RecordApplier.swift (25238B)
1 import CloudKit 2 import CoreData 3 import Foundation 4 5 struct BatchEffects { 6 var movesUpdated = Set<UUID>() 7 /// Games whose roster-relevant state changed in this batch — a Player 8 /// record (name / selection / readAt), a Game record (share metadata), a 9 /// deletion (participant removal), or a *new* contributor's first Moves 10 /// row (a participant the roster can only discover from their moves; see 11 /// `applyMovesRecord`'s `onNewAuthor`). Drives `.playerRosterShouldRefresh`. 12 /// A repeat Moves row from a known contributor is excluded: it changes the 13 /// grid, not the roster, and during a co-solve flurry those land at 14 /// keystroke cadence — refreshing the roster (and re-evaluating the grid 15 /// view) on each one was pure overhead. 16 var rosterRelevant = Set<UUID>() 17 var pings: [Ping] = [] 18 var playersUpdated = Set<UUID>() 19 var playerPresenceChanged = Set<UUID>() 20 var engagementChanged = Set<UUID>() 21 /// Games whose inbound Game record changed the notification content key, so 22 /// the caller re-mirrors the App Group key directory the NSE reads. 23 var contentKeysChanged = Set<UUID>() 24 var removed = Set<UUID>() 25 /// Per-game incoming read cursor from one of *our own* devices: the 26 /// `readAt` horizon plus the encoded per-peer "seen" baseline that sibling 27 /// shipped on its `Player.sessionSnapshot` (nil on older writes). Drives 28 /// cross-device baseline adoption. 29 var readCursors: [(UUID, Date, Data?)] = [] 30 /// Games that just transitioned to completed via an inbound Game record, so 31 /// this device uploads its journal for replay even though it didn't run the 32 /// local completion path. 33 var completedTransitions = Set<UUID>() 34 /// Games for which an inbound `Journal` record landed — wakes a waiting 35 /// finish-banner replay to re-check completeness. 36 var journalsSynced = Set<UUID>() 37 /// Account-level push address decisions seen in the private account zone. 38 var accountPushAddresses: [String] = [] 39 /// Account-level push *secret* decisions seen in the private account zone, 40 /// each with its generation. Drive re-derivation of every per-game push 41 /// address (see `RecordSerializer.deriveGameAddress`); the version gates 42 /// adoption so a stale inbound copy can't undo a rotation. 43 var accountPushSecrets: [(secret: String, version: Int64)] = [] 44 /// Versions of *our own* name Decision echoed back by sync. Adopted into 45 /// the local rename counter so the next rename supersedes the highest 46 /// generation any of this account's devices has published. 47 var selfNameVersions: [Int64] = [] 48 /// A `name` or `nickname` Decision changed a friend row in this batch — 49 /// either side of an App Group nickname-directory entry, so the caller 50 /// rebuilds the directory after the batch saves. 51 var friendNamesChanged = false 52 /// Diagnostics emitted while applying the batch inside `performAndWait` — 53 /// chiefly Core Data fetch/save failures, which silently drop records (the 54 /// engine's change token has already advanced, so they never redeliver). 55 /// The batch context can't `await`, so messages accumulate here and the 56 /// caller traces them afterwards — the same shape as the send path's 57 /// failure messages. A bare `print` is invisible in Production, where the 58 /// on-device diagnostics log is the only observability. 59 var traces: [String] = [] 60 } 61 62 extension SyncEngine { 63 func applyDirectRecordZoneChanges( 64 records: [CKRecord], 65 deletions: [(CKRecord.ID, CKRecord.RecordType)], 66 scopeValue: Int16 67 ) async { 68 guard !records.isEmpty || !deletions.isEmpty else { return } 69 let ctx = persistence.container.newBackgroundContext() 70 ctx.mergePolicy = NSMergePolicy.mergeByPropertyObjectTrump 71 let localAuthorID = await currentLocalAuthorID() 72 let effects: BatchEffects = ctx.performAndWait { 73 var effects = BatchEffects() 74 for record in records { 75 switch record.recordType { 76 case "Game": 77 let entity = RecordSerializer.applyGameRecord( 78 record, 79 to: ctx, 80 databaseScope: scopeValue, 81 onEngagementChange: { effects.engagementChanged.insert($0) }, 82 onCompletedTransition: { effects.completedTransitions.insert($0) }, 83 onContentKeyChange: { effects.contentKeysChanged.insert($0) } 84 ) 85 if let id = entity.id { effects.rosterRelevant.insert(id) } 86 case "Moves": 87 if let value = RecordSerializer.parseMovesRecord(record) { 88 let cellsChanged = RecordSerializer.applyMovesRecord( 89 record, 90 value: value, 91 to: ctx, 92 localAuthorID: localAuthorID, 93 onNewAuthor: { _ in effects.rosterRelevant.insert(value.gameID) } 94 ) 95 if cellsChanged { effects.movesUpdated.insert(value.gameID) } 96 } 97 case "Player": 98 if let (gameID, _) = RecordSerializer.parsePlayerRecordName(record.recordID.recordName) { 99 self.applyPlayerRecord( 100 record, 101 in: ctx, 102 localAuthorID: localAuthorID, 103 onFirstTime: { effects.playersUpdated.insert($0) }, 104 onPresenceChange: { effects.playerPresenceChanged.insert($0) }, 105 onReadCursor: { effects.readCursors.append(($0, $1, $2)) } 106 ) 107 effects.rosterRelevant.insert(gameID) 108 } 109 case Archive.recordType: 110 if let id = self.applyArchiveRecord(record, in: ctx) { 111 effects.rosterRelevant.insert(id) 112 } 113 default: 114 break 115 } 116 } 117 for deletion in deletions { 118 self.applyDeletion( 119 recordID: deletion.0, 120 recordType: deletion.1, 121 in: ctx 122 ) 123 if let id = self.gameID(fromRecordName: deletion.0.recordName) { 124 effects.rosterRelevant.insert(id) 125 } 126 } 127 for gameID in effects.movesUpdated { 128 effects.traces += self.replayCellCache(for: gameID, in: ctx) 129 } 130 if ctx.hasChanges { 131 do { 132 try ctx.save() 133 } catch { 134 let nsError = error as NSError 135 effects.traces.append( 136 "direct-push ctx.save FAILED " + 137 "— domain=\(nsError.domain) code=\(nsError.code) " + 138 "\(nsError.localizedDescription)" 139 ) 140 } 141 } 142 // Re-mirror the App Group key directory once the batch is saved, so 143 // a just-adopted content key is available to the NSE immediately. 144 if !effects.contentKeysChanged.isEmpty { 145 GameEntity.rebuildContentKeyDirectory(in: ctx) 146 } 147 return effects 148 } 149 150 for message in effects.traces { 151 await trace(message) 152 } 153 if let onRemoteMovesUpdated, !effects.movesUpdated.isEmpty { 154 await onRemoteMovesUpdated(effects.movesUpdated) 155 } 156 if let onRemotePlayersUpdated, !effects.playersUpdated.isEmpty { 157 await onRemotePlayersUpdated(effects.playersUpdated) 158 } 159 if let onRemotePlayerPresenceChanged, !effects.playerPresenceChanged.isEmpty { 160 await onRemotePlayerPresenceChanged(effects.playerPresenceChanged) 161 } 162 if let onRemoteEngagementChanged, !effects.engagementChanged.isEmpty { 163 await onRemoteEngagementChanged(effects.engagementChanged) 164 } 165 if let onIncomingReadCursor, !effects.readCursors.isEmpty { 166 await onIncomingReadCursor(effects.readCursors) 167 } 168 // A game just learned it's complete via sync: upload this device's 169 // journal (no-op if it logged nothing) so replay can converge. The 170 // enqueue defers its CKSyncEngine drain via `sendChangesDetached`. 171 if let localAuthorID, !localAuthorID.isEmpty { 172 for id in effects.completedTransitions { 173 enqueueJournalUpload(gameID: id, authorID: localAuthorID) 174 } 175 } 176 if let onGameCompleted { 177 for id in effects.completedTransitions { 178 await onGameCompleted(id) 179 } 180 } 181 let deletedPings = deletions.compactMap { deletion -> (recordName: String, gameID: UUID)? in 182 let recordName = deletion.0.recordName 183 guard recordName.hasPrefix("ping-"), 184 let gameID = gameID(fromRecordName: recordName) 185 else { return nil } 186 return (recordName, gameID) 187 } 188 if let onPingDeleted, !deletedPings.isEmpty { 189 await onPingDeleted(deletedPings) 190 } 191 if !effects.rosterRelevant.isEmpty { 192 NotificationCenter.default.post( 193 name: .playerRosterShouldRefresh, 194 object: nil, 195 userInfo: ["gameIDs": effects.rosterRelevant] 196 ) 197 } 198 } 199 200 nonisolated func applyPlayerRecord( 201 _ record: CKRecord, 202 in ctx: NSManagedObjectContext, 203 localAuthorID: String?, 204 onFirstTime: (UUID) -> Void, 205 onPresenceChange: (UUID) -> Void, 206 onReadCursor: (UUID, Date, Data?) -> Void 207 ) { 208 let ckName = record.recordID.recordName 209 guard let (gameID, authorID) = RecordSerializer.parsePlayerRecordName(ckName) else { 210 return 211 } 212 guard let renderedName = record["name"] as? String else { return } 213 let updatedAt = record["updatedAt"] as? Date 214 ?? record.modificationDate 215 ?? Date() 216 217 let req = NSFetchRequest<PlayerEntity>(entityName: "PlayerEntity") 218 req.predicate = NSPredicate(format: "ckRecordName == %@", ckName) 219 req.fetchLimit = 1 220 221 let entity: PlayerEntity 222 let foundExisting: Bool 223 if let existing = try? ctx.fetch(req).first { 224 entity = existing 225 foundExisting = true 226 } else { 227 let game = RecordSerializer.ensureGameEntity( 228 forGameID: gameID, 229 zoneID: record.recordID.zoneID, 230 in: ctx 231 ) 232 entity = PlayerEntity(context: ctx) 233 entity.game = game 234 foundExisting = false 235 } 236 237 // Drop fetched snapshots older than what we already have. After a 238 // successful push the writeback adopts the new etag; if a query 239 // that started before the push lands later, applying its older 240 // snapshot would downgrade our local etag and OpLock-fail the next 241 // save (see `applyMovesRecord` for the same guard). 242 if foundExisting, 243 !RecordSerializer.incomingIsAtLeastAsFresh(record, existingFields: entity.ckSystemFields) { 244 return 245 } 246 247 let oldSelection = (entity.selRow, entity.selCol, entity.selDir) 248 let hadSelection = oldSelection.0 != nil && oldSelection.1 != nil && oldSelection.2 != nil 249 let oldUpdatedAt = entity.updatedAt 250 251 // Adopt the server's system fields — that's etag tracking and is 252 // independent of which side has the freshest data. 253 entity.ckRecordName = ckName 254 entity.ckSystemFields = RecordSerializer.encodeSystemFields(of: record) 255 entity.authorID = authorID 256 257 // The read cursor and session snapshot are account-scoped convergence 258 // state — "what this account has already seen" — not the live cursor, so 259 // they are adopted ahead of (and independent of) the selection's 260 // `updatedAt` LWW below. A sibling commits the catch-up baseline on leave 261 // (`handlePuzzleLeft`), which advances the read cursor and writes the 262 // snapshot but does *not* bump `updatedAt`; the outbound record therefore 263 // ships a stale `updatedAt`. Gating these on it would let a device with a 264 // fresher local cursor drop the baseline and re-report the same moves as 265 // a duplicate catch-up banner. The etag guard above already rejects 266 // genuinely stale fetches. Adopting the cursor here is *not* monotonic: 267 // `noteIncomingReadCursor` adopts the inbound value directly under 268 // last-writer-wins, so a leaving sibling's past horizon *can* pull the 269 // account horizon back below another sibling's live presence lease. 270 // That collapse is bounded and self-healing: the still-present device 271 // re-asserts its lease as soon as it processes the inbound close 272 // (AppServices' incoming-cursor drain re-runs `publishReadCursor` 273 // ahead of the 5-min refresh floor), and a foreground device marks 274 // inbound peer moves read on arrival regardless. Keeping "A left 275 // while C is still here" representable without the dip would need 276 // per-device Player rows, which don't exist (one row per author). 277 let previousReadAt = entity.readAt 278 let previousSessionSnapshot = entity.sessionSnapshot 279 let incomingReadAt = RecordSerializer.parsePlayerReadAt(from: record) 280 entity.readAt = incomingReadAt 281 let incomingReadThrough = RecordSerializer.parsePlayerReadThrough(from: record) 282 entity.readThrough = incomingReadThrough 283 entity.sessionSnapshot = RecordSerializer.parsePlayerSessionSnapshot(from: record) 284 // `timeLog` is the device-keyed solve-time log. Touch it only when the 285 // fetched record actually carries the field. A partial fetch (CloudQuery 286 // restricts `desiredKeys`) or an older record omits it, and because the 287 // log only ever grows, an absent field never means "cleared" — adopting 288 // nil there would erase a peer's or a sibling's intervals until the next 289 // full sync re-delivered them. Applied outside the `updatedAt` freshness 290 // guard below: like the read cursor, a sibling's leave-write can ship a 291 // stale `updatedAt`. 292 if record.allKeys().contains("timeLog") { 293 let incomingTimeLog = RecordSerializer.parsePlayerTimeLog(from: record) 294 if authorID == localAuthorID { 295 // Our own record echoing back from a sibling: merge by device, 296 // keeping this device's own slot (we are its sole writer, and the 297 // sibling's copy may have dropped a session we have open now). 298 var local = TimeLog.decode(entity.timeLog) 299 local.merge( 300 inbound: TimeLog.decode(incomingTimeLog), 301 preservingDevice: RecordSerializer.localDeviceID 302 ) 303 entity.timeLog = local.devices.isEmpty ? nil : TimeLog.encode(local) 304 } else { 305 entity.timeLog = incomingTimeLog 306 } 307 } 308 // Only surface the cursor when the row actually changed. A re-application 309 // of values we already hold — e.g. a catch-up query snapshot racing this 310 // device's in-flight lease save shares the old etag, so the freshness 311 // guard above admits it — carries no new account-horizon information, 312 // and adopting it would rewind the just-minted lease and trigger a 313 // redundant re-assert save (the open-time double mint). 314 if authorID == localAuthorID, let readAt = incomingReadAt, 315 readAt != previousReadAt || entity.sessionSnapshot != previousSessionSnapshot { 316 onReadCursor(gameID, readAt, entity.sessionSnapshot) 317 } 318 // Our own record coming back from another of this account's devices 319 // carries that device's read watermark. Adopt it monotonically onto the 320 // shared game so the unread badge clears here once any device has read, 321 // mirroring how the presence lease converges via `onReadCursor` above. 322 if authorID == localAuthorID, 323 let incomingReadThrough, 324 let game = entity.game, 325 (game.readThroughAt ?? .distantPast) < incomingReadThrough { 326 game.readThroughAt = incomingReadThrough 327 } 328 329 // The remaining value fields are only adopted when the incoming record 330 // is at least as new as what we have locally; otherwise a stale-but- 331 // current server record (e.g. our own pending writes haven't landed yet) 332 // would clobber the user's live selection on every fetch. 333 let localUpdatedAt = entity.updatedAt 334 let incomingIsFresher = localUpdatedAt.map { updatedAt >= $0 } ?? true 335 guard incomingIsFresher else { return } 336 // An empty `name` is what older builds shipped from the selection publisher 337 // before the fix; treat it as "no information" rather than letting it 338 // clobber a previously-resolved name. 339 if !renderedName.isEmpty { 340 entity.name = renderedName 341 } 342 entity.updatedAt = updatedAt 343 if let selection = RecordSerializer.parsePlayerSelection(from: record) { 344 entity.selRow = NSNumber(value: Int64(selection.row)) 345 entity.selCol = NSNumber(value: Int64(selection.col)) 346 entity.selDir = NSNumber(value: Int64(selection.direction.rawValue)) 347 } else { 348 entity.selRow = nil 349 entity.selCol = nil 350 entity.selDir = nil 351 } 352 // Adopt the record's push address. For a peer this is how the sender 353 // learns where to address pushes; for our own record synced from a 354 // sibling device, it's how the account's devices converge on one 355 // per-game address (the LWW winner of this record). 356 entity.pushAddress = RecordSerializer.parsePlayerPushAddress(from: record) 357 let isRemoteAuthor = authorID != localAuthorID && authorID != CKCurrentUserDefaultName 358 let hasSelection = entity.selRow != nil && entity.selCol != nil && entity.selDir != nil 359 if isRemoteAuthor, 360 (hadSelection || hasSelection), 361 oldUpdatedAt != entity.updatedAt || 362 oldSelection.0 != entity.selRow || 363 oldSelection.1 != entity.selCol || 364 oldSelection.2 != entity.selDir { 365 onPresenceChange(gameID) 366 } 367 if !foundExisting { 368 onFirstTime(gameID) 369 } 370 } 371 372 /// Merges every device's `MovesEntity` row for `gameID` and reconciles the 373 /// `CellEntity` cache against the resulting grid. Must be called inside a 374 /// `performAndWait` block on the same context. Returns diagnostic messages 375 /// for any fetch failure (normally empty) — the caller folds them into 376 /// `BatchEffects.traces` so they reach the diagnostics log once the batch 377 /// context unwinds. 378 nonisolated func replayCellCache( 379 for gameID: UUID, 380 in ctx: NSManagedObjectContext 381 ) -> [String] { 382 let gameReq = NSFetchRequest<GameEntity>(entityName: "GameEntity") 383 gameReq.predicate = NSPredicate(format: "id == %@", gameID as CVarArg) 384 gameReq.fetchLimit = 1 385 let game: GameEntity? 386 do { 387 game = try ctx.fetch(gameReq).first 388 } catch { 389 // CKSyncEngine commits the batch when the delegate returns 390 // (see fetchedRecordZoneChanges save-failure note), so re-throwing 391 // won't redeliver — surface the failure instead of silently 392 // leaving the cell cache stale for this game. 393 return [Self.syncErrorMessage("replayCellCache game fetch", gameID: gameID, error: error)] 394 } 395 guard let game else { return [] } 396 397 let movesReq = NSFetchRequest<MovesEntity>(entityName: "MovesEntity") 398 movesReq.predicate = NSPredicate(format: "game == %@", game) 399 let movesEntities: [MovesEntity] 400 do { 401 movesEntities = try ctx.fetch(movesReq) 402 } catch { 403 return [Self.syncErrorMessage("replayCellCache moves fetch", gameID: gameID, error: error)] 404 } 405 let values: [MovesValue] = movesEntities.compactMap { Self.movesValue(from: $0) } 406 let gridState = GridStateMerger.merge(values) 407 408 let existingCells = (game.cells as? Set<CellEntity>) ?? [] 409 var byPosition: [GridPosition: CellEntity] = [:] 410 for cell in existingCells { 411 byPosition[GridPosition(row: Int(cell.row), col: Int(cell.col))] = cell 412 } 413 414 for (pos, gridCell) in gridState { 415 let cell: CellEntity 416 if let existing = byPosition[pos] { 417 cell = existing 418 } else { 419 cell = CellEntity(context: ctx) 420 cell.game = game 421 cell.row = Int16(pos.row) 422 cell.col = Int16(pos.col) 423 } 424 cell.letter = gridCell.letter 425 cell.markCode = gridCell.mark.code 426 cell.letterAuthorID = gridCell.authorID 427 } 428 429 for (pos, cell) in byPosition where gridState[pos] == nil { 430 cell.letter = "" 431 cell.markCode = 0 432 cell.letterAuthorID = nil 433 } 434 return [] 435 } 436 437 /// Hydrates a `MovesValue` from a `MovesEntity`. Returns `nil` if the row 438 /// is missing required fields (e.g. an unpopulated stub from a partial 439 /// fetch). 440 nonisolated static func movesValue(from entity: MovesEntity) -> MovesValue? { 441 guard let gameID = entity.game?.id, 442 let authorID = entity.authorID, 443 let deviceID = entity.deviceID, 444 let updatedAt = entity.updatedAt 445 else { return nil } 446 let cells = (entity.cells.flatMap { try? MovesCodec.decode($0) }) ?? [:] 447 return MovesValue( 448 gameID: gameID, 449 authorID: authorID, 450 deviceID: deviceID, 451 cells: cells, 452 updatedAt: updatedAt 453 ) 454 } 455 456 /// Formats a sync-context fetch/save failure for the diagnostics log. The 457 /// engine's change token has already advanced by the time these helpers 458 /// run inside performAndWait, so the only available remediation is making 459 /// the drop visible — and visible means traced (the on-device diagnostics 460 /// log), not printed: console output never reaches a collected log. 461 nonisolated static func syncErrorMessage(_ label: String, gameID: UUID, error: Error) -> String { 462 let nsError = error as NSError 463 return "\(label) FAILED for \(gameID.uuidString) " + 464 "— domain=\(nsError.domain) code=\(nsError.code) " + 465 "\(nsError.localizedDescription)" 466 } 467 468 /// Applies an inbound `Archive` record. Inert while a live (non-revoked) 469 /// copy of the original game still exists on this device — the device already 470 /// holds the data, so surfacing a second row would duplicate it. On a device 471 /// without the original (fresh install / reinstall), it hydrates the snapshot 472 /// into a standalone completed, owned game. Returns the materialized game id 473 /// when a row was created, else `nil`. 474 @discardableResult 475 nonisolated func applyArchiveRecord( 476 _ record: CKRecord, 477 in ctx: NSManagedObjectContext 478 ) -> UUID? { 479 guard let payload = Archive.payload(from: record) else { return nil } 480 481 let liveReq = NSFetchRequest<GameEntity>(entityName: "GameEntity") 482 liveReq.predicate = NSPredicate( 483 format: "id == %@ AND isAccessRevoked == NO", 484 payload.originalGameID as CVarArg 485 ) 486 liveReq.fetchLimit = 1 487 if (try? ctx.fetch(liveReq).first) != nil { return nil } 488 489 let created = Archive.materialize(payload, in: ctx) 490 return created?.id 491 } 492 493 nonisolated func applyDeletion( 494 recordID: CKRecord.ID, 495 recordType: CKRecord.RecordType, 496 in ctx: NSManagedObjectContext 497 ) { 498 let name = recordID.recordName 499 let entityName: String 500 if name.hasPrefix("moves-") { 501 entityName = "MovesEntity" 502 } else if name.hasPrefix("player-") { 503 entityName = "PlayerEntity" 504 } else if name.hasPrefix("game-") { 505 entityName = "GameEntity" 506 } else { 507 switch recordType { 508 case "Moves": entityName = "MovesEntity" 509 case "Player": entityName = "PlayerEntity" 510 case "Game": entityName = "GameEntity" 511 default: return 512 } 513 } 514 let req = NSFetchRequest<NSManagedObject>(entityName: entityName) 515 req.predicate = NSPredicate(format: "ckRecordName == %@", name) 516 req.fetchLimit = 1 517 if let obj = try? ctx.fetch(req).first { 518 ctx.delete(obj) 519 } 520 } 521 }