463 lines
19 KiB
Python
463 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Twitch chat → MPD control bot, plus optional "going live" notifications.
|
||
|
||
Connects to Twitch IRC over TLS, joins the configured channel, and responds to:
|
||
!skip advance MPD to the next track (per-user cooldown — see SKIP_COOLDOWN)
|
||
!queue print the next few queued tracks
|
||
!info print full metadata (artist · title · album · year · genre) for now-playing
|
||
The new track shows up on stream because bridges/mpd-state.py is already
|
||
broadcasting MPD state to the loading / game / music-box overlays.
|
||
|
||
If MM_HOOK is set, a second task polls the public ophi118.com status service
|
||
(which already polls Twitch Helix for live/title/game) and POSTs a single
|
||
Mattermost notification per offline→live transition. See live_watch().
|
||
|
||
Reads .env from the same directory:
|
||
TWITCH_ACCESS_TOKEN chat OAuth token (with or without `oauth:` prefix)
|
||
TWITCH_NICK bot login name (lowercase)
|
||
TWITCH_CHANNEL channel to join, no leading `#` — defaults to TWITCH_NICK
|
||
MM_HOOK optional Mattermost incoming-webhook URL; enables
|
||
going-live notifications when set
|
||
STATUS_URL optional override for the status API endpoint
|
||
(default: https://ophi118.com/api/status.json)
|
||
|
||
TWITCH_REFRESH_TOKEN and TWITCH_CLIENT_ID are also stored in .env (paired with
|
||
the access token at generation time) but the bot doesn't use them yet — token
|
||
refresh would land here later if access tokens start expiring mid-stream.
|
||
"""
|
||
|
||
import asyncio
|
||
import json
|
||
import os
|
||
import ssl
|
||
import sys
|
||
import time
|
||
import urllib.error
|
||
import urllib.request
|
||
from pathlib import Path
|
||
|
||
from mpd.asyncio import MPDClient
|
||
|
||
HERE = Path(__file__).resolve().parent
|
||
ENV = HERE / ".env"
|
||
|
||
MPD_HOST = os.environ.get("MPD_HOST", "localhost")
|
||
MPD_PORT = int(os.environ.get("MPD_PORT", "6600"))
|
||
|
||
IRC_HOST = "irc.chat.twitch.tv"
|
||
IRC_PORT = 6697
|
||
PING_TIMEOUT = 360.0 # twitch PINGs ~every 5min; reconnect if silent longer
|
||
SKIP_COOLDOWN = 60.0 # per-user seconds between successful !skip invocations
|
||
QUEUE_PEEK = 3 # how many upcoming tracks !queue should print
|
||
|
||
# Going-live watcher: poll cadence matches the server's Twitch poll (60s) so we
|
||
# never lag the upstream signal by more than ~2 polls.
|
||
STATUS_URL = os.environ.get("STATUS_URL", "https://ophi118.com/api/status.json")
|
||
STATUS_POLL_S = 60.0
|
||
|
||
# Set TWITCH_BOT_DEBUG=1 (env or .env) to dump every received IRC line. Useful
|
||
# when a brand-new bot account looks "connected" but doesn't see chat — Twitch
|
||
# silently degrades unverified accounts in ways that don't surface as errors.
|
||
DEBUG_RAW = os.environ.get("TWITCH_BOT_DEBUG", "").strip() not in ("", "0", "false")
|
||
|
||
|
||
# ─── tiny .env loader ───────────────────────────────────────────────────────
|
||
|
||
def load_env(path: Path) -> dict:
|
||
"""KEY=VALUE lines, # comments, optional surrounding quotes. Avoids a
|
||
python-dotenv dependency for three vars."""
|
||
out = {}
|
||
if not path.exists():
|
||
return out
|
||
for raw in path.read_text().splitlines():
|
||
line = raw.strip()
|
||
if not line or line.startswith("#") or "=" not in line:
|
||
continue
|
||
k, _, v = line.partition("=")
|
||
out[k.strip()] = v.strip().strip('"').strip("'")
|
||
return out
|
||
|
||
|
||
# ─── MPD ────────────────────────────────────────────────────────────────────
|
||
|
||
class Mpd:
|
||
"""One shared MPDClient with reconnect-on-failure.
|
||
|
||
python-mpd2's asyncio client doesn't auto-reconnect, so each command is
|
||
wrapped to retry once after dropping the connection — covers MPD
|
||
restarts and idle-socket timeouts without surfacing transient errors.
|
||
"""
|
||
|
||
def __init__(self):
|
||
self._cli = None
|
||
self._lock = asyncio.Lock()
|
||
|
||
async def _ensure(self):
|
||
if self._cli is None:
|
||
cli = MPDClient()
|
||
await cli.connect(MPD_HOST, MPD_PORT)
|
||
self._cli = cli
|
||
|
||
async def _retry(self, op):
|
||
"""Run an MPD coroutine with one reconnect-on-failure retry."""
|
||
async with self._lock:
|
||
last = None
|
||
for attempt in (1, 2):
|
||
try:
|
||
await self._ensure()
|
||
return await op(self._cli)
|
||
except Exception as e:
|
||
last = e
|
||
self._cli = None # force reconnect on retry
|
||
raise last # type: ignore[misc]
|
||
|
||
async def skip(self) -> str:
|
||
async def op(cli):
|
||
await cli.next()
|
||
return _fmt_song(await cli.currentsong())
|
||
return await self._retry(op)
|
||
|
||
async def currentsong(self) -> dict:
|
||
return await self._retry(lambda cli: cli.currentsong())
|
||
|
||
async def status(self) -> dict:
|
||
return await self._retry(lambda cli: cli.status())
|
||
|
||
async def playlistinfo(self, pos: int) -> list:
|
||
return await self._retry(lambda cli: cli.playlistinfo(pos))
|
||
|
||
|
||
def _flat(v):
|
||
"""MPD tags can be lists (multi-value); flatten to a comma-joined string
|
||
with empties removed. Returns None for missing/empty so callers can skip
|
||
fields without empty parens."""
|
||
if v is None:
|
||
return None
|
||
if isinstance(v, list):
|
||
v = ", ".join(x for x in v if x)
|
||
v = (v or "").strip()
|
||
return v or None
|
||
|
||
|
||
def _fmt_song(song: dict) -> str:
|
||
artist = _flat(song.get("artist") or song.get("albumartist")) or ""
|
||
title = _flat(song.get("title")) or (song.get("file") or "").rsplit("/", 1)[-1]
|
||
out = f"{artist} — {title}".strip(" —")
|
||
return out or "(unknown track)"
|
||
|
||
|
||
def _fmt_info(song: dict) -> str:
|
||
"""Compose the !info reply: 'Artist — Title · Album (Year) · Genre'.
|
||
Skip any segment whose underlying tag is missing."""
|
||
head = _fmt_song(song)
|
||
parts = [head] if head != "(unknown track)" else []
|
||
album = _flat(song.get("album"))
|
||
year = _flat(song.get("date"))
|
||
if year:
|
||
# MPD often stores ISO dates ('2018-04-12') — show year only.
|
||
year = year.split("-", 1)[0]
|
||
if album and year:
|
||
parts.append(f"{album} ({year})")
|
||
elif album:
|
||
parts.append(album)
|
||
elif year:
|
||
parts.append(f"({year})")
|
||
genre = _flat(song.get("genre"))
|
||
if genre:
|
||
parts.append(genre)
|
||
return " · ".join(parts) if parts else "(unknown track)"
|
||
|
||
|
||
# ─── Twitch IRC ─────────────────────────────────────────────────────────────
|
||
|
||
class TwitchBot:
|
||
def __init__(self, nick: str, token: str, channel: str, mpd: Mpd):
|
||
self.nick = nick.lower()
|
||
self.token = token if token.startswith("oauth:") else f"oauth:{token}"
|
||
self.channel = "#" + channel.lower().lstrip("#")
|
||
self.mpd = mpd
|
||
# Per-user !skip cooldown — keyed by lowercased nick. The dict grows
|
||
# unboundedly with viewer count over a stream session; not worth
|
||
# pruning at typical chat sizes (one float per chatter).
|
||
self._skip_seen: dict[str, float] = {}
|
||
|
||
async def _send(self, writer: asyncio.StreamWriter, raw: str):
|
||
writer.write((raw + "\r\n").encode("utf-8"))
|
||
await writer.drain()
|
||
|
||
async def _say(self, writer: asyncio.StreamWriter, msg: str):
|
||
# twitch chat lines max ~500 chars; truncate defensively, strip newlines
|
||
msg = msg.replace("\r", " ").replace("\n", " ")[:480]
|
||
await self._send(writer, f"PRIVMSG {self.channel} :{msg}")
|
||
|
||
async def run(self):
|
||
"""Connect/auth/listen loop with quiet exponential backoff.
|
||
|
||
Mirrors the connect-quietly pattern in bridges/mpd-state.py:
|
||
first failure prints once, then silent retry until the connection
|
||
comes back. Auth failure is fatal — no point retrying with a bad token.
|
||
"""
|
||
backoff = 2
|
||
BACKOFF_MAX = 30
|
||
prev_status = None # None | "up" | "down"
|
||
ctx = ssl.create_default_context()
|
||
|
||
while True:
|
||
try:
|
||
reader, writer = await asyncio.open_connection(
|
||
IRC_HOST, IRC_PORT, ssl=ctx,
|
||
)
|
||
try:
|
||
await self._send(writer, f"PASS {self.token}")
|
||
await self._send(writer, f"NICK {self.nick}")
|
||
await self._send(writer, f"JOIN {self.channel}")
|
||
|
||
if prev_status != "up":
|
||
print(f"[twitch] connected → {self.channel} as {self.nick}",
|
||
flush=True)
|
||
prev_status = "up"
|
||
backoff = 2
|
||
|
||
while True:
|
||
line_bytes = await asyncio.wait_for(
|
||
reader.readline(), timeout=PING_TIMEOUT,
|
||
)
|
||
if not line_bytes:
|
||
raise ConnectionError("server closed connection")
|
||
line = line_bytes.decode("utf-8", errors="replace").rstrip("\r\n")
|
||
await self._handle(writer, line)
|
||
finally:
|
||
writer.close()
|
||
try:
|
||
await writer.wait_closed()
|
||
except Exception:
|
||
pass
|
||
except RuntimeError as e:
|
||
# auth-failed and similar fatals — let systemd flag it
|
||
print(f"[twitch] fatal: {e}", file=sys.stderr, flush=True)
|
||
raise
|
||
except (asyncio.TimeoutError, ConnectionError, OSError) as e:
|
||
if prev_status == "up":
|
||
print(f"[twitch] disconnected ({type(e).__name__}: {e}) — retrying",
|
||
flush=True)
|
||
elif prev_status is None:
|
||
print(f"[twitch] unreachable ({type(e).__name__}) — will retry silently",
|
||
flush=True)
|
||
prev_status = "down"
|
||
await asyncio.sleep(backoff)
|
||
backoff = min(backoff * 2, BACKOFF_MAX)
|
||
|
||
async def _handle(self, writer: asyncio.StreamWriter, line: str):
|
||
if DEBUG_RAW:
|
||
print(f"[rx] {line}", flush=True)
|
||
|
||
# PING/PONG keepalive — twitch sends `PING :tmi.twitch.tv` periodically
|
||
if line.startswith("PING "):
|
||
await self._send(writer, "PONG " + line[5:])
|
||
return
|
||
|
||
# NOTICE on bad creds: ":tmi.twitch.tv NOTICE * :Login authentication failed"
|
||
if " NOTICE " in line and "authentication failed" in line.lower():
|
||
raise RuntimeError("twitch auth failed — check TWITCH_KEY / TWITCH_NICK")
|
||
|
||
# PRIVMSG format: ":user!user@user.tmi.twitch.tv PRIVMSG #chan :body"
|
||
if " PRIVMSG " not in line:
|
||
return
|
||
try:
|
||
prefix, rest = line.split(" PRIVMSG ", 1)
|
||
user = prefix[1:].split("!", 1)[0]
|
||
_, _, body = rest.partition(":")
|
||
except ValueError:
|
||
return
|
||
|
||
# Twitch's anti-duplicate logic appends an invisible Tags-block
|
||
# codepoint (U+E0000–U+E007F) to back-to-back identical messages
|
||
# from the same sender within ~30s. Without stripping it, every
|
||
# other `!skip` in a series silently fails to match.
|
||
body = "".join(c for c in body if not (0xE0000 <= ord(c) <= 0xE007F))
|
||
head = body.strip().split(" ", 1)[0].lower()
|
||
if head == "!skip":
|
||
await self._cmd_skip(writer, user)
|
||
elif head == "!queue":
|
||
await self._cmd_queue(writer, user)
|
||
elif head == "!info":
|
||
await self._cmd_info(writer, user)
|
||
|
||
async def _cmd_skip(self, writer: asyncio.StreamWriter, user: str):
|
||
# Per-user cooldown: each chatter gets one skip per SKIP_COOLDOWN
|
||
# seconds. Reject with a wait-time hint so the user knows it's their
|
||
# own gate, not the bot being broken.
|
||
now = time.monotonic()
|
||
ukey = user.lower()
|
||
last = self._skip_seen.get(ukey, 0.0)
|
||
wait = SKIP_COOLDOWN - (now - last)
|
||
if wait > 0:
|
||
await self._say(
|
||
writer,
|
||
f"@{user} you can only !skip once per minute (wait {int(wait)+1}s)",
|
||
)
|
||
return
|
||
try:
|
||
now_playing = await self.mpd.skip()
|
||
except Exception as e:
|
||
print(f"[skip] failed for @{user}: {type(e).__name__}: {e}", flush=True)
|
||
await self._say(writer, f"@{user} couldn't skip — MPD didn't respond")
|
||
return
|
||
self._skip_seen[ukey] = now
|
||
print(f"[skip] @{user} → {now_playing}", flush=True)
|
||
await self._say(writer, f"⏭ skipped by @{user} → now: {now_playing}")
|
||
|
||
async def _cmd_queue(self, writer: asyncio.StreamWriter, user: str):
|
||
# Read-only; no cooldown. Pulls current pos from status, then peeks
|
||
# the next QUEUE_PEEK tracks one-by-one (small N — full playlistinfo
|
||
# would dump the whole queue).
|
||
try:
|
||
status = await self.mpd.status()
|
||
pos = int(status.get("song", -1))
|
||
total = int(status.get("playlistlength") or 0)
|
||
except Exception as e:
|
||
print(f"[queue] failed for @{user}: {type(e).__name__}: {e}", flush=True)
|
||
await self._say(writer, f"@{user} couldn't read queue — MPD didn't respond")
|
||
return
|
||
if pos < 0 or pos + 1 >= total:
|
||
await self._say(writer, "🎵 nothing else queued")
|
||
return
|
||
upcoming = []
|
||
for i, p in enumerate(range(pos + 1, min(pos + 1 + QUEUE_PEEK, total)), start=1):
|
||
try:
|
||
rows = await self.mpd.playlistinfo(p)
|
||
if rows:
|
||
upcoming.append(f"{i}) {_fmt_song(rows[0])}")
|
||
except Exception:
|
||
continue
|
||
if not upcoming:
|
||
await self._say(writer, "🎵 nothing else queued")
|
||
return
|
||
await self._say(writer, "🎵 next: " + " · ".join(upcoming))
|
||
|
||
async def _cmd_info(self, writer: asyncio.StreamWriter, user: str):
|
||
try:
|
||
song = await self.mpd.currentsong()
|
||
except Exception as e:
|
||
print(f"[info] failed for @{user}: {type(e).__name__}: {e}", flush=True)
|
||
await self._say(writer, f"@{user} couldn't read track info — MPD didn't respond")
|
||
return
|
||
if not song:
|
||
await self._say(writer, "ℹ nothing playing")
|
||
return
|
||
await self._say(writer, "ℹ " + _fmt_info(song))
|
||
|
||
|
||
# ─── Mattermost "going live" notifications ──────────────────────────────────
|
||
|
||
def _fetch_status(url: str) -> dict:
|
||
"""GET the status JSON. Raises on transport/parse errors so live_watch()
|
||
can log + back off uniformly."""
|
||
req = urllib.request.Request(url, method="GET",
|
||
headers={"User-Agent": "obs-twitch-bot/1.0"})
|
||
with urllib.request.urlopen(req, timeout=10) as r:
|
||
return json.loads(r.read().decode("utf-8"))
|
||
|
||
|
||
def _post_mattermost(hook_url: str, channel: str, title: str, game: str) -> None:
|
||
"""POST a single going-live notification. Markdown renders in Mattermost."""
|
||
parts = [f"🔴 **{channel}** is live on Twitch"]
|
||
if title:
|
||
parts.append(f"> {title}")
|
||
if game:
|
||
parts.append(f"_{game}_")
|
||
parts.append(f"https://twitch.tv/{channel}")
|
||
body = json.dumps({"text": "\n".join(parts)}).encode("utf-8")
|
||
req = urllib.request.Request(
|
||
hook_url, method="POST", data=body,
|
||
headers={"Content-Type": "application/json",
|
||
"User-Agent": "obs-twitch-bot/1.0"},
|
||
)
|
||
with urllib.request.urlopen(req, timeout=10) as r:
|
||
if r.status >= 300:
|
||
raise RuntimeError(f"mattermost hook returned HTTP {r.status}")
|
||
|
||
|
||
async def live_watch(mm_hook: str, channel: str, status_url: str = STATUS_URL):
|
||
"""Poll the status service; POST to Mattermost on offline→live transitions.
|
||
|
||
Transition detection is keyed on `started_at`, not just the `live` flag, so
|
||
a brief Helix hiccup (live=true → false → true within one stream session,
|
||
same `started_at`) doesn't double-fire.
|
||
|
||
Cold-start during an ongoing stream is intentionally silent: we record the
|
||
current `started_at` so we won't fire for it later, but we don't announce
|
||
it (we can't tell "they just went live" from "the bot restarted mid-stream"
|
||
without prior offline observation). Once we see live=False at least once,
|
||
the next started_at change becomes a real transition we'll announce.
|
||
"""
|
||
last_announced = None # started_at we've already handled (announced or recorded)
|
||
seen_offline = False # have we observed live=False since last_announced?
|
||
|
||
while True:
|
||
try:
|
||
data = await asyncio.to_thread(_fetch_status, status_url)
|
||
twitch = (data or {}).get("twitch") or {}
|
||
live = bool(twitch.get("live"))
|
||
started = (twitch.get("started_at") or None) if live else None
|
||
except Exception as e:
|
||
print(f"[live] poll failed: {type(e).__name__}: {e}", flush=True)
|
||
await asyncio.sleep(STATUS_POLL_S)
|
||
continue
|
||
|
||
if not live:
|
||
seen_offline = True
|
||
elif started:
|
||
if last_announced is None and not seen_offline:
|
||
# cold start mid-stream: lock in this started_at so we won't
|
||
# falsely announce it later
|
||
last_announced = started
|
||
elif started != last_announced and seen_offline:
|
||
title = twitch.get("title") or ""
|
||
game = twitch.get("game") or ""
|
||
try:
|
||
await asyncio.to_thread(_post_mattermost, mm_hook, channel, title, game)
|
||
print(f"[live] {channel} went live → posted to mattermost "
|
||
f"(title={title!r}, game={game!r})", flush=True)
|
||
except Exception as e:
|
||
# log but don't retry — a missed notification is better
|
||
# than a duplicate; the next genuine transition will fire
|
||
print(f"[live] mattermost post failed: {type(e).__name__}: {e}",
|
||
flush=True)
|
||
last_announced = started
|
||
seen_offline = False
|
||
|
||
await asyncio.sleep(STATUS_POLL_S)
|
||
|
||
|
||
# ─── entrypoint ─────────────────────────────────────────────────────────────
|
||
|
||
async def main():
|
||
env = {**load_env(ENV), **os.environ} # process env wins over .env
|
||
token = env.get("TWITCH_ACCESS_TOKEN", "").strip()
|
||
nick = env.get("TWITCH_NICK", "").strip()
|
||
channel = env.get("TWITCH_CHANNEL", nick).strip()
|
||
mm_hook = env.get("MM_HOOK", "").strip()
|
||
|
||
missing = [k for k, v in (("TWITCH_ACCESS_TOKEN", token), ("TWITCH_NICK", nick)) if not v]
|
||
if missing:
|
||
print(f"[err] missing in {ENV}: {', '.join(missing)}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
bot = TwitchBot(nick=nick, token=token, channel=channel, mpd=Mpd())
|
||
tasks = [bot.run()]
|
||
if mm_hook:
|
||
print(f"[live] watching {STATUS_URL} → mattermost on offline→live", flush=True)
|
||
tasks.append(live_watch(mm_hook, channel))
|
||
else:
|
||
print("[live] MM_HOOK unset — going-live notifications disabled", flush=True)
|
||
await asyncio.gather(*tasks)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
asyncio.run(main())
|
||
except KeyboardInterrupt:
|
||
sys.exit(0)
|