crossmate

A collaborative crossword app for iOS
Log | Files | Refs | LICENSE

RecordApplier.swift (25238B)


      1 import CloudKit
      2 import CoreData
      3 import Foundation
      4 
      5 struct BatchEffects {
      6     var movesUpdated = Set<UUID>()
      7     /// Games whose roster-relevant state changed in this batch — a Player
      8     /// record (name / selection / readAt), a Game record (share metadata), a
      9     /// deletion (participant removal), or a *new* contributor's first Moves
     10     /// row (a participant the roster can only discover from their moves; see
     11     /// `applyMovesRecord`'s `onNewAuthor`). Drives `.playerRosterShouldRefresh`.
     12     /// A repeat Moves row from a known contributor is excluded: it changes the
     13     /// grid, not the roster, and during a co-solve flurry those land at
     14     /// keystroke cadence — refreshing the roster (and re-evaluating the grid
     15     /// view) on each one was pure overhead.
     16     var rosterRelevant = Set<UUID>()
     17     var pings: [Ping] = []
     18     var playersUpdated = Set<UUID>()
     19     var playerPresenceChanged = Set<UUID>()
     20     var engagementChanged = Set<UUID>()
     21     /// Games whose inbound Game record changed the notification content key, so
     22     /// the caller re-mirrors the App Group key directory the NSE reads.
     23     var contentKeysChanged = Set<UUID>()
     24     var removed = Set<UUID>()
     25     /// Per-game incoming read cursor from one of *our own* devices: the
     26     /// `readAt` horizon plus the encoded per-peer "seen" baseline that sibling
     27     /// shipped on its `Player.sessionSnapshot` (nil on older writes). Drives
     28     /// cross-device baseline adoption.
     29     var readCursors: [(UUID, Date, Data?)] = []
     30     /// Games that just transitioned to completed via an inbound Game record, so
     31     /// this device uploads its journal for replay even though it didn't run the
     32     /// local completion path.
     33     var completedTransitions = Set<UUID>()
     34     /// Games for which an inbound `Journal` record landed — wakes a waiting
     35     /// finish-banner replay to re-check completeness.
     36     var journalsSynced = Set<UUID>()
     37     /// Account-level push address decisions seen in the private account zone.
     38     var accountPushAddresses: [String] = []
     39     /// Account-level push *secret* decisions seen in the private account zone,
     40     /// each with its generation. Drive re-derivation of every per-game push
     41     /// address (see `RecordSerializer.deriveGameAddress`); the version gates
     42     /// adoption so a stale inbound copy can't undo a rotation.
     43     var accountPushSecrets: [(secret: String, version: Int64)] = []
     44     /// Versions of *our own* name Decision echoed back by sync. Adopted into
     45     /// the local rename counter so the next rename supersedes the highest
     46     /// generation any of this account's devices has published.
     47     var selfNameVersions: [Int64] = []
     48     /// A `name` or `nickname` Decision changed a friend row in this batch —
     49     /// either side of an App Group nickname-directory entry, so the caller
     50     /// rebuilds the directory after the batch saves.
     51     var friendNamesChanged = false
     52     /// Diagnostics emitted while applying the batch inside `performAndWait` —
     53     /// chiefly Core Data fetch/save failures, which silently drop records (the
     54     /// engine's change token has already advanced, so they never redeliver).
     55     /// The batch context can't `await`, so messages accumulate here and the
     56     /// caller traces them afterwards — the same shape as the send path's
     57     /// failure messages. A bare `print` is invisible in Production, where the
     58     /// on-device diagnostics log is the only observability.
     59     var traces: [String] = []
     60 }
     61 
     62 extension SyncEngine {
     63     func applyDirectRecordZoneChanges(
     64         records: [CKRecord],
     65         deletions: [(CKRecord.ID, CKRecord.RecordType)],
     66         scopeValue: Int16
     67     ) async {
     68         guard !records.isEmpty || !deletions.isEmpty else { return }
     69         let ctx = persistence.container.newBackgroundContext()
     70         ctx.mergePolicy = NSMergePolicy.mergeByPropertyObjectTrump
     71         let localAuthorID = await currentLocalAuthorID()
     72         let effects: BatchEffects = ctx.performAndWait {
     73             var effects = BatchEffects()
     74             for record in records {
     75                 switch record.recordType {
     76                 case "Game":
     77                     let entity = RecordSerializer.applyGameRecord(
     78                         record,
     79                         to: ctx,
     80                         databaseScope: scopeValue,
     81                         onEngagementChange: { effects.engagementChanged.insert($0) },
     82                         onCompletedTransition: { effects.completedTransitions.insert($0) },
     83                         onContentKeyChange: { effects.contentKeysChanged.insert($0) }
     84                     )
     85                     if let id = entity.id { effects.rosterRelevant.insert(id) }
     86                 case "Moves":
     87                     if let value = RecordSerializer.parseMovesRecord(record) {
     88                         let cellsChanged = RecordSerializer.applyMovesRecord(
     89                             record,
     90                             value: value,
     91                             to: ctx,
     92                             localAuthorID: localAuthorID,
     93                             onNewAuthor: { _ in effects.rosterRelevant.insert(value.gameID) }
     94                         )
     95                         if cellsChanged { effects.movesUpdated.insert(value.gameID) }
     96                     }
     97                 case "Player":
     98                     if let (gameID, _) = RecordSerializer.parsePlayerRecordName(record.recordID.recordName) {
     99                         self.applyPlayerRecord(
    100                             record,
    101                             in: ctx,
    102                             localAuthorID: localAuthorID,
    103                             onFirstTime: { effects.playersUpdated.insert($0) },
    104                             onPresenceChange: { effects.playerPresenceChanged.insert($0) },
    105                             onReadCursor: { effects.readCursors.append(($0, $1, $2)) }
    106                         )
    107                         effects.rosterRelevant.insert(gameID)
    108                     }
    109                 case Archive.recordType:
    110                     if let id = self.applyArchiveRecord(record, in: ctx) {
    111                         effects.rosterRelevant.insert(id)
    112                     }
    113                 default:
    114                     break
    115                 }
    116             }
    117             for deletion in deletions {
    118                 self.applyDeletion(
    119                     recordID: deletion.0,
    120                     recordType: deletion.1,
    121                     in: ctx
    122                 )
    123                 if let id = self.gameID(fromRecordName: deletion.0.recordName) {
    124                     effects.rosterRelevant.insert(id)
    125                 }
    126             }
    127             for gameID in effects.movesUpdated {
    128                 effects.traces += self.replayCellCache(for: gameID, in: ctx)
    129             }
    130             if ctx.hasChanges {
    131                 do {
    132                     try ctx.save()
    133                 } catch {
    134                     let nsError = error as NSError
    135                     effects.traces.append(
    136                         "direct-push ctx.save FAILED " +
    137                         "— domain=\(nsError.domain) code=\(nsError.code) " +
    138                         "\(nsError.localizedDescription)"
    139                     )
    140                 }
    141             }
    142             // Re-mirror the App Group key directory once the batch is saved, so
    143             // a just-adopted content key is available to the NSE immediately.
    144             if !effects.contentKeysChanged.isEmpty {
    145                 GameEntity.rebuildContentKeyDirectory(in: ctx)
    146             }
    147             return effects
    148         }
    149 
    150         for message in effects.traces {
    151             await trace(message)
    152         }
    153         if let onRemoteMovesUpdated, !effects.movesUpdated.isEmpty {
    154             await onRemoteMovesUpdated(effects.movesUpdated)
    155         }
    156         if let onRemotePlayersUpdated, !effects.playersUpdated.isEmpty {
    157             await onRemotePlayersUpdated(effects.playersUpdated)
    158         }
    159         if let onRemotePlayerPresenceChanged, !effects.playerPresenceChanged.isEmpty {
    160             await onRemotePlayerPresenceChanged(effects.playerPresenceChanged)
    161         }
    162         if let onRemoteEngagementChanged, !effects.engagementChanged.isEmpty {
    163             await onRemoteEngagementChanged(effects.engagementChanged)
    164         }
    165         if let onIncomingReadCursor, !effects.readCursors.isEmpty {
    166             await onIncomingReadCursor(effects.readCursors)
    167         }
    168         // A game just learned it's complete via sync: upload this device's
    169         // journal (no-op if it logged nothing) so replay can converge. The
    170         // enqueue defers its CKSyncEngine drain via `sendChangesDetached`.
    171         if let localAuthorID, !localAuthorID.isEmpty {
    172             for id in effects.completedTransitions {
    173                 enqueueJournalUpload(gameID: id, authorID: localAuthorID)
    174             }
    175         }
    176         if let onGameCompleted {
    177             for id in effects.completedTransitions {
    178                 await onGameCompleted(id)
    179             }
    180         }
    181         let deletedPings = deletions.compactMap { deletion -> (recordName: String, gameID: UUID)? in
    182             let recordName = deletion.0.recordName
    183             guard recordName.hasPrefix("ping-"),
    184                   let gameID = gameID(fromRecordName: recordName)
    185             else { return nil }
    186             return (recordName, gameID)
    187         }
    188         if let onPingDeleted, !deletedPings.isEmpty {
    189             await onPingDeleted(deletedPings)
    190         }
    191         if !effects.rosterRelevant.isEmpty {
    192             NotificationCenter.default.post(
    193                 name: .playerRosterShouldRefresh,
    194                 object: nil,
    195                 userInfo: ["gameIDs": effects.rosterRelevant]
    196             )
    197         }
    198     }
    199 
    200     nonisolated func applyPlayerRecord(
    201         _ record: CKRecord,
    202         in ctx: NSManagedObjectContext,
    203         localAuthorID: String?,
    204         onFirstTime: (UUID) -> Void,
    205         onPresenceChange: (UUID) -> Void,
    206         onReadCursor: (UUID, Date, Data?) -> Void
    207     ) {
    208         let ckName = record.recordID.recordName
    209         guard let (gameID, authorID) = RecordSerializer.parsePlayerRecordName(ckName) else {
    210             return
    211         }
    212         guard let renderedName = record["name"] as? String else { return }
    213         let updatedAt = record["updatedAt"] as? Date
    214             ?? record.modificationDate
    215             ?? Date()
    216 
    217         let req = NSFetchRequest<PlayerEntity>(entityName: "PlayerEntity")
    218         req.predicate = NSPredicate(format: "ckRecordName == %@", ckName)
    219         req.fetchLimit = 1
    220 
    221         let entity: PlayerEntity
    222         let foundExisting: Bool
    223         if let existing = try? ctx.fetch(req).first {
    224             entity = existing
    225             foundExisting = true
    226         } else {
    227             let game = RecordSerializer.ensureGameEntity(
    228                 forGameID: gameID,
    229                 zoneID: record.recordID.zoneID,
    230                 in: ctx
    231             )
    232             entity = PlayerEntity(context: ctx)
    233             entity.game = game
    234             foundExisting = false
    235         }
    236 
    237         // Drop fetched snapshots older than what we already have. After a
    238         // successful push the writeback adopts the new etag; if a query
    239         // that started before the push lands later, applying its older
    240         // snapshot would downgrade our local etag and OpLock-fail the next
    241         // save (see `applyMovesRecord` for the same guard).
    242         if foundExisting,
    243            !RecordSerializer.incomingIsAtLeastAsFresh(record, existingFields: entity.ckSystemFields) {
    244             return
    245         }
    246 
    247         let oldSelection = (entity.selRow, entity.selCol, entity.selDir)
    248         let hadSelection = oldSelection.0 != nil && oldSelection.1 != nil && oldSelection.2 != nil
    249         let oldUpdatedAt = entity.updatedAt
    250 
    251         // Adopt the server's system fields — that's etag tracking and is
    252         // independent of which side has the freshest data.
    253         entity.ckRecordName = ckName
    254         entity.ckSystemFields = RecordSerializer.encodeSystemFields(of: record)
    255         entity.authorID = authorID
    256 
    257         // The read cursor and session snapshot are account-scoped convergence
    258         // state — "what this account has already seen" — not the live cursor, so
    259         // they are adopted ahead of (and independent of) the selection's
    260         // `updatedAt` LWW below. A sibling commits the catch-up baseline on leave
    261         // (`handlePuzzleLeft`), which advances the read cursor and writes the
    262         // snapshot but does *not* bump `updatedAt`; the outbound record therefore
    263         // ships a stale `updatedAt`. Gating these on it would let a device with a
    264         // fresher local cursor drop the baseline and re-report the same moves as
    265         // a duplicate catch-up banner. The etag guard above already rejects
    266         // genuinely stale fetches. Adopting the cursor here is *not* monotonic:
    267         // `noteIncomingReadCursor` adopts the inbound value directly under
    268         // last-writer-wins, so a leaving sibling's past horizon *can* pull the
    269         // account horizon back below another sibling's live presence lease.
    270         // That collapse is bounded and self-healing: the still-present device
    271         // re-asserts its lease as soon as it processes the inbound close
    272         // (AppServices' incoming-cursor drain re-runs `publishReadCursor`
    273         // ahead of the 5-min refresh floor), and a foreground device marks
    274         // inbound peer moves read on arrival regardless. Keeping "A left
    275         // while C is still here" representable without the dip would need
    276         // per-device Player rows, which don't exist (one row per author).
    277         let previousReadAt = entity.readAt
    278         let previousSessionSnapshot = entity.sessionSnapshot
    279         let incomingReadAt = RecordSerializer.parsePlayerReadAt(from: record)
    280         entity.readAt = incomingReadAt
    281         let incomingReadThrough = RecordSerializer.parsePlayerReadThrough(from: record)
    282         entity.readThrough = incomingReadThrough
    283         entity.sessionSnapshot = RecordSerializer.parsePlayerSessionSnapshot(from: record)
    284         // `timeLog` is the device-keyed solve-time log. Touch it only when the
    285         // fetched record actually carries the field. A partial fetch (CloudQuery
    286         // restricts `desiredKeys`) or an older record omits it, and because the
    287         // log only ever grows, an absent field never means "cleared" — adopting
    288         // nil there would erase a peer's or a sibling's intervals until the next
    289         // full sync re-delivered them. Applied outside the `updatedAt` freshness
    290         // guard below: like the read cursor, a sibling's leave-write can ship a
    291         // stale `updatedAt`.
    292         if record.allKeys().contains("timeLog") {
    293             let incomingTimeLog = RecordSerializer.parsePlayerTimeLog(from: record)
    294             if authorID == localAuthorID {
    295                 // Our own record echoing back from a sibling: merge by device,
    296                 // keeping this device's own slot (we are its sole writer, and the
    297                 // sibling's copy may have dropped a session we have open now).
    298                 var local = TimeLog.decode(entity.timeLog)
    299                 local.merge(
    300                     inbound: TimeLog.decode(incomingTimeLog),
    301                     preservingDevice: RecordSerializer.localDeviceID
    302                 )
    303                 entity.timeLog = local.devices.isEmpty ? nil : TimeLog.encode(local)
    304             } else {
    305                 entity.timeLog = incomingTimeLog
    306             }
    307         }
    308         // Only surface the cursor when the row actually changed. A re-application
    309         // of values we already hold — e.g. a catch-up query snapshot racing this
    310         // device's in-flight lease save shares the old etag, so the freshness
    311         // guard above admits it — carries no new account-horizon information,
    312         // and adopting it would rewind the just-minted lease and trigger a
    313         // redundant re-assert save (the open-time double mint).
    314         if authorID == localAuthorID, let readAt = incomingReadAt,
    315            readAt != previousReadAt || entity.sessionSnapshot != previousSessionSnapshot {
    316             onReadCursor(gameID, readAt, entity.sessionSnapshot)
    317         }
    318         // Our own record coming back from another of this account's devices
    319         // carries that device's read watermark. Adopt it monotonically onto the
    320         // shared game so the unread badge clears here once any device has read,
    321         // mirroring how the presence lease converges via `onReadCursor` above.
    322         if authorID == localAuthorID,
    323            let incomingReadThrough,
    324            let game = entity.game,
    325            (game.readThroughAt ?? .distantPast) < incomingReadThrough {
    326             game.readThroughAt = incomingReadThrough
    327         }
    328 
    329         // The remaining value fields are only adopted when the incoming record
    330         // is at least as new as what we have locally; otherwise a stale-but-
    331         // current server record (e.g. our own pending writes haven't landed yet)
    332         // would clobber the user's live selection on every fetch.
    333         let localUpdatedAt = entity.updatedAt
    334         let incomingIsFresher = localUpdatedAt.map { updatedAt >= $0 } ?? true
    335         guard incomingIsFresher else { return }
    336         // An empty `name` is what older builds shipped from the selection publisher
    337         // before the fix; treat it as "no information" rather than letting it
    338         // clobber a previously-resolved name.
    339         if !renderedName.isEmpty {
    340             entity.name = renderedName
    341         }
    342         entity.updatedAt = updatedAt
    343         if let selection = RecordSerializer.parsePlayerSelection(from: record) {
    344             entity.selRow = NSNumber(value: Int64(selection.row))
    345             entity.selCol = NSNumber(value: Int64(selection.col))
    346             entity.selDir = NSNumber(value: Int64(selection.direction.rawValue))
    347         } else {
    348             entity.selRow = nil
    349             entity.selCol = nil
    350             entity.selDir = nil
    351         }
    352         // Adopt the record's push address. For a peer this is how the sender
    353         // learns where to address pushes; for our own record synced from a
    354         // sibling device, it's how the account's devices converge on one
    355         // per-game address (the LWW winner of this record).
    356         entity.pushAddress = RecordSerializer.parsePlayerPushAddress(from: record)
    357         let isRemoteAuthor = authorID != localAuthorID && authorID != CKCurrentUserDefaultName
    358         let hasSelection = entity.selRow != nil && entity.selCol != nil && entity.selDir != nil
    359         if isRemoteAuthor,
    360            (hadSelection || hasSelection),
    361            oldUpdatedAt != entity.updatedAt ||
    362                oldSelection.0 != entity.selRow ||
    363                oldSelection.1 != entity.selCol ||
    364                oldSelection.2 != entity.selDir {
    365             onPresenceChange(gameID)
    366         }
    367         if !foundExisting {
    368             onFirstTime(gameID)
    369         }
    370     }
    371 
    372     /// Merges every device's `MovesEntity` row for `gameID` and reconciles the
    373     /// `CellEntity` cache against the resulting grid. Must be called inside a
    374     /// `performAndWait` block on the same context. Returns diagnostic messages
    375     /// for any fetch failure (normally empty) — the caller folds them into
    376     /// `BatchEffects.traces` so they reach the diagnostics log once the batch
    377     /// context unwinds.
    378     nonisolated func replayCellCache(
    379         for gameID: UUID,
    380         in ctx: NSManagedObjectContext
    381     ) -> [String] {
    382         let gameReq = NSFetchRequest<GameEntity>(entityName: "GameEntity")
    383         gameReq.predicate = NSPredicate(format: "id == %@", gameID as CVarArg)
    384         gameReq.fetchLimit = 1
    385         let game: GameEntity?
    386         do {
    387             game = try ctx.fetch(gameReq).first
    388         } catch {
    389             // CKSyncEngine commits the batch when the delegate returns
    390             // (see fetchedRecordZoneChanges save-failure note), so re-throwing
    391             // won't redeliver — surface the failure instead of silently
    392             // leaving the cell cache stale for this game.
    393             return [Self.syncErrorMessage("replayCellCache game fetch", gameID: gameID, error: error)]
    394         }
    395         guard let game else { return [] }
    396 
    397         let movesReq = NSFetchRequest<MovesEntity>(entityName: "MovesEntity")
    398         movesReq.predicate = NSPredicate(format: "game == %@", game)
    399         let movesEntities: [MovesEntity]
    400         do {
    401             movesEntities = try ctx.fetch(movesReq)
    402         } catch {
    403             return [Self.syncErrorMessage("replayCellCache moves fetch", gameID: gameID, error: error)]
    404         }
    405         let values: [MovesValue] = movesEntities.compactMap { Self.movesValue(from: $0) }
    406         let gridState = GridStateMerger.merge(values)
    407 
    408         let existingCells = (game.cells as? Set<CellEntity>) ?? []
    409         var byPosition: [GridPosition: CellEntity] = [:]
    410         for cell in existingCells {
    411             byPosition[GridPosition(row: Int(cell.row), col: Int(cell.col))] = cell
    412         }
    413 
    414         for (pos, gridCell) in gridState {
    415             let cell: CellEntity
    416             if let existing = byPosition[pos] {
    417                 cell = existing
    418             } else {
    419                 cell = CellEntity(context: ctx)
    420                 cell.game = game
    421                 cell.row = Int16(pos.row)
    422                 cell.col = Int16(pos.col)
    423             }
    424             cell.letter = gridCell.letter
    425             cell.markCode = gridCell.mark.code
    426             cell.letterAuthorID = gridCell.authorID
    427         }
    428 
    429         for (pos, cell) in byPosition where gridState[pos] == nil {
    430             cell.letter = ""
    431             cell.markCode = 0
    432             cell.letterAuthorID = nil
    433         }
    434         return []
    435     }
    436 
    437     /// Hydrates a `MovesValue` from a `MovesEntity`. Returns `nil` if the row
    438     /// is missing required fields (e.g. an unpopulated stub from a partial
    439     /// fetch).
    440     nonisolated static func movesValue(from entity: MovesEntity) -> MovesValue? {
    441         guard let gameID = entity.game?.id,
    442               let authorID = entity.authorID,
    443               let deviceID = entity.deviceID,
    444               let updatedAt = entity.updatedAt
    445         else { return nil }
    446         let cells = (entity.cells.flatMap { try? MovesCodec.decode($0) }) ?? [:]
    447         return MovesValue(
    448             gameID: gameID,
    449             authorID: authorID,
    450             deviceID: deviceID,
    451             cells: cells,
    452             updatedAt: updatedAt
    453         )
    454     }
    455 
    456     /// Formats a sync-context fetch/save failure for the diagnostics log. The
    457     /// engine's change token has already advanced by the time these helpers
    458     /// run inside performAndWait, so the only available remediation is making
    459     /// the drop visible — and visible means traced (the on-device diagnostics
    460     /// log), not printed: console output never reaches a collected log.
    461     nonisolated static func syncErrorMessage(_ label: String, gameID: UUID, error: Error) -> String {
    462         let nsError = error as NSError
    463         return "\(label) FAILED for \(gameID.uuidString) " +
    464             "— domain=\(nsError.domain) code=\(nsError.code) " +
    465             "\(nsError.localizedDescription)"
    466     }
    467 
    468     /// Applies an inbound `Archive` record. Inert while a live (non-revoked)
    469     /// copy of the original game still exists on this device — the device already
    470     /// holds the data, so surfacing a second row would duplicate it. On a device
    471     /// without the original (fresh install / reinstall), it hydrates the snapshot
    472     /// into a standalone completed, owned game. Returns the materialized game id
    473     /// when a row was created, else `nil`.
    474     @discardableResult
    475     nonisolated func applyArchiveRecord(
    476         _ record: CKRecord,
    477         in ctx: NSManagedObjectContext
    478     ) -> UUID? {
    479         guard let payload = Archive.payload(from: record) else { return nil }
    480 
    481         let liveReq = NSFetchRequest<GameEntity>(entityName: "GameEntity")
    482         liveReq.predicate = NSPredicate(
    483             format: "id == %@ AND isAccessRevoked == NO",
    484             payload.originalGameID as CVarArg
    485         )
    486         liveReq.fetchLimit = 1
    487         if (try? ctx.fetch(liveReq).first) != nil { return nil }
    488 
    489         let created = Archive.materialize(payload, in: ctx)
    490         return created?.id
    491     }
    492 
    493     nonisolated func applyDeletion(
    494         recordID: CKRecord.ID,
    495         recordType: CKRecord.RecordType,
    496         in ctx: NSManagedObjectContext
    497     ) {
    498         let name = recordID.recordName
    499         let entityName: String
    500         if name.hasPrefix("moves-") {
    501             entityName = "MovesEntity"
    502         } else if name.hasPrefix("player-") {
    503             entityName = "PlayerEntity"
    504         } else if name.hasPrefix("game-") {
    505             entityName = "GameEntity"
    506         } else {
    507             switch recordType {
    508             case "Moves": entityName = "MovesEntity"
    509             case "Player": entityName = "PlayerEntity"
    510             case "Game": entityName = "GameEntity"
    511             default: return
    512             }
    513         }
    514         let req = NSFetchRequest<NSManagedObject>(entityName: entityName)
    515         req.predicate = NSPredicate(format: "ckRecordName == %@", name)
    516         req.fetchLimit = 1
    517         if let obj = try? ctx.fetch(req).first {
    518             ctx.delete(obj)
    519         }
    520     }
    521 }