engagement-worker.js (8692B)
1 // Coarse debounce for the durable `lastSeenAt` write on the message hot path. 2 // The room TTL only needs second-ish granularity, so a write + alarm reschedule 3 // on every keystroke broadcast is pure churn. In-memory, so it resets to a 4 // guaranteed write on each hibernation wake — which is fine. 5 const TOUCH_DEBOUNCE_MS = 30 * 1000; 6 7 export class EngagementRoom { 8 constructor(state, env) { 9 this.state = state; 10 this.env = env; 11 // Answer keepalive pings without waking the Durable Object, so an otherwise 12 // idle foreground socket stays warm through NAT / edge idle timeouts. Pairs 13 // with the client's periodic "ping" (EngagementHost.startPing). Auto-answered 14 // frames never reach `webSocketMessage`, so they cost nothing here. 15 this.state.setWebSocketAutoResponse(new WebSocketRequestResponsePair("ping", "pong")); 16 } 17 18 async fetch(request) { 19 const upgrade = request.headers.get("Upgrade"); 20 if (upgrade !== "websocket") { 21 return new Response("Expected WebSocket upgrade", { status: 426 }); 22 } 23 24 const url = new URL(request.url); 25 const roomID = roomIDFromPath(url.pathname); 26 if (!roomID) { 27 return new Response("Missing room ID", { status: 400 }); 28 } 29 30 const auth = await this.authenticate(roomID, url.searchParams); 31 if (!auth.ok) { 32 return new Response(auth.message, { status: auth.status }); 33 } 34 35 const pair = new WebSocketPair(); 36 const [client, server] = Object.values(pair); 37 server.serializeAttachment({ 38 authorID: auth.authorID, 39 deviceID: auth.deviceID, 40 connectedAt: Date.now() 41 }); 42 // Supersede any earlier socket from the same device. A reconnect would 43 // otherwise leave the stale one lingering until its TCP dies, so the room 44 // fans every broadcast at a zombie and can echo the device's own prior 45 // frames back to it. `server` is not accepted yet, so it is not in the set. 46 for (const existing of this.state.getWebSockets()) { 47 const attachment = existing.deserializeAttachment(); 48 if (attachment && attachment.authorID === auth.authorID && attachment.deviceID === auth.deviceID) { 49 try { 50 existing.close(1000, "Superseded by a newer connection"); 51 } catch { 52 // Already closing; the runtime will reap it. 53 } 54 } 55 } 56 this.state.acceptWebSocket(server); 57 58 return new Response(null, { 59 status: 101, 60 webSocket: client 61 }); 62 } 63 64 async authenticate(roomID, params) { 65 const authorID = params.get("authorID") || ""; 66 const deviceID = params.get("deviceID") || ""; 67 const timestamp = params.get("timestamp") || ""; 68 const nonce = params.get("nonce") || ""; 69 const secret = params.get("secret") || ""; 70 const signature = params.get("signature") || ""; 71 72 if (!authorID || !deviceID || !timestamp || !nonce || !secret || !signature) { 73 return { ok: false, status: 401, message: "Missing auth parameters" }; 74 } 75 76 const nowSeconds = Math.floor(Date.now() / 1000); 77 const timestampSeconds = Number(timestamp); 78 const maxSkewSeconds = Number(this.env.MAX_AUTH_SKEW_SECONDS || "120"); 79 if (!Number.isFinite(timestampSeconds) || Math.abs(nowSeconds - timestampSeconds) > maxSkewSeconds) { 80 return { ok: false, status: 401, message: "Stale auth timestamp" }; 81 } 82 83 const nonceKey = `nonce:${nonce}`; 84 if (await this.state.storage.get(nonceKey)) { 85 return { ok: false, status: 401, message: "Nonce already used" }; 86 } 87 88 const payload = [roomID, authorID, deviceID, timestamp, nonce].join("|"); 89 const expectedSignature = await hmacSHA256(secret, payload); 90 if (!timingSafeEqual(signature, expectedSignature)) { 91 return { ok: false, status: 401, message: "Invalid signature" }; 92 } 93 94 const secretHash = await sha256(secret); 95 const storedSecretHash = await this.state.storage.get("secretHash"); 96 if (storedSecretHash && storedSecretHash !== secretHash) { 97 return { ok: false, status: 403, message: "Wrong room secret" }; 98 } 99 if (!storedSecretHash) { 100 await this.state.storage.put("secretHash", secretHash); 101 await this.state.storage.put("createdAt", Date.now()); 102 } 103 104 await this.state.storage.put("lastSeenAt", Date.now()); 105 await this.state.storage.put(nonceKey, Date.now()); 106 await this.pruneNonces(); 107 await this.scheduleExpiry(); 108 this.lastTouchAt = Date.now(); 109 110 return { ok: true, authorID, deviceID }; 111 } 112 113 async webSocketMessage(ws, message) { 114 const data = typeof message === "string" ? message : message.slice(0); 115 await this.touch(); 116 for (const peer of this.state.getWebSockets()) { 117 if (peer === ws) continue; 118 try { 119 peer.send(data); 120 } catch { 121 // A peer caught mid-close throws here; skip it so the rest of the room 122 // still receives this frame. The dead socket is reaped via 123 // `webSocketClose` / runtime cleanup. 124 } 125 } 126 } 127 128 async webSocketClose(ws, code, reason) { 129 await this.state.storage.put("lastSeenAt", Date.now()); 130 await this.scheduleExpiry(); 131 ws.close(code, reason); 132 } 133 134 async alarm() { 135 await this.pruneNonces(); 136 const ttlMs = Number(this.env.ROOM_TTL_SECONDS || "600") * 1000; 137 const sockets = this.state.getWebSockets(); 138 if (sockets.length > 0) { 139 // Still connected — the room is alive regardless of how long since the 140 // last broadcast (keepalive pings are auto-answered and don't advance 141 // `lastSeenAt`). Re-arm from now, not the stale deadline, or a long idle 142 // session would refire the alarm in a tight loop. 143 await this.state.storage.setAlarm(Date.now() + ttlMs); 144 return; 145 } 146 147 const lastSeenAt = await this.state.storage.get("lastSeenAt"); 148 const idleMs = Date.now() - (lastSeenAt || Date.now()); 149 if (idleMs > ttlMs) { 150 await this.state.storage.deleteAll(); 151 return; 152 } 153 await this.scheduleExpiry(); 154 } 155 156 async touch() { 157 const now = Date.now(); 158 // Debounce the durable write + alarm reschedule off the message hot path. 159 if (this.lastTouchAt && now - this.lastTouchAt < TOUCH_DEBOUNCE_MS) return; 160 this.lastTouchAt = now; 161 await this.state.storage.put("lastSeenAt", now); 162 await this.scheduleExpiry(); 163 } 164 165 async pruneNonces() { 166 const maxAgeMs = Number(this.env.NONCE_TTL_SECONDS || "300") * 1000; 167 const cutoff = Date.now() - maxAgeMs; 168 const nonces = await this.state.storage.list({ prefix: "nonce:" }); 169 for (const [key, createdAt] of nonces) { 170 if (createdAt < cutoff) { 171 await this.state.storage.delete(key); 172 } 173 } 174 } 175 176 async scheduleExpiry() { 177 const ttlMs = Number(this.env.ROOM_TTL_SECONDS || "600") * 1000; 178 const lastSeenAt = (await this.state.storage.get("lastSeenAt")) || Date.now(); 179 await this.state.storage.setAlarm(lastSeenAt + ttlMs); 180 } 181 } 182 183 export default { 184 async fetch(request, env) { 185 const url = new URL(request.url); 186 if (url.pathname === "/health") { 187 return new Response("ok"); 188 } 189 const roomID = roomIDFromPath(url.pathname); 190 if (!roomID) { 191 return new Response("Not found", { status: 404 }); 192 } 193 const id = env.ENGAGEMENT_ROOMS.idFromName(roomID); 194 return env.ENGAGEMENT_ROOMS.get(id).fetch(request); 195 } 196 }; 197 198 function roomIDFromPath(pathname) { 199 const match = pathname.match(/^\/rooms\/([^/]+)\/socket$/); 200 return match ? match[1] : null; 201 } 202 203 async function hmacSHA256(secret, payload) { 204 const key = await crypto.subtle.importKey( 205 "raw", 206 base64URLDecode(secret), 207 { name: "HMAC", hash: "SHA-256" }, 208 false, 209 ["sign"] 210 ); 211 const signature = await crypto.subtle.sign("HMAC", key, new TextEncoder().encode(payload)); 212 return base64URLEncode(new Uint8Array(signature)); 213 } 214 215 async function sha256(value) { 216 const digest = await crypto.subtle.digest("SHA-256", new TextEncoder().encode(value)); 217 return base64URLEncode(new Uint8Array(digest)); 218 } 219 220 function base64URLDecode(value) { 221 const base64 = value.replace(/-/g, "+").replace(/_/g, "/").padEnd(Math.ceil(value.length / 4) * 4, "="); 222 const binary = atob(base64); 223 return Uint8Array.from(binary, (char) => char.charCodeAt(0)); 224 } 225 226 function base64URLEncode(bytes) { 227 let binary = ""; 228 for (const byte of bytes) { 229 binary += String.fromCharCode(byte); 230 } 231 return btoa(binary).replace(/\+/g, "-").replace(/\//g, "_").replace(/=+$/g, ""); 232 } 233 234 function timingSafeEqual(a, b) { 235 const left = new TextEncoder().encode(a); 236 const right = new TextEncoder().encode(b); 237 if (left.length !== right.length) return false; 238 let diff = 0; 239 for (let index = 0; index < left.length; index += 1) { 240 diff |= left[index] ^ right[index]; 241 } 242 return diff === 0; 243 }