3
replies
507 views
Description
I’m trying to find (or confirm if it exists) a Discord bot for Diablo that tracks Terror Zones in a very minimal way? The best would be a bot that would just update it's own name about what zone is terrorized and for how long. Similar to this one that someone has made, but for 3-day instance reset timers for world of wracraft:
https://www.reddit.com/r/classicwow/com ... et_timers/
https://www.reddit.com/r/classicwow/com ... et_timers/
Can be used to make Runewords:
I’m trying to find (or confirm if it exists) a Discord bot for Diablo that tracks Terror Zones in a very minimal way? The best would be a bot that would just update it's own name about what zone is terrorized and for how long. Similar to this one that someone has made, but for 3-day instance reset timers for world of wracraft:
https://www.reddit.com/r/classicwow/com ... et_timers/
https://www.reddit.com/r/classicwow/com ... et_timers/
You could build your own using this free API: https://www.d2tz.info/api
This is a quick and basic python discord bot that does what you want.
I took the liberty to also add the TZ time remaining, the Imunities displayed as emojis in the name, and also the loot/exp tier for said zones.
As a needed side-effect, I also abreviated the zone names when the bot changes his nick, since there is a hard 32 char limit on the nick a user can have on discord.
You will need to run it locally and invite it to your server.
It needs an .env file located in the same directory as the python file, with the following format:
DISCORD_BOT_TOKEN=discord_bot_token
D2TZ_TOKEN=your_d2tz_token ( TAKE CARE IT EXPIRES EVERY MONTH )
CHECK_EVERY_SECONDS=60 ( refresh duration in seconds )
The following is the python code needed to run the bot:
import asyncio
import json
import os
import sqlite3
import time
from pathlib import Path
from typing import Any, Optional, Tuple
import aiohttp
import discord
from discord.ext import commands
from dotenv import load_dotenv
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timezone
load_dotenv()
# =========================================================
# CONFIG
# =========================================================
BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
D2TZ_TOKEN = os.getenv("D2TZ_TOKEN")
MAX_NICKNAME_LEN = 32
# Refresh a little before expiry to avoid boundary issues
REFRESH_EARLY_SECONDS = 10
D2TZ_API_URL = "https://api.d2tz.info/public/tz"
DB_PATH = Path("bot_data.sqlite3")
if not BOT_TOKEN:
raise RuntimeError("DISCORD_BOT_TOKEN is not set in .env")
if not D2TZ_TOKEN:
raise RuntimeError("D2TZ_TOKEN is not set in .env")
# =========================================================
# LOGGING SETUP
# =========================================================
LOG_DIR = Path("logs")
LOG_DIR.mkdir(exist_ok=True)
ACTIVITY_LOG_FILE = LOG_DIR / "activity.log"
ERROR_LOG_FILE = LOG_DIR / "error.log"
LOG_FORMAT = "[%(asctime)s] [%(levelname)s] %(message)s"
LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATEFMT)
logger = logging.getLogger("d2tzbot")
logger.setLevel(logging.INFO)
logger.propagate = False
# Prevent duplicate handlers on reload/restart
if logger.handlers:
logger.handlers.clear()
# Console / systemd journal
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
# Activity log: everything INFO and above
activity_handler = RotatingFileHandler(
ACTIVITY_LOG_FILE,
maxBytes=5 * 1024 * 1024,
backupCount=5,
encoding="utf-8",
)
activity_handler.setLevel(logging.INFO)
activity_handler.setFormatter(formatter)
# Error log: WARNING and ERROR only
error_handler = RotatingFileHandler(
ERROR_LOG_FILE,
maxBytes=5 * 1024 * 1024,
backupCount=5,
encoding="utf-8",
)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(activity_handler)
logger.addHandler(error_handler)
# =========================================================
# DISCORD SETUP
# =========================================================
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents, help_command=None)
http_session: Optional[aiohttp.ClientSession] = None
# Per-guild last announced zone Key cache
guild_config_key_cache: dict[int, str] = {}
# Cached TZ data
tz_cache: dict[str, Any] = {
"zone_name": None,
"end_time": None,
"immunity_icons": "",
"tier_text": "",
"zone_key": None,
"next_zone_name": None,
"next_start_time": None,
"last_fetch_time": 0,
}
bg_tasks_started = False
bot_start_time = datetime.now(timezone.utc)
# =========================================================
# ABBREVIATIONS / DISPLAY
# =========================================================
ZONE_ABBREVIATIONS = {
" Abaddon": "Ab",
" Ancient Tunnels": "AT",
" Arachnid Lair": "AL",
" Arcane Sanctuary": "AS",
" Arreat Plateau": "AP",
" Barracks": "Bar",
" Black Marsh": "BM",
" Blood Moor": "BldM",
" Burial Grounds": "BG",
" Canyon of the Magi": "CotM",
"Catacombs": "Cat",
" Cathedral": "Cath",
" Cold Plains": "CP",
" Crystalline Passage": "CrP",
" Dark Wood": "DW",
" Den of Evil": "DoE",
" Drifter Cavern": "DC",
" Dry Hills": "DH",
"Durance Of Hate": "DoH",
" Far Oasis": "FO",
"Flayer Dungeon": "FD",
" Flayer Jungle": "FJ",
"Forgotten Tower": "FT",
" Frigid Highlands": "FH",
" Frozen River": "FR",
" Glacial Trail": "GT",
" Great Marsh": "GM",
" Halls of Pain": "HoP",
"Halls Of The Dead": "HotD",
" Halls of Vaught": "HoV",
"Jail": "Jail",
" Kurast Bazaar": "KB",
"Kurast Sewers": "KS",
" Kurast Causeway": "KC",
" Lost City": "LC",
" Lower Kurast": "LK",
"Maggot Lair": "ML",
"Marsh Of Pain": "MoP",
" Mausoleum": "Maus",
"Moo Moo Farm": "MMF",
" Pit of Acheron": "PoA",
" Plains of Despair": "PoD",
"Pit": "Pit",
" River of Flame": "RoF",
" Rocky Waste": "RW",
"Sewers": "Sew",
" Spider Cavern": "SC",
" Spider Forest": "SF",
" Stony Field": "StF",
"Stony Tomb": "ST",
"Swampy Pit": "SwP",
" Tal Rasha's Tomb": "TRT",
" Tamoe Highland": "TH",
"The Crypt": " Crypt",
"The Hole": "Hole",
"The Pit": "Pit",
" The Secret Cow Level": "Cows",
"Tower Cellar": "TC",
" Travincal": "Trav",
" Tristram": "Tris",
"Underground Passage": "UP",
" Valley of Snakes": "VoS",
"Worldstone Keep": "WSK",
}
IMMUNITY_EMOJIS = {
"f": "🔥",
"c": "❄️",
"l": "⚡",
"p": "☠️",
"ph": "🟣",
"m": "🧪",
}
IMMUNITY_LABELS = {
"f": "Fire",
"c": "Cold",
"l": "Lightning",
"p": "Poison",
"ph": "Physical",
"m": "Magic",
}
# =========================================================
# DATABASE
# =========================================================
from typing import Optional
def cleanup_guild(guild_id: int, reason: str, guild_name: Optional[str] = None) -> None:
"""
Centralized cleanup for a guild config.
Logs clearly why it happened.
"""
delete_guild_config(guild_id)
guild_config_key_cache.pop(guild_id, None)
name_part = f" ({guild_name})" if guild_name else ""
logger.info(f"[CLEANUP] Guild {guild_id}{name_part} removed | reason: {reason}")
def audit_guild_configs(bot_guilds: list) -> None:
"""
Startup audit:
Removes any guild configs that no longer exist in bot.guilds.
"""
configs = get_all_guild_configs()
bot_guild_map = {g.id: g.name for g in bot_guilds}
removed = 0
for config in configs:
guild_id = int(config["guild_id"])
if guild_id not in bot_guild_map:
cleanup_guild(
guild_id,
"startup audit: bot not in guild",
guild_name=None,
)
removed += 1
logger.info(f"[AUDIT] Startup guild audit complete | removed stale configs: {removed}")
def get_db_connection() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db() -> None:
with get_db_connection() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS guild_config (
guild_id INTEGER PRIMARY Key,
announce_channel_id INTEGER,
nickname_enabled INTEGER NOT NULL DEFAULT 1,
announcements_enabled INTEGER NOT NULL DEFAULT 1
)
"""
)
conn.commit()
def upsert_guild_config(
guild_id: int,
announce_channel_id: Optional[int] = None,
nickname_enabled: Optional[bool] = None,
announcements_enabled: Optional[bool] = None,
) -> None:
with get_db_connection() as conn:
existing = conn.execute(
"SELECT * FROM guild_config WHERE guild_id = ?",
(guild_id,),
).fetchone()
if existing is None:
conn.execute(
"""
INSERT INTO guild_config (
guild_id,
announce_channel_id,
nickname_enabled,
announcements_enabled
) VALUES (?, ?, ?, ?)
""",
(
guild_id,
announce_channel_id,
1 if nickname_enabled is None else int(nickname_enabled),
1 if announcements_enabled is None else int(announcements_enabled),
),
)
else:
new_channel_id = announce_channel_id if announce_channel_id is not None else existing["announce_channel_id"]
new_nickname_enabled = int(nickname_enabled) if nickname_enabled is not None else existing["nickname_enabled"]
new_announcements_enabled = (
int(announcements_enabled)
if announcements_enabled is not None
else existing["announcements_enabled"]
)
conn.execute(
"""
UPDATE guild_config
SET announce_channel_id = ?,
nickname_enabled = ?,
announcements_enabled = ?
WHERE guild_id = ?
""",
(
new_channel_id,
new_nickname_enabled,
new_announcements_enabled,
guild_id,
),
)
conn.commit()
def get_guild_config(guild_id: int) -> Optional[sqlite3.Row]:
with get_db_connection() as conn:
return conn.execute(
"SELECT * FROM guild_config WHERE guild_id = ?",
(guild_id,),
).fetchone()
def get_all_guild_configs() -> list[sqlite3.Row]:
with get_db_connection() as conn:
return conn.execute("SELECT * FROM guild_config ORDER BY guild_id").fetchall()
def delete_guild_config(guild_id: int) -> None:
with get_db_connection() as conn:
conn.execute("DELETE FROM guild_config WHERE guild_id = ?", (guild_id,))
conn.commit()
# =========================================================
# HTTP
# =========================================================
async def get_http_session() -> aiohttp.ClientSession:
global http_session
if http_session is None or http_session.closed:
timeout = aiohttp.ClientTimeout(total=15)
http_session = aiohttp.ClientSession(timeout=timeout)
return http_session
# =========================================================
# D2TZ API
# =========================================================
async def fetch_current_zone_data() -> Any:
session = await get_http_session()
params = {"token": D2TZ_TOKEN}
async with session.get(D2TZ_API_URL, params=params) as resp:
text = await resp.text()
if resp.status != 200:
logger.error(f"D2TZ API returned HTTP {resp.status}: {text[:300]}")
raise RuntimeError(f"D2TZ API returned HTTP {resp.status}: {text[:300]}")
try:
data = json.loads(text)
except json.JSONDecodeError as e:
logger.error(f"D2TZ API did not return valid JSON: {text[:300]}")
raise RuntimeError(f"D2TZ API did not return valid JSON: {text[:300]}") from e
if isinstance(data, dict) and data.get("status") == "error":
logger.error(f"D2TZ API error: {data.get('message', 'unknown error')}")
raise RuntimeError(f"D2TZ API error: {data.get('message', 'unknown error')}")
return data
def normalize_zone_name(name: str) -> str:
return name.replace("_", " ").strip()
def format_zone_name(zone_names: Any) -> Optional[str]:
if isinstance(zone_names, list) and zone_names:
cleaned = [normalize_zone_name(str(z)) for z in zone_names if str(z).strip()]
if cleaned:
return " / ".join(cleaned)
if isinstance(zone_names, str) and zone_names.strip():
return normalize_zone_name(zone_names)
return None
def format_immunities(immunities: Any) -> str:
if not isinstance(immunities, list):
return ""
out = []
for code in immunities:
code_str = str(code).strip().lower()
emoji = IMMUNITY_EMOJIS.get(code_str)
out.append(emoji if emoji else code_str.upper())
return "".join(out)
def format_tiers(entry: dict) -> str:
exp_tier = str(entry.get("tier-exp", "")).strip()
loot_tier = str(entry.get("tier-loot", "")).strip()
if exp_tier and loot_tier:
return f"{exp_tier}/{loot_tier}"
if exp_tier:
return f"{exp_tier}/?"
if loot_tier:
return f"?/{loot_tier}"
return ""
def make_zone_key(zone_name: str, end_time: Optional[int]) -> str:
return f"{zone_name}|{end_time}"
def parse_zone_entry(entry: dict, now: int) -> Tuple[Optional[str], Optional[int], Optional[int], str, str, Optional[str], Optional[int]]:
zone_name = format_zone_name(entry.get("zone_name"))
start_time = entry.get("time")
end_time = entry.get("end_time")
start_ts = int(start_time) if isinstance(start_time, (int, float)) else None
end_ts = int(end_time) if isinstance(end_time, (int, float)) else None
minutes_left = None
if end_ts is not None:
seconds_left = max(0, end_ts - now)
minutes_left = seconds_left // 60
immunity_icons = format_immunities(entry.get("immunities"))
tier_text = format_tiers(entry)
zone_key = make_zone_key(zone_name, end_ts) if zone_name else None
return zone_name, minutes_left, end_ts, immunity_icons, tier_text, zone_key, start_ts
def extract_current_and_next_zone_info(
data: Any,
) -> Tuple[
Optional[str], Optional[int], Optional[int], str, str, Optional[str],
Optional[str], Optional[int]
]:
now = int(time.time())
if isinstance(data, list):
entries = [item for item in data if isinstance(item, dict)]
if len(entries) >= 2:
sorted_entries = sorted(
entries,
Key=lambda e: int(e.get("end_time")) if isinstance(e.get("end_time"), (int, float)) else 10**18
)
current_entry = sorted_entries[0]
next_entry = sorted_entries[1]
current_zone_name, current_minutes_left, current_end_time, current_immunity_icons, current_tier_text, current_zone_key, _ = parse_zone_entry(current_entry, now)
next_zone_name, _, _, _, _, _, next_start_time = parse_zone_entry(next_entry, now)
next_seconds_until = None
if next_start_time is not None:
next_seconds_until = max(0, next_start_time - now)
return (
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
next_zone_name,
next_seconds_until,
)
elif len(entries) == 1:
current_zone_name, current_minutes_left, current_end_time, current_immunity_icons, current_tier_text, current_zone_key, _ = parse_zone_entry(entries[0], now)
return (
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
None,
None,
)
if isinstance(data, dict):
current_zone_name, current_minutes_left, current_end_time, current_immunity_icons, current_tier_text, current_zone_key, _ = parse_zone_entry(data, now)
return (
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
None,
None,
)
return None, None, None, "", "", None, None, None
async def refresh_tz_cache(force: bool = False) -> bool:
"""
Refresh API cache only when needed.
Returns True if zone Key changed.
"""
global tz_cache
now = int(time.time())
current_end_time = tz_cache.get("end_time")
old_zone_key = tz_cache.get("zone_key")
should_refresh = force
if not should_refresh:
if current_end_time is None:
should_refresh = True
elif now >= int(current_end_time) - REFRESH_EARLY_SECONDS:
should_refresh = True
if not should_refresh:
return False
data = await fetch_current_zone_data()
(
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
next_zone_name,
next_seconds_until,
) = extract_current_and_next_zone_info(data)
if not current_zone_name:
raise RuntimeError(f"Could not find current zone in API response: {data}")
next_start_time = None
if next_seconds_until is not None:
next_start_time = now + next_seconds_until
tz_cache = {
"zone_name": current_zone_name,
"end_time": current_end_time,
"immunity_icons": current_immunity_icons,
"tier_text": current_tier_text,
"zone_key": current_zone_key,
"next_zone_name": next_zone_name,
"next_start_time": next_start_time,
"last_fetch_time": now,
}
return old_zone_key != current_zone_key
def get_cached_zone_snapshot() -> Tuple[str, Optional[int], Optional[int], str, str, Optional[str], Optional[str], Optional[int]]:
zone_name = tz_cache.get("zone_name")
end_time = tz_cache.get("end_time")
immunity_icons = tz_cache.get("immunity_icons", "")
tier_text = tz_cache.get("tier_text", "")
zone_key = tz_cache.get("zone_key")
next_zone_name = tz_cache.get("next_zone_name")
next_start_time = tz_cache.get("next_start_time")
if not zone_name:
raise RuntimeError("TZ cache is empty")
now = int(time.time())
minutes_left = None
if isinstance(end_time, int):
seconds_left = max(0, end_time - now)
minutes_left = seconds_left // 60
next_seconds_until = None
if isinstance(next_start_time, int):
next_seconds_until = max(0, next_start_time - now)
return zone_name, minutes_left, end_time, immunity_icons, tier_text, zone_key, next_zone_name, next_seconds_until
# =========================================================
# NICKNAME BUILDING
# =========================================================
def abbreviate_single_zone(zone_name: str) -> str:
normalized = normalize_zone_name(zone_name)
if normalized in ZONE_ABBREVIATIONS:
return ZONE_ABBREVIATIONS[normalized]
words = [w for w in normalized.replace("'", "").split() if w]
if not words:
return normalized
if len(words) == 1:
return words[0][:4]
return "".join(word[0].upper() for word in words)
def abbreviate_zone_string(zone_string: str) -> str:
parts = [part.strip() for part in zone_string.split("/")]
abbreviated_parts = [abbreviate_single_zone(part) for part in parts if part.strip()]
return " / ".join(abbreviated_parts)
def build_nickname(zone_name: str, minutes_left: Optional[int], immunity_icons: str, tier_text: str) -> str:
time_part = f" ({minutes_left}m)" if minutes_left is not None else ""
extras = []
if immunity_icons:
extras.append(immunity_icons)
if tier_text:
extras.append(tier_text)
extra_part = f" {' '.join(extras)}" if extras else ""
full_name = f"{zone_name}{extra_part}{time_part}"
if len(full_name) <= MAX_NICKNAME_LEN:
return full_name
abbreviated_zone = abbreviate_zone_string(zone_name)
abbreviated_full = f"{abbreviated_zone}{extra_part}{time_part}"
if len(abbreviated_full) <= MAX_NICKNAME_LEN:
return abbreviated_full
if immunity_icons and tier_text:
reduced_extra = f" {immunity_icons}"
reduced_name = f"{abbreviated_zone}{reduced_extra}{time_part}"
if len(reduced_name) <= MAX_NICKNAME_LEN:
return reduced_name
tiny_parts = []
for part in [p.strip() for p in zone_name.split("/")]:
abbr = abbreviate_single_zone(part)
tiny_parts.append(abbr[:3] if len(abbr) > 3 else abbr)
tiny_zone = " / ".join(tiny_parts)
tiny_full = f"{tiny_zone}{extra_part}{time_part}"
if len(tiny_full) <= MAX_NICKNAME_LEN:
return tiny_full
tiny_reduced = f"{tiny_zone}{(' ' + immunity_icons) if immunity_icons else ''}{time_part}"
if len(tiny_reduced) <= MAX_NICKNAME_LEN:
return tiny_reduced
absolute_fallback = f"{tiny_zone}{time_part}"
if len(absolute_fallback) <= MAX_NICKNAME_LEN:
return absolute_fallback
return absolute_fallback[:MAX_NICKNAME_LEN]
# =========================================================
# STATUS HELPERS
# =========================================================
def format_duration(seconds: int) -> str:
seconds = max(0, int(seconds))
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, secs = divmod(rem, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
if secs or not parts:
parts.append(f"{secs}s")
return " ".join(parts)
def get_status_snapshot() -> dict[str, Any]:
now = int(time.time())
zone_name = tz_cache.get("zone_name")
end_time = tz_cache.get("end_time")
next_zone_name = tz_cache.get("next_zone_name")
next_start_time = tz_cache.get("next_start_time")
last_fetch_time = tz_cache.get("last_fetch_time", 0)
seconds_left = None
if isinstance(end_time, int):
seconds_left = max(0, end_time - now)
next_seconds_until = None
if isinstance(next_start_time, int):
next_seconds_until = max(0, next_start_time - now)
uptime_seconds = int((datetime.now(timezone.utc) - bot_start_time).total_seconds())
return {
"uptime_seconds": uptime_seconds,
"guild_count_live": len(bot.guilds),
"guild_count_configured": len(get_all_guild_configs()),
"zone_name": zone_name,
"seconds_left": seconds_left,
"next_zone_name": next_zone_name,
"next_seconds_until": next_seconds_until,
"last_fetch_time": last_fetch_time,
"zone_key": tz_cache.get("zone_key"),
}
# =========================================================
# DISCORD HELPERS
# =========================================================
async def get_bot_member(guild: discord.Guild) -> Optional[discord.Member]:
me = guild.me
if me is None and bot.user is not None:
try:
me = await guild.fetch_member(bot.user.id)
except discord.HTTPException:
return None
return me
async def get_text_channel(channel_id: int) -> Optional[discord.TextChannel]:
channel = bot.get_channel(channel_id)
if channel is None:
try:
channel = await bot.fetch_channel(channel_id)
except discord.HTTPException:
return None
return channel if isinstance(channel, discord.TextChannel) else None
async def update_guild_nickname(
guild: discord.Guild,
zone_name: str,
minutes_left: Optional[int],
immunity_icons: str,
tier_text: str,
) -> None:
me = await get_bot_member(guild)
if me is None:
logger.warning(f"Could not fetch bot member in guild {guild.id}")
return
nickname = build_nickname(zone_name, minutes_left, immunity_icons, tier_text)
if me.nick != nickname:
try:
await me.edit(nick=nickname, reason="Updating nickname with cached TZ info")
logger.info(f"[{guild.id}] Nickname updated to: {nickname}")
except discord.Forbidden:
logger.warning(f"[{guild.id}] Missing permission to change nickname.")
except discord.HTTPException as e:
logger.error(f"[{guild.id}] Failed to change nickname: {e}")
async def announce_zone_change(
channel_id: int,
old_zone: Optional[str],
new_zone: str,
minutes_left: Optional[int],
immunity_icons: str,
tier_text: str,
) -> None:
channel = await get_text_channel(channel_id)
if channel is None:
logger.warning(f"Could not access announce channel {channel_id}")
return
bits = []
if immunity_icons:
bits.append(immunity_icons)
if tier_text:
bits.append(f"EXP/LOOT {tier_text}")
if minutes_left is not None:
bits.append(f"{minutes_left}m left")
suffix = f" ({', '.join(bits)})" if bits else ""
if old_zone is None:
message = f"Current zone detected: **{new_zone}**{suffix}"
else:
message = f"Zone changed: **{old_zone}** → **{new_zone}**{suffix}"
try:
await channel.send(message)
logger.info(f"[{channel.guild.id}] Announcement sent: {message}")
except discord.Forbidden:
logger.warning(f"[{channel.guild.id}] Missing permission to send messages in {channel_id}.")
except discord.HTTPException as e:
logger.error(f"[{channel.guild.id}] Failed to send message: {e}")
# =========================================================
# PER-GUILD PROCESSING
# =========================================================
async def process_single_guild(
guild_config: sqlite3.Row,
zone_name: str,
minutes_left: Optional[int],
immunity_icons: str,
tier_text: str,
zone_key: Optional[str],
allow_announcement: bool,
) -> None:
guild_id = int(guild_config["guild_id"])
guild = bot.get_guild(guild_id)
if guild is None:
cleanup_guild(
guild_id,
"stale config detected during loop",
guild_name=None,
)
return
if guild_config["nickname_enabled"]:
await update_guild_nickname(guild, zone_name, minutes_left, immunity_icons, tier_text)
if (
allow_announcement
and guild_config["announcements_enabled"]
and guild_config["announce_channel_id"]
and zone_key
):
old_zone_key = guild_config_key_cache.get(guild_id)
if zone_key != old_zone_key:
old_zone = old_zone_key.split("|", 1)[0] if old_zone_key else None
await announce_zone_change(
int(guild_config["announce_channel_id"]),
old_zone,
zone_name,
minutes_left,
immunity_icons,
tier_text,
)
guild_config_key_cache[guild_id] = zone_key
# =========================================================
# BACKGROUND TASKS
# =========================================================
async def minute_aligned_updater() -> None:
await bot.wait_until_ready()
while not bot.is_closed():
try:
now = time.time()
sleep_for = 60 - (now % 60)
await asyncio.sleep(sleep_for)
if not tz_cache.get("zone_name"):
continue
zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()
configs = get_all_guild_configs()
for config in configs:
# minute updater does not send zone-change announcements
await process_single_guild(
config,
zone_name,
minutes_left,
immunity_icons,
tier_text,
zone_key,
allow_announcement=False,
)
except Exception as e:
logger.error(f"minute_aligned_updater error: {e}")
await asyncio.sleep(5)
async def api_refresh_loop() -> None:
await bot.wait_until_ready()
while not bot.is_closed():
try:
if not tz_cache.get("zone_name") or not tz_cache.get("end_time"):
changed = await refresh_tz_cache(force=True)
zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()
configs = get_all_guild_configs()
for config in configs:
await process_single_guild(
config,
zone_name,
minutes_left,
immunity_icons,
tier_text,
zone_key,
allow_announcement=changed,
)
await asyncio.sleep(5)
continue
now = int(time.time())
refresh_at = int(tz_cache["end_time"]) - REFRESH_EARLY_SECONDS
sleep_for = max(1, refresh_at - now)
await asyncio.sleep(sleep_for)
changed = await refresh_tz_cache(force=True)
zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()
configs = get_all_guild_configs()
for config in configs:
await process_single_guild(
config,
zone_name,
minutes_left,
immunity_icons,
tier_text,
zone_key,
allow_announcement=changed,
)
except Exception as e:
logger.error(f"api_refresh_loop error: {e}")
await asyncio.sleep(10)
# =========================================================
# COMMANDS
# =========================================================
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def setup(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announce_channel_id=ctx.channel.id,
nickname_enabled=True,
announcements_enabled=True,
)
await ctx.send(
f"Setup complete.\n"
f"Announcements channel: {ctx.channel.mention}\n"
f"Nickname updates: enabled\n"
f"Announcements: enabled"
)
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def setchannel(ctx: commands.Context, channel: discord.TextChannel) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announce_channel_id=channel.id,
)
await ctx.send(f"Announcement channel set to {channel.mention}.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def enabletz(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=True,
announcements_enabled=True,
)
await ctx.send(" Terror Zone updates enabled for this server.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def disabletz(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=False,
announcements_enabled=False,
)
await ctx.send(" Terror Zone updates disabled for this server.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def nickon(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=True,
)
await ctx.send("Nickname updates enabled for this server.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def nickoff(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=False,
)
await ctx.send("Nickname updates disabled for this server.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def announceon(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announcements_enabled=True,
)
await ctx.send("Announcements enabled for this server.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def announceoff(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announcements_enabled=False,
)
await ctx.send("Announcements disabled for this server.")
@bot.command()
@commands.guild_only()
async def showconfig(ctx: commands.Context) -> None:
config = get_guild_config(ctx.guild.id)
if config is None:
await ctx.send("This server is not configured yet. Run `!setup` in the channel you want to use.")
return
channel_id = config["announce_channel_id"]
channel_text = f"<#{channel_id}>" if channel_id else "Not set"
await ctx.send(
f"Guild config:\n"
f"- Announce channel: {channel_text}\n"
f"- Nickname updates: {'enabled' if config['nickname_enabled'] else 'disabled'}\n"
f"- Announcements: {'enabled' if config['announcements_enabled'] else 'disabled'}"
)
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def removetz(ctx: commands.Context) -> None:
delete_guild_config(ctx.guild.id)
guild_config_key_cache.pop(ctx.guild.id, None)
await ctx.send("This server's Terror Zone configuration was removed.")
@bot.command()
async def zone(ctx: commands.Context) -> None:
try:
if not tz_cache.get("zone_name"):
await refresh_tz_cache(force=True)
zone_name, minutes_left, end_time, immunity_icons, tier_text, _zone_key, next_zone_name, next_seconds_until = get_cached_zone_snapshot()
nickname_preview = build_nickname(zone_name, minutes_left, immunity_icons, tier_text)
msg = f"Current zone: **{zone_name}**"
if immunity_icons:
msg += f"\nImmunities: {immunity_icons}"
if tier_text:
msg += f"\nEXP/LOOT tier: `{tier_text}`"
if minutes_left is not None:
msg += f"\nTime left: `{minutes_left}m`"
if next_zone_name:
if next_seconds_until is not None:
next_minutes = next_seconds_until // 60
msg += f"\nNext zone: **{next_zone_name}** (in {next_minutes}m)"
else:
msg += f"\nNext zone: **{next_zone_name}**"
msg += f"\nNickname preview: `{nickname_preview}`"
if end_time is not None:
msg += f"\nEnd time (unix): `{end_time}`"
msg += f"\nLast API fetch: `<t:{tz_cache['last_fetch_time']}:R>`"
await ctx.send(msg)
except Exception as e:
await ctx.send(f"Error reading current zone: `{e}`")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def zonedebug(ctx: commands.Context) -> None:
try:
data = await fetch_current_zone_data()
pretty = json.dumps(data, indent=2, ensure_ascii=False)
if len(pretty) > 3500:
pretty = pretty[:3500] + "\n... (truncated)"
await ctx.send(f"```json\n{pretty}\n```")
except Exception as e:
await ctx.send(f"Debug failed: `{e}`")
@bot.command()
async def legend(ctx: commands.Context) -> None:
lines = ["Zone abbreviation legend:"]
for full_name in sorted(ZONE_ABBREVIATIONS):
lines.append(f"{ZONE_ABBREVIATIONS[full_name]} = {full_name}")
lines.append("")
lines.append("Immunity legend:")
for code in ["f", "c", "l", "p", "ph", "m"]:
if code in IMMUNITY_EMOJIS:
lines.append(f"{IMMUNITY_EMOJIS[code]} = {IMMUNITY_LABELS.get(code, code)}")
message = "\n".join(lines)
if len(message) <= 1900:
await ctx.send(f"```text\n{message}\n```")
return
chunks = []
current = ""
for line in lines:
if len(current) + len(line) + 1 > 1900:
chunks.append(current)
current = line + "\n"
else:
current += line + "\n"
if current:
chunks.append(current)
for chunk in chunks:
await ctx.send(f"```text\n{chunk}\n```")
@bot.command()
async def status(ctx: commands.Context) -> None:
try:
snapshot = get_status_snapshot()
uptime_text = format_duration(snapshot["uptime_seconds"])
configured = snapshot["guild_count_configured"]
live = snapshot["guild_count_live"]
msg = [
"**Bot status**",
f"Uptime: `{uptime_text}`",
f"Connected guilds: `{live}`",
f"Configured guilds: `{configured}`",
]
if snapshot["zone_name"]:
msg.append(f"Cached current zone: **{snapshot['zone_name']}**")
else:
msg.append("Cached current zone: `none`")
if snapshot["seconds_left"] is not None:
msg.append(f"Time left: `{format_duration(snapshot['seconds_left'])}`")
if snapshot["next_zone_name"]:
if snapshot["next_seconds_until"] is not None:
msg.append(
f"Next zone: **{snapshot['next_zone_name']}** "
f"(in {format_duration(snapshot['next_seconds_until'])})"
)
else:
msg.append(f"Next zone: **{snapshot['next_zone_name']}**")
if snapshot["last_fetch_time"]:
msg.append(f"Last API fetch: <t:{snapshot['last_fetch_time']}:F>")
msg.append(f"Last API fetch (relative): <t:{snapshot['last_fetch_time']}:R>")
if snapshot["zone_key"]:
msg.append(f"Zone Key: `{snapshot['zone_key']}`")
await ctx.send("\n".join(msg))
except Exception as e:
logger.error(f"status command error: {e}")
await ctx.send(f"Status error: `{e}`")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def forcezone(ctx: commands.Context) -> None:
try:
changed = await refresh_tz_cache(force=True)
zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()
config = get_guild_config(ctx.guild.id)
if config is None:
await ctx.send("This server is not configured yet. Run `!setup` first.")
return
if config["nickname_enabled"]:
await update_guild_nickname(ctx.guild, zone_name, minutes_left, immunity_icons, tier_text)
if config["announcements_enabled"] and config["announce_channel_id"]:
old_key = guild_config_key_cache.get(ctx.guild.id)
old_zone = old_key.split("|", 1)[0] if old_key else None
await announce_zone_change(
int(config["announce_channel_id"]),
old_zone,
zone_name,
minutes_left,
immunity_icons,
tier_text,
)
if zone_key:
guild_config_key_cache[ctx.guild.id] = zone_key
await ctx.send(
f"Forced refresh complete: **{build_nickname(zone_name, minutes_left, immunity_icons, tier_text)}**"
f"\nAPI returned {'a new zone' if changed else 'the same zone'}."
)
except Exception as e:
await ctx.send(f"Force update failed: `{e}`")
@bot.command(name="help")
async def help_command(ctx: commands.Context) -> None:
is_guild = ctx.guild is not None
can_manage_guild = (
is_guild and
isinstance(ctx.author, discord.Member) and
ctx.author.guild_permissions.manage_guild
)
lines = [
"**D2TZ Bot Help**",
"",
"**Public commands**",
"`!zone` - Show current Terror Zone, time left, next zone, and nickname preview.",
"`!status` - Show bot uptime, cached TZ info, and basic health/status.",
"`!legend` - Show zone abbreviations and immunity emoji meanings.",
]
if can_manage_guild:
lines.extend([
"",
"**Server admin commands**",
"`!setup` - Initialize the bot in this server and use the current channel for announcements.",
"`!setchannel #channel` - Set the announcement channel for this server.",
"`!showconfig` - Show this server's current bot configuration.",
"`!enabletz` - Enable both nickname updates and announcements.",
"`!disabletz` - Disable both nickname updates and announcements.",
"`!nickon` - Enable nickname updates only.",
"`!nickoff` - Disable nickname updates only.",
"`!announceon` - Enable announcements only.",
"`!announceoff` - Disable announcements only.",
"`!removetz` - Remove this server's TZ configuration.",
"`!forcezone` - Force an immediate TZ refresh and update.",
"`!zonedebug` - Show raw API data for debugging.",
])
if not is_guild:
lines.extend([
"",
"_Note: server setup commands only work inside a server, not in DMs._"
])
elif not can_manage_guild:
lines.extend([
"",
"_You do not currently have the 'Manage Server' permission, so admin commands are hidden._"
])
await ctx.send("\n".join(lines))
# =========================================================
# EVENTS
# =========================================================
@bot.event
async def on_ready() -> None:
global bg_tasks_started
logger.info(f"Logged in as {bot.user} ({bot.user.id})")
init_db()
await get_http_session()
await refresh_tz_cache(force=True)
# 🔍 Startup audit
audit_guild_configs(bot.guilds)
# Rebuild cache safely
configs = get_all_guild_configs()
for config in configs:
guild_config_key_cache.setdefault(int(config["guild_id"]), "")
if not bg_tasks_started:
bg_tasks_started = True
asyncio.create_task(minute_aligned_updater())
asyncio.create_task(api_refresh_loop())
@bot.event
async def on_guild_remove(guild: discord.Guild) -> None:
cleanup_guild(
guild.id,
"bot removed from server",
guild.name,
)
@bot.event
async def on_command_error(ctx: commands.Context, error: Exception) -> None:
if isinstance(error, commands.MissingPermissions):
await ctx.send("You need the 'Manage Server' permission to use that command.")
elif isinstance(error, commands.CheckFailure):
await ctx.send("You do not have permission to use that command.")
elif isinstance(error, commands.NoPrivateMessage):
await ctx.send("This command can only be used inside a server.")
elif isinstance(error, commands.MissingRequiredArgument):
await ctx.send("Missing argument for that command.")
elif isinstance(error, commands.BadArgument):
await ctx.send("Invalid argument.")
elif isinstance(error, commands.CommandNotFound):
return
else:
logger.error(f"Command error ({ctx.command}): {error}")
await ctx.send("An unexpected error occurred.")
# =========================================================
# MAIN
# =========================================================
async def main() -> None:
init_db()
try:
await bot.start(BOT_TOKEN)
finally:
global http_session
if http_session and not http_session.closed:
await http_session.close()
if __name__ == "__main__":
asyncio.run(main())
It also sends a text message when the zone changes, but you can disable that if you want to ( I would keep it since it transmits the message without abreviations ).
A practical note: because nicknames are short, this code uses a priority order:
full zone names + immunities + tiers + time
abbreviated zone names + immunities + tiers + time
abbreviated zone names + immunities + time
abbreviated zone names + time
So if the nickname gets too long, the tiers are the first thing it drops, then it keeps the immunities and timer. That keeps the most useful info visible.
I took the liberty to also add the TZ time remaining, the Imunities displayed as emojis in the name, and also the loot/exp tier for said zones.
As a needed side-effect, I also abreviated the zone names when the bot changes his nick, since there is a hard 32 char limit on the nick a user can have on discord.
You will need to run it locally and invite it to your server.
It needs an .env file located in the same directory as the python file, with the following format:
DISCORD_BOT_TOKEN=discord_bot_token
D2TZ_TOKEN=your_d2tz_token ( TAKE CARE IT EXPIRES EVERY MONTH )
CHECK_EVERY_SECONDS=60 ( refresh duration in seconds )
The following is the python code needed to run the bot:
import asyncio
import json
import os
import sqlite3
import time
from pathlib import Path
from typing import Any, Optional, Tuple
import aiohttp
import discord
from discord.ext import commands
from dotenv import load_dotenv
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timezone
load_dotenv()
# =========================================================
# CONFIG
# =========================================================
BOT_TOKEN = os.getenv("DISCORD_BOT_TOKEN")
D2TZ_TOKEN = os.getenv("D2TZ_TOKEN")
MAX_NICKNAME_LEN = 32
# Refresh a little before expiry to avoid boundary issues
REFRESH_EARLY_SECONDS = 10
D2TZ_API_URL = "https://api.d2tz.info/public/tz"
DB_PATH = Path("bot_data.sqlite3")
if not BOT_TOKEN:
raise RuntimeError("DISCORD_BOT_TOKEN is not set in .env")
if not D2TZ_TOKEN:
raise RuntimeError("D2TZ_TOKEN is not set in .env")
# =========================================================
# LOGGING SETUP
# =========================================================
LOG_DIR = Path("logs")
LOG_DIR.mkdir(exist_ok=True)
ACTIVITY_LOG_FILE = LOG_DIR / "activity.log"
ERROR_LOG_FILE = LOG_DIR / "error.log"
LOG_FORMAT = "[%(asctime)s] [%(levelname)s] %(message)s"
LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
formatter = logging.Formatter(LOG_FORMAT, datefmt=LOG_DATEFMT)
logger = logging.getLogger("d2tzbot")
logger.setLevel(logging.INFO)
logger.propagate = False
# Prevent duplicate handlers on reload/restart
if logger.handlers:
logger.handlers.clear()
# Console / systemd journal
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
# Activity log: everything INFO and above
activity_handler = RotatingFileHandler(
ACTIVITY_LOG_FILE,
maxBytes=5 * 1024 * 1024,
backupCount=5,
encoding="utf-8",
)
activity_handler.setLevel(logging.INFO)
activity_handler.setFormatter(formatter)
# Error log: WARNING and ERROR only
error_handler = RotatingFileHandler(
ERROR_LOG_FILE,
maxBytes=5 * 1024 * 1024,
backupCount=5,
encoding="utf-8",
)
error_handler.setLevel(logging.WARNING)
error_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.addHandler(activity_handler)
logger.addHandler(error_handler)
# =========================================================
# DISCORD SETUP
# =========================================================
intents = discord.Intents.default()
intents.message_content = True
bot = commands.Bot(command_prefix="!", intents=intents, help_command=None)
http_session: Optional[aiohttp.ClientSession] = None
# Per-guild last announced zone Key cache
guild_config_key_cache: dict[int, str] = {}
# Cached TZ data
tz_cache: dict[str, Any] = {
"zone_name": None,
"end_time": None,
"immunity_icons": "",
"tier_text": "",
"zone_key": None,
"next_zone_name": None,
"next_start_time": None,
"last_fetch_time": 0,
}
bg_tasks_started = False
bot_start_time = datetime.now(timezone.utc)
# =========================================================
# ABBREVIATIONS / DISPLAY
# =========================================================
ZONE_ABBREVIATIONS = {
" Abaddon": "Ab",
" Ancient Tunnels": "AT",
" Arachnid Lair": "AL",
" Arcane Sanctuary": "AS",
" Arreat Plateau": "AP",
" Barracks": "Bar",
" Black Marsh": "BM",
" Blood Moor": "BldM",
" Burial Grounds": "BG",
" Canyon of the Magi": "CotM",
"Catacombs": "Cat",
" Cathedral": "Cath",
" Cold Plains": "CP",
" Crystalline Passage": "CrP",
" Dark Wood": "DW",
" Den of Evil": "DoE",
" Drifter Cavern": "DC",
" Dry Hills": "DH",
"Durance Of Hate": "DoH",
" Far Oasis": "FO",
"Flayer Dungeon": "FD",
" Flayer Jungle": "FJ",
"Forgotten Tower": "FT",
" Frigid Highlands": "FH",
" Frozen River": "FR",
" Glacial Trail": "GT",
" Great Marsh": "GM",
" Halls of Pain": "HoP",
"Halls Of The Dead": "HotD",
" Halls of Vaught": "HoV",
"Jail": "Jail",
" Kurast Bazaar": "KB",
"Kurast Sewers": "KS",
" Kurast Causeway": "KC",
" Lost City": "LC",
" Lower Kurast": "LK",
"Maggot Lair": "ML",
"Marsh Of Pain": "MoP",
" Mausoleum": "Maus",
"Moo Moo Farm": "MMF",
" Pit of Acheron": "PoA",
" Plains of Despair": "PoD",
"Pit": "Pit",
" River of Flame": "RoF",
" Rocky Waste": "RW",
"Sewers": "Sew",
" Spider Cavern": "SC",
" Spider Forest": "SF",
" Stony Field": "StF",
"Stony Tomb": "ST",
"Swampy Pit": "SwP",
" Tal Rasha's Tomb": "TRT",
" Tamoe Highland": "TH",
"The Crypt": " Crypt",
"The Hole": "Hole",
"The Pit": "Pit",
" The Secret Cow Level": "Cows",
"Tower Cellar": "TC",
" Travincal": "Trav",
" Tristram": "Tris",
"Underground Passage": "UP",
" Valley of Snakes": "VoS",
"Worldstone Keep": "WSK",
}
IMMUNITY_EMOJIS = {
"f": "🔥",
"c": "❄️",
"l": "⚡",
"p": "☠️",
"ph": "🟣",
"m": "🧪",
}
IMMUNITY_LABELS = {
"f": "Fire",
"c": "Cold",
"l": "Lightning",
"p": "Poison",
"ph": "Physical",
"m": "Magic",
}
# =========================================================
# DATABASE
# =========================================================
from typing import Optional
def cleanup_guild(guild_id: int, reason: str, guild_name: Optional[str] = None) -> None:
"""
Centralized cleanup for a guild config.
Logs clearly why it happened.
"""
delete_guild_config(guild_id)
guild_config_key_cache.pop(guild_id, None)
name_part = f" ({guild_name})" if guild_name else ""
logger.info(f"[CLEANUP] Guild {guild_id}{name_part} removed | reason: {reason}")
def audit_guild_configs(bot_guilds: list) -> None:
"""
Startup audit:
Removes any guild configs that no longer exist in bot.guilds.
"""
configs = get_all_guild_configs()
bot_guild_map = {g.id: g.name for g in bot_guilds}
removed = 0
for config in configs:
guild_id = int(config["guild_id"])
if guild_id not in bot_guild_map:
cleanup_guild(
guild_id,
"startup audit: bot not in guild",
guild_name=None,
)
removed += 1
logger.info(f"[AUDIT] Startup guild audit complete | removed stale configs: {removed}")
def get_db_connection() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db() -> None:
with get_db_connection() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS guild_config (
guild_id INTEGER PRIMARY Key,
announce_channel_id INTEGER,
nickname_enabled INTEGER NOT NULL DEFAULT 1,
announcements_enabled INTEGER NOT NULL DEFAULT 1
)
"""
)
conn.commit()
def upsert_guild_config(
guild_id: int,
announce_channel_id: Optional[int] = None,
nickname_enabled: Optional[bool] = None,
announcements_enabled: Optional[bool] = None,
) -> None:
with get_db_connection() as conn:
existing = conn.execute(
"SELECT * FROM guild_config WHERE guild_id = ?",
(guild_id,),
).fetchone()
if existing is None:
conn.execute(
"""
INSERT INTO guild_config (
guild_id,
announce_channel_id,
nickname_enabled,
announcements_enabled
) VALUES (?, ?, ?, ?)
""",
(
guild_id,
announce_channel_id,
1 if nickname_enabled is None else int(nickname_enabled),
1 if announcements_enabled is None else int(announcements_enabled),
),
)
else:
new_channel_id = announce_channel_id if announce_channel_id is not None else existing["announce_channel_id"]
new_nickname_enabled = int(nickname_enabled) if nickname_enabled is not None else existing["nickname_enabled"]
new_announcements_enabled = (
int(announcements_enabled)
if announcements_enabled is not None
else existing["announcements_enabled"]
)
conn.execute(
"""
UPDATE guild_config
SET announce_channel_id = ?,
nickname_enabled = ?,
announcements_enabled = ?
WHERE guild_id = ?
""",
(
new_channel_id,
new_nickname_enabled,
new_announcements_enabled,
guild_id,
),
)
conn.commit()
def get_guild_config(guild_id: int) -> Optional[sqlite3.Row]:
with get_db_connection() as conn:
return conn.execute(
"SELECT * FROM guild_config WHERE guild_id = ?",
(guild_id,),
).fetchone()
def get_all_guild_configs() -> list[sqlite3.Row]:
with get_db_connection() as conn:
return conn.execute("SELECT * FROM guild_config ORDER BY guild_id").fetchall()
def delete_guild_config(guild_id: int) -> None:
with get_db_connection() as conn:
conn.execute("DELETE FROM guild_config WHERE guild_id = ?", (guild_id,))
conn.commit()
# =========================================================
# HTTP
# =========================================================
async def get_http_session() -> aiohttp.ClientSession:
global http_session
if http_session is None or http_session.closed:
timeout = aiohttp.ClientTimeout(total=15)
http_session = aiohttp.ClientSession(timeout=timeout)
return http_session
# =========================================================
# D2TZ API
# =========================================================
async def fetch_current_zone_data() -> Any:
session = await get_http_session()
params = {"token": D2TZ_TOKEN}
async with session.get(D2TZ_API_URL, params=params) as resp:
text = await resp.text()
if resp.status != 200:
logger.error(f"D2TZ API returned HTTP {resp.status}: {text[:300]}")
raise RuntimeError(f"D2TZ API returned HTTP {resp.status}: {text[:300]}")
try:
data = json.loads(text)
except json.JSONDecodeError as e:
logger.error(f"D2TZ API did not return valid JSON: {text[:300]}")
raise RuntimeError(f"D2TZ API did not return valid JSON: {text[:300]}") from e
if isinstance(data, dict) and data.get("status") == "error":
logger.error(f"D2TZ API error: {data.get('message', 'unknown error')}")
raise RuntimeError(f"D2TZ API error: {data.get('message', 'unknown error')}")
return data
def normalize_zone_name(name: str) -> str:
return name.replace("_", " ").strip()
def format_zone_name(zone_names: Any) -> Optional[str]:
if isinstance(zone_names, list) and zone_names:
cleaned = [normalize_zone_name(str(z)) for z in zone_names if str(z).strip()]
if cleaned:
return " / ".join(cleaned)
if isinstance(zone_names, str) and zone_names.strip():
return normalize_zone_name(zone_names)
return None
def format_immunities(immunities: Any) -> str:
if not isinstance(immunities, list):
return ""
out = []
for code in immunities:
code_str = str(code).strip().lower()
emoji = IMMUNITY_EMOJIS.get(code_str)
out.append(emoji if emoji else code_str.upper())
return "".join(out)
def format_tiers(entry: dict) -> str:
exp_tier = str(entry.get("tier-exp", "")).strip()
loot_tier = str(entry.get("tier-loot", "")).strip()
if exp_tier and loot_tier:
return f"{exp_tier}/{loot_tier}"
if exp_tier:
return f"{exp_tier}/?"
if loot_tier:
return f"?/{loot_tier}"
return ""
def make_zone_key(zone_name: str, end_time: Optional[int]) -> str:
return f"{zone_name}|{end_time}"
def parse_zone_entry(entry: dict, now: int) -> Tuple[Optional[str], Optional[int], Optional[int], str, str, Optional[str], Optional[int]]:
zone_name = format_zone_name(entry.get("zone_name"))
start_time = entry.get("time")
end_time = entry.get("end_time")
start_ts = int(start_time) if isinstance(start_time, (int, float)) else None
end_ts = int(end_time) if isinstance(end_time, (int, float)) else None
minutes_left = None
if end_ts is not None:
seconds_left = max(0, end_ts - now)
minutes_left = seconds_left // 60
immunity_icons = format_immunities(entry.get("immunities"))
tier_text = format_tiers(entry)
zone_key = make_zone_key(zone_name, end_ts) if zone_name else None
return zone_name, minutes_left, end_ts, immunity_icons, tier_text, zone_key, start_ts
def extract_current_and_next_zone_info(
data: Any,
) -> Tuple[
Optional[str], Optional[int], Optional[int], str, str, Optional[str],
Optional[str], Optional[int]
]:
now = int(time.time())
if isinstance(data, list):
entries = [item for item in data if isinstance(item, dict)]
if len(entries) >= 2:
sorted_entries = sorted(
entries,
Key=lambda e: int(e.get("end_time")) if isinstance(e.get("end_time"), (int, float)) else 10**18
)
current_entry = sorted_entries[0]
next_entry = sorted_entries[1]
current_zone_name, current_minutes_left, current_end_time, current_immunity_icons, current_tier_text, current_zone_key, _ = parse_zone_entry(current_entry, now)
next_zone_name, _, _, _, _, _, next_start_time = parse_zone_entry(next_entry, now)
next_seconds_until = None
if next_start_time is not None:
next_seconds_until = max(0, next_start_time - now)
return (
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
next_zone_name,
next_seconds_until,
)
elif len(entries) == 1:
current_zone_name, current_minutes_left, current_end_time, current_immunity_icons, current_tier_text, current_zone_key, _ = parse_zone_entry(entries[0], now)
return (
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
None,
None,
)
if isinstance(data, dict):
current_zone_name, current_minutes_left, current_end_time, current_immunity_icons, current_tier_text, current_zone_key, _ = parse_zone_entry(data, now)
return (
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
None,
None,
)
return None, None, None, "", "", None, None, None
async def refresh_tz_cache(force: bool = False) -> bool:
"""
Refresh API cache only when needed.
Returns True if zone Key changed.
"""
global tz_cache
now = int(time.time())
current_end_time = tz_cache.get("end_time")
old_zone_key = tz_cache.get("zone_key")
should_refresh = force
if not should_refresh:
if current_end_time is None:
should_refresh = True
elif now >= int(current_end_time) - REFRESH_EARLY_SECONDS:
should_refresh = True
if not should_refresh:
return False
data = await fetch_current_zone_data()
(
current_zone_name,
current_minutes_left,
current_end_time,
current_immunity_icons,
current_tier_text,
current_zone_key,
next_zone_name,
next_seconds_until,
) = extract_current_and_next_zone_info(data)
if not current_zone_name:
raise RuntimeError(f"Could not find current zone in API response: {data}")
next_start_time = None
if next_seconds_until is not None:
next_start_time = now + next_seconds_until
tz_cache = {
"zone_name": current_zone_name,
"end_time": current_end_time,
"immunity_icons": current_immunity_icons,
"tier_text": current_tier_text,
"zone_key": current_zone_key,
"next_zone_name": next_zone_name,
"next_start_time": next_start_time,
"last_fetch_time": now,
}
return old_zone_key != current_zone_key
def get_cached_zone_snapshot() -> Tuple[str, Optional[int], Optional[int], str, str, Optional[str], Optional[str], Optional[int]]:
zone_name = tz_cache.get("zone_name")
end_time = tz_cache.get("end_time")
immunity_icons = tz_cache.get("immunity_icons", "")
tier_text = tz_cache.get("tier_text", "")
zone_key = tz_cache.get("zone_key")
next_zone_name = tz_cache.get("next_zone_name")
next_start_time = tz_cache.get("next_start_time")
if not zone_name:
raise RuntimeError("TZ cache is empty")
now = int(time.time())
minutes_left = None
if isinstance(end_time, int):
seconds_left = max(0, end_time - now)
minutes_left = seconds_left // 60
next_seconds_until = None
if isinstance(next_start_time, int):
next_seconds_until = max(0, next_start_time - now)
return zone_name, minutes_left, end_time, immunity_icons, tier_text, zone_key, next_zone_name, next_seconds_until
# =========================================================
# NICKNAME BUILDING
# =========================================================
def abbreviate_single_zone(zone_name: str) -> str:
normalized = normalize_zone_name(zone_name)
if normalized in ZONE_ABBREVIATIONS:
return ZONE_ABBREVIATIONS[normalized]
words = [w for w in normalized.replace("'", "").split() if w]
if not words:
return normalized
if len(words) == 1:
return words[0][:4]
return "".join(word[0].upper() for word in words)
def abbreviate_zone_string(zone_string: str) -> str:
parts = [part.strip() for part in zone_string.split("/")]
abbreviated_parts = [abbreviate_single_zone(part) for part in parts if part.strip()]
return " / ".join(abbreviated_parts)
def build_nickname(zone_name: str, minutes_left: Optional[int], immunity_icons: str, tier_text: str) -> str:
time_part = f" ({minutes_left}m)" if minutes_left is not None else ""
extras = []
if immunity_icons:
extras.append(immunity_icons)
if tier_text:
extras.append(tier_text)
extra_part = f" {' '.join(extras)}" if extras else ""
full_name = f"{zone_name}{extra_part}{time_part}"
if len(full_name) <= MAX_NICKNAME_LEN:
return full_name
abbreviated_zone = abbreviate_zone_string(zone_name)
abbreviated_full = f"{abbreviated_zone}{extra_part}{time_part}"
if len(abbreviated_full) <= MAX_NICKNAME_LEN:
return abbreviated_full
if immunity_icons and tier_text:
reduced_extra = f" {immunity_icons}"
reduced_name = f"{abbreviated_zone}{reduced_extra}{time_part}"
if len(reduced_name) <= MAX_NICKNAME_LEN:
return reduced_name
tiny_parts = []
for part in [p.strip() for p in zone_name.split("/")]:
abbr = abbreviate_single_zone(part)
tiny_parts.append(abbr[:3] if len(abbr) > 3 else abbr)
tiny_zone = " / ".join(tiny_parts)
tiny_full = f"{tiny_zone}{extra_part}{time_part}"
if len(tiny_full) <= MAX_NICKNAME_LEN:
return tiny_full
tiny_reduced = f"{tiny_zone}{(' ' + immunity_icons) if immunity_icons else ''}{time_part}"
if len(tiny_reduced) <= MAX_NICKNAME_LEN:
return tiny_reduced
absolute_fallback = f"{tiny_zone}{time_part}"
if len(absolute_fallback) <= MAX_NICKNAME_LEN:
return absolute_fallback
return absolute_fallback[:MAX_NICKNAME_LEN]
# =========================================================
# STATUS HELPERS
# =========================================================
def format_duration(seconds: int) -> str:
seconds = max(0, int(seconds))
days, rem = divmod(seconds, 86400)
hours, rem = divmod(rem, 3600)
minutes, secs = divmod(rem, 60)
parts = []
if days:
parts.append(f"{days}d")
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
if secs or not parts:
parts.append(f"{secs}s")
return " ".join(parts)
def get_status_snapshot() -> dict[str, Any]:
now = int(time.time())
zone_name = tz_cache.get("zone_name")
end_time = tz_cache.get("end_time")
next_zone_name = tz_cache.get("next_zone_name")
next_start_time = tz_cache.get("next_start_time")
last_fetch_time = tz_cache.get("last_fetch_time", 0)
seconds_left = None
if isinstance(end_time, int):
seconds_left = max(0, end_time - now)
next_seconds_until = None
if isinstance(next_start_time, int):
next_seconds_until = max(0, next_start_time - now)
uptime_seconds = int((datetime.now(timezone.utc) - bot_start_time).total_seconds())
return {
"uptime_seconds": uptime_seconds,
"guild_count_live": len(bot.guilds),
"guild_count_configured": len(get_all_guild_configs()),
"zone_name": zone_name,
"seconds_left": seconds_left,
"next_zone_name": next_zone_name,
"next_seconds_until": next_seconds_until,
"last_fetch_time": last_fetch_time,
"zone_key": tz_cache.get("zone_key"),
}
# =========================================================
# DISCORD HELPERS
# =========================================================
async def get_bot_member(guild: discord.Guild) -> Optional[discord.Member]:
me = guild.me
if me is None and bot.user is not None:
try:
me = await guild.fetch_member(bot.user.id)
except discord.HTTPException:
return None
return me
async def get_text_channel(channel_id: int) -> Optional[discord.TextChannel]:
channel = bot.get_channel(channel_id)
if channel is None:
try:
channel = await bot.fetch_channel(channel_id)
except discord.HTTPException:
return None
return channel if isinstance(channel, discord.TextChannel) else None
async def update_guild_nickname(
guild: discord.Guild,
zone_name: str,
minutes_left: Optional[int],
immunity_icons: str,
tier_text: str,
) -> None:
me = await get_bot_member(guild)
if me is None:
logger.warning(f"Could not fetch bot member in guild {guild.id}")
return
nickname = build_nickname(zone_name, minutes_left, immunity_icons, tier_text)
if me.nick != nickname:
try:
await me.edit(nick=nickname, reason="Updating nickname with cached TZ info")
logger.info(f"[{guild.id}] Nickname updated to: {nickname}")
except discord.Forbidden:
logger.warning(f"[{guild.id}] Missing permission to change nickname.")
except discord.HTTPException as e:
logger.error(f"[{guild.id}] Failed to change nickname: {e}")
async def announce_zone_change(
channel_id: int,
old_zone: Optional[str],
new_zone: str,
minutes_left: Optional[int],
immunity_icons: str,
tier_text: str,
) -> None:
channel = await get_text_channel(channel_id)
if channel is None:
logger.warning(f"Could not access announce channel {channel_id}")
return
bits = []
if immunity_icons:
bits.append(immunity_icons)
if tier_text:
bits.append(f"EXP/LOOT {tier_text}")
if minutes_left is not None:
bits.append(f"{minutes_left}m left")
suffix = f" ({', '.join(bits)})" if bits else ""
if old_zone is None:
message = f"Current zone detected: **{new_zone}**{suffix}"
else:
message = f"Zone changed: **{old_zone}** → **{new_zone}**{suffix}"
try:
await channel.send(message)
logger.info(f"[{channel.guild.id}] Announcement sent: {message}")
except discord.Forbidden:
logger.warning(f"[{channel.guild.id}] Missing permission to send messages in {channel_id}.")
except discord.HTTPException as e:
logger.error(f"[{channel.guild.id}] Failed to send message: {e}")
# =========================================================
# PER-GUILD PROCESSING
# =========================================================
async def process_single_guild(
guild_config: sqlite3.Row,
zone_name: str,
minutes_left: Optional[int],
immunity_icons: str,
tier_text: str,
zone_key: Optional[str],
allow_announcement: bool,
) -> None:
guild_id = int(guild_config["guild_id"])
guild = bot.get_guild(guild_id)
if guild is None:
cleanup_guild(
guild_id,
"stale config detected during loop",
guild_name=None,
)
return
if guild_config["nickname_enabled"]:
await update_guild_nickname(guild, zone_name, minutes_left, immunity_icons, tier_text)
if (
allow_announcement
and guild_config["announcements_enabled"]
and guild_config["announce_channel_id"]
and zone_key
):
old_zone_key = guild_config_key_cache.get(guild_id)
if zone_key != old_zone_key:
old_zone = old_zone_key.split("|", 1)[0] if old_zone_key else None
await announce_zone_change(
int(guild_config["announce_channel_id"]),
old_zone,
zone_name,
minutes_left,
immunity_icons,
tier_text,
)
guild_config_key_cache[guild_id] = zone_key
# =========================================================
# BACKGROUND TASKS
# =========================================================
async def minute_aligned_updater() -> None:
await bot.wait_until_ready()
while not bot.is_closed():
try:
now = time.time()
sleep_for = 60 - (now % 60)
await asyncio.sleep(sleep_for)
if not tz_cache.get("zone_name"):
continue
zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()
configs = get_all_guild_configs()
for config in configs:
# minute updater does not send zone-change announcements
await process_single_guild(
config,
zone_name,
minutes_left,
immunity_icons,
tier_text,
zone_key,
allow_announcement=False,
)
except Exception as e:
logger.error(f"minute_aligned_updater error: {e}")
await asyncio.sleep(5)
async def api_refresh_loop() -> None:
await bot.wait_until_ready()
while not bot.is_closed():
try:
if not tz_cache.get("zone_name") or not tz_cache.get("end_time"):
changed = await refresh_tz_cache(force=True)
zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()
configs = get_all_guild_configs()
for config in configs:
await process_single_guild(
config,
zone_name,
minutes_left,
immunity_icons,
tier_text,
zone_key,
allow_announcement=changed,
)
await asyncio.sleep(5)
continue
now = int(time.time())
refresh_at = int(tz_cache["end_time"]) - REFRESH_EARLY_SECONDS
sleep_for = max(1, refresh_at - now)
await asyncio.sleep(sleep_for)
changed = await refresh_tz_cache(force=True)
zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()
configs = get_all_guild_configs()
for config in configs:
await process_single_guild(
config,
zone_name,
minutes_left,
immunity_icons,
tier_text,
zone_key,
allow_announcement=changed,
)
except Exception as e:
logger.error(f"api_refresh_loop error: {e}")
await asyncio.sleep(10)
# =========================================================
# COMMANDS
# =========================================================
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def setup(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announce_channel_id=ctx.channel.id,
nickname_enabled=True,
announcements_enabled=True,
)
await ctx.send(
f"Setup complete.\n"
f"Announcements channel: {ctx.channel.mention}\n"
f"Nickname updates: enabled\n"
f"Announcements: enabled"
)
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def setchannel(ctx: commands.Context, channel: discord.TextChannel) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announce_channel_id=channel.id,
)
await ctx.send(f"Announcement channel set to {channel.mention}.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def enabletz(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=True,
announcements_enabled=True,
)
await ctx.send(" Terror Zone updates enabled for this server.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def disabletz(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=False,
announcements_enabled=False,
)
await ctx.send(" Terror Zone updates disabled for this server.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def nickon(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=True,
)
await ctx.send("Nickname updates enabled for this server.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def nickoff(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
nickname_enabled=False,
)
await ctx.send("Nickname updates disabled for this server.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def announceon(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announcements_enabled=True,
)
await ctx.send("Announcements enabled for this server.")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def announceoff(ctx: commands.Context) -> None:
upsert_guild_config(
guild_id=ctx.guild.id,
announcements_enabled=False,
)
await ctx.send("Announcements disabled for this server.")
@bot.command()
@commands.guild_only()
async def showconfig(ctx: commands.Context) -> None:
config = get_guild_config(ctx.guild.id)
if config is None:
await ctx.send("This server is not configured yet. Run `!setup` in the channel you want to use.")
return
channel_id = config["announce_channel_id"]
channel_text = f"<#{channel_id}>" if channel_id else "Not set"
await ctx.send(
f"Guild config:\n"
f"- Announce channel: {channel_text}\n"
f"- Nickname updates: {'enabled' if config['nickname_enabled'] else 'disabled'}\n"
f"- Announcements: {'enabled' if config['announcements_enabled'] else 'disabled'}"
)
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def removetz(ctx: commands.Context) -> None:
delete_guild_config(ctx.guild.id)
guild_config_key_cache.pop(ctx.guild.id, None)
await ctx.send("This server's Terror Zone configuration was removed.")
@bot.command()
async def zone(ctx: commands.Context) -> None:
try:
if not tz_cache.get("zone_name"):
await refresh_tz_cache(force=True)
zone_name, minutes_left, end_time, immunity_icons, tier_text, _zone_key, next_zone_name, next_seconds_until = get_cached_zone_snapshot()
nickname_preview = build_nickname(zone_name, minutes_left, immunity_icons, tier_text)
msg = f"Current zone: **{zone_name}**"
if immunity_icons:
msg += f"\nImmunities: {immunity_icons}"
if tier_text:
msg += f"\nEXP/LOOT tier: `{tier_text}`"
if minutes_left is not None:
msg += f"\nTime left: `{minutes_left}m`"
if next_zone_name:
if next_seconds_until is not None:
next_minutes = next_seconds_until // 60
msg += f"\nNext zone: **{next_zone_name}** (in {next_minutes}m)"
else:
msg += f"\nNext zone: **{next_zone_name}**"
msg += f"\nNickname preview: `{nickname_preview}`"
if end_time is not None:
msg += f"\nEnd time (unix): `{end_time}`"
msg += f"\nLast API fetch: `<t:{tz_cache['last_fetch_time']}:R>`"
await ctx.send(msg)
except Exception as e:
await ctx.send(f"Error reading current zone: `{e}`")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def zonedebug(ctx: commands.Context) -> None:
try:
data = await fetch_current_zone_data()
pretty = json.dumps(data, indent=2, ensure_ascii=False)
if len(pretty) > 3500:
pretty = pretty[:3500] + "\n... (truncated)"
await ctx.send(f"```json\n{pretty}\n```")
except Exception as e:
await ctx.send(f"Debug failed: `{e}`")
@bot.command()
async def legend(ctx: commands.Context) -> None:
lines = ["Zone abbreviation legend:"]
for full_name in sorted(ZONE_ABBREVIATIONS):
lines.append(f"{ZONE_ABBREVIATIONS[full_name]} = {full_name}")
lines.append("")
lines.append("Immunity legend:")
for code in ["f", "c", "l", "p", "ph", "m"]:
if code in IMMUNITY_EMOJIS:
lines.append(f"{IMMUNITY_EMOJIS[code]} = {IMMUNITY_LABELS.get(code, code)}")
message = "\n".join(lines)
if len(message) <= 1900:
await ctx.send(f"```text\n{message}\n```")
return
chunks = []
current = ""
for line in lines:
if len(current) + len(line) + 1 > 1900:
chunks.append(current)
current = line + "\n"
else:
current += line + "\n"
if current:
chunks.append(current)
for chunk in chunks:
await ctx.send(f"```text\n{chunk}\n```")
@bot.command()
async def status(ctx: commands.Context) -> None:
try:
snapshot = get_status_snapshot()
uptime_text = format_duration(snapshot["uptime_seconds"])
configured = snapshot["guild_count_configured"]
live = snapshot["guild_count_live"]
msg = [
"**Bot status**",
f"Uptime: `{uptime_text}`",
f"Connected guilds: `{live}`",
f"Configured guilds: `{configured}`",
]
if snapshot["zone_name"]:
msg.append(f"Cached current zone: **{snapshot['zone_name']}**")
else:
msg.append("Cached current zone: `none`")
if snapshot["seconds_left"] is not None:
msg.append(f"Time left: `{format_duration(snapshot['seconds_left'])}`")
if snapshot["next_zone_name"]:
if snapshot["next_seconds_until"] is not None:
msg.append(
f"Next zone: **{snapshot['next_zone_name']}** "
f"(in {format_duration(snapshot['next_seconds_until'])})"
)
else:
msg.append(f"Next zone: **{snapshot['next_zone_name']}**")
if snapshot["last_fetch_time"]:
msg.append(f"Last API fetch: <t:{snapshot['last_fetch_time']}:F>")
msg.append(f"Last API fetch (relative): <t:{snapshot['last_fetch_time']}:R>")
if snapshot["zone_key"]:
msg.append(f"Zone Key: `{snapshot['zone_key']}`")
await ctx.send("\n".join(msg))
except Exception as e:
logger.error(f"status command error: {e}")
await ctx.send(f"Status error: `{e}`")
@bot.command()
@commands.guild_only()
@commands.has_permissions(manage_guild=True)
async def forcezone(ctx: commands.Context) -> None:
try:
changed = await refresh_tz_cache(force=True)
zone_name, minutes_left, _end_time, immunity_icons, tier_text, zone_key, _next_zone_name, _next_seconds_until = get_cached_zone_snapshot()
config = get_guild_config(ctx.guild.id)
if config is None:
await ctx.send("This server is not configured yet. Run `!setup` first.")
return
if config["nickname_enabled"]:
await update_guild_nickname(ctx.guild, zone_name, minutes_left, immunity_icons, tier_text)
if config["announcements_enabled"] and config["announce_channel_id"]:
old_key = guild_config_key_cache.get(ctx.guild.id)
old_zone = old_key.split("|", 1)[0] if old_key else None
await announce_zone_change(
int(config["announce_channel_id"]),
old_zone,
zone_name,
minutes_left,
immunity_icons,
tier_text,
)
if zone_key:
guild_config_key_cache[ctx.guild.id] = zone_key
await ctx.send(
f"Forced refresh complete: **{build_nickname(zone_name, minutes_left, immunity_icons, tier_text)}**"
f"\nAPI returned {'a new zone' if changed else 'the same zone'}."
)
except Exception as e:
await ctx.send(f"Force update failed: `{e}`")
@bot.command(name="help")
async def help_command(ctx: commands.Context) -> None:
is_guild = ctx.guild is not None
can_manage_guild = (
is_guild and
isinstance(ctx.author, discord.Member) and
ctx.author.guild_permissions.manage_guild
)
lines = [
"**D2TZ Bot Help**",
"",
"**Public commands**",
"`!zone` - Show current Terror Zone, time left, next zone, and nickname preview.",
"`!status` - Show bot uptime, cached TZ info, and basic health/status.",
"`!legend` - Show zone abbreviations and immunity emoji meanings.",
]
if can_manage_guild:
lines.extend([
"",
"**Server admin commands**",
"`!setup` - Initialize the bot in this server and use the current channel for announcements.",
"`!setchannel #channel` - Set the announcement channel for this server.",
"`!showconfig` - Show this server's current bot configuration.",
"`!enabletz` - Enable both nickname updates and announcements.",
"`!disabletz` - Disable both nickname updates and announcements.",
"`!nickon` - Enable nickname updates only.",
"`!nickoff` - Disable nickname updates only.",
"`!announceon` - Enable announcements only.",
"`!announceoff` - Disable announcements only.",
"`!removetz` - Remove this server's TZ configuration.",
"`!forcezone` - Force an immediate TZ refresh and update.",
"`!zonedebug` - Show raw API data for debugging.",
])
if not is_guild:
lines.extend([
"",
"_Note: server setup commands only work inside a server, not in DMs._"
])
elif not can_manage_guild:
lines.extend([
"",
"_You do not currently have the 'Manage Server' permission, so admin commands are hidden._"
])
await ctx.send("\n".join(lines))
# =========================================================
# EVENTS
# =========================================================
@bot.event
async def on_ready() -> None:
global bg_tasks_started
logger.info(f"Logged in as {bot.user} ({bot.user.id})")
init_db()
await get_http_session()
await refresh_tz_cache(force=True)
# 🔍 Startup audit
audit_guild_configs(bot.guilds)
# Rebuild cache safely
configs = get_all_guild_configs()
for config in configs:
guild_config_key_cache.setdefault(int(config["guild_id"]), "")
if not bg_tasks_started:
bg_tasks_started = True
asyncio.create_task(minute_aligned_updater())
asyncio.create_task(api_refresh_loop())
@bot.event
async def on_guild_remove(guild: discord.Guild) -> None:
cleanup_guild(
guild.id,
"bot removed from server",
guild.name,
)
@bot.event
async def on_command_error(ctx: commands.Context, error: Exception) -> None:
if isinstance(error, commands.MissingPermissions):
await ctx.send("You need the 'Manage Server' permission to use that command.")
elif isinstance(error, commands.CheckFailure):
await ctx.send("You do not have permission to use that command.")
elif isinstance(error, commands.NoPrivateMessage):
await ctx.send("This command can only be used inside a server.")
elif isinstance(error, commands.MissingRequiredArgument):
await ctx.send("Missing argument for that command.")
elif isinstance(error, commands.BadArgument):
await ctx.send("Invalid argument.")
elif isinstance(error, commands.CommandNotFound):
return
else:
logger.error(f"Command error ({ctx.command}): {error}")
await ctx.send("An unexpected error occurred.")
# =========================================================
# MAIN
# =========================================================
async def main() -> None:
init_db()
try:
await bot.start(BOT_TOKEN)
finally:
global http_session
if http_session and not http_session.closed:
await http_session.close()
if __name__ == "__main__":
asyncio.run(main())
It also sends a text message when the zone changes, but you can disable that if you want to ( I would keep it since it transmits the message without abreviations ).
A practical note: because nicknames are short, this code uses a priority order:
full zone names + immunities + tiers + time
abbreviated zone names + immunities + tiers + time
abbreviated zone names + immunities + time
abbreviated zone names + time
So if the nickname gets too long, the tiers are the first thing it drops, then it keeps the immunities and timer. That keeps the most useful info visible.
After testing it for like a day, I decided to host it up and let everyone who needs it use it without the hassle of having computer literacy skills.
I have further modified the original bot so that everyone can use it out of the box, without having to mumble through .env files and such. ( I have updated the source code above as well, to reflect the new changes)
To achieve this, once the bot is invited to a server, just type !setup in the chat channel you want the bot to anounce in ( you must run !setup at least once so that the bot actually starts working ).
These are all the various bot commands so far:
Admin ( by Admin, I mean both Admins as well as people who have the Manage Server role in the server ):
!setup - initial setup of the bot, automatically adding the GUILD_ID and ANNOUNCE_CHANNEL_ID to the sqlite db the bot uses so it knows on which server/channel to connect to
!setchannel - self explanatory, use it in case you need to change the channel it anounces on ( for example: !setchannel #tz-updates )
!enabletz / !disabletz - enables/disables the whole bot functionality, aka nickname updates and announcements
!nickon / !nickoff - enables/disables the nickname updates
!announceon / !announceoff - enables/disables the announcements via text channel
!removetz - deletes the entry for your server from the sqlite database the bot uses, basically it stops working XD
!forcezone - forces an update to the nickname, sends an announcement, ignores cooldown
!showconfig - shows the current config of the bot
!zonedebug - mostly for internal use while coding it, but left it in
Everyone:
!zone - general zone information
!legend - legend of what everything means
!status - status of bot
!help - shows the list of commands
Here is the invite link for the bot: https://discord.com/oauth2/authorize?cl ... 7407664188
I hope it helps anyone who may need it, and in case you do use it and run into any bugs, drop me a message and I'll see what's up.
I have further modified the original bot so that everyone can use it out of the box, without having to mumble through .env files and such. ( I have updated the source code above as well, to reflect the new changes)
To achieve this, once the bot is invited to a server, just type !setup in the chat channel you want the bot to anounce in ( you must run !setup at least once so that the bot actually starts working ).
These are all the various bot commands so far:
Admin ( by Admin, I mean both Admins as well as people who have the Manage Server role in the server ):
!setup - initial setup of the bot, automatically adding the GUILD_ID and ANNOUNCE_CHANNEL_ID to the sqlite db the bot uses so it knows on which server/channel to connect to
!setchannel - self explanatory, use it in case you need to change the channel it anounces on ( for example: !setchannel #tz-updates )
!enabletz / !disabletz - enables/disables the whole bot functionality, aka nickname updates and announcements
!nickon / !nickoff - enables/disables the nickname updates
!announceon / !announceoff - enables/disables the announcements via text channel
!removetz - deletes the entry for your server from the sqlite database the bot uses, basically it stops working XD
!forcezone - forces an update to the nickname, sends an announcement, ignores cooldown
!showconfig - shows the current config of the bot
!zonedebug - mostly for internal use while coding it, but left it in
Everyone:
!zone - general zone information
!legend - legend of what everything means
!status - status of bot
!help - shows the list of commands
Here is the invite link for the bot: https://discord.com/oauth2/authorize?cl ... 7407664188
I hope it helps anyone who may need it, and in case you do use it and run into any bugs, drop me a message and I'll see what's up.
Similar pages
Advertisment
Hide ads
Greetings stranger!
You don't appear to be logged in...
99
Who is online
Users browsing Forums:
Ahrefs [Bot],
DotNetDotCom.org [Bot],
FatalJS,
Google Adsense [Bot],
Illusions,
Kakumba,
Kowal31415,
Leczymorda,
mtcollins83,
oOKIWIOo,
Piss Vortex,
REDBUN,
RMac85,
Schnorki,
sexandra,
ShadowHeart,
Skaug,
Trda,
twicebest and 355 guests.
No matches
Bismillah
5