#!/usr/bin/env python3 """ Twitch chat → MPD control bot, plus optional "going live" notifications. Connects to Twitch IRC over TLS, joins the configured channel, and responds to: !skip advance MPD to the next track (per-user cooldown — see SKIP_COOLDOWN) !queue print the next few queued tracks !info print full metadata (artist · title · album · year · genre) for now-playing The new track shows up on stream because bridges/mpd-state.py is already broadcasting MPD state to the loading / game / music-box overlays. If MM_HOOK is set, a second task polls the public ophi118.com status service (which already polls Twitch Helix for live/title/game) and POSTs a single Mattermost notification per offline→live transition. See live_watch(). Reads .env from the same directory: TWITCH_ACCESS_TOKEN chat OAuth token (with or without `oauth:` prefix) TWITCH_NICK bot login name (lowercase) TWITCH_CHANNEL channel to join, no leading `#` — defaults to TWITCH_NICK MM_HOOK optional Mattermost incoming-webhook URL; enables going-live notifications when set STATUS_URL optional override for the status API endpoint (default: https://ophi118.com/api/status.json) TWITCH_REFRESH_TOKEN and TWITCH_CLIENT_ID are also stored in .env (paired with the access token at generation time) but the bot doesn't use them yet — token refresh would land here later if access tokens start expiring mid-stream. """ import asyncio import json import os import ssl import sys import time import urllib.error import urllib.request from pathlib import Path from mpd.asyncio import MPDClient HERE = Path(__file__).resolve().parent ENV = HERE / ".env" MPD_HOST = os.environ.get("MPD_HOST", "localhost") MPD_PORT = int(os.environ.get("MPD_PORT", "6600")) IRC_HOST = "irc.chat.twitch.tv" IRC_PORT = 6697 PING_TIMEOUT = 360.0 # twitch PINGs ~every 5min; reconnect if silent longer SKIP_COOLDOWN = 60.0 # per-user seconds between successful !skip invocations QUEUE_PEEK = 3 # how many upcoming tracks !queue should print # Going-live watcher: poll cadence matches the server's Twitch poll (60s) so we # never lag the upstream signal by more than ~2 polls. STATUS_URL = os.environ.get("STATUS_URL", "https://ophi118.com/api/status.json") STATUS_POLL_S = 60.0 # Set TWITCH_BOT_DEBUG=1 (env or .env) to dump every received IRC line. Useful # when a brand-new bot account looks "connected" but doesn't see chat — Twitch # silently degrades unverified accounts in ways that don't surface as errors. DEBUG_RAW = os.environ.get("TWITCH_BOT_DEBUG", "").strip() not in ("", "0", "false") # ─── tiny .env loader ─────────────────────────────────────────────────────── def load_env(path: Path) -> dict: """KEY=VALUE lines, # comments, optional surrounding quotes. Avoids a python-dotenv dependency for three vars.""" out = {} if not path.exists(): return out for raw in path.read_text().splitlines(): line = raw.strip() if not line or line.startswith("#") or "=" not in line: continue k, _, v = line.partition("=") out[k.strip()] = v.strip().strip('"').strip("'") return out # ─── MPD ──────────────────────────────────────────────────────────────────── class Mpd: """One shared MPDClient with reconnect-on-failure. python-mpd2's asyncio client doesn't auto-reconnect, so each command is wrapped to retry once after dropping the connection — covers MPD restarts and idle-socket timeouts without surfacing transient errors. """ def __init__(self): self._cli = None self._lock = asyncio.Lock() async def _ensure(self): if self._cli is None: cli = MPDClient() await cli.connect(MPD_HOST, MPD_PORT) self._cli = cli async def _retry(self, op): """Run an MPD coroutine with one reconnect-on-failure retry.""" async with self._lock: last = None for attempt in (1, 2): try: await self._ensure() return await op(self._cli) except Exception as e: last = e self._cli = None # force reconnect on retry raise last # type: ignore[misc] async def skip(self) -> str: async def op(cli): await cli.next() return _fmt_song(await cli.currentsong()) return await self._retry(op) async def currentsong(self) -> dict: return await self._retry(lambda cli: cli.currentsong()) async def status(self) -> dict: return await self._retry(lambda cli: cli.status()) async def playlistinfo(self, pos: int) -> list: return await self._retry(lambda cli: cli.playlistinfo(pos)) def _flat(v): """MPD tags can be lists (multi-value); flatten to a comma-joined string with empties removed. Returns None for missing/empty so callers can skip fields without empty parens.""" if v is None: return None if isinstance(v, list): v = ", ".join(x for x in v if x) v = (v or "").strip() return v or None def _fmt_song(song: dict) -> str: artist = _flat(song.get("artist") or song.get("albumartist")) or "" title = _flat(song.get("title")) or (song.get("file") or "").rsplit("/", 1)[-1] out = f"{artist} — {title}".strip(" —") return out or "(unknown track)" def _fmt_info(song: dict) -> str: """Compose the !info reply: 'Artist — Title · Album (Year) · Genre'. Skip any segment whose underlying tag is missing.""" head = _fmt_song(song) parts = [head] if head != "(unknown track)" else [] album = _flat(song.get("album")) year = _flat(song.get("date")) if year: # MPD often stores ISO dates ('2018-04-12') — show year only. year = year.split("-", 1)[0] if album and year: parts.append(f"{album} ({year})") elif album: parts.append(album) elif year: parts.append(f"({year})") genre = _flat(song.get("genre")) if genre: parts.append(genre) return " · ".join(parts) if parts else "(unknown track)" # ─── Twitch IRC ───────────────────────────────────────────────────────────── class TwitchBot: def __init__(self, nick: str, token: str, channel: str, mpd: Mpd): self.nick = nick.lower() self.token = token if token.startswith("oauth:") else f"oauth:{token}" self.channel = "#" + channel.lower().lstrip("#") self.mpd = mpd # Per-user !skip cooldown — keyed by lowercased nick. The dict grows # unboundedly with viewer count over a stream session; not worth # pruning at typical chat sizes (one float per chatter). self._skip_seen: dict[str, float] = {} async def _send(self, writer: asyncio.StreamWriter, raw: str): writer.write((raw + "\r\n").encode("utf-8")) await writer.drain() async def _say(self, writer: asyncio.StreamWriter, msg: str): # twitch chat lines max ~500 chars; truncate defensively, strip newlines msg = msg.replace("\r", " ").replace("\n", " ")[:480] await self._send(writer, f"PRIVMSG {self.channel} :{msg}") async def run(self): """Connect/auth/listen loop with quiet exponential backoff. Mirrors the connect-quietly pattern in bridges/mpd-state.py: first failure prints once, then silent retry until the connection comes back. Auth failure is fatal — no point retrying with a bad token. """ backoff = 2 BACKOFF_MAX = 30 prev_status = None # None | "up" | "down" ctx = ssl.create_default_context() while True: try: reader, writer = await asyncio.open_connection( IRC_HOST, IRC_PORT, ssl=ctx, ) try: await self._send(writer, f"PASS {self.token}") await self._send(writer, f"NICK {self.nick}") await self._send(writer, f"JOIN {self.channel}") if prev_status != "up": print(f"[twitch] connected → {self.channel} as {self.nick}", flush=True) prev_status = "up" backoff = 2 while True: line_bytes = await asyncio.wait_for( reader.readline(), timeout=PING_TIMEOUT, ) if not line_bytes: raise ConnectionError("server closed connection") line = line_bytes.decode("utf-8", errors="replace").rstrip("\r\n") await self._handle(writer, line) finally: writer.close() try: await writer.wait_closed() except Exception: pass except RuntimeError as e: # auth-failed and similar fatals — let systemd flag it print(f"[twitch] fatal: {e}", file=sys.stderr, flush=True) raise except (asyncio.TimeoutError, ConnectionError, OSError) as e: if prev_status == "up": print(f"[twitch] disconnected ({type(e).__name__}: {e}) — retrying", flush=True) elif prev_status is None: print(f"[twitch] unreachable ({type(e).__name__}) — will retry silently", flush=True) prev_status = "down" await asyncio.sleep(backoff) backoff = min(backoff * 2, BACKOFF_MAX) async def _handle(self, writer: asyncio.StreamWriter, line: str): if DEBUG_RAW: print(f"[rx] {line}", flush=True) # PING/PONG keepalive — twitch sends `PING :tmi.twitch.tv` periodically if line.startswith("PING "): await self._send(writer, "PONG " + line[5:]) return # NOTICE on bad creds: ":tmi.twitch.tv NOTICE * :Login authentication failed" if " NOTICE " in line and "authentication failed" in line.lower(): raise RuntimeError("twitch auth failed — check TWITCH_KEY / TWITCH_NICK") # PRIVMSG format: ":user!user@user.tmi.twitch.tv PRIVMSG #chan :body" if " PRIVMSG " not in line: return try: prefix, rest = line.split(" PRIVMSG ", 1) user = prefix[1:].split("!", 1)[0] _, _, body = rest.partition(":") except ValueError: return # Twitch's anti-duplicate logic appends an invisible Tags-block # codepoint (U+E0000–U+E007F) to back-to-back identical messages # from the same sender within ~30s. Without stripping it, every # other `!skip` in a series silently fails to match. body = "".join(c for c in body if not (0xE0000 <= ord(c) <= 0xE007F)) head = body.strip().split(" ", 1)[0].lower() if head == "!skip": await self._cmd_skip(writer, user) elif head == "!queue": await self._cmd_queue(writer, user) elif head == "!info": await self._cmd_info(writer, user) async def _cmd_skip(self, writer: asyncio.StreamWriter, user: str): # Per-user cooldown: each chatter gets one skip per SKIP_COOLDOWN # seconds. Reject with a wait-time hint so the user knows it's their # own gate, not the bot being broken. now = time.monotonic() ukey = user.lower() last = self._skip_seen.get(ukey, 0.0) wait = SKIP_COOLDOWN - (now - last) if wait > 0: await self._say( writer, f"@{user} you can only !skip once per minute (wait {int(wait)+1}s)", ) return try: now_playing = await self.mpd.skip() except Exception as e: print(f"[skip] failed for @{user}: {type(e).__name__}: {e}", flush=True) await self._say(writer, f"@{user} couldn't skip — MPD didn't respond") return self._skip_seen[ukey] = now print(f"[skip] @{user} → {now_playing}", flush=True) await self._say(writer, f"⏭ skipped by @{user} → now: {now_playing}") async def _cmd_queue(self, writer: asyncio.StreamWriter, user: str): # Read-only; no cooldown. Pulls current pos from status, then peeks # the next QUEUE_PEEK tracks one-by-one (small N — full playlistinfo # would dump the whole queue). try: status = await self.mpd.status() pos = int(status.get("song", -1)) total = int(status.get("playlistlength") or 0) except Exception as e: print(f"[queue] failed for @{user}: {type(e).__name__}: {e}", flush=True) await self._say(writer, f"@{user} couldn't read queue — MPD didn't respond") return if pos < 0 or pos + 1 >= total: await self._say(writer, "🎵 nothing else queued") return upcoming = [] for i, p in enumerate(range(pos + 1, min(pos + 1 + QUEUE_PEEK, total)), start=1): try: rows = await self.mpd.playlistinfo(p) if rows: upcoming.append(f"{i}) {_fmt_song(rows[0])}") except Exception: continue if not upcoming: await self._say(writer, "🎵 nothing else queued") return await self._say(writer, "🎵 next: " + " · ".join(upcoming)) async def _cmd_info(self, writer: asyncio.StreamWriter, user: str): try: song = await self.mpd.currentsong() except Exception as e: print(f"[info] failed for @{user}: {type(e).__name__}: {e}", flush=True) await self._say(writer, f"@{user} couldn't read track info — MPD didn't respond") return if not song: await self._say(writer, "ℹ nothing playing") return await self._say(writer, "ℹ " + _fmt_info(song)) # ─── Mattermost "going live" notifications ────────────────────────────────── def _fetch_status(url: str) -> dict: """GET the status JSON. Raises on transport/parse errors so live_watch() can log + back off uniformly.""" req = urllib.request.Request(url, method="GET", headers={"User-Agent": "obs-twitch-bot/1.0"}) with urllib.request.urlopen(req, timeout=10) as r: return json.loads(r.read().decode("utf-8")) def _post_mattermost(hook_url: str, channel: str, title: str, game: str) -> None: """POST a single going-live notification. Markdown renders in Mattermost.""" parts = [f"🔴 **{channel}** is live on Twitch"] if title: parts.append(f"> {title}") if game: parts.append(f"_{game}_") parts.append(f"https://twitch.tv/{channel}") body = json.dumps({"text": "\n".join(parts)}).encode("utf-8") req = urllib.request.Request( hook_url, method="POST", data=body, headers={"Content-Type": "application/json", "User-Agent": "obs-twitch-bot/1.0"}, ) with urllib.request.urlopen(req, timeout=10) as r: if r.status >= 300: raise RuntimeError(f"mattermost hook returned HTTP {r.status}") async def live_watch(mm_hook: str, channel: str, status_url: str = STATUS_URL): """Poll the status service; POST to Mattermost on offline→live transitions. Transition detection is keyed on `started_at`, not just the `live` flag, so a brief Helix hiccup (live=true → false → true within one stream session, same `started_at`) doesn't double-fire. Cold-start during an ongoing stream is intentionally silent: we record the current `started_at` so we won't fire for it later, but we don't announce it (we can't tell "they just went live" from "the bot restarted mid-stream" without prior offline observation). Once we see live=False at least once, the next started_at change becomes a real transition we'll announce. """ last_announced = None # started_at we've already handled (announced or recorded) seen_offline = False # have we observed live=False since last_announced? while True: try: data = await asyncio.to_thread(_fetch_status, status_url) twitch = (data or {}).get("twitch") or {} live = bool(twitch.get("live")) started = (twitch.get("started_at") or None) if live else None except Exception as e: print(f"[live] poll failed: {type(e).__name__}: {e}", flush=True) await asyncio.sleep(STATUS_POLL_S) continue if not live: seen_offline = True elif started: if last_announced is None and not seen_offline: # cold start mid-stream: lock in this started_at so we won't # falsely announce it later last_announced = started elif started != last_announced and seen_offline: title = twitch.get("title") or "" game = twitch.get("game") or "" try: await asyncio.to_thread(_post_mattermost, mm_hook, channel, title, game) print(f"[live] {channel} went live → posted to mattermost " f"(title={title!r}, game={game!r})", flush=True) except Exception as e: # log but don't retry — a missed notification is better # than a duplicate; the next genuine transition will fire print(f"[live] mattermost post failed: {type(e).__name__}: {e}", flush=True) last_announced = started seen_offline = False await asyncio.sleep(STATUS_POLL_S) # ─── entrypoint ───────────────────────────────────────────────────────────── async def main(): env = {**load_env(ENV), **os.environ} # process env wins over .env token = env.get("TWITCH_ACCESS_TOKEN", "").strip() nick = env.get("TWITCH_NICK", "").strip() channel = env.get("TWITCH_CHANNEL", nick).strip() mm_hook = env.get("MM_HOOK", "").strip() missing = [k for k, v in (("TWITCH_ACCESS_TOKEN", token), ("TWITCH_NICK", nick)) if not v] if missing: print(f"[err] missing in {ENV}: {', '.join(missing)}", file=sys.stderr) sys.exit(1) bot = TwitchBot(nick=nick, token=token, channel=channel, mpd=Mpd()) tasks = [bot.run()] if mm_hook: print(f"[live] watching {STATUS_URL} → mattermost on offline→live", flush=True) tasks.append(live_watch(mm_hook, channel)) else: print("[live] MM_HOOK unset — going-live notifications disabled", flush=True) await asyncio.gather(*tasks) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: sys.exit(0)