Search the database
Search forum topics
Search members
Search for trades
diablo2.io is supported by ads
diablo2.io is supported by ads
4 replies   582 views
2

Description

I’m trying to find (or confirm if it exists) a Discord bot for Diablo that tracks
Terror
Zones in a very minimal way? The best would be a bot that would just update it's own name about what zone is terrorized and for how long. Similar to this one that someone has made, but for 3-day instance reset timers for world of wracraft:

https://www.reddit.com/r/classicwow/com ... et_timers/
5

Can be used to make Runewords:

7
I’m trying to find (or confirm if it exists) a Discord bot for Diablo that tracks
Terror
Zones in a very minimal way? The best would be a bot that would just update it's own name about what zone is terrorized and for how long. Similar to this one that someone has made, but for 3-day instance reset timers for world of wracraft:

https://www.reddit.com/r/classicwow/com ... et_timers/
7
This is a quick and basic python discord bot that does what you want.
I took the liberty to also add the TZ time remaining, the Imunities displayed as emojis in the name, and also the loot/exp tier for said zones.
As a needed side-effect, I also abreviated the zone names when the bot changes his nick, since there is a hard 32 char limit on the nick a user can have on discord.
You will need to run it locally and invite it to your server.

It needs an .env file located in the same directory as the python file, with the following format:

DISCORD_BOT_TOKEN=discord_bot_token
D2TZ_TOKEN=your_d2tz_token ( TAKE CARE IT EXPIRES EVERY MONTH )
CHECK_EVERY_SECONDS=60 ( refresh duration in seconds )

The following is the python code needed to run the bot:

import asyncio
import json
import os
import sqlite3
import time
from pathlib import Path
from typing import Any, Optional, Tuple
import aiohttp
import discord
from discord.ext import commands
from dotenv import load_dotenv
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timezone


load_dotenv()

# =========================================================
# CONFIG
# =========================================================
BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
D2TZ_TOKEN = os.getenv("D2TZ_TOKEN")
MAX_NICKNAME_LEN = 32

# Refresh a little before expiry to avoid boundary issues
REFRESH_EARLY_SECONDS = 10

D2TZ_API_URL = "https://api.d2tz.info/public/tz"
DB_PATH = Path("bot_data.sqlite3")

if not BOT_TOKEN:
raise RuntimeError("DISCORD_BOT_TOKEN is not set in .env")

if not D2TZ_TOKEN:
raise RuntimeError("D2TZ_TOKEN is not set in .env")

# =========================================================
# LOGGING SETUP
# =========================================================
LOG_DIR = Path("logs")
LOG_DIR.mkdir(exist_ok=True)

ACTIVITY_LOG_FILE = LOG_DIR / "activity.log"
ERROR_LOG_FILE = LOG_DIR / "error.log"

LOG_FORMAT = "[%(asctime)s] [%(levelname)s] %(message)s"
LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"

formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATEFMT)

logger = logging.getLogger("d2tzbot")
logger.setLevel(logging.INFO)
logger.propagate = False

# Prevent duplicate handlers on reload/restart
if logger.handlers:
logger.handlers.clear()

# Console / systemd journal
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)

# Activity log: everything INFO and above
activity_handler = RotatingFileHandler(
ACTIVITY_LOG_FILE,
maxBytes=5 * 1024 * 1024,
backupCount=5,
encoding="utf-8",
)
activity_handler.setLevel(logging.INFO)
activity_handler.setFormatter(formatter)

# Error log: WARNING and ERROR only
error_handler = RotatingFileHandler(
ERROR_LOG_FILE,
maxBytes=5 * 1024 * 1024,
backupCount=5,
encoding="utf-8",
)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)

logger.addHandler(console_handler)
logger.addHandler(activity_handler)
logger.addHandler(error_handler)

# =========================================================
# DISCORD SETUP
# =========================================================
intents = discord.Intents.default()
intents.message_content = True

bot = commands.Bot(command_prefix="!", intents=intents, help_command=None)

http_session: Optional[aiohttp.ClientSession] = None

# Per-guild last announced zone
Key
cache
guild_config_key_cache: dict[int, str] = {}

# Cached TZ data
tz_cache: dict[str, Any] = {
"zone_name": None,
"end_time": None,
"immunity_icons": "",
"tier_text": "",
"zone_key": None,
"next_zone_name": None,
"next_start_time": None,
"last_fetch_time": 0,
}

bg_tasks_started = False
bot_start_time = datetime.now(timezone.utc)
# =========================================================
# ABBREVIATIONS / DISPLAY
# =========================================================
ZONE_ABBREVIATIONS = {
"
Abaddon
": "Ab",
"
Ancient Tunnels
": "AT",
"
Arachnid Lair
": "AL",
"
Arcane Sanctuary
": "AS",
"
Arreat Plateau
": "AP",
"
Barracks
": "Bar",
"
Black Marsh
": "BM",
"
Blood Moor
": "BldM",
"
Burial Grounds
": "BG",
"
Canyon of the Magi
": "CotM",
"Catacombs": "Cat",
"
Cathedral
": "Cath",
"
Cold Plains
": "CP",
"
Crystalline Passage
": "CrP",
"
Dark Wood
": "DW",
"
Den of Evil
": "DoE",
"
Drifter Cavern
": "DC",
"
Dry Hills
": "DH",
"Durance Of Hate": "DoH",
"
Far Oasis
": "FO",
"Flayer Dungeon": "FD",
"
Flayer Jungle
": "FJ",
"Forgotten Tower": "FT",
"
Frigid Highlands
": "FH",
"
Frozen River
": "FR",
"
Glacial Trail
": "GT",
"
Great Marsh
": "GM",
"
Halls of Pain
": "HoP",
"Halls Of The Dead": "HotD",
"
Halls of Vaught
": "HoV",
"Jail": "Jail",
"
Kurast Bazaar
": "KB",
"Kurast Sewers": "KS",
"
Kurast Causeway
": "KC",
"
Lost City
": "LC",
"
Lower Kurast
": "LK",
"Maggot Lair": "ML",
"Marsh Of Pain": "MoP",
"
Mausoleum
": "Maus",
"Moo Moo Farm": "MMF",
"
Pit of Acheron
": "PoA",
"
Plains of Despair
": "PoD",
"Pit": "Pit",
"
River of Flame
": "RoF",
"
Rocky Waste
": "RW",
"Sewers": "Sew",
"
Spider Cavern
": "SC",
"
Spider Forest
": "SF",
"
Stony Field
": "StF",
"Stony Tomb": "ST",
"Swampy Pit": "SwP",
"
Tal Rasha's Tomb
": "TRT",
"
Tamoe Highland
": "TH",
"The
Crypt
": "
Crypt
",
"The Hole": "Hole",
"The Pit": "Pit",
"
The Secret Cow Level
": "Cows",
"Tower Cellar": "TC",
"
Travincal
": "Trav",
"
Tristram
": "Tris",
"Underground Passage": "UP",
"
Valley of Snakes
": "VoS",
"Worldstone Keep": "WSK",
}

IMMUNITY_EMOJIS = {
"f": "🔥",
"c": "❄️",
"l": "⚡",
"p": "☠️",
"ph": "🟣",
"m": "🧪",
}

IMMUNITY_LABELS = {
"f": "Fire",
"c": "Cold",
"l": "Lightning",
"p": "Poison",
"ph": "Physical",
"m": "Magic",
}

# =========================================================
# DATABASE
# =========================================================
from typing import Optional


def cleanup_guild(guild_id: int, reason: str, guild_name: Optional[str] = None) -> None:
"""
Centralized cleanup for a guild config.
Logs clearly why it happened.
"""
delete_guild_config(guild_id)
guild_config_key_cache.pop(guild_id, None)

name_part = f" ({guild_name})" if guild_name else ""
logger.info(f"[CLEANUP] Guild {guild_id}{name_part} removed | reason: {reason}")


def audit_guild_configs(bot_guilds: list) -> None:
"""
Startup audit:
Removes any guild configs that no longer exist in bot.guilds.
"""
configs = get_all_guild_configs()
bot_guild_map = {g.id: g.name for g in bot_guilds}

removed = 0

for config in configs:
guild_id = int(config["guild_id"])

if guild_id not in bot_guild_map:
cleanup_guild(
guild_id,
"startup audit: bot not in guild",
guild_name=None,
)
removed += 1

logger.info(f"[AUDIT] Startup guild audit complete | removed stale configs: {removed}")


def get_db_connection() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn


def init_db() -> None:
with get_db_connection() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS guild_config (
guild_id INTEGER PRIMARY
Key
,
announce_channel_id INTEGER,
nickname_enabled INTEGER NOT NULL DEFAULT 1,
announcements_enabled INTEGER NOT NULL DEFAULT 1
)
"""
)
conn.commit()


def upsert_guild_config(
guild_id: int,
announce_channel_id: Optional[int] = None,
nickname_enabled: Optional[bool] = None,
announcements_enabled: Optional[bool] = None,
) -> None:
with get_db_connection() as conn:
existing = conn.execute(
"SELECT * FROM guild_config WHERE guild_id = ?",
(guild_id,),
).fetchone()

if existing is None:
conn.execute(
"""
INSERT INTO guild_config (
guild_id,
announce_channel_id,
nickname_enabled,
announcements_enabled
) VALUES (?, ?, ?, ?)
""",
(
guild_id,
announce_channel_id,
1 if nickname_enabled is None else int(nickname_enabled),
1 if announcements_enabled is None else int(announcements_enabled),
),
)
else:
new_channel_id = announce_channel_id if announce_channel_id is not None else existing["announce_channel_id"]
new_nickname_enabled = int(nickname_enabled) if nickname_enabled is not None else existing["nickname_enabled"]
new_announcements_enabled = (
int(announcements_enabled)
if announcements_enabled is not None
else existing["announcements_enabled"]
)

conn.execute(
"""
UPDATE guild_config
SET announce_channel_id = ?,
nickname_enabled = ?,
announcements_enabled = ?
WHERE guild_id = ?
""",
(
new_channel_id,
new_nickname_enabled,
new_announcements_enabled,
guild_id,
),
)
conn.commit()


def get_guild_config(guild_id: int) -> Optional[sqlite3.Row]:
with get_db_connection() as conn:
return conn.execute(
"SELECT * FROM guild_config WHERE guild_id = ?",
(guild_id,),
).fetchone()


def get_all_guild_configs() -> list[sqlite3.Row]:
with get_db_connection() as conn:
return conn.execute("SELECT * FROM guild_config ORDER BY guild_id").fetchall()


def delete_guild_config(guild_id: int) -> None:
with get_db_connection() as conn:
conn.execute("DELETE FROM guild_config WHERE guild_id = ?", (guild_id,))
conn.commit()


# =========================================================
# HTTP
# =========================================================
async def get_http_session() -> aiohttp.ClientSession:
global http_session

if http_session is None or http_session.closed:
timeout = aiohttp.ClientTimeout(total=15)
http_session = aiohttp.ClientSession(timeout=timeout)

return http_session


# =========================================================
# D2TZ API
# =========================================================
async def fetch_current_zone_data() -> Any:
session = await get_http_session()
params = {"token": D2TZ_TOKEN}

async with session.get(D2TZ_API_URL, params=params) as resp:
text = await resp.text()

if resp.status != 200:
logger.error(f"D2TZ API returned HTTP {resp.status}: {text[:300]}")
raise RuntimeError(f"D2TZ API returned HTTP {resp.status}: {text[:300]}")

try:
data = json.loads(text)
except json.JSONDecodeError as e:
logger.error(f"D2TZ API did not return valid JSON: {text[:300]}")
raise RuntimeError(f"D2TZ API did not return valid JSON: {text[:300]}") from e

if isinstance(data, dict) and data.get("status") == "error":
logger.error(f"D2TZ API error: {data.get('message', 'unknown error')}")
raise RuntimeError(f"D2TZ API error: {data.get('message', 'unknown error')}")

return data


def normalize_zone_name(name: str) -> str:
return name.replace("_", " ").strip()


def format_zone_name(zone_names: Any) -> Optional[str]:
if isinstance(zone_names, list) and zone_names:
cleaned = [normalize_zone_name(str(z)) for z in zone_names if str(z).strip()]
if cleaned:
return " / ".join(cleaned)

if isinstance(zone_names, str) and zone_names.strip():
return normalize_zone_name(zone_names)

return None


def format_immunities(immunities: Any) -> str:
if not isinstance(immunities, list):
return ""

out = []
for code in immunities:
code_str = str(code).strip().lower()
emoji = IMMUNITY_EMOJIS.get(code_str)
out.append(emoji if emoji else code_str.upper())

return "".join(out)


def format_tiers(entry: dict) -> str:
exp_tier = str(entry.get("tier-exp", "")).strip()
loot_tier = str(entry.get("tier-loot", "")).strip()

if exp_tier and loot_tier:
return f"{exp_tier}/{loot_tier}"
if exp_tier:
return f"{exp_tier}/?"
if loot_tier:
return f"?/{loot_tier}"
return ""


def make_zone_key(zone_name: str, end_time: Optional[int]) -> str:
return f"{zone_name}|{end_time}"


def parse_zone_entry(entry: dict, now: int) -> Tuple[Optional[str], Optional[int], Optional[int], str, str, Optional[str], Optional[int]]:
zone_name = format_zone_name(entry.get("zone_name"))

start_time = entry.get("time")
end_time = entry.get("end_time")

start_ts = int(start_time) if isinstance(start_time, (int, float)) else None
end_ts = int(end_time) if isinstance(end_time, (int, float)) else None

minutes_left = None
if end_ts is not None:
seconds_left = max(0, end_ts - now)
minutes_left = seconds_left // 60

immunity_icons = format_immunities(entry.get("immunities"))
tier_text = format_tiers(entry)
zone_key = make_zone_key(zone_name, end_ts) if zone_name else None

return zone_name, minutes_left, end_ts, immunity_icons, tier_text, zone_key, start_ts


def extract_current_and_next_zone_info(
data: Any,
) -> Tuple[
Optional[str], Optional[int], Optional[int], str, str, Optional[str],
Optional[str], Optional[int]
]:
now = int(time.time())

if isinstance(data, list):
entries = [item for item in data if isinstance(item, dict)]

if len(entries) >= 2:
sorted_entries = sorted(
entries,
Key
=lambda e: int(e.get("end_time")) if isinstance(e.get("end_time"), (int, float)) else 10**18
)

current_entry = sorted_entries[0]
next_entry = sorted_entries[1]

current_zone_name, current_minutes_left, current_end_time, current_immunity_icons, current_tier_text, current_zone_key, _ = parse_zone_entry(current_entry, now)
next_zone_name, _, _, _, _, _, next_start_time = parse_zone_entry(next_entry, now)

next_seconds_until = None
if next_start_time is not None:
next_seconds_until = max(0, next_start_time - now)

return (
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
next_zone_name,
next_seconds_until,
)

elif len(entries) == 1:
current_zone_name, current_minutes_left, current_end_time, current_immunity_icons, current_tier_text, current_zone_key, _ = parse_zone_entry(entries[0], now)
return (
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
None,
None,
)

if isinstance(data, dict):
current_zone_name, current_minutes_left, current_end_time, current_immunity_icons, current_tier_text, current_zone_key, _ = parse_zone_entry(data, now)
return (
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
None,
None,
)

return None, None, None, "", "", None, None, None


async def refresh_tz_cache(force: bool = False) -> bool:
"""
Refresh API cache only when needed.
Returns True if zone
Key
changed.
"""
global tz_cache

now = int(time.time())

current_end_time = tz_cache.get("end_time")
old_zone_key = tz_cache.get("zone_key")

should_refresh = force
if not should_refresh:
if current_end_time is None:
should_refresh = True
elif now >= int(current_end_time) - REFRESH_EARLY_SECONDS:
should_refresh = True

if not should_refresh:
return False

data = await fetch_current_zone_data()

(
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
next_zone_name,
next_seconds_until,
) = extract_current_and_next_zone_info(data)

if not current_zone_name:
raise RuntimeError(f"Could not find current zone in API response: {data}")

next_start_time = None
if next_seconds_until is not None:
next_start_time = now + next_seconds_until

tz_cache = {
"zone_name": current_zone_name,
"end_time": current_end_time,
"immunity_icons": current_immunity_icons,
"tier_text": current_tier_text,
"zone_key": current_zone_key,
"next_zone_name": next_zone_name,
"next_start_time": next_start_time,
"last_fetch_time": now,
}

return old_zone_key != current_zone_key


def get_cached_zone_snapshot() -> Tuple[str, Optional[int], Optional[int], str, str, Optional[str], Optional[str], Optional[int]]:
zone_name = tz_cache.get("zone_name")
end_time = tz_cache.get("end_time")
immunity_icons = tz_cache.get("immunity_icons", "")
tier_text = tz_cache.get("tier_text", "")
zone_key = tz_cache.get("zone_key")
next_zone_name = tz_cache.get("next_zone_name")
next_start_time = tz_cache.get("next_start_time")

if not zone_name:
raise RuntimeError("TZ cache is empty")

now = int(time.time())

minutes_left = None
if isinstance(end_time, int):
seconds_left = max(0, end_time - now)
minutes_left = seconds_left // 60

next_seconds_until = None
if isinstance(next_start_time, int):
next_seconds_until = max(0, next_start_time - now)

return zone_name, minutes_left, end_time, immunity_icons, tier_text, zone_key, next_zone_name, next_seconds_until

# =========================================================
# NICKNAME BUILDING
# =========================================================
def abbreviate_single_zone(zone_name: str) -> str:
normalized = normalize_zone_name(zone_name)

if normalized in ZONE_ABBREVIATIONS:
return ZONE_ABBREVIATIONS[normalized]

words = [w for w in normalized.replace("'", "").split() if w]
if not words:
return normalized

if len(words) == 1:
return words[0][:4]
return "".join(word[0].upper() for word in words)


def abbreviate_zone_string(zone_string: str) -> str:
parts = [part.strip() for part in zone_string.split("/")]
abbreviated_parts = [abbreviate_single_zone(part) for part in parts if part.strip()]
return " / ".join(abbreviated_parts)


def build_nickname(zone_name: str, minutes_left: Optional[int], immunity_icons: str, tier_text: str) -> str:
time_part = f" ({minutes_left}m)" if minutes_left is not None else ""

extras = []
if immunity_icons:
extras.append(immunity_icons)
if tier_text:
extras.append(tier_text)

extra_part = f" {' '.join(extras)}" if extras else ""
full_name = f"{zone_name}{extra_part}{time_part}"

if len(full_name) <= MAX_NICKNAME_LEN:
return full_name

abbreviated_zone = abbreviate_zone_string(zone_name)
abbreviated_full = f"{abbreviated_zone}{extra_part}{time_part}"
if len(abbreviated_full) <= MAX_NICKNAME_LEN:
return abbreviated_full

if immunity_icons and tier_text:
reduced_extra = f" {immunity_icons}"
reduced_name = f"{abbreviated_zone}{reduced_extra}{time_part}"
if len(reduced_name) <= MAX_NICKNAME_LEN:
return reduced_name

tiny_parts = []
for part in [p.strip() for p in zone_name.split("/")]:
abbr = abbreviate_single_zone(part)
tiny_parts.append(abbr[:3] if len(abbr) > 3 else abbr)

tiny_zone = " / ".join(tiny_parts)
tiny_full = f"{tiny_zone}{extra_part}{time_part}"
if len(tiny_full) <= MAX_NICKNAME_LEN:
return tiny_full

tiny_reduced = f"{tiny_zone}{(' ' + immunity_icons) if immunity_icons else ''}{time_part}"
if len(tiny_reduced) <= MAX_NICKNAME_LEN:
return tiny_reduced

absolute_fallback = f"{tiny_zone}{time_part}"
if len(absolute_fallback) <= MAX_NICKNAME_LEN:
return absolute_fallback

return absolute_fallback[:MAX_NICKNAME_LEN]

# =========================================================
# STATUS HELPERS
# =========================================================
def format_duration(seconds: int) -> str:
seconds = max(0, int(seconds))
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, secs = divmod(rem, 60)

parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
if secs or not parts:
parts.append(f"{secs}s")

return " ".join(parts)


def get_status_snapshot() -> dict[str, Any]:
now = int(time.time())

zone_name = tz_cache.get("zone_name")
end_time = tz_cache.get("end_time")
next_zone_name = tz_cache.get("next_zone_name")
next_start_time = tz_cache.get("next_start_time")
last_fetch_time = tz_cache.get("last_fetch_time", 0)

seconds_left = None
if isinstance(end_time, int):
seconds_left = max(0, end_time - now)

next_seconds_until = None
if isinstance(next_start_time, int):
next_seconds_until = max(0, next_start_time - now)

uptime_seconds = int((datetime.now(timezone.utc) - bot_start_time).total_seconds())

return {
"uptime_seconds": uptime_seconds,
"guild_count_live": len(bot.guilds),
"guild_count_configured": len(get_all_guild_configs()),
"zone_name": zone_name,
"seconds_left": seconds_left,
"next_zone_name": next_zone_name,
"next_seconds_until": next_seconds_until,
"last_fetch_time": last_fetch_time,
"zone_key": tz_cache.get("zone_key"),
}


# =========================================================
# DISCORD HELPERS
# =========================================================
async def get_bot_member(guild: discord.Guild) -> Optional[discord.Member]:
me = guild.me
if me is None and bot.user is not None:
try:
me = await guild.fetch_member(bot.user.id)
except discord.HTTPException:
return None
return me


async def get_text_channel(channel_id: int) -> Optional[discord.TextChannel]:
channel = bot.get_channel(channel_id)
if channel is None:
try:
channel = await bot.fetch_channel(channel_id)
except discord.HTTPException:
return None

return channel if isinstance(channel, discord.TextChannel) else None


async def update_guild_nickname(
guild: discord.Guild,
zone_name: str,
minutes_left: Optional[int],
immunity_icons: str,
tier_text: str,
) -> None:
me = await get_bot_member(guild)
if me is None:
logger.warning(f"Could not fetch bot member in guild {guild.id}")
return

nickname = build_nickname(zone_name, minutes_left, immunity_icons, tier_text)

if me.nick != nickname:
try:
await me.edit(nick=nickname, reason="Updating nickname with cached TZ info")
logger.info(f"[{guild.id}] Nickname updated to: {nickname}")
except discord.Forbidden:
logger.warning(f"[{guild.id}] Missing permission to change nickname.")
except discord.HTTPException as e:
logger.error(f"[{guild.id}] Failed to change nickname: {e}")


async def announce_zone_change(
channel_id: int,
old_zone: Optional[str],
new_zone: str,
minutes_left: Optional[int],
immunity_icons: str,
tier_text: str,
) -> None:
channel = await get_text_channel(channel_id)
if channel is None:
logger.warning(f"Could not access announce channel {channel_id}")
return

bits = []
if immunity_icons:
bits.append(immunity_icons)
if tier_text:
bits.append(f"EXP/LOOT {tier_text}")
if minutes_left is not None:
bits.append(f"{minutes_left}m left")

suffix = f" ({', '.join(bits)})" if bits else ""

if old_zone is None:
message = f"Current zone detected: **{new_zone}**{suffix}"
else:
message = f"Zone changed: **{old_zone}** → **{new_zone}**{suffix}"

try:
await channel.send(message)
logger.info(f"[{channel.guild.id}] Announcement sent: {message}")
except discord.Forbidden:
logger.warning(f"[{channel.guild.id}] Missing permission to send messages in {channel_id}.")
except discord.HTTPException as e:
logger.error(f"[{channel.guild.id}] Failed to send message: {e}")


# =========================================================
# PER-GUILD PROCESSING
# =========================================================
async def process_single_guild(
guild_config: sqlite3.Row,
zone_name: str,
minutes_left: Optional[int],
immunity_icons: str,
tier_text: str,
zone_key: Optional[str],
allow_announcement: bool,
) -> None:
guild_id = int(guild_config["guild_id"])
guild = bot.get_guild(guild_id)

if guild is None:
cleanup_guild(
guild_id,
"stale config detected during loop",
guild_name=None,
)
return

if guild_config["nickname_enabled"]:
await update_guild_nickname(guild, zone_name, minutes_left, immunity_icons, tier_text)

if (
allow_announcement
and guild_config["announcements_enabled"]
and guild_config["announce_channel_id"]
and zone_key
):
old_zone_key = guild_config_key_cache.get(guild_id)
if zone_key != old_zone_key:
old_zone = old_zone_key.split("|", 1)[0] if old_zone_key else None
await announce_zone_change(
int(guild_config["announce_channel_id"]),
old_zone,
zone_name,
minutes_left,
immunity_icons,
tier_text,
)
guild_config_key_cache[guild_id] = zone_key


# =========================================================
# BACKGROUND TASKS
# =========================================================
async def minute_aligned_updater() -> None:
await bot.wait_until_ready()

while not bot.is_closed():
try:
now = time.time()
sleep_for = 60 - (now % 60)
await asyncio.sleep(sleep_for)

if not tz_cache.get("zone_name"):
continue

zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()

configs = get_all_guild_configs()
for config in configs:
# minute updater does not send zone-change announcements
await process_single_guild(
config,
zone_name,
minutes_left,
immunity_icons,
tier_text,
zone_key,
allow_announcement=False,
)

except Exception as e:
logger.error(f"minute_aligned_updater error: {e}")
await asyncio.sleep(5)


async def api_refresh_loop() -> None:
await bot.wait_until_ready()

while not bot.is_closed():
try:
if not tz_cache.get("zone_name") or not tz_cache.get("end_time"):
changed = await refresh_tz_cache(force=True)

zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()
configs = get_all_guild_configs()
for config in configs:
await process_single_guild(
config,
zone_name,
minutes_left,
immunity_icons,
tier_text,
zone_key,
allow_announcement=changed,
)

await asyncio.sleep(5)
continue

now = int(time.time())
refresh_at = int(tz_cache["end_time"]) - REFRESH_EARLY_SECONDS
sleep_for = max(1, refresh_at - now)

await asyncio.sleep(sleep_for)

changed = await refresh_tz_cache(force=True)

zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()
configs = get_all_guild_configs()
for config in configs:
await process_single_guild(
config,
zone_name,
minutes_left,
immunity_icons,
tier_text,
zone_key,
allow_announcement=changed,
)

except Exception as e:
logger.error(f"api_refresh_loop error: {e}")
await asyncio.sleep(10)


# =========================================================
# COMMANDS
# =========================================================
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def setup(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announce_channel_id=ctx.channel.id,
nickname_enabled=True,
announcements_enabled=True,
)
await ctx.send(
f"Setup complete.\n"
f"Announcements channel: {ctx.channel.mention}\n"
f"Nickname updates: enabled\n"
f"Announcements: enabled"
)


@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def setchannel(ctx: commands.Context, channel: discord.TextChannel) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announce_channel_id=channel.id,
)
await ctx.send(f"Announcement channel set to {channel.mention}.")


@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def enabletz(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=True,
announcements_enabled=True,
)
await ctx.send("
Terror
Zone updates enabled for this server.")


@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def disabletz(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=False,
announcements_enabled=False,
)
await ctx.send("
Terror
Zone updates disabled for this server.")


@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def nickon(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=True,
)
await ctx.send("Nickname updates enabled for this server.")


@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def nickoff(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=False,
)
await ctx.send("Nickname updates disabled for this server.")


@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def announceon(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announcements_enabled=True,
)
await ctx.send("Announcements enabled for this server.")


@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def announceoff(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announcements_enabled=False,
)
await ctx.send("Announcements disabled for this server.")


@bot.command()
@commands.guild_only()
async def showconfig(ctx: commands.Context) -> None:
config = get_guild_config(ctx.guild.id)

if config is None:
await ctx.send("This server is not configured yet. Run `!setup` in the channel you want to use.")
return

channel_id = config["announce_channel_id"]
channel_text = f"<#{channel_id}>" if channel_id else "Not set"

await ctx.send(
f"Guild config:\n"
f"- Announce channel: {channel_text}\n"
f"- Nickname updates: {'enabled' if config['nickname_enabled'] else 'disabled'}\n"
f"- Announcements: {'enabled' if config['announcements_enabled'] else 'disabled'}"
)


@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def removetz(ctx: commands.Context) -> None:
delete_guild_config(ctx.guild.id)
guild_config_key_cache.pop(ctx.guild.id, None)
await ctx.send("This server's
Terror
Zone configuration was removed.")


@bot.command()
async def zone(ctx: commands.Context) -> None:
try:
if not tz_cache.get("zone_name"):
await refresh_tz_cache(force=True)

zone_name, minutes_left, end_time, immunity_icons, tier_text, _zone_key, next_zone_name, next_seconds_until = get_cached_zone_snapshot()

nickname_preview = build_nickname(zone_name, minutes_left, immunity_icons, tier_text)

msg = f"Current zone: **{zone_name}**"
if immunity_icons:
msg += f"\nImmunities: {immunity_icons}"
if tier_text:
msg += f"\nEXP/LOOT tier: `{tier_text}`"
if minutes_left is not None:
msg += f"\nTime left: `{minutes_left}m`"

if next_zone_name:
if next_seconds_until is not None:
next_minutes = next_seconds_until // 60
msg += f"\nNext zone: **{next_zone_name}** (in {next_minutes}m)"
else:
msg += f"\nNext zone: **{next_zone_name}**"

msg += f"\nNickname preview: `{nickname_preview}`"
if end_time is not None:
msg += f"\nEnd time (unix): `{end_time}`"
msg += f"\nLast API fetch: `<t:{tz_cache['last_fetch_time']}:R>`"

await ctx.send(msg)

except Exception as e:
await ctx.send(f"Error reading current zone: `{e}`")


@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def zonedebug(ctx: commands.Context) -> None:
try:
data = await fetch_current_zone_data()
pretty = json.dumps(data, indent=2, ensure_ascii=False)

if len(pretty) > 3500:
pretty = pretty[:3500] + "\n... (truncated)"

await ctx.send(f"```json\n{pretty}\n```")
except Exception as e:
await ctx.send(f"Debug failed: `{e}`")


@bot.command()
async def legend(ctx: commands.Context) -> None:
lines = ["Zone abbreviation legend:"]
for full_name in sorted(ZONE_ABBREVIATIONS):
lines.append(f"{ZONE_ABBREVIATIONS[full_name]} = {full_name}")

lines.append("")
lines.append("Immunity legend:")
for code in ["f", "c", "l", "p", "ph", "m"]:
if code in IMMUNITY_EMOJIS:
lines.append(f"{IMMUNITY_EMOJIS[code]} = {IMMUNITY_LABELS.get(code, code)}")

message = "\n".join(lines)

if len(message) <= 1900:
await ctx.send(f"```text\n{message}\n```")
return

chunks = []
current = ""
for line in lines:
if len(current) + len(line) + 1 > 1900:
chunks.append(current)
current = line + "\n"
else:
current += line + "\n"
if current:
chunks.append(current)

for chunk in chunks:
await ctx.send(f"```text\n{chunk}\n```")


@bot.command()
async def status(ctx: commands.Context) -> None:
try:
snapshot = get_status_snapshot()

uptime_text = format_duration(snapshot["uptime_seconds"])
configured = snapshot["guild_count_configured"]
live = snapshot["guild_count_live"]

msg = [
"**Bot status**",
f"Uptime: `{uptime_text}`",
f"Connected guilds: `{live}`",
f"Configured guilds: `{configured}`",
]

if snapshot["zone_name"]:
msg.append(f"Cached current zone: **{snapshot['zone_name']}**")
else:
msg.append("Cached current zone: `none`")

if snapshot["seconds_left"] is not None:
msg.append(f"Time left: `{format_duration(snapshot['seconds_left'])}`")

if snapshot["next_zone_name"]:
if snapshot["next_seconds_until"] is not None:
msg.append(
f"Next zone: **{snapshot['next_zone_name']}** "
f"(in {format_duration(snapshot['next_seconds_until'])})"
)
else:
msg.append(f"Next zone: **{snapshot['next_zone_name']}**")

if snapshot["last_fetch_time"]:
msg.append(f"Last API fetch: <t:{snapshot['last_fetch_time']}:F>")
msg.append(f"Last API fetch (relative): <t:{snapshot['last_fetch_time']}:R>")

if snapshot["zone_key"]:
msg.append(f"Zone
Key
: `{snapshot['zone_key']}`")

await ctx.send("\n".join(msg))

except Exception as e:
logger.error(f"status command error: {e}")
await ctx.send(f"Status error: `{e}`")


@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def forcezone(ctx: commands.Context) -> None:
try:
changed = await refresh_tz_cache(force=True)
zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()

config = get_guild_config(ctx.guild.id)
if config is None:
await ctx.send("This server is not configured yet. Run `!setup` first.")
return

if config["nickname_enabled"]:
await update_guild_nickname(ctx.guild, zone_name, minutes_left, immunity_icons, tier_text)

if config["announcements_enabled"] and config["announce_channel_id"]:
old_key = guild_config_key_cache.get(ctx.guild.id)
old_zone = old_key.split("|", 1)[0] if old_key else None
await announce_zone_change(
int(config["announce_channel_id"]),
old_zone,
zone_name,
minutes_left,
immunity_icons,
tier_text,
)
if zone_key:
guild_config_key_cache[ctx.guild.id] = zone_key

await ctx.send(
f"Forced refresh complete: **{build_nickname(zone_name, minutes_left, immunity_icons, tier_text)}**"
f"\nAPI returned {'a new zone' if changed else 'the same zone'}."
)

except Exception as e:
await ctx.send(f"Force update failed: `{e}`")


@bot.command(name="help")
async def help_command(ctx: commands.Context) -> None:
is_guild = ctx.guild is not None
can_manage_guild = (
is_guild and
isinstance(ctx.author, discord.Member) and
ctx.author.guild_permissions.manage_guild
)

lines = [
"**D2TZ Bot Help**",
"",
"**Public commands**",
"`!zone` - Show current
Terror
Zone, time left, next zone, and nickname preview.",
"`!status` - Show bot uptime, cached TZ info, and basic health/status.",
"`!legend` - Show zone abbreviations and immunity emoji meanings.",
]

if can_manage_guild:
lines.extend([
"",
"**Server admin commands**",
"`!setup` - Initialize the bot in this server and use the current channel for announcements.",
"`!setchannel #channel` - Set the announcement channel for this server.",
"`!showconfig` - Show this server's current bot configuration.",
"`!enabletz` - Enable both nickname updates and announcements.",
"`!disabletz` - Disable both nickname updates and announcements.",
"`!nickon` - Enable nickname updates only.",
"`!nickoff` - Disable nickname updates only.",
"`!announceon` - Enable announcements only.",
"`!announceoff` - Disable announcements only.",
"`!removetz` - Remove this server's TZ configuration.",
"`!forcezone` - Force an immediate TZ refresh and update.",
"`!zonedebug` - Show raw API data for debugging.",
])

if not is_guild:
lines.extend([
"",
"_Note: server setup commands only work inside a server, not in DMs._"
])
elif not can_manage_guild:
lines.extend([
"",
"_You do not currently have the 'Manage Server' permission, so admin commands are hidden._"
])

await ctx.send("\n".join(lines))


# =========================================================
# EVENTS
# =========================================================
@bot.event
async def on_ready() -> None:
global bg_tasks_started

logger.info(f"Logged in as {bot.user} ({bot.user.id})")

init_db()
await get_http_session()
await refresh_tz_cache(force=True)

# 🔍 Startup audit
audit_guild_configs(bot.guilds)

# Rebuild cache safely
configs = get_all_guild_configs()
for config in configs:
guild_config_key_cache.setdefault(int(config["guild_id"]), "")

if not bg_tasks_started:
bg_tasks_started = True
asyncio.create_task(minute_aligned_updater())
asyncio.create_task(api_refresh_loop())


@bot.event
async def on_guild_remove(guild: discord.Guild) -> None:
cleanup_guild(
guild.id,
"bot removed from server",
guild.name,
)


@bot.event
async def on_command_error(ctx: commands.Context, error: Exception) -> None:
if isinstance(error, commands.MissingPermissions):
await ctx.send("You need the 'Manage Server' permission to use that command.")

elif isinstance(error, commands.CheckFailure):
await ctx.send("You do not have permission to use that command.")

elif isinstance(error, commands.NoPrivateMessage):
await ctx.send("This command can only be used inside a server.")

elif isinstance(error, commands.MissingRequiredArgument):
await ctx.send("Missing argument for that command.")

elif isinstance(error, commands.BadArgument):
await ctx.send("Invalid argument.")

elif isinstance(error, commands.CommandNotFound):
return

else:
logger.error(f"Command error ({ctx.command}): {error}")
await ctx.send("An unexpected error occurred.")


# =========================================================
# MAIN
# =========================================================
async def main() -> None:
init_db()
try:
await bot.start(BOT_TOKEN)
finally:
global http_session
if http_session and not http_session.closed:
await http_session.close()


if __name__ == "__main__":
asyncio.run(main())

It also sends a text message when the zone changes, but you can disable that if you want to ( I would keep it since it transmits the message without abreviations ).
A practical note: because nicknames are short, this code uses a priority order:

full zone names + immunities + tiers + time
abbreviated zone names + immunities + tiers + time
abbreviated zone names + immunities + time
abbreviated zone names + time

So if the nickname gets too long, the tiers are the first thing it drops, then it keeps the immunities and timer. That keeps the most useful info visible.
7
After testing it for like a day, I decided to host it up and let everyone who needs it use it without the hassle of having computer literacy skills.
I have further modified the original bot so that everyone can use it out of the box, without having to mumble through .env files and such. ( I have updated the source code above as well, to reflect the new changes)

To achieve this, once the bot is invited to a server, just type !setup in the chat channel you want the bot to anounce in ( you must run !setup at least once so that the bot actually starts working ).

These are all the various bot commands so far:

Admin ( by Admin, I mean both Admins as well as people who have the Manage Server role in the server ):

!setup - initial setup of the bot, automatically adding the GUILD_ID and ANNOUNCE_CHANNEL_ID to the sqlite db the bot uses so it knows on which server/channel to connect to
!setchannel - self explanatory, use it in case you need to change the channel it anounces on ( for example: !setchannel #tz-updates )
!enabletz / !disabletz - enables/disables the whole bot functionality, aka nickname updates and announcements
!nickon / !nickoff - enables/disables the nickname updates
!announceon / !announceoff - enables/disables the announcements via text channel
!removetz - deletes the entry for your server from the sqlite database the bot uses, basically it stops working XD
!forcezone - forces an update to the nickname, sends an announcement, ignores cooldown
!showconfig - shows the current config of the bot
!zonedebug - mostly for internal use while coding it, but left it in


Everyone:

!zone - general zone information
!legend - legend of what everything means
!status - status of bot
!help - shows the list of commands

Here is the invite link for the bot: https://discord.com/oauth2/authorize?cl ... 7407664188

I hope it helps anyone who may need it, and in case you do use it and run into any bugs, drop me a message and I'll see what's up.
7
Bot update:

Added next-zone preview support for display commands.
Refactored the bot from a single-server design into a multi-guild design.
Removed hardcoded GUILD_ID and ANNOUNCE_CHANNEL_ID from .env.
Added SQLite storage for per-server configuration.
Added per-server setup and control commands:
Made the bot usable in multiple Discord servers through one hosted instance.
Adjusted permissions so setup/admin commands use Manage Server instead of requiring full Administrator.
Clarified which commands are public and which are admin-only.
Added a custom !help command and disabled the default Discord.py help command.
Restricted !zonedebug so it can be limited to server managers/admins instead of everyone.
Improved command error handling so users get clean messages instead of raw exceptions.
Deployed the bot to a VPS.
Added cleanup behavior for servers that remove the bot.
Added on_guild_remove cleanup so the bot deletes server config automatically when kicked.
Added loop-time stale-config cleanup as a fallback if a guild record remains after the bot is gone.
Added startup audit cleanup, so on every restart the bot checks database guilds against actual connected guilds and removes stale entries.
Centralized guild cleanup logic into a single helper and improved cleanup logging.
Added structured logging.
We split logs into:
activity.log
error.log
Kept console logging so logs also remain visible through journalctl.
Added a !status command to show runtime and cache health information.
Improved logging and status so the bot is easier to monitor in production.
Reduced API usage by introducing caching.
Changed nickname updates to use cached data and update locally every minute.
Added a refresh policy so the API is only called again shortly before the current zone expires.
Aligned nickname updates to the real minute boundary, so countdown changes happen at exact minute marks instead of drifting from startup time.
Split background behavior into two internal loops:
Minute-aligned nickname updater
API refresh loop based on expiry
Kept !forcezone available for manual refresh/testing.
Added automatic SQLite database backups on the VPS.
9

Advertisment

Hide ads
999

Greetings stranger!

You don't appear to be logged in...

No matches
 

 

 

 

You haven't specified which diablo2.io user you completed this trade with. This means that you will not be able to exchange trust.

Are you sure you want to continue?

Yes, continue without username
No, I will specify a username
Are you sure you want to delete your entire Holy Grail collection? This action is irreversible.

Are you sure you want to continue?

Yes, delete my entire collection
No, I want to keep my collection
Choose which dclone tracking options you want to see in this widget:
Version:
Value:
Hide ads forever by supporting the site with a donation.

Greetings adblocker...

Warriv asks that you consider disabling your adblocker when using diablo2.io

Ad revenue helps keep the servers going and supports me, the site's creator :)

A one-time donation hides all ads, forever:
Make a donation