From 1af250ae1a87924719c09a26918854b19520703a Mon Sep 17 00:00:00 2001 From: Jakub Zych Date: Mon, 27 Apr 2026 23:21:39 +0200 Subject: [PATCH] Initial version --- .env.broadcaster.example | 27 +++ .env.example | 38 ++++ .gitignore | 12 + README.md | 225 +++++++++++++++++++ _twitch.py | 121 ++++++++++ bot.py | 462 +++++++++++++++++++++++++++++++++++++++ examples/stream.sh | 150 +++++++++++++ search-game.py | 104 +++++++++ set-channel.py | 102 +++++++++ 9 files changed, 1241 insertions(+) create mode 100644 .env.broadcaster.example create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 README.md create mode 100644 _twitch.py create mode 100644 bot.py create mode 100755 examples/stream.sh create mode 100755 search-game.py create mode 100755 set-channel.py diff --git a/.env.broadcaster.example b/.env.broadcaster.example new file mode 100644 index 0000000..9f77e82 --- /dev/null +++ b/.env.broadcaster.example @@ -0,0 +1,27 @@ +# Copy to `.env.broadcaster` and fill in. `.env.broadcaster` is gitignored; +# this template is not. +# +# Used by: +# set-channel.py — PATCH /helix/channels (title + game) +# search-game.py — GET /helix/search/categories +# examples/stream.sh — wraps both into one interactive pre-stream prompt +# +# This file holds BROADCASTER credentials (the channel-owner account). The +# chat bot's separate .env holds the BOT account credentials — different +# scopes, different tokens. Keep them separate; do not merge. + +# ─── Twitch broadcaster auth ──────────────────────────────────────────────── +# Generate at https://twitchtokengenerator.com — sign in as the broadcaster +# (the channel owner) and select the scope `channel:manage:broadcast`. +# Twitch enforces broadcaster_id == token user_id on PATCH /helix/channels, +# so the token MUST belong to the broadcaster account. + +TWITCH_ACCESS_TOKEN= +TWITCH_REFRESH_TOKEN= +TWITCH_CLIENT_ID= + +# Numeric Twitch user id for the broadcaster. Find it with any "twitch user +# id lookup" tool, or run: +# curl -H "Authorization: Bearer " -H "Client-Id: " \ +# "https://api.twitch.tv/helix/users?login=" +TWITCH_BROADCASTER_ID= diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..a59a709 --- /dev/null +++ b/.env.example @@ -0,0 +1,38 @@ +# Copy to `.env` and fill in. `.env` is gitignored; this template is not. + +# ─── Twitch chat auth ─────────────────────────────────────────────────────── +# Generated together at https://twitchtokengenerator.com +# (scopes: chat:read + chat:edit). Access token is what the bot uses today; +# refresh + client id are stored so we can rotate the access token later +# without re-running the generator. + +# Chat OAuth token. Paste the raw token — the bot adds the `oauth:` prefix +# automatically if you omit it. +TWITCH_ACCESS_TOKEN= + +# Refresh token. Currently unused by bot.py; kept here so the value survives +# alongside the access token it's paired with. +TWITCH_REFRESH_TOKEN= + +# App client ID shown by twitchtokengenerator. Needed together with the +# refresh token to mint a new access token via Twitch's OAuth endpoint. +# Currently unused by bot.py. +TWITCH_CLIENT_ID= + +# ─── Identity / where to connect ──────────────────────────────────────────── +# Bot login name (lowercase). Posting as your own streamer account is fine +# for a personal bot — set this to your channel name in that case. +TWITCH_NICK=ophi118 + +# Channel to join (no leading #). Defaults to TWITCH_NICK if omitted. +TWITCH_CHANNEL=ophi118 + +# ─── Mattermost: going-live notifications (optional) ──────────────────────── +# When set, the bot polls https://ophi118.com/api/status.json (the public +# status service in station/server/) and POSTs a single message to this +# Mattermost incoming-webhook URL on every offline→live transition. Unset +# (or blank) disables notifications entirely; the bot still runs. +# +# Override the polled endpoint with STATUS_URL if running against a private +# status service. +MM_HOOK= diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..052ce61 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +# This is now a standalone repo, so the gitignore has to do its own work +# (no parent .gitignore to fall back to). The `.env*` glob catches every +# secret-bearing variant; the `!*.example` re-include keeps templates tracked. +.env* +!.env.example +!.env.broadcaster.example + +# Output of examples/stream.sh — generated per session, no value in tracking. +examples/stream.json + +# Python bytecode cache +__pycache__/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..353d342 --- /dev/null +++ b/README.md @@ -0,0 +1,225 @@ +# twitch-bot + +Lightweight Twitch chat bot that controls a local [MPD](https://www.musicpd.org/) music daemon, plus optional pre-stream automation and going-live notifications. + +Built for a personal Twitch rig (`ophi118`) and extracted here as a stand-alone bot. Stdlib + `python-mpd2`, no other dependencies. + +## What it does + +- **Chat-driven music control:** viewers run `!skip`, `!queue`, `!info` and the bot drives MPD on your machine. Per-user 60s cooldown on `!skip` so a single viewer can't spam. +- **Pre-stream helper:** `examples/stream.sh` walks you through one prompt, resolves the game name to a Twitch category id, updates your channel title + game in one PATCH, and writes a `stream.json` manifest any overlay/timer can read. +- **Going-live notifier (optional):** when `MM_HOOK` is set the bot polls a public status JSON every 60s and POSTs to the webhook on each offline → live transition. Works against any Mattermost/Slack-shape incoming webhook. + +## Requirements + +- Python 3.10+ +- [MPD](https://www.musicpd.org/) reachable on `localhost:6600` (override with `MPD_HOST` / `MPD_PORT`). Only needed for the chat commands — the pre-stream helper and notifier work standalone. +- One Twitch token for the **bot account** (`chat:read` + `chat:edit` scopes) +- One Twitch token for the **broadcaster account** if you want `examples/stream.sh` (`channel:manage:broadcast` scope) +- Easiest way to mint tokens: + +```bash +pip install python-mpd2 +``` + +## Layout + +``` +twitch-bot/ +├── bot.py # the chat bot (long-running) +├── _twitch.py # shared Helix auth helpers (urllib only) +├── search-game.py # CLI: free-form game query → "\t" +├── set-channel.py # CLI: PATCH /helix/channels (title + game) +├── .env.example # template for bot.py +├── .env.broadcaster.example # template for set-channel.py / stream.sh +└── examples/ + └── stream.sh # interactive pre-stream helper +``` + +The bot reads `.env`. The pre-stream tools read `.env.broadcaster`. Two files because they hold credentials for two **different** Twitch accounts with **different** scopes — do not merge them. + +## Quick start + +### 1. Configure the bot + +```bash +cp .env.example .env +$EDITOR .env +``` + +Minimum required: + +```env +TWITCH_ACCESS_TOKEN= +TWITCH_NICK=mybot # bot account login (lowercase) +TWITCH_CHANNEL=mychannel # channel to join, no leading '#' +``` + +### 2. Run + +```bash +python3 bot.py +``` + +Open chat in your channel and try `!skip`, `!queue`, `!info`. The bot replies in chat and the music skip lands on stream because MPD is the audio source — the bot just sends `next` to it. + +### 3. (Optional) pre-stream helper + +```bash +cp .env.broadcaster.example .env.broadcaster +$EDITOR .env.broadcaster +./examples/stream.sh +``` + +See [Pre-stream helper](#pre-stream-helper) below. + +## Chat commands + +| Command | Effect | +| -------- | ------ | +| `!skip` | Advance MPD to the next track. Per-user 60s cooldown (`SKIP_COOLDOWN` in `bot.py`). | +| `!queue` | Print the next 3 queued tracks (`QUEUE_PEEK`). | +| `!info` | Print full metadata for now-playing — `artist · title · album · year · genre`. | + +There is intentionally **no moderator gate**. Anyone in chat can `!skip`. The 60s per-user cooldown is the only abuse mitigation. If you want stricter controls, fork — the dispatch lives in `bot.py:TwitchBot._handle`. + +## Pre-stream helper + +`examples/stream.sh` is an interactive prompt that: + +1. Asks you for a game name and resolves it to a Twitch category id via `search-game.py` (you pick from up to 20 matches; auto-picks when there's only one). +2. Asks you for a stream title and a countdown duration. +3. Writes `stream.json` next to the script with the resolved values. +4. PATCHes your Twitch channel — sets title and game in one call. + +Re-running uses your previous answers as defaults. + +```text +$ ./examples/stream.sh +── stream manifest ── + (press Enter to keep [defaults]) + + Game (search): doom +── 5 matches for 'doom' ── + 1) DOOM + 2) DOOM Eternal + 3) DOOM (1993) + ... + 0) cancel +Pick: 2 + Title (Twitch stream title): Slayer is back — Nightmare run + Countdown (minutes) [5]: + + ✓ wrote /home/you/twitch-bot/examples/stream.json + ✓ Twitch channel updated → game_id=517520, title='Slayer is back — Nightmare run' +``` + +Resulting `stream.json`: + +```json +{ + "compiledAt": "2026-04-27T22:55:11Z", + "game": "DOOM Eternal", + "gameId": "517520", + "title": "Slayer is back — Nightmare run", + "countdownMin": 5 +} +``` + +Read it from any starting-soon overlay, OBS browser source, OBS text source, or static HTML page that wants to show what's coming up. Override the output path with `STREAM_JSON=/some/where.json ./examples/stream.sh`. + +If `set-channel.py` or `.env.broadcaster` aren't present, the script soft-fails the Twitch sync step — the local `stream.json` is still written. + +### Token refresh + +`set-channel.py` and `search-game.py` both go through `_twitch.py`, which auto-refreshes the access token via twitchtokengenerator on a 401, persists the new token back to `.env.broadcaster`, and retries. You don't need to babysit token expiry. + +### Direct invocation + +The Python helpers are fine to call directly without the shell wrapper: + +```bash +./search-game.py "Heroes" # stdout: "\t" +./set-channel.py --game-id 517520 "..." +./set-channel.py "DOOM Eternal" "..." # exact-match lookup, no fuzzy search +./set-channel.py --dry-run --game-id 517520 "..." +``` + +## Going-live notifications + +If `MM_HOOK` is set in `.env`, the bot starts a second async task (`live_watch`) that polls a status JSON every 60 seconds and POSTs to the webhook on every offline → live transition. + +```env +MM_HOOK=https://chat.example.com/hooks/abc123 +STATUS_URL=https://your-status-page.example.com/api/status.json +``` + +The status URL must serve a JSON document of this shape: + +```json +{ + "live": true, + "started_at": "2026-04-27T18:30:00Z", + "title": "...", + "game_name": "..." +} +``` + +Transition detection keys on `started_at`, **not** `live` alone. Twitch Helix occasionally flaps the `live` flag mid-stream; ignoring those phantom edges is the whole point of polling a stable upstream value. + +Cold start mid-stream is silent on purpose: the bot records the current `started_at` on its first poll and only fires when it changes. Restarting the bot during an active stream will not double-post. + +You provide the status endpoint — the bot is a consumer, not a server. Easy ways to roll one: + +- A small worker that calls Helix `GET /streams?user_login=...` every 60s and writes a JSON file to your web root. +- A tiny webapp serving the same shape from any framework. +- Skip notifications entirely by leaving `MM_HOOK` unset. + +## Running as a systemd user service + +```ini +# ~/.config/systemd/user/twitch-bot.service +[Unit] +Description=Twitch chat → MPD control bot +After=mpd.service mpd.socket network-online.target +Wants=mpd.service network-online.target + +[Service] +Type=simple +WorkingDirectory=%h/path/to/twitch-bot +ExecStart=/usr/bin/python3 %h/path/to/twitch-bot/bot.py +Restart=on-failure +RestartSec=5 + +[Install] +WantedBy=default.target +``` + +```bash +systemctl --user daemon-reload +systemctl --user enable --now twitch-bot.service +journalctl --user -u twitch-bot -f +``` + +`Restart=on-failure` covers Twitch-IRC drops and MPD restarts. The bot's reconnect loop also handles transient drops without exiting; the systemd restart is the ultimate safety net. + +## Troubleshooting + +| Symptom | What to check | +| ------- | ------------- | +| Bot connects but never sees chat | Brand-new Twitch accounts can be silently throttled. Set `TWITCH_BOT_DEBUG=1` in `.env` to dump every received IRC line. If you see `JOIN` lines but no `PRIVMSG`, the account is the problem — not the bot. | +| `!skip` says "MPD is not playing anything" but it is | Wrong host/port. The bot connects to `localhost:6600` by default; override with `MPD_HOST` / `MPD_PORT` env vars. | +| `set-channel.py` returns 401 / 403 | The broadcaster token must (a) carry the `channel:manage:broadcast` scope **and** (b) belong to the same account whose numeric id is in `TWITCH_BROADCASTER_ID`. Twitch enforces both. | +| Mattermost notifier never fires | Hit your `STATUS_URL` with `curl` first — it must serve `live: true` with a stable `started_at` for the bot to detect a transition. The bot logs `[live]` lines to stdout/journal on each poll if you need to confirm it's actually polling. | +| `auth_call` keeps refreshing every request | Your `TWITCH_ACCESS_TOKEN` is wrong or revoked but the refresh token still mints a working one. Regenerate the access token at twitchtokengenerator and update `.env.broadcaster`. | + +## Notes on the code + +- All HTTP is `urllib.request`. No `requests`. No async HTTP. The bot's IRC loop is async (`asyncio` + raw TLS sockets), the helpers are not — keeps the helpers usable as plain CLI scripts. +- `_twitch.py` looks for `.env.broadcaster` first, falls back to `.env.ophi118` (legacy filename from the rig this was extracted from). Either works. +- `bot.py:load_env` is a separate 12-line `.env` parser instead of pulling in `python-dotenv`. Three vars don't justify the dependency. +- `bot.py` does **not** currently use the bot account's refresh token. If access tokens start expiring mid-stream, refresh would land in the IRC reconnect handler. + +## License + +No license file ships in this repo yet — add one before redistributing. MIT or 0BSD if you want maximally permissive; AGPL if you want forks to stay open. diff --git a/_twitch.py b/_twitch.py new file mode 100644 index 0000000..8b604d8 --- /dev/null +++ b/_twitch.py @@ -0,0 +1,121 @@ +""" +Shared helpers for set-channel.py and search-game.py. + +Stdlib-only on purpose — the bot dir intentionally avoids a venv. If a third +caller appears that needs richer behavior (retries with backoff, async, etc.), +revisit; until then minimal urllib is fine. + +Auth model: every Helix call goes through `auth_call`, which runs the request, +refreshes the access token via twitchtokengenerator on 401, persists the new +tokens to .env.ophi118, mutates the in-memory env dict, and retries once. +Callers stay agnostic about whether a refresh happened. +""" + +import json +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path + +HELIX = "https://api.twitch.tv/helix" +REFRESH_URL = "https://twitchtokengenerator.com/api/refresh/{refresh}" + +# Default broadcaster env file. Looks for `.env.broadcaster` (the documented +# OSS-style filename) first, then falls back to the legacy `.env.ophi118` +# kept around for the original ophi118 rig that birthed this code. Whichever +# exists wins; if neither exists yet, ENV_FILE points at `.env.broadcaster` +# so error messages tell the user the right file to create. +_DIR = Path(__file__).resolve().parent +ENV_FILE = _DIR / ".env.broadcaster" +if not ENV_FILE.exists() and (_DIR / ".env.ophi118").exists(): + ENV_FILE = _DIR / ".env.ophi118" + + +# ─── env file I/O ─────────────────────────────────────────────────────────── + +def load_env(path: Path = ENV_FILE) -> dict: + out = {} + if not path.exists(): + return out + for raw in path.read_text().splitlines(): + line = raw.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, _, v = line.partition("=") + out[k.strip()] = v.strip().strip('"').strip("'") + return out + + +def update_env(updates: dict, path: Path = ENV_FILE) -> None: + """Atomic in-place rewrite of KEY=VALUE lines, comments preserved.""" + lines = path.read_text().splitlines() if path.exists() else [] + seen, out_lines = set(), [] + for line in lines: + s = line.strip() + if s and not s.startswith("#") and "=" in s: + k = s.split("=", 1)[0].strip() + if k in updates: + out_lines.append(f"{k}={updates[k]}") + seen.add(k) + continue + out_lines.append(line) + for k, v in updates.items(): + if k not in seen: + out_lines.append(f"{k}={v}") + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text("\n".join(out_lines) + "\n") + tmp.replace(path) + + +# ─── HTTP ─────────────────────────────────────────────────────────────────── + +def http(method: str, url: str, *, headers=None, body=None): + """Returns (status, parsed_json_or_text). Never raises on HTTP errors — + surfaces them as the status code so callers can branch on 401 cleanly.""" + data = body.encode("utf-8") if isinstance(body, str) else body + req = urllib.request.Request(url, method=method, headers=headers or {}, data=data) + try: + with urllib.request.urlopen(req, timeout=10) as r: + raw = r.read().decode("utf-8") + try: return r.status, (json.loads(raw) if raw else None) + except json.JSONDecodeError: return r.status, raw + except urllib.error.HTTPError as e: + raw = e.read().decode("utf-8", errors="replace") + try: return e.code, (json.loads(raw) if raw else None) + except json.JSONDecodeError: return e.code, raw + + +# ─── auth ─────────────────────────────────────────────────────────────────── + +def refresh_token(env: dict) -> dict: + """Mint new tokens via twitchtokengenerator. Returns the merge dict for + `update_env` — caller persists. Raises on refresh failure (no point + retrying — the refresh token is dead, the user must re-generate).""" + refresh = env["TWITCH_REFRESH_TOKEN"] + url = REFRESH_URL.format(refresh=urllib.parse.quote(refresh, safe="")) + status, body = http("GET", url) + if status != 200 or not isinstance(body, dict) or not body.get("success"): + raise RuntimeError(f"token refresh failed (status={status}): {body}") + return { + "TWITCH_ACCESS_TOKEN": body["token"], + "TWITCH_REFRESH_TOKEN": body["refresh"], + } + + +def auth_call(method: str, url: str, env: dict, *, body=None): + """Authenticated Helix request with 401 → refresh → retry. Mutates `env` + in place on refresh so subsequent calls in the same process see the new + token without another reload.""" + def headers_for(tok): + h = {"Authorization": f"Bearer {tok}", "Client-Id": env["TWITCH_CLIENT_ID"]} + if body is not None: + h["Content-Type"] = "application/json" + return h + + status, resp = http(method, url, headers=headers_for(env["TWITCH_ACCESS_TOKEN"]), body=body) + if status == 401: + new = refresh_token(env) + update_env(new) + env.update(new) + status, resp = http(method, url, headers=headers_for(env["TWITCH_ACCESS_TOKEN"]), body=body) + return status, resp diff --git a/bot.py b/bot.py new file mode 100644 index 0000000..317d167 --- /dev/null +++ b/bot.py @@ -0,0 +1,462 @@ +#!/usr/bin/env python3 +""" +Twitch chat → MPD control bot, plus optional "going live" notifications. + +Connects to Twitch IRC over TLS, joins the configured channel, and responds to: + !skip advance MPD to the next track (per-user cooldown — see SKIP_COOLDOWN) + !queue print the next few queued tracks + !info print full metadata (artist · title · album · year · genre) for now-playing +The new track shows up on stream because bridges/mpd-state.py is already +broadcasting MPD state to the loading / game / music-box overlays. + +If MM_HOOK is set, a second task polls the public ophi118.com status service +(which already polls Twitch Helix for live/title/game) and POSTs a single +Mattermost notification per offline→live transition. See live_watch(). + +Reads .env from the same directory: + TWITCH_ACCESS_TOKEN chat OAuth token (with or without `oauth:` prefix) + TWITCH_NICK bot login name (lowercase) + TWITCH_CHANNEL channel to join, no leading `#` — defaults to TWITCH_NICK + MM_HOOK optional Mattermost incoming-webhook URL; enables + going-live notifications when set + STATUS_URL optional override for the status API endpoint + (default: https://ophi118.com/api/status.json) + +TWITCH_REFRESH_TOKEN and TWITCH_CLIENT_ID are also stored in .env (paired with +the access token at generation time) but the bot doesn't use them yet — token +refresh would land here later if access tokens start expiring mid-stream. +""" + +import asyncio +import json +import os +import ssl +import sys +import time +import urllib.error +import urllib.request +from pathlib import Path + +from mpd.asyncio import MPDClient + +HERE = Path(__file__).resolve().parent +ENV = HERE / ".env" + +MPD_HOST = os.environ.get("MPD_HOST", "localhost") +MPD_PORT = int(os.environ.get("MPD_PORT", "6600")) + +IRC_HOST = "irc.chat.twitch.tv" +IRC_PORT = 6697 +PING_TIMEOUT = 360.0 # twitch PINGs ~every 5min; reconnect if silent longer +SKIP_COOLDOWN = 60.0 # per-user seconds between successful !skip invocations +QUEUE_PEEK = 3 # how many upcoming tracks !queue should print + +# Going-live watcher: poll cadence matches the server's Twitch poll (60s) so we +# never lag the upstream signal by more than ~2 polls. +STATUS_URL = os.environ.get("STATUS_URL", "https://ophi118.com/api/status.json") +STATUS_POLL_S = 60.0 + +# Set TWITCH_BOT_DEBUG=1 (env or .env) to dump every received IRC line. Useful +# when a brand-new bot account looks "connected" but doesn't see chat — Twitch +# silently degrades unverified accounts in ways that don't surface as errors. +DEBUG_RAW = os.environ.get("TWITCH_BOT_DEBUG", "").strip() not in ("", "0", "false") + + +# ─── tiny .env loader ─────────────────────────────────────────────────────── + +def load_env(path: Path) -> dict: + """KEY=VALUE lines, # comments, optional surrounding quotes. Avoids a + python-dotenv dependency for three vars.""" + out = {} + if not path.exists(): + return out + for raw in path.read_text().splitlines(): + line = raw.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, _, v = line.partition("=") + out[k.strip()] = v.strip().strip('"').strip("'") + return out + + +# ─── MPD ──────────────────────────────────────────────────────────────────── + +class Mpd: + """One shared MPDClient with reconnect-on-failure. + + python-mpd2's asyncio client doesn't auto-reconnect, so each command is + wrapped to retry once after dropping the connection — covers MPD + restarts and idle-socket timeouts without surfacing transient errors. + """ + + def __init__(self): + self._cli = None + self._lock = asyncio.Lock() + + async def _ensure(self): + if self._cli is None: + cli = MPDClient() + await cli.connect(MPD_HOST, MPD_PORT) + self._cli = cli + + async def _retry(self, op): + """Run an MPD coroutine with one reconnect-on-failure retry.""" + async with self._lock: + last = None + for attempt in (1, 2): + try: + await self._ensure() + return await op(self._cli) + except Exception as e: + last = e + self._cli = None # force reconnect on retry + raise last # type: ignore[misc] + + async def skip(self) -> str: + async def op(cli): + await cli.next() + return _fmt_song(await cli.currentsong()) + return await self._retry(op) + + async def currentsong(self) -> dict: + return await self._retry(lambda cli: cli.currentsong()) + + async def status(self) -> dict: + return await self._retry(lambda cli: cli.status()) + + async def playlistinfo(self, pos: int) -> list: + return await self._retry(lambda cli: cli.playlistinfo(pos)) + + +def _flat(v): + """MPD tags can be lists (multi-value); flatten to a comma-joined string + with empties removed. Returns None for missing/empty so callers can skip + fields without empty parens.""" + if v is None: + return None + if isinstance(v, list): + v = ", ".join(x for x in v if x) + v = (v or "").strip() + return v or None + + +def _fmt_song(song: dict) -> str: + artist = _flat(song.get("artist") or song.get("albumartist")) or "" + title = _flat(song.get("title")) or (song.get("file") or "").rsplit("/", 1)[-1] + out = f"{artist} — {title}".strip(" —") + return out or "(unknown track)" + + +def _fmt_info(song: dict) -> str: + """Compose the !info reply: 'Artist — Title · Album (Year) · Genre'. + Skip any segment whose underlying tag is missing.""" + head = _fmt_song(song) + parts = [head] if head != "(unknown track)" else [] + album = _flat(song.get("album")) + year = _flat(song.get("date")) + if year: + # MPD often stores ISO dates ('2018-04-12') — show year only. + year = year.split("-", 1)[0] + if album and year: + parts.append(f"{album} ({year})") + elif album: + parts.append(album) + elif year: + parts.append(f"({year})") + genre = _flat(song.get("genre")) + if genre: + parts.append(genre) + return " · ".join(parts) if parts else "(unknown track)" + + +# ─── Twitch IRC ───────────────────────────────────────────────────────────── + +class TwitchBot: + def __init__(self, nick: str, token: str, channel: str, mpd: Mpd): + self.nick = nick.lower() + self.token = token if token.startswith("oauth:") else f"oauth:{token}" + self.channel = "#" + channel.lower().lstrip("#") + self.mpd = mpd + # Per-user !skip cooldown — keyed by lowercased nick. The dict grows + # unboundedly with viewer count over a stream session; not worth + # pruning at typical chat sizes (one float per chatter). + self._skip_seen: dict[str, float] = {} + + async def _send(self, writer: asyncio.StreamWriter, raw: str): + writer.write((raw + "\r\n").encode("utf-8")) + await writer.drain() + + async def _say(self, writer: asyncio.StreamWriter, msg: str): + # twitch chat lines max ~500 chars; truncate defensively, strip newlines + msg = msg.replace("\r", " ").replace("\n", " ")[:480] + await self._send(writer, f"PRIVMSG {self.channel} :{msg}") + + async def run(self): + """Connect/auth/listen loop with quiet exponential backoff. + + Mirrors the connect-quietly pattern in bridges/mpd-state.py: + first failure prints once, then silent retry until the connection + comes back. Auth failure is fatal — no point retrying with a bad token. + """ + backoff = 2 + BACKOFF_MAX = 30 + prev_status = None # None | "up" | "down" + ctx = ssl.create_default_context() + + while True: + try: + reader, writer = await asyncio.open_connection( + IRC_HOST, IRC_PORT, ssl=ctx, + ) + try: + await self._send(writer, f"PASS {self.token}") + await self._send(writer, f"NICK {self.nick}") + await self._send(writer, f"JOIN {self.channel}") + + if prev_status != "up": + print(f"[twitch] connected → {self.channel} as {self.nick}", + flush=True) + prev_status = "up" + backoff = 2 + + while True: + line_bytes = await asyncio.wait_for( + reader.readline(), timeout=PING_TIMEOUT, + ) + if not line_bytes: + raise ConnectionError("server closed connection") + line = line_bytes.decode("utf-8", errors="replace").rstrip("\r\n") + await self._handle(writer, line) + finally: + writer.close() + try: + await writer.wait_closed() + except Exception: + pass + except RuntimeError as e: + # auth-failed and similar fatals — let systemd flag it + print(f"[twitch] fatal: {e}", file=sys.stderr, flush=True) + raise + except (asyncio.TimeoutError, ConnectionError, OSError) as e: + if prev_status == "up": + print(f"[twitch] disconnected ({type(e).__name__}: {e}) — retrying", + flush=True) + elif prev_status is None: + print(f"[twitch] unreachable ({type(e).__name__}) — will retry silently", + flush=True) + prev_status = "down" + await asyncio.sleep(backoff) + backoff = min(backoff * 2, BACKOFF_MAX) + + async def _handle(self, writer: asyncio.StreamWriter, line: str): + if DEBUG_RAW: + print(f"[rx] {line}", flush=True) + + # PING/PONG keepalive — twitch sends `PING :tmi.twitch.tv` periodically + if line.startswith("PING "): + await self._send(writer, "PONG " + line[5:]) + return + + # NOTICE on bad creds: ":tmi.twitch.tv NOTICE * :Login authentication failed" + if " NOTICE " in line and "authentication failed" in line.lower(): + raise RuntimeError("twitch auth failed — check TWITCH_KEY / TWITCH_NICK") + + # PRIVMSG format: ":user!user@user.tmi.twitch.tv PRIVMSG #chan :body" + if " PRIVMSG " not in line: + return + try: + prefix, rest = line.split(" PRIVMSG ", 1) + user = prefix[1:].split("!", 1)[0] + _, _, body = rest.partition(":") + except ValueError: + return + + # Twitch's anti-duplicate logic appends an invisible Tags-block + # codepoint (U+E0000–U+E007F) to back-to-back identical messages + # from the same sender within ~30s. Without stripping it, every + # other `!skip` in a series silently fails to match. + body = "".join(c for c in body if not (0xE0000 <= ord(c) <= 0xE007F)) + head = body.strip().split(" ", 1)[0].lower() + if head == "!skip": + await self._cmd_skip(writer, user) + elif head == "!queue": + await self._cmd_queue(writer, user) + elif head == "!info": + await self._cmd_info(writer, user) + + async def _cmd_skip(self, writer: asyncio.StreamWriter, user: str): + # Per-user cooldown: each chatter gets one skip per SKIP_COOLDOWN + # seconds. Reject with a wait-time hint so the user knows it's their + # own gate, not the bot being broken. + now = time.monotonic() + ukey = user.lower() + last = self._skip_seen.get(ukey, 0.0) + wait = SKIP_COOLDOWN - (now - last) + if wait > 0: + await self._say( + writer, + f"@{user} you can only !skip once per minute (wait {int(wait)+1}s)", + ) + return + try: + now_playing = await self.mpd.skip() + except Exception as e: + print(f"[skip] failed for @{user}: {type(e).__name__}: {e}", flush=True) + await self._say(writer, f"@{user} couldn't skip — MPD didn't respond") + return + self._skip_seen[ukey] = now + print(f"[skip] @{user} → {now_playing}", flush=True) + await self._say(writer, f"⏭ skipped by @{user} → now: {now_playing}") + + async def _cmd_queue(self, writer: asyncio.StreamWriter, user: str): + # Read-only; no cooldown. Pulls current pos from status, then peeks + # the next QUEUE_PEEK tracks one-by-one (small N — full playlistinfo + # would dump the whole queue). + try: + status = await self.mpd.status() + pos = int(status.get("song", -1)) + total = int(status.get("playlistlength") or 0) + except Exception as e: + print(f"[queue] failed for @{user}: {type(e).__name__}: {e}", flush=True) + await self._say(writer, f"@{user} couldn't read queue — MPD didn't respond") + return + if pos < 0 or pos + 1 >= total: + await self._say(writer, "🎵 nothing else queued") + return + upcoming = [] + for i, p in enumerate(range(pos + 1, min(pos + 1 + QUEUE_PEEK, total)), start=1): + try: + rows = await self.mpd.playlistinfo(p) + if rows: + upcoming.append(f"{i}) {_fmt_song(rows[0])}") + except Exception: + continue + if not upcoming: + await self._say(writer, "🎵 nothing else queued") + return + await self._say(writer, "🎵 next: " + " · ".join(upcoming)) + + async def _cmd_info(self, writer: asyncio.StreamWriter, user: str): + try: + song = await self.mpd.currentsong() + except Exception as e: + print(f"[info] failed for @{user}: {type(e).__name__}: {e}", flush=True) + await self._say(writer, f"@{user} couldn't read track info — MPD didn't respond") + return + if not song: + await self._say(writer, "ℹ nothing playing") + return + await self._say(writer, "ℹ " + _fmt_info(song)) + + +# ─── Mattermost "going live" notifications ────────────────────────────────── + +def _fetch_status(url: str) -> dict: + """GET the status JSON. Raises on transport/parse errors so live_watch() + can log + back off uniformly.""" + req = urllib.request.Request(url, method="GET", + headers={"User-Agent": "obs-twitch-bot/1.0"}) + with urllib.request.urlopen(req, timeout=10) as r: + return json.loads(r.read().decode("utf-8")) + + +def _post_mattermost(hook_url: str, channel: str, title: str, game: str) -> None: + """POST a single going-live notification. Markdown renders in Mattermost.""" + parts = [f"🔴 **{channel}** is live on Twitch"] + if title: + parts.append(f"> {title}") + if game: + parts.append(f"_{game}_") + parts.append(f"https://twitch.tv/{channel}") + body = json.dumps({"text": "\n".join(parts)}).encode("utf-8") + req = urllib.request.Request( + hook_url, method="POST", data=body, + headers={"Content-Type": "application/json", + "User-Agent": "obs-twitch-bot/1.0"}, + ) + with urllib.request.urlopen(req, timeout=10) as r: + if r.status >= 300: + raise RuntimeError(f"mattermost hook returned HTTP {r.status}") + + +async def live_watch(mm_hook: str, channel: str, status_url: str = STATUS_URL): + """Poll the status service; POST to Mattermost on offline→live transitions. + + Transition detection is keyed on `started_at`, not just the `live` flag, so + a brief Helix hiccup (live=true → false → true within one stream session, + same `started_at`) doesn't double-fire. + + Cold-start during an ongoing stream is intentionally silent: we record the + current `started_at` so we won't fire for it later, but we don't announce + it (we can't tell "they just went live" from "the bot restarted mid-stream" + without prior offline observation). Once we see live=False at least once, + the next started_at change becomes a real transition we'll announce. + """ + last_announced = None # started_at we've already handled (announced or recorded) + seen_offline = False # have we observed live=False since last_announced? + + while True: + try: + data = await asyncio.to_thread(_fetch_status, status_url) + twitch = (data or {}).get("twitch") or {} + live = bool(twitch.get("live")) + started = (twitch.get("started_at") or None) if live else None + except Exception as e: + print(f"[live] poll failed: {type(e).__name__}: {e}", flush=True) + await asyncio.sleep(STATUS_POLL_S) + continue + + if not live: + seen_offline = True + elif started: + if last_announced is None and not seen_offline: + # cold start mid-stream: lock in this started_at so we won't + # falsely announce it later + last_announced = started + elif started != last_announced and seen_offline: + title = twitch.get("title") or "" + game = twitch.get("game") or "" + try: + await asyncio.to_thread(_post_mattermost, mm_hook, channel, title, game) + print(f"[live] {channel} went live → posted to mattermost " + f"(title={title!r}, game={game!r})", flush=True) + except Exception as e: + # log but don't retry — a missed notification is better + # than a duplicate; the next genuine transition will fire + print(f"[live] mattermost post failed: {type(e).__name__}: {e}", + flush=True) + last_announced = started + seen_offline = False + + await asyncio.sleep(STATUS_POLL_S) + + +# ─── entrypoint ───────────────────────────────────────────────────────────── + +async def main(): + env = {**load_env(ENV), **os.environ} # process env wins over .env + token = env.get("TWITCH_ACCESS_TOKEN", "").strip() + nick = env.get("TWITCH_NICK", "").strip() + channel = env.get("TWITCH_CHANNEL", nick).strip() + mm_hook = env.get("MM_HOOK", "").strip() + + missing = [k for k, v in (("TWITCH_ACCESS_TOKEN", token), ("TWITCH_NICK", nick)) if not v] + if missing: + print(f"[err] missing in {ENV}: {', '.join(missing)}", file=sys.stderr) + sys.exit(1) + + bot = TwitchBot(nick=nick, token=token, channel=channel, mpd=Mpd()) + tasks = [bot.run()] + if mm_hook: + print(f"[live] watching {STATUS_URL} → mattermost on offline→live", flush=True) + tasks.append(live_watch(mm_hook, channel)) + else: + print("[live] MM_HOOK unset — going-live notifications disabled", flush=True) + await asyncio.gather(*tasks) + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + sys.exit(0) diff --git a/examples/stream.sh b/examples/stream.sh new file mode 100755 index 0000000..97c19d7 --- /dev/null +++ b/examples/stream.sh @@ -0,0 +1,150 @@ +#!/usr/bin/env bash +# stream.sh — interactive pre-stream helper. +# +# ./stream.sh +# +# Prompts for game + title + countdown, resolves the game to a Twitch +# category id via ../search-game.py, updates the channel via ../set-channel.py, +# and writes stream.json next to this script for any overlay / starting-soon +# timer / status page that wants to read the manifest. +# +# Re-running uses your previous answers as defaults — press Enter to keep them. +# +# Output path can be overridden with STREAM_JSON=/path/to/file.json ./stream.sh +set -euo pipefail + +DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +BOT_DIR="$(cd "$DIR/.." && pwd)" +JSON="${STREAM_JSON:-$DIR/stream.json}" + +SEARCH_GAME="$BOT_DIR/search-game.py" +SET_CHANNEL="$BOT_DIR/set-channel.py" + +# Reads a top-level field from the previous stream.json. Always exits 0; +# prints empty string if file/field is missing. +prev() { + [[ -f "$JSON" ]] || return 0 + python3 - "$JSON" "$1" <<'PY' 2>/dev/null || true +import json, sys +try: + d = json.load(open(sys.argv[1])) + v = d.get(sys.argv[2]) + print('' if v is None else v) +except Exception: + pass +PY +} + +ask() { + # ask "question" "default" → echoes user input or default if blank + local q="$1" def="${2:-}" + local prompt=" $q" + [[ -n "$def" ]] && prompt+=" [$def]" + prompt+=": " + local v + read -rp "$prompt" v + [[ -z "$v" && -n "$def" ]] && v="$def" + printf '%s' "$v" +} + +ask_int() { + local v + while :; do + v=$(ask "$1" "${2:-}") + [[ "$v" =~ ^[0-9]+$ ]] && { printf '%s' "$v"; return; } + echo " ✗ must be a whole number" >&2 + done +} + +sanitize() { + printf '%s' "${1:-}" \ + | tr -d '"\\' \ + | tr '\n\r\t' ' ' \ + | sed 's/ */ /g; s/^ //; s/ $//' +} +emit_str() { [[ -z "${1:-}" ]] && printf 'null' || printf '"%s"' "$1"; } + +# ── Pull previous answers as defaults ─────────────── +prev_game=$(prev game) +prev_game_id=$(prev gameId) +prev_title=$(prev title) +prev_count=$(prev countdownMin); [[ -z "$prev_count" ]] && prev_count="5" + +echo "── stream manifest ──" +echo " (press Enter to keep [defaults])" +echo + +# Resolve Game via Twitch search → exact directory entry. The picker prints +# "\t" to stdout on success, or non-zero on no-match / cancel. +# Re-running and accepting the previous game unchanged reuses the cached id +# so we don't burn an API call to re-resolve a known answer. +have_twitch=0 +[[ -x "$SEARCH_GAME" ]] && have_twitch=1 + +game=""; game_id="" +while :; do + query=$(ask "Game (search)" "$prev_game") + if [[ -z "$query" ]]; then + echo " ✗ Game is required" >&2 + continue + fi + if (( have_twitch )); then + # Reuse cached id only if it looks like a real Twitch numeric id. A past + # bug (search-game.py prompt leaking into stdout) wrote "Pick: 506462" + # here; the regex makes sure such corruption falls back to a fresh search + # instead of getting passed straight to PATCH /helix/channels. + if [[ -n "$prev_game_id" && "$query" == "$prev_game" && "$prev_game_id" =~ ^[0-9]+$ ]]; then + game="$prev_game"; game_id="$prev_game_id" + echo " ↻ reusing cached: $game (id=$game_id)" + break + fi + if line=$("$SEARCH_GAME" "$query"); then + IFS=$'\t' read -r game_id game <<<"$line" + break + fi + # search-game.py already printed its own error; loop and re-ask + else + # no twitch helper available — accept the raw input, no id resolution + game="$query"; game_id="" + break + fi +done + +title=$(ask "Title (Twitch stream title)" "$prev_title") +countdown=$(ask_int "Countdown (minutes)" "$prev_count") + +game=$(sanitize "$game") +title=$(sanitize "$title") + +now_iso=$(date -u +%FT%TZ) +tmp="$JSON.tmp.$$" +cat > "$tmp" </dev/null 2>&1; then + echo " ✗ produced invalid JSON, leaving $tmp for inspection" >&2 + exit 1 +fi +mv -f "$tmp" "$JSON" + +echo +echo " ✓ wrote $JSON" + +# ── Push to Twitch (game + title) via the broadcaster token ───────────── +# Soft-fail: a Twitch hiccup must not block the local manifest from being +# written — anything reading stream.json can still load offline. +# Title falls back to the game name when the user left it blank. +if (( have_twitch )) && [[ -x "$SET_CHANNEL" && -n "$game_id" ]]; then + twitch_title="${title:-$game}" + echo + if ! "$SET_CHANNEL" --game-id "$game_id" "$twitch_title"; then + echo " ! Twitch sync failed (manifest still saved) — fix and re-run if needed" >&2 + fi +fi diff --git a/search-game.py b/search-game.py new file mode 100755 index 0000000..bfb3074 --- /dev/null +++ b/search-game.py @@ -0,0 +1,104 @@ +#!/usr/bin/env python3 +""" +Interactive Twitch category search. Used by loading.sh to resolve a free-form +query into an exact directory entry. + + ./search-game.py "Heroes" + stderr: numbered list of up to 20 matches + "Pick: " prompt + stdout: a single line "\\t" (only on success) + exit: 0 success | 1 no-match / cancel / error | 130 ctrl-c + +Auto-picks when there's exactly one result. Tempting to also auto-pick on an +exact-name match within a larger result set (e.g. typing "Doom Eternal" and +having that exact entry near the top), but a real test surfaced the failure: +typing "Heroes" returns "Heroes" (TV show) plus all the Heroes-* games, and +auto-picking the standalone match silently steals from the user — who is +*using search* precisely because the query is ambiguous. So: only one rule. + +Endpoint: /helix/search/categories — same one Twitch's dashboard autocomplete +uses. Accepts any user/app token; we reuse the broadcaster token from +.env.ophi118 because it's already there and refreshable. +""" + +import sys +import urllib.parse + +import _twitch as tw + +MAX_RESULTS = 20 + + +def err(msg=""): + print(msg, file=sys.stderr, flush=True) + + +def search(env: dict, query: str) -> list: + qs = urllib.parse.urlencode({"query": query, "first": MAX_RESULTS}) + status, body = tw.auth_call("GET", f"{tw.HELIX}/search/categories?{qs}", env) + if status != 200: + raise RuntimeError(f"search failed (status={status}): {body}") + return (body or {}).get("data") or [] + + +def pick(results: list, query: str) -> dict: + if not results: + raise RuntimeError(f"no Twitch categories match {query!r}") + + if len(results) == 1: + err(f" ✓ only match: {results[0]['name']}") + return results[0] + + err(f"── {len(results)} matches for {query!r} ──") + for i, g in enumerate(results, 1): + err(f" {i:2}) {g['name']}") + err(" 0) cancel") + + while True: + # input()'s prompt arg writes to stdout — fatal here because bash + # captures stdout for the resolved \t line. Print to stderr + # explicitly, then call input() with no prompt. + print("Pick: ", end="", file=sys.stderr, flush=True) + try: + raw = input().strip() + except EOFError: + raise RuntimeError("cancelled (EOF)") + if not raw.isdigit(): + err(" ! enter a number") + continue + n = int(raw) + if n == 0: + raise RuntimeError("cancelled") + if 1 <= n <= len(results): + return results[n - 1] + err(f" ! must be 0..{len(results)}") + + +def main(): + if len(sys.argv) != 2 or not sys.argv[1].strip(): + print("usage: search-game.py ", file=sys.stderr) + sys.exit(2) + query = sys.argv[1].strip() + + env = tw.load_env() + needed = ("TWITCH_ACCESS_TOKEN", "TWITCH_REFRESH_TOKEN", "TWITCH_CLIENT_ID") + missing = [k for k in needed if not env.get(k)] + if missing: + err(f"[err] missing in {tw.ENV_FILE.name}: {', '.join(missing)}") + sys.exit(1) + + try: + results = search(env, query) + chosen = pick(results, query) + except Exception as e: + err(f"[err] {type(e).__name__}: {e}") + sys.exit(1) + + # ONLY the resolved tuple goes to stdout — bash captures this. + print(f"{chosen['id']}\t{chosen['name']}") + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + sys.exit(130) diff --git a/set-channel.py b/set-channel.py new file mode 100755 index 0000000..6a892d9 --- /dev/null +++ b/set-channel.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +""" +Update Twitch channel info (game + title) for the broadcaster account. + + ./set-channel.py --game-id 517520 "Slayer is back — Nightmare run" + ./set-channel.py "Doom Eternal" "..." # exact-name lookup, fails if no exact hit + ./set-channel.py --dry-run --game-id 517520 "..." + +For free-form / fuzzy game queries, use search-game.py first to resolve the +id (loading.sh does this automatically). The /helix/games?name= fallback in +this script is exact-and-case-sensitive. + +Loads `.env.ophi118` — see _twitch.py for the env contract. The token MUST +belong to the broadcaster (Twitch enforces broadcaster_id == token user_id +on PATCH /helix/channels), so this script intentionally targets that file +and not `.env` (which holds the chat-bot's account). +""" + +import json +import sys +import urllib.parse + +import _twitch as tw + +TITLE_LIMIT = 140 + + +def lookup_game_id(env: dict, name: str) -> str: + qs = urllib.parse.urlencode({"name": name}) + status, body = tw.auth_call("GET", f"{tw.HELIX}/games?{qs}", env) + if status != 200: + raise RuntimeError(f"games lookup failed (status={status}): {body}") + data = (body or {}).get("data") or [] + if not data: + raise RuntimeError( + f"no Twitch game matches {name!r} exactly. /helix/games?name= is " + f"case-sensitive — use search-game.py for fuzzy lookup." + ) + return data[0]["id"] + + +def patch_channel(env: dict, game_id: str, title: str) -> None: + qs = urllib.parse.urlencode({"broadcaster_id": env["TWITCH_BROADCASTER_ID"]}) + payload = json.dumps({"game_id": game_id, "title": title}) + status, body = tw.auth_call("PATCH", f"{tw.HELIX}/channels?{qs}", env, body=payload) + if status not in (200, 204): + raise RuntimeError(f"channel update failed (status={status}): {body}") + + +def usage_and_die(): + print("usage: set-channel.py [--dry-run] (--game-id | ) ", + file=sys.stderr) + sys.exit(2) + + +def main(): + args = sys.argv[1:] + dry = False + if args and args[0] == "--dry-run": + dry, args = True, args[1:] + + game_id = None + if args and args[0] == "--game-id": + if len(args) < 3: + usage_and_die() + game_id, args = args[1], args[2:] + + expected = 1 if game_id else 2 + if len(args) != expected: + usage_and_die() + + if game_id: + title = args[0][:TITLE_LIMIT] + else: + game_name = args[0] + title = args[1][:TITLE_LIMIT] + + env = tw.load_env() + needed = ("TWITCH_ACCESS_TOKEN", "TWITCH_REFRESH_TOKEN", + "TWITCH_CLIENT_ID", "TWITCH_BROADCASTER_ID") + missing = [k for k in needed if not env.get(k)] + if missing: + print(f"[err] missing in {tw.ENV_FILE.name}: {', '.join(missing)}", file=sys.stderr) + sys.exit(1) + + if game_id is None: + game_id = lookup_game_id(env, game_name) + + if dry: + print(f" ✓ dry-run: would set game_id={game_id}, title={title!r} (no PATCH issued)") + return + + patch_channel(env, game_id, title) + print(f" ✓ Twitch channel updated → game_id={game_id}, title={title!r}") + + +if __name__ == "__main__": + try: + main() + except Exception as e: + print(f"[err] {type(e).__name__}: {e}", file=sys.stderr) + sys.exit(1)