EngagementCoordinator.swift (15864B)
1 import Foundation 2 3 struct EngagementAddressee: Equatable, Sendable { 4 var authorID: String 5 var deviceID: String? 6 7 var rawValue: String { 8 if let deviceID, !deviceID.isEmpty { 9 return "\(authorID):\(deviceID)" 10 } 11 return authorID 12 } 13 14 static func parse(_ rawValue: String?) -> EngagementAddressee? { 15 guard let rawValue, !rawValue.isEmpty else { return nil } 16 let parts = rawValue.split(separator: ":", maxSplits: 1, omittingEmptySubsequences: false) 17 guard let author = parts.first, !author.isEmpty else { return nil } 18 let device = parts.count == 2 ? String(parts[1]) : nil 19 return EngagementAddressee(authorID: String(author), deviceID: device?.isEmpty == true ? nil : device) 20 } 21 22 func matches(authorID: String, deviceID: String) -> Bool { 23 guard self.authorID == authorID else { return false } 24 guard let targetDeviceID = self.deviceID else { return true } 25 return targetDeviceID == deviceID 26 } 27 } 28 29 struct EngagementRoomCredentials: Codable, Equatable, Sendable { 30 var ver: Int 31 var roomID: UUID 32 var secret: String 33 var createdAt: Date 34 var expiresAt: Date 35 36 init( 37 roomID: UUID = UUID(), 38 secret: String, 39 createdAt: Date = Date(), 40 expiresAt: Date, 41 ver: Int = 2 42 ) { 43 self.ver = ver 44 self.roomID = roomID 45 self.secret = secret 46 self.createdAt = createdAt 47 self.expiresAt = expiresAt 48 } 49 50 func encoded() throws -> String { 51 let data = try JSONEncoder().encode(self) 52 guard let string = String(data: data, encoding: .utf8) else { 53 throw EngagementCoordinatorError.invalidPayloadEncoding 54 } 55 return string 56 } 57 58 static func decode(_ string: String?) -> EngagementRoomCredentials? { 59 guard let data = string?.data(using: .utf8) else { return nil } 60 return try? JSONDecoder().decode(EngagementRoomCredentials.self, from: data) 61 } 62 63 static func fresh(now: Date = Date(), ttl: TimeInterval = 10 * 60) throws -> EngagementRoomCredentials { 64 try EngagementRoomCredentials( 65 secret: Data.secureRandom(count: 32).base64URLEncodedString(), 66 createdAt: now, 67 expiresAt: now.addingTimeInterval(ttl) 68 ) 69 } 70 } 71 72 struct EngagementMessage: Codable, Equatable, Sendable { 73 enum Kind: String, Codable, Sendable { 74 case debugText 75 case cellEdit 76 case cellEditBatch 77 case selection 78 } 79 80 var kind: Kind 81 var text: String 82 var cellEdit: RealtimeCellEdit? 83 var cellEdits: [RealtimeCellEdit]? 84 var selection: EngagementSelectionUpdate? 85 var sentAt: Date 86 var ver: Int 87 88 init( 89 kind: Kind = .debugText, 90 text: String, 91 cellEdit: RealtimeCellEdit? = nil, 92 cellEdits: [RealtimeCellEdit]? = nil, 93 selection: EngagementSelectionUpdate? = nil, 94 sentAt: Date = Date(), 95 ver: Int = 1 96 ) { 97 self.kind = kind 98 self.text = text 99 self.cellEdit = cellEdit 100 self.cellEdits = cellEdits 101 self.selection = selection 102 self.sentAt = sentAt 103 self.ver = ver 104 } 105 106 init(cellEdit: RealtimeCellEdit, sentAt: Date = Date(), ver: Int = 1) { 107 self.kind = .cellEdit 108 self.text = "" 109 self.cellEdit = cellEdit 110 self.cellEdits = nil 111 self.selection = nil 112 self.sentAt = sentAt 113 self.ver = ver 114 } 115 116 /// Carries one bulk gesture (check/clear/multi-cell undo) as a single 117 /// message. Peers that predate this kind fail to decode it and drop the 118 /// live update; the same cells still arrive durably via the Moves/CloudKit 119 /// path, so they degrade to "appears on sync" rather than breaking. 120 init(cellEdits: [RealtimeCellEdit], sentAt: Date = Date(), ver: Int = 1) { 121 self.kind = .cellEditBatch 122 self.text = "" 123 self.cellEdit = nil 124 self.cellEdits = cellEdits 125 self.selection = nil 126 self.sentAt = sentAt 127 self.ver = ver 128 } 129 130 init(selection: EngagementSelectionUpdate, sentAt: Date = Date(), ver: Int = 1) { 131 self.kind = .selection 132 self.text = "" 133 self.cellEdit = nil 134 self.cellEdits = nil 135 self.selection = selection 136 self.sentAt = sentAt 137 self.ver = ver 138 } 139 140 func encodedData() throws -> Data { 141 try JSONEncoder().encode(self) 142 } 143 144 static func decode(_ data: Data) -> EngagementMessage? { 145 try? JSONDecoder().decode(EngagementMessage.self, from: data) 146 } 147 } 148 149 enum EngagementReconcileOutcome: Equatable, Sendable { 150 /// The reconcile ran; any connect failure is transient and covered by the 151 /// normal retry backstops. 152 case reconciled 153 /// The worker refused the advertised room because it is registered under a 154 /// different secret. Retrying with the same creds can never succeed; the 155 /// caller should clear the advertised creds so a fresh room is minted. 156 case roomRejected 157 } 158 159 @MainActor 160 protocol EngagementTransporting: AnyObject, Sendable { 161 func connect( 162 engagementID: UUID, 163 room: EngagementRoomCredentials, 164 authorID: String, 165 deviceID: String 166 ) async throws 167 func send(engagementID: UUID, message: Data) async throws 168 func disconnect(engagementID: UUID) 169 } 170 171 actor EngagementCoordinator { 172 typealias Log = @Sendable (_ message: String) async -> Void 173 174 private enum State: Equatable { 175 case idle 176 case connecting(engagementID: UUID, room: EngagementRoomCredentials, at: Date) 177 case live(engagementID: UUID, room: EngagementRoomCredentials) 178 179 var engagementID: UUID? { 180 switch self { 181 case .idle: 182 nil 183 case .connecting(let engagementID, _, _), 184 .live(let engagementID, _): 185 engagementID 186 } 187 } 188 189 var roomID: UUID? { 190 switch self { 191 case .idle: 192 nil 193 case .connecting(_, let room, _), 194 .live(_, let room): 195 room.roomID 196 } 197 } 198 } 199 200 private let host: any EngagementTransporting 201 private let localAuthorID: @Sendable () async -> String? 202 private let localDeviceID: String 203 private let log: Log 204 private let now: @Sendable () -> Date 205 private let connectionTimeout: TimeInterval 206 private var states: [UUID: State] = [:] 207 208 init( 209 host: any EngagementTransporting, 210 localAuthorID: @escaping @Sendable () async -> String?, 211 localDeviceID: String = RecordSerializer.localDeviceID, 212 log: @escaping Log = { _ in }, 213 now: @escaping @Sendable () -> Date = Date.init, 214 connectionTimeout: TimeInterval = 30 215 ) { 216 self.host = host 217 self.localAuthorID = localAuthorID 218 self.localDeviceID = localDeviceID 219 self.log = log 220 self.now = now 221 self.connectionTimeout = connectionTimeout 222 } 223 224 /// Drives this game's live connection toward the room the shared Game 225 /// record currently advertises. `creds` is the decoded `engagement` field 226 /// (nil if none minted yet); `hasPeer` is whether any peer holds a live 227 /// read lease. The desired state is: connected to `creds.roomID` when a 228 /// peer is present and creds exist, disconnected otherwise. 229 /// 230 /// This subsumes connect, reconnect, migrate (a peer rotated the room), and 231 /// teardown (peer left) — and the create race needs no arbiter: if two 232 /// participants mint, the Game record's LWW picks one set of creds, and 233 /// every device reconciles onto it, the loser migrating off its own room. 234 @discardableResult 235 func reconcile(gameID: UUID, creds: EngagementRoomCredentials?, hasPeer: Bool) async -> EngagementReconcileOutcome { 236 await sweepStaleConnections() 237 let current = state(for: gameID) 238 guard hasPeer, let creds else { 239 // No peer (or no creds) → disconnect. Leave state for `.channelClose` 240 // to clear so the normal cleanup runs downstream. 241 if let engagementID = current.engagementID { 242 await log("engagement: no present peer for \(gameID.uuidString), tearing down \(engagementID.uuidString)") 243 await host.disconnect(engagementID: engagementID) 244 } 245 return .reconciled 246 } 247 // Already connecting/live to the advertised room — nothing to do. 248 if current.roomID == creds.roomID { return .reconciled } 249 // Connect to the desired room first (so the old socket's `.channelClose` 250 // sees a state that has already moved on and no-ops), then drop the old. 251 let staleEngagementID = current.engagementID 252 let outcome = await connect(gameID: gameID, room: creds) 253 if let staleEngagementID { 254 await host.disconnect(engagementID: staleEngagementID) 255 } 256 return outcome 257 } 258 259 private func sweepStaleConnections() async { 260 let cutoff = now() 261 var demoted: [(gameID: UUID, engagementID: UUID, age: TimeInterval)] = [] 262 for (gameID, state) in states { 263 guard case .connecting(let engagementID, _, let at) = state else { continue } 264 let age = cutoff.timeIntervalSince(at) 265 if age > connectionTimeout { 266 demoted.append((gameID, engagementID, age)) 267 states[gameID] = .idle 268 } 269 } 270 for entry in demoted { 271 await host.disconnect(engagementID: entry.engagementID) 272 await log( 273 "engagement: connection timed out for \(entry.gameID.uuidString) " + 274 "after \(Int(entry.age))s, engagement \(entry.engagementID.uuidString)" 275 ) 276 } 277 } 278 279 func teardown(gameID: UUID) async { 280 let state = state(for: gameID) 281 states[gameID] = .idle 282 if let engagementID = state.engagementID { 283 await host.disconnect(engagementID: engagementID) 284 } 285 } 286 287 func channelOpened(engagementID: UUID) async -> UUID? { 288 guard let (gameID, state) = stateEntry(for: engagementID) else { return nil } 289 switch state { 290 case .idle: 291 return nil 292 case .connecting(_, let room, _), 293 .live(_, let room): 294 states[gameID] = .live(engagementID: engagementID, room: room) 295 return gameID 296 } 297 } 298 299 func channelClosed(engagementID: UUID) async -> UUID? { 300 guard let (gameID, state) = stateEntry(for: engagementID) else { return nil } 301 if state.engagementID == engagementID { 302 states[gameID] = .idle 303 return gameID 304 } 305 return nil 306 } 307 308 func sendDebugMessage(gameID: UUID, text: String) async { 309 guard case .live(let engagementID, _) = state(for: gameID) else { 310 await log("engagement: test message skipped for \(gameID.uuidString), channel is not live") 311 return 312 } 313 do { 314 let message = EngagementMessage(text: text) 315 try await host.send(engagementID: engagementID, message: message.encodedData()) 316 await log("engagement: sent test message \(engagementID.uuidString)") 317 } catch { 318 await log("engagement: test message failed \(engagementID.uuidString): \(error.localizedDescription)") 319 } 320 } 321 322 func sendCellEdit(_ edit: RealtimeCellEdit) async { 323 guard case .live(let engagementID, _) = state(for: edit.gameID) else { return } 324 do { 325 let message = EngagementMessage(cellEdit: edit) 326 try await host.send(engagementID: engagementID, message: message.encodedData()) 327 await log( 328 "engagement: sent cellEdit \(engagementID.uuidString) " + 329 "r=\(edit.row) c=\(edit.col) device=\(edit.deviceID.prefix(8))" 330 ) 331 } catch { 332 await log("engagement: cell edit send failed \(engagementID.uuidString): \(error.localizedDescription)") 333 } 334 } 335 336 /// Ships a bulk gesture as one batched message instead of one per cell, so 337 /// a whole-grid action lands on the peer in a single frame. All edits in a 338 /// batch share a game (they originate from one local gesture). 339 func sendCellEdits(_ edits: [RealtimeCellEdit]) async { 340 guard let first = edits.first else { return } 341 guard case .live(let engagementID, _) = state(for: first.gameID) else { return } 342 do { 343 let message = EngagementMessage(cellEdits: edits) 344 try await host.send(engagementID: engagementID, message: message.encodedData()) 345 await log( 346 "engagement: sent cellEditBatch \(engagementID.uuidString) " + 347 "count=\(edits.count) device=\(first.deviceID.prefix(8))" 348 ) 349 } catch { 350 await log("engagement: cell edit batch send failed \(engagementID.uuidString): \(error.localizedDescription)") 351 } 352 } 353 354 func sendSelection(_ selection: EngagementSelectionUpdate) async { 355 guard case .live(let engagementID, _) = state(for: selection.gameID) else { return } 356 do { 357 let message = EngagementMessage(selection: selection) 358 try await host.send(engagementID: engagementID, message: message.encodedData()) 359 await log( 360 "engagement: sent selection \(engagementID.uuidString) " + 361 "r=\(selection.row) c=\(selection.col) device=\(selection.deviceID.prefix(8))" 362 ) 363 } catch { 364 await log("engagement: selection send failed \(engagementID.uuidString): \(error.localizedDescription)") 365 } 366 } 367 368 /// Opens a socket to `room` and parks the game in `.connecting`; the 369 /// `.channelOpen` callback promotes it to `.live`. Callers that are 370 /// migrating off a previous room disconnect the stale engagement *after* 371 /// this returns, so the old socket's close races against a state that has 372 /// already moved on. 373 private func connect(gameID: UUID, room: EngagementRoomCredentials) async -> EngagementReconcileOutcome { 374 guard let localAuthorID = await localAuthorID(), !localAuthorID.isEmpty else { 375 await log("engagement: connect skipped for \(gameID.uuidString), missing local author") 376 return .reconciled 377 } 378 let engagementID = UUID() 379 states[gameID] = .connecting(engagementID: engagementID, room: room, at: now()) 380 do { 381 try await host.connect( 382 engagementID: engagementID, 383 room: room, 384 authorID: localAuthorID, 385 deviceID: localDeviceID 386 ) 387 await log("engagement: connecting \(gameID.uuidString) to room \(room.roomID.uuidString)") 388 } catch EngagementHostError.roomSecretMismatch { 389 states[gameID] = .idle 390 await log( 391 "engagement: room \(room.roomID.uuidString) rejected for \(gameID.uuidString): " + 392 "registered with a different secret" 393 ) 394 return .roomRejected 395 } catch { 396 states[gameID] = .idle 397 await log("engagement: connect failed for \(gameID.uuidString): \(error.localizedDescription)") 398 } 399 return .reconciled 400 } 401 402 private func state(for gameID: UUID) -> State { 403 states[gameID] ?? .idle 404 } 405 406 private func stateEntry(for engagementID: UUID) -> (UUID, State)? { 407 states.first { _, state in 408 state.engagementID == engagementID 409 } 410 } 411 } 412 413 enum EngagementCoordinatorError: LocalizedError { 414 case invalidPayloadEncoding 415 416 var errorDescription: String? { 417 switch self { 418 case .invalidPayloadEncoding: 419 "Unable to encode engagement room payload." 420 } 421 } 422 }