Initial version

This commit is contained in:
Jakub Zych
2026-04-27 23:21:39 +02:00
commit 1af250ae1a
9 changed files with 1241 additions and 0 deletions

27
.env.broadcaster.example Normal file
View File

@@ -0,0 +1,27 @@
# Copy to `.env.broadcaster` and fill in. `.env.broadcaster` is gitignored;
# this template is not.
#
# Used by:
# set-channel.py — PATCH /helix/channels (title + game)
# search-game.py — GET /helix/search/categories
# examples/stream.sh — wraps both into one interactive pre-stream prompt
#
# This file holds BROADCASTER credentials (the channel-owner account). The
# chat bot's separate .env holds the BOT account credentials — different
# scopes, different tokens. Keep them separate; do not merge.
# ─── Twitch broadcaster auth ────────────────────────────────────────────────
# Generate at https://twitchtokengenerator.com — sign in as the broadcaster
# (the channel owner) and select the scope `channel:manage:broadcast`.
# Twitch enforces broadcaster_id == token user_id on PATCH /helix/channels,
# so the token MUST belong to the broadcaster account.
TWITCH_ACCESS_TOKEN=
TWITCH_REFRESH_TOKEN=
TWITCH_CLIENT_ID=
# Numeric Twitch user id for the broadcaster. Find it with any "twitch user
# id lookup" tool, or run:
# curl -H "Authorization: Bearer <token>" -H "Client-Id: <client-id>" \
# "https://api.twitch.tv/helix/users?login=<your_login>"
TWITCH_BROADCASTER_ID=

38
.env.example Normal file
View File

@@ -0,0 +1,38 @@
# Copy to `.env` and fill in. `.env` is gitignored; this template is not.
# ─── Twitch chat auth ───────────────────────────────────────────────────────
# Generated together at https://twitchtokengenerator.com
# (scopes: chat:read + chat:edit). Access token is what the bot uses today;
# refresh + client id are stored so we can rotate the access token later
# without re-running the generator.
# Chat OAuth token. Paste the raw token — the bot adds the `oauth:` prefix
# automatically if you omit it.
TWITCH_ACCESS_TOKEN=
# Refresh token. Currently unused by bot.py; kept here so the value survives
# alongside the access token it's paired with.
TWITCH_REFRESH_TOKEN=
# App client ID shown by twitchtokengenerator. Needed together with the
# refresh token to mint a new access token via Twitch's OAuth endpoint.
# Currently unused by bot.py.
TWITCH_CLIENT_ID=
# ─── Identity / where to connect ────────────────────────────────────────────
# Bot login name (lowercase). Posting as your own streamer account is fine
# for a personal bot — set this to your channel name in that case.
TWITCH_NICK=ophi118
# Channel to join (no leading #). Defaults to TWITCH_NICK if omitted.
TWITCH_CHANNEL=ophi118
# ─── Mattermost: going-live notifications (optional) ────────────────────────
# When set, the bot polls https://ophi118.com/api/status.json (the public
# status service in station/server/) and POSTs a single message to this
# Mattermost incoming-webhook URL on every offline→live transition. Unset
# (or blank) disables notifications entirely; the bot still runs.
#
# Override the polled endpoint with STATUS_URL if running against a private
# status service.
MM_HOOK=

12
.gitignore vendored Normal file
View File

@@ -0,0 +1,12 @@
# This is now a standalone repo, so the gitignore has to do its own work
# (no parent .gitignore to fall back to). The `.env*` glob catches every
# secret-bearing variant; the `!*.example` re-include keeps templates tracked.
.env*
!.env.example
!.env.broadcaster.example
# Output of examples/stream.sh — generated per session, no value in tracking.
examples/stream.json
# Python bytecode cache
__pycache__/

225
README.md Normal file
View File

@@ -0,0 +1,225 @@
# twitch-bot
Lightweight Twitch chat bot that controls a local [MPD](https://www.musicpd.org/) music daemon, plus optional pre-stream automation and going-live notifications.
Built for a personal Twitch rig (`ophi118`) and extracted here as a stand-alone bot. Stdlib + `python-mpd2`, no other dependencies.
## What it does
- **Chat-driven music control:** viewers run `!skip`, `!queue`, `!info` and the bot drives MPD on your machine. Per-user 60s cooldown on `!skip` so a single viewer can't spam.
- **Pre-stream helper:** `examples/stream.sh` walks you through one prompt, resolves the game name to a Twitch category id, updates your channel title + game in one PATCH, and writes a `stream.json` manifest any overlay/timer can read.
- **Going-live notifier (optional):** when `MM_HOOK` is set the bot polls a public status JSON every 60s and POSTs to the webhook on each offline → live transition. Works against any Mattermost/Slack-shape incoming webhook.
## Requirements
- Python 3.10+
- [MPD](https://www.musicpd.org/) reachable on `localhost:6600` (override with `MPD_HOST` / `MPD_PORT`). Only needed for the chat commands — the pre-stream helper and notifier work standalone.
- One Twitch token for the **bot account** (`chat:read` + `chat:edit` scopes)
- One Twitch token for the **broadcaster account** if you want `examples/stream.sh` (`channel:manage:broadcast` scope)
- Easiest way to mint tokens: <https://twitchtokengenerator.com>
```bash
pip install python-mpd2
```
## Layout
```
twitch-bot/
├── bot.py # the chat bot (long-running)
├── _twitch.py # shared Helix auth helpers (urllib only)
├── search-game.py # CLI: free-form game query → "<id>\t<name>"
├── set-channel.py # CLI: PATCH /helix/channels (title + game)
├── .env.example # template for bot.py
├── .env.broadcaster.example # template for set-channel.py / stream.sh
└── examples/
└── stream.sh # interactive pre-stream helper
```
The bot reads `.env`. The pre-stream tools read `.env.broadcaster`. Two files because they hold credentials for two **different** Twitch accounts with **different** scopes — do not merge them.
## Quick start
### 1. Configure the bot
```bash
cp .env.example .env
$EDITOR .env
```
Minimum required:
```env
TWITCH_ACCESS_TOKEN=<oauth token from twitchtokengenerator>
TWITCH_NICK=mybot # bot account login (lowercase)
TWITCH_CHANNEL=mychannel # channel to join, no leading '#'
```
### 2. Run
```bash
python3 bot.py
```
Open chat in your channel and try `!skip`, `!queue`, `!info`. The bot replies in chat and the music skip lands on stream because MPD is the audio source — the bot just sends `next` to it.
### 3. (Optional) pre-stream helper
```bash
cp .env.broadcaster.example .env.broadcaster
$EDITOR .env.broadcaster
./examples/stream.sh
```
See [Pre-stream helper](#pre-stream-helper) below.
## Chat commands
| Command | Effect |
| -------- | ------ |
| `!skip` | Advance MPD to the next track. Per-user 60s cooldown (`SKIP_COOLDOWN` in `bot.py`). |
| `!queue` | Print the next 3 queued tracks (`QUEUE_PEEK`). |
| `!info` | Print full metadata for now-playing — `artist · title · album · year · genre`. |
There is intentionally **no moderator gate**. Anyone in chat can `!skip`. The 60s per-user cooldown is the only abuse mitigation. If you want stricter controls, fork — the dispatch lives in `bot.py:TwitchBot._handle`.
## Pre-stream helper
`examples/stream.sh` is an interactive prompt that:
1. Asks you for a game name and resolves it to a Twitch category id via `search-game.py` (you pick from up to 20 matches; auto-picks when there's only one).
2. Asks you for a stream title and a countdown duration.
3. Writes `stream.json` next to the script with the resolved values.
4. PATCHes your Twitch channel — sets title and game in one call.
Re-running uses your previous answers as defaults.
```text
$ ./examples/stream.sh
── stream manifest ──
(press Enter to keep [defaults])
Game (search): doom
── 5 matches for 'doom' ──
1) DOOM
2) DOOM Eternal
3) DOOM (1993)
...
0) cancel
Pick: 2
Title (Twitch stream title): Slayer is back — Nightmare run
Countdown (minutes) [5]:
✓ wrote /home/you/twitch-bot/examples/stream.json
✓ Twitch channel updated → game_id=517520, title='Slayer is back — Nightmare run'
```
Resulting `stream.json`:
```json
{
"compiledAt": "2026-04-27T22:55:11Z",
"game": "DOOM Eternal",
"gameId": "517520",
"title": "Slayer is back — Nightmare run",
"countdownMin": 5
}
```
Read it from any starting-soon overlay, OBS browser source, OBS text source, or static HTML page that wants to show what's coming up. Override the output path with `STREAM_JSON=/some/where.json ./examples/stream.sh`.
If `set-channel.py` or `.env.broadcaster` aren't present, the script soft-fails the Twitch sync step — the local `stream.json` is still written.
### Token refresh
`set-channel.py` and `search-game.py` both go through `_twitch.py`, which auto-refreshes the access token via twitchtokengenerator on a 401, persists the new token back to `.env.broadcaster`, and retries. You don't need to babysit token expiry.
### Direct invocation
The Python helpers are fine to call directly without the shell wrapper:
```bash
./search-game.py "Heroes" # stdout: "<id>\t<canonical_name>"
./set-channel.py --game-id 517520 "..."
./set-channel.py "DOOM Eternal" "..." # exact-match lookup, no fuzzy search
./set-channel.py --dry-run --game-id 517520 "..."
```
## Going-live notifications
If `MM_HOOK` is set in `.env`, the bot starts a second async task (`live_watch`) that polls a status JSON every 60 seconds and POSTs to the webhook on every offline → live transition.
```env
MM_HOOK=https://chat.example.com/hooks/abc123
STATUS_URL=https://your-status-page.example.com/api/status.json
```
The status URL must serve a JSON document of this shape:
```json
{
"live": true,
"started_at": "2026-04-27T18:30:00Z",
"title": "...",
"game_name": "..."
}
```
Transition detection keys on `started_at`, **not** `live` alone. Twitch Helix occasionally flaps the `live` flag mid-stream; ignoring those phantom edges is the whole point of polling a stable upstream value.
Cold start mid-stream is silent on purpose: the bot records the current `started_at` on its first poll and only fires when it changes. Restarting the bot during an active stream will not double-post.
You provide the status endpoint — the bot is a consumer, not a server. Easy ways to roll one:
- A small worker that calls Helix `GET /streams?user_login=...` every 60s and writes a JSON file to your web root.
- A tiny webapp serving the same shape from any framework.
- Skip notifications entirely by leaving `MM_HOOK` unset.
## Running as a systemd user service
```ini
# ~/.config/systemd/user/twitch-bot.service
[Unit]
Description=Twitch chat → MPD control bot
After=mpd.service mpd.socket network-online.target
Wants=mpd.service network-online.target
[Service]
Type=simple
WorkingDirectory=%h/path/to/twitch-bot
ExecStart=/usr/bin/python3 %h/path/to/twitch-bot/bot.py
Restart=on-failure
RestartSec=5
[Install]
WantedBy=default.target
```
```bash
systemctl --user daemon-reload
systemctl --user enable --now twitch-bot.service
journalctl --user -u twitch-bot -f
```
`Restart=on-failure` covers Twitch-IRC drops and MPD restarts. The bot's reconnect loop also handles transient drops without exiting; the systemd restart is the ultimate safety net.
## Troubleshooting
| Symptom | What to check |
| ------- | ------------- |
| Bot connects but never sees chat | Brand-new Twitch accounts can be silently throttled. Set `TWITCH_BOT_DEBUG=1` in `.env` to dump every received IRC line. If you see `JOIN` lines but no `PRIVMSG`, the account is the problem — not the bot. |
| `!skip` says "MPD is not playing anything" but it is | Wrong host/port. The bot connects to `localhost:6600` by default; override with `MPD_HOST` / `MPD_PORT` env vars. |
| `set-channel.py` returns 401 / 403 | The broadcaster token must (a) carry the `channel:manage:broadcast` scope **and** (b) belong to the same account whose numeric id is in `TWITCH_BROADCASTER_ID`. Twitch enforces both. |
| Mattermost notifier never fires | Hit your `STATUS_URL` with `curl` first — it must serve `live: true` with a stable `started_at` for the bot to detect a transition. The bot logs `[live]` lines to stdout/journal on each poll if you need to confirm it's actually polling. |
| `auth_call` keeps refreshing every request | Your `TWITCH_ACCESS_TOKEN` is wrong or revoked but the refresh token still mints a working one. Regenerate the access token at twitchtokengenerator and update `.env.broadcaster`. |
## Notes on the code
- All HTTP is `urllib.request`. No `requests`. No async HTTP. The bot's IRC loop is async (`asyncio` + raw TLS sockets), the helpers are not — keeps the helpers usable as plain CLI scripts.
- `_twitch.py` looks for `.env.broadcaster` first, falls back to `.env.ophi118` (legacy filename from the rig this was extracted from). Either works.
- `bot.py:load_env` is a separate 12-line `.env` parser instead of pulling in `python-dotenv`. Three vars don't justify the dependency.
- `bot.py` does **not** currently use the bot account's refresh token. If access tokens start expiring mid-stream, refresh would land in the IRC reconnect handler.
## License
No license file ships in this repo yet — add one before redistributing. MIT or 0BSD if you want maximally permissive; AGPL if you want forks to stay open.

121
_twitch.py Normal file
View File

@@ -0,0 +1,121 @@
"""
Shared helpers for set-channel.py and search-game.py.
Stdlib-only on purpose — the bot dir intentionally avoids a venv. If a third
caller appears that needs richer behavior (retries with backoff, async, etc.),
revisit; until then minimal urllib is fine.
Auth model: every Helix call goes through `auth_call`, which runs the request,
refreshes the access token via twitchtokengenerator on 401, persists the new
tokens to .env.ophi118, mutates the in-memory env dict, and retries once.
Callers stay agnostic about whether a refresh happened.
"""
import json
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
HELIX = "https://api.twitch.tv/helix"
REFRESH_URL = "https://twitchtokengenerator.com/api/refresh/{refresh}"
# Default broadcaster env file. Looks for `.env.broadcaster` (the documented
# OSS-style filename) first, then falls back to the legacy `.env.ophi118`
# kept around for the original ophi118 rig that birthed this code. Whichever
# exists wins; if neither exists yet, ENV_FILE points at `.env.broadcaster`
# so error messages tell the user the right file to create.
_DIR = Path(__file__).resolve().parent
ENV_FILE = _DIR / ".env.broadcaster"
if not ENV_FILE.exists() and (_DIR / ".env.ophi118").exists():
ENV_FILE = _DIR / ".env.ophi118"
# ─── env file I/O ───────────────────────────────────────────────────────────
def load_env(path: Path = ENV_FILE) -> dict:
out = {}
if not path.exists():
return out
for raw in path.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
out[k.strip()] = v.strip().strip('"').strip("'")
return out
def update_env(updates: dict, path: Path = ENV_FILE) -> None:
"""Atomic in-place rewrite of KEY=VALUE lines, comments preserved."""
lines = path.read_text().splitlines() if path.exists() else []
seen, out_lines = set(), []
for line in lines:
s = line.strip()
if s and not s.startswith("#") and "=" in s:
k = s.split("=", 1)[0].strip()
if k in updates:
out_lines.append(f"{k}={updates[k]}")
seen.add(k)
continue
out_lines.append(line)
for k, v in updates.items():
if k not in seen:
out_lines.append(f"{k}={v}")
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text("\n".join(out_lines) + "\n")
tmp.replace(path)
# ─── HTTP ───────────────────────────────────────────────────────────────────
def http(method: str, url: str, *, headers=None, body=None):
"""Returns (status, parsed_json_or_text). Never raises on HTTP errors —
surfaces them as the status code so callers can branch on 401 cleanly."""
data = body.encode("utf-8") if isinstance(body, str) else body
req = urllib.request.Request(url, method=method, headers=headers or {}, data=data)
try:
with urllib.request.urlopen(req, timeout=10) as r:
raw = r.read().decode("utf-8")
try: return r.status, (json.loads(raw) if raw else None)
except json.JSONDecodeError: return r.status, raw
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
try: return e.code, (json.loads(raw) if raw else None)
except json.JSONDecodeError: return e.code, raw
# ─── auth ───────────────────────────────────────────────────────────────────
def refresh_token(env: dict) -> dict:
"""Mint new tokens via twitchtokengenerator. Returns the merge dict for
`update_env` — caller persists. Raises on refresh failure (no point
retrying — the refresh token is dead, the user must re-generate)."""
refresh = env["TWITCH_REFRESH_TOKEN"]
url = REFRESH_URL.format(refresh=urllib.parse.quote(refresh, safe=""))
status, body = http("GET", url)
if status != 200 or not isinstance(body, dict) or not body.get("success"):
raise RuntimeError(f"token refresh failed (status={status}): {body}")
return {
"TWITCH_ACCESS_TOKEN": body["token"],
"TWITCH_REFRESH_TOKEN": body["refresh"],
}
def auth_call(method: str, url: str, env: dict, *, body=None):
"""Authenticated Helix request with 401 → refresh → retry. Mutates `env`
in place on refresh so subsequent calls in the same process see the new
token without another reload."""
def headers_for(tok):
h = {"Authorization": f"Bearer {tok}", "Client-Id": env["TWITCH_CLIENT_ID"]}
if body is not None:
h["Content-Type"] = "application/json"
return h
status, resp = http(method, url, headers=headers_for(env["TWITCH_ACCESS_TOKEN"]), body=body)
if status == 401:
new = refresh_token(env)
update_env(new)
env.update(new)
status, resp = http(method, url, headers=headers_for(env["TWITCH_ACCESS_TOKEN"]), body=body)
return status, resp

462
bot.py Normal file
View File

@@ -0,0 +1,462 @@
#!/usr/bin/env python3
"""
Twitch chat → MPD control bot, plus optional "going live" notifications.
Connects to Twitch IRC over TLS, joins the configured channel, and responds to:
!skip advance MPD to the next track (per-user cooldown — see SKIP_COOLDOWN)
!queue print the next few queued tracks
!info print full metadata (artist · title · album · year · genre) for now-playing
The new track shows up on stream because bridges/mpd-state.py is already
broadcasting MPD state to the loading / game / music-box overlays.
If MM_HOOK is set, a second task polls the public ophi118.com status service
(which already polls Twitch Helix for live/title/game) and POSTs a single
Mattermost notification per offline→live transition. See live_watch().
Reads .env from the same directory:
TWITCH_ACCESS_TOKEN chat OAuth token (with or without `oauth:` prefix)
TWITCH_NICK bot login name (lowercase)
TWITCH_CHANNEL channel to join, no leading `#` — defaults to TWITCH_NICK
MM_HOOK optional Mattermost incoming-webhook URL; enables
going-live notifications when set
STATUS_URL optional override for the status API endpoint
(default: https://ophi118.com/api/status.json)
TWITCH_REFRESH_TOKEN and TWITCH_CLIENT_ID are also stored in .env (paired with
the access token at generation time) but the bot doesn't use them yet — token
refresh would land here later if access tokens start expiring mid-stream.
"""
import asyncio
import json
import os
import ssl
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from mpd.asyncio import MPDClient
HERE = Path(__file__).resolve().parent
ENV = HERE / ".env"
MPD_HOST = os.environ.get("MPD_HOST", "localhost")
MPD_PORT = int(os.environ.get("MPD_PORT", "6600"))
IRC_HOST = "irc.chat.twitch.tv"
IRC_PORT = 6697
PING_TIMEOUT = 360.0 # twitch PINGs ~every 5min; reconnect if silent longer
SKIP_COOLDOWN = 60.0 # per-user seconds between successful !skip invocations
QUEUE_PEEK = 3 # how many upcoming tracks !queue should print
# Going-live watcher: poll cadence matches the server's Twitch poll (60s) so we
# never lag the upstream signal by more than ~2 polls.
STATUS_URL = os.environ.get("STATUS_URL", "https://ophi118.com/api/status.json")
STATUS_POLL_S = 60.0
# Set TWITCH_BOT_DEBUG=1 (env or .env) to dump every received IRC line. Useful
# when a brand-new bot account looks "connected" but doesn't see chat — Twitch
# silently degrades unverified accounts in ways that don't surface as errors.
DEBUG_RAW = os.environ.get("TWITCH_BOT_DEBUG", "").strip() not in ("", "0", "false")
# ─── tiny .env loader ───────────────────────────────────────────────────────
def load_env(path: Path) -> dict:
"""KEY=VALUE lines, # comments, optional surrounding quotes. Avoids a
python-dotenv dependency for three vars."""
out = {}
if not path.exists():
return out
for raw in path.read_text().splitlines():
line = raw.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
out[k.strip()] = v.strip().strip('"').strip("'")
return out
# ─── MPD ────────────────────────────────────────────────────────────────────
class Mpd:
"""One shared MPDClient with reconnect-on-failure.
python-mpd2's asyncio client doesn't auto-reconnect, so each command is
wrapped to retry once after dropping the connection — covers MPD
restarts and idle-socket timeouts without surfacing transient errors.
"""
def __init__(self):
self._cli = None
self._lock = asyncio.Lock()
async def _ensure(self):
if self._cli is None:
cli = MPDClient()
await cli.connect(MPD_HOST, MPD_PORT)
self._cli = cli
async def _retry(self, op):
"""Run an MPD coroutine with one reconnect-on-failure retry."""
async with self._lock:
last = None
for attempt in (1, 2):
try:
await self._ensure()
return await op(self._cli)
except Exception as e:
last = e
self._cli = None # force reconnect on retry
raise last # type: ignore[misc]
async def skip(self) -> str:
async def op(cli):
await cli.next()
return _fmt_song(await cli.currentsong())
return await self._retry(op)
async def currentsong(self) -> dict:
return await self._retry(lambda cli: cli.currentsong())
async def status(self) -> dict:
return await self._retry(lambda cli: cli.status())
async def playlistinfo(self, pos: int) -> list:
return await self._retry(lambda cli: cli.playlistinfo(pos))
def _flat(v):
"""MPD tags can be lists (multi-value); flatten to a comma-joined string
with empties removed. Returns None for missing/empty so callers can skip
fields without empty parens."""
if v is None:
return None
if isinstance(v, list):
v = ", ".join(x for x in v if x)
v = (v or "").strip()
return v or None
def _fmt_song(song: dict) -> str:
artist = _flat(song.get("artist") or song.get("albumartist")) or ""
title = _flat(song.get("title")) or (song.get("file") or "").rsplit("/", 1)[-1]
out = f"{artist}{title}".strip("")
return out or "(unknown track)"
def _fmt_info(song: dict) -> str:
"""Compose the !info reply: 'Artist — Title · Album (Year) · Genre'.
Skip any segment whose underlying tag is missing."""
head = _fmt_song(song)
parts = [head] if head != "(unknown track)" else []
album = _flat(song.get("album"))
year = _flat(song.get("date"))
if year:
# MPD often stores ISO dates ('2018-04-12') — show year only.
year = year.split("-", 1)[0]
if album and year:
parts.append(f"{album} ({year})")
elif album:
parts.append(album)
elif year:
parts.append(f"({year})")
genre = _flat(song.get("genre"))
if genre:
parts.append(genre)
return " · ".join(parts) if parts else "(unknown track)"
# ─── Twitch IRC ─────────────────────────────────────────────────────────────
class TwitchBot:
def __init__(self, nick: str, token: str, channel: str, mpd: Mpd):
self.nick = nick.lower()
self.token = token if token.startswith("oauth:") else f"oauth:{token}"
self.channel = "#" + channel.lower().lstrip("#")
self.mpd = mpd
# Per-user !skip cooldown — keyed by lowercased nick. The dict grows
# unboundedly with viewer count over a stream session; not worth
# pruning at typical chat sizes (one float per chatter).
self._skip_seen: dict[str, float] = {}
async def _send(self, writer: asyncio.StreamWriter, raw: str):
writer.write((raw + "\r\n").encode("utf-8"))
await writer.drain()
async def _say(self, writer: asyncio.StreamWriter, msg: str):
# twitch chat lines max ~500 chars; truncate defensively, strip newlines
msg = msg.replace("\r", " ").replace("\n", " ")[:480]
await self._send(writer, f"PRIVMSG {self.channel} :{msg}")
async def run(self):
"""Connect/auth/listen loop with quiet exponential backoff.
Mirrors the connect-quietly pattern in bridges/mpd-state.py:
first failure prints once, then silent retry until the connection
comes back. Auth failure is fatal — no point retrying with a bad token.
"""
backoff = 2
BACKOFF_MAX = 30
prev_status = None # None | "up" | "down"
ctx = ssl.create_default_context()
while True:
try:
reader, writer = await asyncio.open_connection(
IRC_HOST, IRC_PORT, ssl=ctx,
)
try:
await self._send(writer, f"PASS {self.token}")
await self._send(writer, f"NICK {self.nick}")
await self._send(writer, f"JOIN {self.channel}")
if prev_status != "up":
print(f"[twitch] connected → {self.channel} as {self.nick}",
flush=True)
prev_status = "up"
backoff = 2
while True:
line_bytes = await asyncio.wait_for(
reader.readline(), timeout=PING_TIMEOUT,
)
if not line_bytes:
raise ConnectionError("server closed connection")
line = line_bytes.decode("utf-8", errors="replace").rstrip("\r\n")
await self._handle(writer, line)
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
except RuntimeError as e:
# auth-failed and similar fatals — let systemd flag it
print(f"[twitch] fatal: {e}", file=sys.stderr, flush=True)
raise
except (asyncio.TimeoutError, ConnectionError, OSError) as e:
if prev_status == "up":
print(f"[twitch] disconnected ({type(e).__name__}: {e}) — retrying",
flush=True)
elif prev_status is None:
print(f"[twitch] unreachable ({type(e).__name__}) — will retry silently",
flush=True)
prev_status = "down"
await asyncio.sleep(backoff)
backoff = min(backoff * 2, BACKOFF_MAX)
async def _handle(self, writer: asyncio.StreamWriter, line: str):
if DEBUG_RAW:
print(f"[rx] {line}", flush=True)
# PING/PONG keepalive — twitch sends `PING :tmi.twitch.tv` periodically
if line.startswith("PING "):
await self._send(writer, "PONG " + line[5:])
return
# NOTICE on bad creds: ":tmi.twitch.tv NOTICE * :Login authentication failed"
if " NOTICE " in line and "authentication failed" in line.lower():
raise RuntimeError("twitch auth failed — check TWITCH_KEY / TWITCH_NICK")
# PRIVMSG format: ":user!user@user.tmi.twitch.tv PRIVMSG #chan :body"
if " PRIVMSG " not in line:
return
try:
prefix, rest = line.split(" PRIVMSG ", 1)
user = prefix[1:].split("!", 1)[0]
_, _, body = rest.partition(":")
except ValueError:
return
# Twitch's anti-duplicate logic appends an invisible Tags-block
# codepoint (U+E0000U+E007F) to back-to-back identical messages
# from the same sender within ~30s. Without stripping it, every
# other `!skip` in a series silently fails to match.
body = "".join(c for c in body if not (0xE0000 <= ord(c) <= 0xE007F))
head = body.strip().split(" ", 1)[0].lower()
if head == "!skip":
await self._cmd_skip(writer, user)
elif head == "!queue":
await self._cmd_queue(writer, user)
elif head == "!info":
await self._cmd_info(writer, user)
async def _cmd_skip(self, writer: asyncio.StreamWriter, user: str):
# Per-user cooldown: each chatter gets one skip per SKIP_COOLDOWN
# seconds. Reject with a wait-time hint so the user knows it's their
# own gate, not the bot being broken.
now = time.monotonic()
ukey = user.lower()
last = self._skip_seen.get(ukey, 0.0)
wait = SKIP_COOLDOWN - (now - last)
if wait > 0:
await self._say(
writer,
f"@{user} you can only !skip once per minute (wait {int(wait)+1}s)",
)
return
try:
now_playing = await self.mpd.skip()
except Exception as e:
print(f"[skip] failed for @{user}: {type(e).__name__}: {e}", flush=True)
await self._say(writer, f"@{user} couldn't skip — MPD didn't respond")
return
self._skip_seen[ukey] = now
print(f"[skip] @{user}{now_playing}", flush=True)
await self._say(writer, f"⏭ skipped by @{user} → now: {now_playing}")
async def _cmd_queue(self, writer: asyncio.StreamWriter, user: str):
# Read-only; no cooldown. Pulls current pos from status, then peeks
# the next QUEUE_PEEK tracks one-by-one (small N — full playlistinfo
# would dump the whole queue).
try:
status = await self.mpd.status()
pos = int(status.get("song", -1))
total = int(status.get("playlistlength") or 0)
except Exception as e:
print(f"[queue] failed for @{user}: {type(e).__name__}: {e}", flush=True)
await self._say(writer, f"@{user} couldn't read queue — MPD didn't respond")
return
if pos < 0 or pos + 1 >= total:
await self._say(writer, "🎵 nothing else queued")
return
upcoming = []
for i, p in enumerate(range(pos + 1, min(pos + 1 + QUEUE_PEEK, total)), start=1):
try:
rows = await self.mpd.playlistinfo(p)
if rows:
upcoming.append(f"{i}) {_fmt_song(rows[0])}")
except Exception:
continue
if not upcoming:
await self._say(writer, "🎵 nothing else queued")
return
await self._say(writer, "🎵 next: " + " · ".join(upcoming))
async def _cmd_info(self, writer: asyncio.StreamWriter, user: str):
try:
song = await self.mpd.currentsong()
except Exception as e:
print(f"[info] failed for @{user}: {type(e).__name__}: {e}", flush=True)
await self._say(writer, f"@{user} couldn't read track info — MPD didn't respond")
return
if not song:
await self._say(writer, " nothing playing")
return
await self._say(writer, " " + _fmt_info(song))
# ─── Mattermost "going live" notifications ──────────────────────────────────
def _fetch_status(url: str) -> dict:
"""GET the status JSON. Raises on transport/parse errors so live_watch()
can log + back off uniformly."""
req = urllib.request.Request(url, method="GET",
headers={"User-Agent": "obs-twitch-bot/1.0"})
with urllib.request.urlopen(req, timeout=10) as r:
return json.loads(r.read().decode("utf-8"))
def _post_mattermost(hook_url: str, channel: str, title: str, game: str) -> None:
"""POST a single going-live notification. Markdown renders in Mattermost."""
parts = [f"🔴 **{channel}** is live on Twitch"]
if title:
parts.append(f"> {title}")
if game:
parts.append(f"_{game}_")
parts.append(f"https://twitch.tv/{channel}")
body = json.dumps({"text": "\n".join(parts)}).encode("utf-8")
req = urllib.request.Request(
hook_url, method="POST", data=body,
headers={"Content-Type": "application/json",
"User-Agent": "obs-twitch-bot/1.0"},
)
with urllib.request.urlopen(req, timeout=10) as r:
if r.status >= 300:
raise RuntimeError(f"mattermost hook returned HTTP {r.status}")
async def live_watch(mm_hook: str, channel: str, status_url: str = STATUS_URL):
"""Poll the status service; POST to Mattermost on offline→live transitions.
Transition detection is keyed on `started_at`, not just the `live` flag, so
a brief Helix hiccup (live=true → false → true within one stream session,
same `started_at`) doesn't double-fire.
Cold-start during an ongoing stream is intentionally silent: we record the
current `started_at` so we won't fire for it later, but we don't announce
it (we can't tell "they just went live" from "the bot restarted mid-stream"
without prior offline observation). Once we see live=False at least once,
the next started_at change becomes a real transition we'll announce.
"""
last_announced = None # started_at we've already handled (announced or recorded)
seen_offline = False # have we observed live=False since last_announced?
while True:
try:
data = await asyncio.to_thread(_fetch_status, status_url)
twitch = (data or {}).get("twitch") or {}
live = bool(twitch.get("live"))
started = (twitch.get("started_at") or None) if live else None
except Exception as e:
print(f"[live] poll failed: {type(e).__name__}: {e}", flush=True)
await asyncio.sleep(STATUS_POLL_S)
continue
if not live:
seen_offline = True
elif started:
if last_announced is None and not seen_offline:
# cold start mid-stream: lock in this started_at so we won't
# falsely announce it later
last_announced = started
elif started != last_announced and seen_offline:
title = twitch.get("title") or ""
game = twitch.get("game") or ""
try:
await asyncio.to_thread(_post_mattermost, mm_hook, channel, title, game)
print(f"[live] {channel} went live → posted to mattermost "
f"(title={title!r}, game={game!r})", flush=True)
except Exception as e:
# log but don't retry — a missed notification is better
# than a duplicate; the next genuine transition will fire
print(f"[live] mattermost post failed: {type(e).__name__}: {e}",
flush=True)
last_announced = started
seen_offline = False
await asyncio.sleep(STATUS_POLL_S)
# ─── entrypoint ─────────────────────────────────────────────────────────────
async def main():
env = {**load_env(ENV), **os.environ} # process env wins over .env
token = env.get("TWITCH_ACCESS_TOKEN", "").strip()
nick = env.get("TWITCH_NICK", "").strip()
channel = env.get("TWITCH_CHANNEL", nick).strip()
mm_hook = env.get("MM_HOOK", "").strip()
missing = [k for k, v in (("TWITCH_ACCESS_TOKEN", token), ("TWITCH_NICK", nick)) if not v]
if missing:
print(f"[err] missing in {ENV}: {', '.join(missing)}", file=sys.stderr)
sys.exit(1)
bot = TwitchBot(nick=nick, token=token, channel=channel, mpd=Mpd())
tasks = [bot.run()]
if mm_hook:
print(f"[live] watching {STATUS_URL} → mattermost on offline→live", flush=True)
tasks.append(live_watch(mm_hook, channel))
else:
print("[live] MM_HOOK unset — going-live notifications disabled", flush=True)
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
sys.exit(0)

150
examples/stream.sh Executable file
View File

@@ -0,0 +1,150 @@
#!/usr/bin/env bash
# stream.sh — interactive pre-stream helper.
#
# ./stream.sh
#
# Prompts for game + title + countdown, resolves the game to a Twitch
# category id via ../search-game.py, updates the channel via ../set-channel.py,
# and writes stream.json next to this script for any overlay / starting-soon
# timer / status page that wants to read the manifest.
#
# Re-running uses your previous answers as defaults — press Enter to keep them.
#
# Output path can be overridden with STREAM_JSON=/path/to/file.json ./stream.sh
set -euo pipefail
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
BOT_DIR="$(cd "$DIR/.." && pwd)"
JSON="${STREAM_JSON:-$DIR/stream.json}"
SEARCH_GAME="$BOT_DIR/search-game.py"
SET_CHANNEL="$BOT_DIR/set-channel.py"
# Reads a top-level field from the previous stream.json. Always exits 0;
# prints empty string if file/field is missing.
prev() {
[[ -f "$JSON" ]] || return 0
python3 - "$JSON" "$1" <<'PY' 2>/dev/null || true
import json, sys
try:
d = json.load(open(sys.argv[1]))
v = d.get(sys.argv[2])
print('' if v is None else v)
except Exception:
pass
PY
}
ask() {
# ask "question" "default" → echoes user input or default if blank
local q="$1" def="${2:-}"
local prompt=" $q"
[[ -n "$def" ]] && prompt+=" [$def]"
prompt+=": "
local v
read -rp "$prompt" v
[[ -z "$v" && -n "$def" ]] && v="$def"
printf '%s' "$v"
}
ask_int() {
local v
while :; do
v=$(ask "$1" "${2:-}")
[[ "$v" =~ ^[0-9]+$ ]] && { printf '%s' "$v"; return; }
echo " ✗ must be a whole number" >&2
done
}
sanitize() {
printf '%s' "${1:-}" \
| tr -d '"\\' \
| tr '\n\r\t' ' ' \
| sed 's/ */ /g; s/^ //; s/ $//'
}
emit_str() { [[ -z "${1:-}" ]] && printf 'null' || printf '"%s"' "$1"; }
# ── Pull previous answers as defaults ───────────────
prev_game=$(prev game)
prev_game_id=$(prev gameId)
prev_title=$(prev title)
prev_count=$(prev countdownMin); [[ -z "$prev_count" ]] && prev_count="5"
echo "── stream manifest ──"
echo " (press Enter to keep [defaults])"
echo
# Resolve Game via Twitch search → exact directory entry. The picker prints
# "<id>\t<name>" to stdout on success, or non-zero on no-match / cancel.
# Re-running and accepting the previous game unchanged reuses the cached id
# so we don't burn an API call to re-resolve a known answer.
have_twitch=0
[[ -x "$SEARCH_GAME" ]] && have_twitch=1
game=""; game_id=""
while :; do
query=$(ask "Game (search)" "$prev_game")
if [[ -z "$query" ]]; then
echo " ✗ Game is required" >&2
continue
fi
if (( have_twitch )); then
# Reuse cached id only if it looks like a real Twitch numeric id. A past
# bug (search-game.py prompt leaking into stdout) wrote "Pick: 506462"
# here; the regex makes sure such corruption falls back to a fresh search
# instead of getting passed straight to PATCH /helix/channels.
if [[ -n "$prev_game_id" && "$query" == "$prev_game" && "$prev_game_id" =~ ^[0-9]+$ ]]; then
game="$prev_game"; game_id="$prev_game_id"
echo " ↻ reusing cached: $game (id=$game_id)"
break
fi
if line=$("$SEARCH_GAME" "$query"); then
IFS=$'\t' read -r game_id game <<<"$line"
break
fi
# search-game.py already printed its own error; loop and re-ask
else
# no twitch helper available — accept the raw input, no id resolution
game="$query"; game_id=""
break
fi
done
title=$(ask "Title (Twitch stream title)" "$prev_title")
countdown=$(ask_int "Countdown (minutes)" "$prev_count")
game=$(sanitize "$game")
title=$(sanitize "$title")
now_iso=$(date -u +%FT%TZ)
tmp="$JSON.tmp.$$"
cat > "$tmp" <<JSON
{
"compiledAt": "$now_iso",
"game": $(emit_str "$game"),
"gameId": $(emit_str "$game_id"),
"title": $(emit_str "$title"),
"countdownMin": $countdown
}
JSON
if ! python3 -m json.tool "$tmp" >/dev/null 2>&1; then
echo " ✗ produced invalid JSON, leaving $tmp for inspection" >&2
exit 1
fi
mv -f "$tmp" "$JSON"
echo
echo " ✓ wrote $JSON"
# ── Push to Twitch (game + title) via the broadcaster token ─────────────
# Soft-fail: a Twitch hiccup must not block the local manifest from being
# written — anything reading stream.json can still load offline.
# Title falls back to the game name when the user left it blank.
if (( have_twitch )) && [[ -x "$SET_CHANNEL" && -n "$game_id" ]]; then
twitch_title="${title:-$game}"
echo
if ! "$SET_CHANNEL" --game-id "$game_id" "$twitch_title"; then
echo " ! Twitch sync failed (manifest still saved) — fix and re-run if needed" >&2
fi
fi

104
search-game.py Executable file
View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""
Interactive Twitch category search. Used by loading.sh to resolve a free-form
query into an exact directory entry.
./search-game.py "Heroes"
stderr: numbered list of up to 20 matches + "Pick: " prompt
stdout: a single line "<game_id>\\t<canonical_name>" (only on success)
exit: 0 success | 1 no-match / cancel / error | 130 ctrl-c
Auto-picks when there's exactly one result. Tempting to also auto-pick on an
exact-name match within a larger result set (e.g. typing "Doom Eternal" and
having that exact entry near the top), but a real test surfaced the failure:
typing "Heroes" returns "Heroes" (TV show) plus all the Heroes-* games, and
auto-picking the standalone match silently steals from the user — who is
*using search* precisely because the query is ambiguous. So: only one rule.
Endpoint: /helix/search/categories — same one Twitch's dashboard autocomplete
uses. Accepts any user/app token; we reuse the broadcaster token from
.env.ophi118 because it's already there and refreshable.
"""
import sys
import urllib.parse
import _twitch as tw
MAX_RESULTS = 20
def err(msg=""):
print(msg, file=sys.stderr, flush=True)
def search(env: dict, query: str) -> list:
qs = urllib.parse.urlencode({"query": query, "first": MAX_RESULTS})
status, body = tw.auth_call("GET", f"{tw.HELIX}/search/categories?{qs}", env)
if status != 200:
raise RuntimeError(f"search failed (status={status}): {body}")
return (body or {}).get("data") or []
def pick(results: list, query: str) -> dict:
if not results:
raise RuntimeError(f"no Twitch categories match {query!r}")
if len(results) == 1:
err(f" ✓ only match: {results[0]['name']}")
return results[0]
err(f"── {len(results)} matches for {query!r} ──")
for i, g in enumerate(results, 1):
err(f" {i:2}) {g['name']}")
err(" 0) cancel")
while True:
# input()'s prompt arg writes to stdout — fatal here because bash
# captures stdout for the resolved <id>\t<name> line. Print to stderr
# explicitly, then call input() with no prompt.
print("Pick: ", end="", file=sys.stderr, flush=True)
try:
raw = input().strip()
except EOFError:
raise RuntimeError("cancelled (EOF)")
if not raw.isdigit():
err(" ! enter a number")
continue
n = int(raw)
if n == 0:
raise RuntimeError("cancelled")
if 1 <= n <= len(results):
return results[n - 1]
err(f" ! must be 0..{len(results)}")
def main():
if len(sys.argv) != 2 or not sys.argv[1].strip():
print("usage: search-game.py <query>", file=sys.stderr)
sys.exit(2)
query = sys.argv[1].strip()
env = tw.load_env()
needed = ("TWITCH_ACCESS_TOKEN", "TWITCH_REFRESH_TOKEN", "TWITCH_CLIENT_ID")
missing = [k for k in needed if not env.get(k)]
if missing:
err(f"[err] missing in {tw.ENV_FILE.name}: {', '.join(missing)}")
sys.exit(1)
try:
results = search(env, query)
chosen = pick(results, query)
except Exception as e:
err(f"[err] {type(e).__name__}: {e}")
sys.exit(1)
# ONLY the resolved tuple goes to stdout — bash captures this.
print(f"{chosen['id']}\t{chosen['name']}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(130)

102
set-channel.py Executable file
View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""
Update Twitch channel info (game + title) for the broadcaster account.
./set-channel.py --game-id 517520 "Slayer is back — Nightmare run"
./set-channel.py "Doom Eternal" "..." # exact-name lookup, fails if no exact hit
./set-channel.py --dry-run --game-id 517520 "..."
For free-form / fuzzy game queries, use search-game.py first to resolve the
id (loading.sh does this automatically). The /helix/games?name= fallback in
this script is exact-and-case-sensitive.
Loads `.env.ophi118` — see _twitch.py for the env contract. The token MUST
belong to the broadcaster (Twitch enforces broadcaster_id == token user_id
on PATCH /helix/channels), so this script intentionally targets that file
and not `.env` (which holds the chat-bot's account).
"""
import json
import sys
import urllib.parse
import _twitch as tw
TITLE_LIMIT = 140
def lookup_game_id(env: dict, name: str) -> str:
qs = urllib.parse.urlencode({"name": name})
status, body = tw.auth_call("GET", f"{tw.HELIX}/games?{qs}", env)
if status != 200:
raise RuntimeError(f"games lookup failed (status={status}): {body}")
data = (body or {}).get("data") or []
if not data:
raise RuntimeError(
f"no Twitch game matches {name!r} exactly. /helix/games?name= is "
f"case-sensitive — use search-game.py for fuzzy lookup."
)
return data[0]["id"]
def patch_channel(env: dict, game_id: str, title: str) -> None:
qs = urllib.parse.urlencode({"broadcaster_id": env["TWITCH_BROADCASTER_ID"]})
payload = json.dumps({"game_id": game_id, "title": title})
status, body = tw.auth_call("PATCH", f"{tw.HELIX}/channels?{qs}", env, body=payload)
if status not in (200, 204):
raise RuntimeError(f"channel update failed (status={status}): {body}")
def usage_and_die():
print("usage: set-channel.py [--dry-run] (--game-id <id> | <game-name>) <title>",
file=sys.stderr)
sys.exit(2)
def main():
args = sys.argv[1:]
dry = False
if args and args[0] == "--dry-run":
dry, args = True, args[1:]
game_id = None
if args and args[0] == "--game-id":
if len(args) < 3:
usage_and_die()
game_id, args = args[1], args[2:]
expected = 1 if game_id else 2
if len(args) != expected:
usage_and_die()
if game_id:
title = args[0][:TITLE_LIMIT]
else:
game_name = args[0]
title = args[1][:TITLE_LIMIT]
env = tw.load_env()
needed = ("TWITCH_ACCESS_TOKEN", "TWITCH_REFRESH_TOKEN",
"TWITCH_CLIENT_ID", "TWITCH_BROADCASTER_ID")
missing = [k for k in needed if not env.get(k)]
if missing:
print(f"[err] missing in {tw.ENV_FILE.name}: {', '.join(missing)}", file=sys.stderr)
sys.exit(1)
if game_id is None:
game_id = lookup_game_id(env, game_name)
if dry:
print(f" ✓ dry-run: would set game_id={game_id}, title={title!r} (no PATCH issued)")
return
patch_channel(env, game_id, title)
print(f" ✓ Twitch channel updated → game_id={game_id}, title={title!r}")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"[err] {type(e).__name__}: {e}", file=sys.stderr)
sys.exit(1)