Files
twitch-bot/bot.py
2026-04-27 23:21:39 +02:00

463 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Twitch chat → MPD control bot, plus optional "going live" notifications.
Connects to Twitch IRC over TLS, joins the configured channel, and responds to:
!skip advance MPD to the next track (per-user cooldown — see SKIP_COOLDOWN)
!queue print the next few queued tracks
!info print full metadata (artist · title · album · year · genre) for now-playing
The new track shows up on stream because bridges/mpd-state.py is already
broadcasting MPD state to the loading / game / music-box overlays.
If MM_HOOK is set, a second task polls the public ophi118.com status service
(which already polls Twitch Helix for live/title/game) and POSTs a single
Mattermost notification per offline→live transition. See live_watch().
Reads .env from the same directory:
TWITCH_ACCESS_TOKEN chat OAuth token (with or without `oauth:` prefix)
TWITCH_NICK bot login name (lowercase)
TWITCH_CHANNEL channel to join, no leading `#` — defaults to TWITCH_NICK
MM_HOOK optional Mattermost incoming-webhook URL; enables
going-live notifications when set
STATUS_URL optional override for the status API endpoint
(default: https://ophi118.com/api/status.json)
TWITCH_REFRESH_TOKEN and TWITCH_CLIENT_ID are also stored in .env (paired with
the access token at generation time) but the bot doesn't use them yet — token
refresh would land here later if access tokens start expiring mid-stream.
"""
import asyncio
import json
import os
import ssl
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from mpd.asyncio import MPDClient
HERE = Path(__file__).resolve().parent
ENV = HERE / ".env"
MPD_HOST = os.environ.get("MPD_HOST", "localhost")
MPD_PORT = int(os.environ.get("MPD_PORT", "6600"))
IRC_HOST = "irc.chat.twitch.tv"
IRC_PORT = 6697
PING_TIMEOUT = 360.0 # twitch PINGs ~every 5min; reconnect if silent longer
SKIP_COOLDOWN = 60.0 # per-user seconds between successful !skip invocations
QUEUE_PEEK = 3 # how many upcoming tracks !queue should print
# Going-live watcher: poll cadence matches the server's Twitch poll (60s) so we
# never lag the upstream signal by more than ~2 polls.
STATUS_URL = os.environ.get("STATUS_URL", "https://ophi118.com/api/status.json")
STATUS_POLL_S = 60.0
# Set TWITCH_BOT_DEBUG=1 (env or .env) to dump every received IRC line. Useful
# when a brand-new bot account looks "connected" but doesn't see chat — Twitch
# silently degrades unverified accounts in ways that don't surface as errors.
DEBUG_RAW = os.environ.get("TWITCH_BOT_DEBUG", "").strip() not in ("", "0", "false")
# ─── tiny .env loader ───────────────────────────────────────────────────────
def load_env(path: Path) -> dict:
"""KEY=VALUE lines, # comments, optional surrounding quotes. Avoids a
python-dotenv dependency for three vars."""
out = {}
if not path.exists():
return out
for raw in path.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
out[k.strip()] = v.strip().strip('"').strip("'")
return out
# ─── MPD ────────────────────────────────────────────────────────────────────
class Mpd:
"""One shared MPDClient with reconnect-on-failure.
python-mpd2's asyncio client doesn't auto-reconnect, so each command is
wrapped to retry once after dropping the connection — covers MPD
restarts and idle-socket timeouts without surfacing transient errors.
"""
def __init__(self):
self._cli = None
self._lock = asyncio.Lock()
async def _ensure(self):
if self._cli is None:
cli = MPDClient()
await cli.connect(MPD_HOST, MPD_PORT)
self._cli = cli
async def _retry(self, op):
"""Run an MPD coroutine with one reconnect-on-failure retry."""
async with self._lock:
last = None
for attempt in (1, 2):
try:
await self._ensure()
return await op(self._cli)
except Exception as e:
last = e
self._cli = None # force reconnect on retry
raise last # type: ignore[misc]
async def skip(self) -> str:
async def op(cli):
await cli.next()
return _fmt_song(await cli.currentsong())
return await self._retry(op)
async def currentsong(self) -> dict:
return await self._retry(lambda cli: cli.currentsong())
async def status(self) -> dict:
return await self._retry(lambda cli: cli.status())
async def playlistinfo(self, pos: int) -> list:
return await self._retry(lambda cli: cli.playlistinfo(pos))
def _flat(v):
"""MPD tags can be lists (multi-value); flatten to a comma-joined string
with empties removed. Returns None for missing/empty so callers can skip
fields without empty parens."""
if v is None:
return None
if isinstance(v, list):
v = ", ".join(x for x in v if x)
v = (v or "").strip()
return v or None
def _fmt_song(song: dict) -> str:
artist = _flat(song.get("artist") or song.get("albumartist")) or ""
title = _flat(song.get("title")) or (song.get("file") or "").rsplit("/", 1)[-1]
out = f"{artist}{title}".strip("")
return out or "(unknown track)"
def _fmt_info(song: dict) -> str:
"""Compose the !info reply: 'Artist — Title · Album (Year) · Genre'.
Skip any segment whose underlying tag is missing."""
head = _fmt_song(song)
parts = [head] if head != "(unknown track)" else []
album = _flat(song.get("album"))
year = _flat(song.get("date"))
if year:
# MPD often stores ISO dates ('2018-04-12') — show year only.
year = year.split("-", 1)[0]
if album and year:
parts.append(f"{album} ({year})")
elif album:
parts.append(album)
elif year:
parts.append(f"({year})")
genre = _flat(song.get("genre"))
if genre:
parts.append(genre)
return " · ".join(parts) if parts else "(unknown track)"
# ─── Twitch IRC ─────────────────────────────────────────────────────────────
class TwitchBot:
def __init__(self, nick: str, token: str, channel: str, mpd: Mpd):
self.nick = nick.lower()
self.token = token if token.startswith("oauth:") else f"oauth:{token}"
self.channel = "#" + channel.lower().lstrip("#")
self.mpd = mpd
# Per-user !skip cooldown — keyed by lowercased nick. The dict grows
# unboundedly with viewer count over a stream session; not worth
# pruning at typical chat sizes (one float per chatter).
self._skip_seen: dict[str, float] = {}
async def _send(self, writer: asyncio.StreamWriter, raw: str):
writer.write((raw + "\r\n").encode("utf-8"))
await writer.drain()
async def _say(self, writer: asyncio.StreamWriter, msg: str):
# twitch chat lines max ~500 chars; truncate defensively, strip newlines
msg = msg.replace("\r", " ").replace("\n", " ")[:480]
await self._send(writer, f"PRIVMSG {self.channel} :{msg}")
async def run(self):
"""Connect/auth/listen loop with quiet exponential backoff.
Mirrors the connect-quietly pattern in bridges/mpd-state.py:
first failure prints once, then silent retry until the connection
comes back. Auth failure is fatal — no point retrying with a bad token.
"""
backoff = 2
BACKOFF_MAX = 30
prev_status = None # None | "up" | "down"
ctx = ssl.create_default_context()
while True:
try:
reader, writer = await asyncio.open_connection(
IRC_HOST, IRC_PORT, ssl=ctx,
)
try:
await self._send(writer, f"PASS {self.token}")
await self._send(writer, f"NICK {self.nick}")
await self._send(writer, f"JOIN {self.channel}")
if prev_status != "up":
print(f"[twitch] connected → {self.channel} as {self.nick}",
flush=True)
prev_status = "up"
backoff = 2
while True:
line_bytes = await asyncio.wait_for(
reader.readline(), timeout=PING_TIMEOUT,
)
if not line_bytes:
raise ConnectionError("server closed connection")
line = line_bytes.decode("utf-8", errors="replace").rstrip("\r\n")
await self._handle(writer, line)
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
except RuntimeError as e:
# auth-failed and similar fatals — let systemd flag it
print(f"[twitch] fatal: {e}", file=sys.stderr, flush=True)
raise
except (asyncio.TimeoutError, ConnectionError, OSError) as e:
if prev_status == "up":
print(f"[twitch] disconnected ({type(e).__name__}: {e}) — retrying",
flush=True)
elif prev_status is None:
print(f"[twitch] unreachable ({type(e).__name__}) — will retry silently",
flush=True)
prev_status = "down"
await asyncio.sleep(backoff)
backoff = min(backoff * 2, BACKOFF_MAX)
async def _handle(self, writer: asyncio.StreamWriter, line: str):
if DEBUG_RAW:
print(f"[rx] {line}", flush=True)
# PING/PONG keepalive — twitch sends `PING :tmi.twitch.tv` periodically
if line.startswith("PING "):
await self._send(writer, "PONG " + line[5:])
return
# NOTICE on bad creds: ":tmi.twitch.tv NOTICE * :Login authentication failed"
if " NOTICE " in line and "authentication failed" in line.lower():
raise RuntimeError("twitch auth failed — check TWITCH_KEY / TWITCH_NICK")
# PRIVMSG format: ":user!user@user.tmi.twitch.tv PRIVMSG #chan :body"
if " PRIVMSG " not in line:
return
try:
prefix, rest = line.split(" PRIVMSG ", 1)
user = prefix[1:].split("!", 1)[0]
_, _, body = rest.partition(":")
except ValueError:
return
# Twitch's anti-duplicate logic appends an invisible Tags-block
# codepoint (U+E0000U+E007F) to back-to-back identical messages
# from the same sender within ~30s. Without stripping it, every
# other `!skip` in a series silently fails to match.
body = "".join(c for c in body if not (0xE0000 <= ord(c) <= 0xE007F))
head = body.strip().split(" ", 1)[0].lower()
if head == "!skip":
await self._cmd_skip(writer, user)
elif head == "!queue":
await self._cmd_queue(writer, user)
elif head == "!info":
await self._cmd_info(writer, user)
async def _cmd_skip(self, writer: asyncio.StreamWriter, user: str):
# Per-user cooldown: each chatter gets one skip per SKIP_COOLDOWN
# seconds. Reject with a wait-time hint so the user knows it's their
# own gate, not the bot being broken.
now = time.monotonic()
ukey = user.lower()
last = self._skip_seen.get(ukey, 0.0)
wait = SKIP_COOLDOWN - (now - last)
if wait > 0:
await self._say(
writer,
f"@{user} you can only !skip once per minute (wait {int(wait)+1}s)",
)
return
try:
now_playing = await self.mpd.skip()
except Exception as e:
print(f"[skip] failed for @{user}: {type(e).__name__}: {e}", flush=True)
await self._say(writer, f"@{user} couldn't skip — MPD didn't respond")
return
self._skip_seen[ukey] = now
print(f"[skip] @{user}{now_playing}", flush=True)
await self._say(writer, f"⏭ skipped by @{user} → now: {now_playing}")
async def _cmd_queue(self, writer: asyncio.StreamWriter, user: str):
# Read-only; no cooldown. Pulls current pos from status, then peeks
# the next QUEUE_PEEK tracks one-by-one (small N — full playlistinfo
# would dump the whole queue).
try:
status = await self.mpd.status()
pos = int(status.get("song", -1))
total = int(status.get("playlistlength") or 0)
except Exception as e:
print(f"[queue] failed for @{user}: {type(e).__name__}: {e}", flush=True)
await self._say(writer, f"@{user} couldn't read queue — MPD didn't respond")
return
if pos < 0 or pos + 1 >= total:
await self._say(writer, "🎵 nothing else queued")
return
upcoming = []
for i, p in enumerate(range(pos + 1, min(pos + 1 + QUEUE_PEEK, total)), start=1):
try:
rows = await self.mpd.playlistinfo(p)
if rows:
upcoming.append(f"{i}) {_fmt_song(rows[0])}")
except Exception:
continue
if not upcoming:
await self._say(writer, "🎵 nothing else queued")
return
await self._say(writer, "🎵 next: " + " · ".join(upcoming))
async def _cmd_info(self, writer: asyncio.StreamWriter, user: str):
try:
song = await self.mpd.currentsong()
except Exception as e:
print(f"[info] failed for @{user}: {type(e).__name__}: {e}", flush=True)
await self._say(writer, f"@{user} couldn't read track info — MPD didn't respond")
return
if not song:
await self._say(writer, " nothing playing")
return
await self._say(writer, " " + _fmt_info(song))
# ─── Mattermost "going live" notifications ──────────────────────────────────
def _fetch_status(url: str) -> dict:
"""GET the status JSON. Raises on transport/parse errors so live_watch()
can log + back off uniformly."""
req = urllib.request.Request(url, method="GET",
headers={"User-Agent": "obs-twitch-bot/1.0"})
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read().decode("utf-8"))
def _post_mattermost(hook_url: str, channel: str, title: str, game: str) -> None:
"""POST a single going-live notification. Markdown renders in Mattermost."""
parts = [f"🔴 **{channel}** is live on Twitch"]
if title:
parts.append(f"> {title}")
if game:
parts.append(f"_{game}_")
parts.append(f"https://twitch.tv/{channel}")
body = json.dumps({"text": "\n".join(parts)}).encode("utf-8")
req = urllib.request.Request(
hook_url, method="POST", data=body,
headers={"Content-Type": "application/json",
"User-Agent": "obs-twitch-bot/1.0"},
)
with urllib.request.urlopen(req, timeout=10) as r:
if r.status >= 300:
raise RuntimeError(f"mattermost hook returned HTTP {r.status}")
async def live_watch(mm_hook: str, channel: str, status_url: str = STATUS_URL):
"""Poll the status service; POST to Mattermost on offline→live transitions.
Transition detection is keyed on `started_at`, not just the `live` flag, so
a brief Helix hiccup (live=true → false → true within one stream session,
same `started_at`) doesn't double-fire.
Cold-start during an ongoing stream is intentionally silent: we record the
current `started_at` so we won't fire for it later, but we don't announce
it (we can't tell "they just went live" from "the bot restarted mid-stream"
without prior offline observation). Once we see live=False at least once,
the next started_at change becomes a real transition we'll announce.
"""
last_announced = None # started_at we've already handled (announced or recorded)
seen_offline = False # have we observed live=False since last_announced?
while True:
try:
data = await asyncio.to_thread(_fetch_status, status_url)
twitch = (data or {}).get("twitch") or {}
live = bool(twitch.get("live"))
started = (twitch.get("started_at") or None) if live else None
except Exception as e:
print(f"[live] poll failed: {type(e).__name__}: {e}", flush=True)
await asyncio.sleep(STATUS_POLL_S)
continue
if not live:
seen_offline = True
elif started:
if last_announced is None and not seen_offline:
# cold start mid-stream: lock in this started_at so we won't
# falsely announce it later
last_announced = started
elif started != last_announced and seen_offline:
title = twitch.get("title") or ""
game = twitch.get("game") or ""
try:
await asyncio.to_thread(_post_mattermost, mm_hook, channel, title, game)
print(f"[live] {channel} went live → posted to mattermost "
f"(title={title!r}, game={game!r})", flush=True)
except Exception as e:
# log but don't retry — a missed notification is better
# than a duplicate; the next genuine transition will fire
print(f"[live] mattermost post failed: {type(e).__name__}: {e}",
flush=True)
last_announced = started
seen_offline = False
await asyncio.sleep(STATUS_POLL_S)
# ─── entrypoint ─────────────────────────────────────────────────────────────
async def main():
env = {**load_env(ENV), **os.environ} # process env wins over .env
token = env.get("TWITCH_ACCESS_TOKEN", "").strip()
nick = env.get("TWITCH_NICK", "").strip()
channel = env.get("TWITCH_CHANNEL", nick).strip()
mm_hook = env.get("MM_HOOK", "").strip()
missing = [k for k, v in (("TWITCH_ACCESS_TOKEN", token), ("TWITCH_NICK", nick)) if not v]
if missing:
print(f"[err] missing in {ENV}: {', '.join(missing)}", file=sys.stderr)
sys.exit(1)
bot = TwitchBot(nick=nick, token=token, channel=channel, mpd=Mpd())
tasks = [bot.run()]
if mm_hook:
print(f"[live] watching {STATUS_URL} → mattermost on offline→live", flush=True)
tasks.append(live_watch(mm_hook, channel))
else:
print("[live] MM_HOOK unset — going-live notifications disabled", flush=True)
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
sys.exit(0)