commit 7595556f9aabaccc7702da720321ab2416027258
parent 468919ba2913eab8c1f3effaadf348a7db1896ae
Author: Michael Camilleri <[email protected]>
Date: Fri, 29 May 2026 13:46:53 +0900
Rendezvous live-engagement rooms on the Game record
Prior this commit, live engagement was pairwise: the coordinator tracked
one peer per game, and the lower account ID was elected initiator,
minting a room and sending a directed `.hail` ping to the one higher-ID
peer, who accepted it. This doesn't extend to three or more participants
— with A < B < C, A hails B while B hails C, and each device's single
room slot thrashes as the crossing hails replace one another. It also
bakes identity into a standing host role, which is brittle when the
elected host is absent or joins late.
This commit replaces the election and the hails with a record
rendezvous. The shared room credentials (an encoded
EngagementRoomCredentials) live in a new `engagement` field on the Game
record. Every present participant connects to whatever room that field
advertises; when it's empty and a peer is present, any participant may
mint one and write it. Concurrent mints need no arbiter: the Game
record's last-writer-wins picks one credential set, it syncs to
everyone, and each device reconciles onto it — a loser that minted its
own room migrates off it. The engagement worker's room is already a
broadcast group (every socket in a roomID fans out to the others), so
this is N-party with no server change; the 1:1 limit was purely
client-side.
EngagementCoordinator drops the State's peer identity and the createRoom
/ acceptRoom / handle(hail) / offerEngagement machinery, becoming a
reconciler: reconcile(gameID:creds:hasPeer:) connects to the advertised
room, migrates when the credentials change, and tears down when no peer
is present — connecting to the new room before dropping the old so the
stale socket's close races a state that has already moved on.
AppServices.reconcileEngagement reads the credentials and the
readAt-lease presence, mints into the Game record when absent, and hands
the desired credentials to the coordinator; it's driven by presence
changes, the new onRemoteEngagementChanged callback (a peer minted or
rotated the room), the reconnect timer, open/foreground, and the manual
offer (which forces a mint without waiting to see a peer).
RecordSerializer round-trips `engagement` and signals a change so
receivers reconcile; GameStore.setEngagement writes it under
hasPendingSave so an inbound record can't clobber a fresh local mint.
The .hail ping and its legacy purge stay so old records still drain, but
nothing produces or consumes hails for engagement anymore; a stray legacy hail
is ignored rather than surfaced.
Co-Authored-By: Claude Opus 4.8 <[email protected]>
Diffstat:
8 files changed, 271 insertions(+), 477 deletions(-)
diff --git a/Crossmate/Models/CrossmateModel.xcdatamodeld/CrossmateModel.xcdatamodel/contents b/Crossmate/Models/CrossmateModel.xcdatamodeld/CrossmateModel.xcdatamodel/contents
@@ -12,6 +12,7 @@
<attribute name="completedAt" optional="YES" attributeType="Date" usesScalarValueType="NO"/>
<attribute name="completedBy" optional="YES" attributeType="String"/>
<attribute name="createdAt" attributeType="Date" usesScalarValueType="NO"/>
+ <attribute name="engagement" optional="YES" attributeType="String"/>
<attribute name="databaseScope" optional="YES" attributeType="Integer 16" defaultValueString="0" usesScalarValueType="YES"/>
<attribute name="gridHeight" optional="YES" attributeType="Integer 16" defaultValueString="0" usesScalarValueType="YES"/>
<attribute name="gridWidth" optional="YES" attributeType="Integer 16" defaultValueString="0" usesScalarValueType="YES"/>
diff --git a/Crossmate/Persistence/GameStore.swift b/Crossmate/Persistence/GameStore.swift
@@ -636,6 +636,39 @@ final class GameStore {
return true
}
+ // MARK: - Engagement room
+
+ /// The shared live-engagement room creds for `gameID` (an encoded
+ /// `EngagementRoomCredentials`), or nil if none has been minted yet.
+ func engagement(for gameID: UUID) -> String? {
+ let request = NSFetchRequest<GameEntity>(entityName: "GameEntity")
+ request.predicate = NSPredicate(format: "id == %@", gameID as CVarArg)
+ request.fetchLimit = 1
+ return (try? context.fetch(request).first)?.engagement
+ }
+
+ /// Writes the engagement room creds for `gameID` and enqueues a Game-record
+ /// push. No-op (returns false) when unchanged or the game isn't shared.
+ /// Sets `hasPendingSave` so an inbound Game record can't clobber freshly
+ /// minted creds before the push lands; record-level LWW then converges any
+ /// concurrent mint by another participant.
+ @discardableResult
+ func setEngagement(_ encoded: String?, for gameID: UUID) -> Bool {
+ let request = NSFetchRequest<GameEntity>(entityName: "GameEntity")
+ request.predicate = NSPredicate(format: "id == %@", gameID as CVarArg)
+ request.fetchLimit = 1
+ guard let entity = try? context.fetch(request).first else { return false }
+ let isShared = entity.ckShareRecordName != nil || entity.databaseScope == 1
+ guard isShared, entity.engagement != encoded else { return false }
+ entity.engagement = encoded
+ entity.hasPendingSave = true
+ saveContext("setEngagement")
+ if let ckName = entity.ckRecordName {
+ onGameUpdated(ckName)
+ }
+ return true
+ }
+
// MARK: - Reset
/// Deletes every game (and its cascaded moves, snapshots, and cells) plus
diff --git a/Crossmate/Services/AppServices.swift b/Crossmate/Services/AppServices.swift
@@ -94,20 +94,6 @@ final class AppServices {
localAuthorID: { [weak self] in
await MainActor.run { self?.identity.currentID }
},
- presentPeers: { [weak self] gameIDs in
- guard let self else { return [:] }
- return await Self.presentPeers(
- persistence: self.persistence,
- gameIDs: gameIDs,
- localAuthorID: self.identity.currentID
- )
- },
- sendHail: { [weak self] gameID, payload, addressee in
- await self?.sendHailPing(gameID: gameID, payload: payload, addressee: addressee)
- },
- deletePing: { [weak self] recordName, gameID in
- await self?.syncEngine.deletePing(recordName: recordName, gameID: gameID)
- },
log: { [weak self] message in
await MainActor.run { self?.syncMonitor.note(message) }
}
@@ -386,7 +372,20 @@ final class AppServices {
// live co-solving path.
await syncEngine.setOnRemotePlayerPresenceChanged { [weak self] gameIDs in
await self?.playerSelectionPublisher.peerPresenceMayHaveChanged(gameIDs: gameIDs)
- await self?.engagementCoordinator.peerPresenceMayHaveChanged(gameIDs: gameIDs)
+ guard let self else { return }
+ for gameID in gameIDs {
+ await self.reconcileEngagement(gameID: gameID)
+ }
+ }
+
+ // A peer minted or rotated the shared engagement room (the Game
+ // record's `engagement` creds changed). Reconcile so this device joins
+ // — or migrates onto — whatever room the record now advertises.
+ await syncEngine.setOnRemoteEngagementChanged { [weak self] gameIDs in
+ guard let self else { return }
+ for gameID in gameIDs {
+ await self.reconcileEngagement(gameID: gameID)
+ }
}
// A sibling device of the same iCloud account has published its read
@@ -965,20 +964,38 @@ final class AppServices {
}
}
- private func sendHailPing(gameID: UUID, payload: String, addressee: String) async {
- guard preferences.isICloudSyncEnabled,
- let localAuthorID = identity.currentID,
- !localAuthorID.isEmpty
- else { return }
- guard await ensureICloudSyncStarted() else { return }
- await syncEngine.enqueuePing(
- kind: .hail,
- gameID: gameID,
- authorID: localAuthorID,
- playerName: preferences.name,
- payload: payload,
- addressee: addressee
- )
+ /// Drives one game's live connection toward the room its Game record
+ /// advertises. Reads the shared `engagement` creds and whether any peer
+ /// holds a live read lease; if a peer is present (or `force`) and no creds
+ /// exist yet, mints a room and writes it to the Game record (any present
+ /// participant may — record-level LWW converges concurrent mints). Then
+ /// hands the desired creds to the coordinator, which connects, migrates, or
+ /// tears down to match. `force` is the manual "start a room now" path,
+ /// which mints and connects without waiting to see a present peer.
+ func reconcileEngagement(gameID: UUID, force: Bool = false) async {
+ guard preferences.isICloudSyncEnabled else { return }
+ let hasPeer: Bool
+ if force {
+ hasPeer = true
+ } else {
+ hasPeer = await Self.hasPresentPeer(
+ persistence: persistence,
+ gameID: gameID,
+ localAuthorID: identity.currentID
+ )
+ }
+ var creds = EngagementRoomCredentials.decode(store.engagement(for: gameID))
+ if creds == nil, hasPeer {
+ if let minted = try? EngagementRoomCredentials.fresh(),
+ let encoded = try? minted.encoded(),
+ store.setEngagement(encoded, for: gameID) {
+ creds = minted
+ syncMonitor.note(
+ "engagement: minted room \(minted.roomID.uuidString) for \(gameID.uuidString)"
+ )
+ }
+ }
+ await engagementCoordinator.reconcile(gameID: gameID, creds: creds, hasPeer: hasPeer)
}
func offerEngagement(gameID: UUID) async {
@@ -995,14 +1012,15 @@ final class AppServices {
"engagement: manual offer requested for \(gameID.uuidString) " +
"device=\(RecordSerializer.localDeviceID.prefix(8))"
)
- await engagementCoordinator.offerEngagement(gameID: gameID)
+ await reconcileEngagement(gameID: gameID, force: true)
+ startEngagementReconnectRetry(gameID: gameID)
}
func startEngagementIfPossible(gameID: UUID) async {
cancelScheduledEngagementEnd(gameID: gameID)
guard preferences.isICloudSyncEnabled else { return }
guard await ensureICloudSyncStarted() else { return }
- await engagementCoordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+ await reconcileEngagement(gameID: gameID)
startEngagementReconnectRetry(gameID: gameID)
}
@@ -1075,7 +1093,7 @@ final class AppServices {
try? await Task.sleep(for: Self.engagementReconnectInterval)
guard !Task.isCancelled, let self else { return }
await self.publishReadCursor(for: gameID, mode: .activeLease)
- await self.engagementCoordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+ await self.reconcileEngagement(gameID: gameID)
}
}
}
@@ -2078,19 +2096,18 @@ final class AppServices {
let pings = await consumeStaleInvites(claimed)
guard !pings.isEmpty else { return }
applyInvitePings(pings)
- // `.friend` is the friendship-bootstrap handshake and `.hail` is
- // engagement room bootstrap. Both are system-only: no alert, no
- // notification authorization dependency. Everything else goes
- // through the alert path below.
+ // `.friend` is the friendship-bootstrap handshake. `.hail` is a
+ // retired engagement-bootstrap kind (live rooms now rendezvous on the
+ // Game record's `engagement` creds); any legacy `.hail` record is
+ // ignored here rather than surfaced as an alert. Both are system-only:
+ // no alert, no notification authorization dependency. Everything else
+ // goes through the alert path below.
let (systemPings, playerFacingPings) = pings.partitioned {
$0.kind == .friend || $0.kind == .hail
}
for ping in systemPings where ping.kind == .friend {
await friendController.applyFriendPing(ping)
}
- for ping in systemPings where ping.kind == .hail {
- await engagementCoordinator.handle(ping)
- }
guard !playerFacingPings.isEmpty else { return }
guard await canPresentNotifications() else {
syncMonitor.note("ping: local notification skipped — authorization not granted")
diff --git a/Crossmate/Sync/EngagementCoordinator.swift b/Crossmate/Sync/EngagementCoordinator.swift
@@ -139,26 +139,19 @@ protocol EngagementTransporting: AnyObject, Sendable {
}
actor EngagementCoordinator {
- typealias PresentPeers = @Sendable (Set<UUID>?) async -> [UUID: [String]]
- typealias SendHail = @Sendable (
- _ gameID: UUID,
- _ payload: String,
- _ addressee: String
- ) async -> Void
- typealias DeletePing = @Sendable (_ recordName: String, _ gameID: UUID) async -> Void
typealias Log = @Sendable (_ message: String) async -> Void
private enum State: Equatable {
case idle
- case connecting(peerAuthorID: String, engagementID: UUID, room: EngagementRoomCredentials, at: Date)
- case live(peerAuthorID: String, engagementID: UUID, room: EngagementRoomCredentials)
+ case connecting(engagementID: UUID, room: EngagementRoomCredentials, at: Date)
+ case live(engagementID: UUID, room: EngagementRoomCredentials)
var engagementID: UUID? {
switch self {
case .idle:
nil
- case .connecting(_, let engagementID, _, _),
- .live(_, let engagementID, _):
+ case .connecting(let engagementID, _, _),
+ .live(let engagementID, _):
engagementID
}
}
@@ -167,114 +160,75 @@ actor EngagementCoordinator {
switch self {
case .idle:
nil
- case .connecting(_, _, let room, _),
- .live(_, _, let room):
+ case .connecting(_, let room, _),
+ .live(_, let room):
room.roomID
}
}
-
- var peerAuthorID: String? {
- switch self {
- case .idle:
- nil
- case .connecting(let peerAuthorID, _, _, _),
- .live(let peerAuthorID, _, _):
- peerAuthorID
- }
- }
}
private let host: any EngagementTransporting
private let localAuthorID: @Sendable () async -> String?
private let localDeviceID: String
- private let presentPeers: PresentPeers
- private let sendHail: SendHail
- private let deletePing: DeletePing
private let log: Log
private let now: @Sendable () -> Date
- private let hailMaxAge: TimeInterval
private let connectionTimeout: TimeInterval
- private let roomTTL: TimeInterval
private var states: [UUID: State] = [:]
init(
host: any EngagementTransporting,
localAuthorID: @escaping @Sendable () async -> String?,
localDeviceID: String = RecordSerializer.localDeviceID,
- presentPeers: @escaping PresentPeers,
- sendHail: @escaping SendHail,
- deletePing: @escaping DeletePing,
log: @escaping Log = { _ in },
now: @escaping @Sendable () -> Date = Date.init,
- hailMaxAge: TimeInterval = 120,
- connectionTimeout: TimeInterval = 30,
- roomTTL: TimeInterval = 10 * 60
+ connectionTimeout: TimeInterval = 30
) {
self.host = host
self.localAuthorID = localAuthorID
self.localDeviceID = localDeviceID
- self.presentPeers = presentPeers
- self.sendHail = sendHail
- self.deletePing = deletePing
self.log = log
self.now = now
- self.hailMaxAge = hailMaxAge
self.connectionTimeout = connectionTimeout
- self.roomTTL = roomTTL
}
- func peerPresenceMayHaveChanged(gameIDs: Set<UUID>? = nil) async {
- guard let localAuthorID = await localAuthorID(), !localAuthorID.isEmpty else { return }
+ /// Drives this game's live connection toward the room the shared Game
+ /// record currently advertises. `creds` is the decoded `engagement` field
+ /// (nil if none minted yet); `hasPeer` is whether any peer holds a live
+ /// read lease. The desired state is: connected to `creds.roomID` when a
+ /// peer is present and creds exist, disconnected otherwise.
+ ///
+ /// This subsumes connect, reconnect, migrate (a peer rotated the room), and
+ /// teardown (peer left) — and the create race needs no arbiter: if two
+ /// participants mint, the Game record's LWW picks one set of creds, and
+ /// every device reconciles onto it, the loser migrating off its own room.
+ func reconcile(gameID: UUID, creds: EngagementRoomCredentials?, hasPeer: Bool) async {
await sweepStaleConnections()
- let peersByGame = await presentPeers(gameIDs)
-
- // Tear down any live channel whose connected peer no longer holds a
- // valid lease — they've left or vanished. Disconnecting the host
- // fires `.channelClose`, which drives the normal idle transition and
- // downstream cleanup, so we don't mutate state here. Collected before
- // awaiting so the `states` walk doesn't race a reentrant mutation.
- // `.connecting` is left to `sweepStaleConnections`' timeout, since an
- // in-flight hail may simply be racing the peer's first lease write.
- var expired: [(gameID: UUID, engagementID: UUID, peerAuthorID: String)] = []
- for (gameID, state) in states {
- if let gameIDs, !gameIDs.contains(gameID) { continue }
- guard case .live(let peerAuthorID, let engagementID, _) = state else { continue }
- if !(peersByGame[gameID]?.contains(peerAuthorID) ?? false) {
- expired.append((gameID, engagementID, peerAuthorID))
+ let current = state(for: gameID)
+ guard hasPeer, let creds else {
+ // No peer (or no creds) → disconnect. Leave state for `.channelClose`
+ // to clear so the normal cleanup runs downstream.
+ if let engagementID = current.engagementID {
+ await log("engagement: no present peer for \(gameID.uuidString), tearing down \(engagementID.uuidString)")
+ await host.disconnect(engagementID: engagementID)
}
- }
- for entry in expired {
- await log(
- "engagement: peer \(entry.peerAuthorID) lease expired for " +
- "\(entry.gameID.uuidString), tearing down \(entry.engagementID.uuidString)"
- )
- await host.disconnect(engagementID: entry.engagementID)
- }
-
- for (gameID, peers) in peersByGame {
- guard state(for: gameID) == .idle else { continue }
- guard let peerAuthorID = peers.sorted().first(where: { localAuthorID < $0 }) else { continue }
- await createRoom(gameID: gameID, peerAuthorID: peerAuthorID, localAuthorID: localAuthorID)
- }
- }
-
- func offerEngagement(gameID: UUID) async {
- guard let localAuthorID = await localAuthorID(), !localAuthorID.isEmpty else { return }
- await sweepStaleConnections()
- guard state(for: gameID) == .idle else {
- await log("engagement: manual connect skipped for \(gameID.uuidString), state is not idle")
return
}
- let peers = await presentPeers([gameID])
- let peerAuthorID = peers[gameID]?.sorted().first ?? localAuthorID
- await createRoom(gameID: gameID, peerAuthorID: peerAuthorID, localAuthorID: localAuthorID)
+ // Already connecting/live to the advertised room — nothing to do.
+ if current.roomID == creds.roomID { return }
+ // Connect to the desired room first (so the old socket's `.channelClose`
+ // sees a state that has already moved on and no-ops), then drop the old.
+ let staleEngagementID = current.engagementID
+ await connect(gameID: gameID, room: creds)
+ if let staleEngagementID {
+ await host.disconnect(engagementID: staleEngagementID)
+ }
}
private func sweepStaleConnections() async {
let cutoff = now()
var demoted: [(gameID: UUID, engagementID: UUID, age: TimeInterval)] = []
for (gameID, state) in states {
- guard case .connecting(_, let engagementID, _, let at) = state else { continue }
+ guard case .connecting(let engagementID, _, let at) = state else { continue }
let age = cutoff.timeIntervalSince(at)
if age > connectionTimeout {
demoted.append((gameID, engagementID, age))
@@ -290,43 +244,6 @@ actor EngagementCoordinator {
}
}
- func handle(_ ping: Ping) async {
- guard ping.kind == .hail else { return }
- guard let localAuthorID = await localAuthorID(), !localAuthorID.isEmpty else {
- await log("engagement: ignored hail \(ping.recordName), missing local author")
- return
- }
- guard ping.authorID != localAuthorID || ping.deviceID != localDeviceID else {
- await log("engagement: ignored own hail \(ping.recordName)")
- return
- }
- guard let addressee = EngagementAddressee.parse(ping.addressee) else {
- await log("engagement: ignored hail \(ping.recordName), missing addressee")
- return
- }
- guard addressee.matches(authorID: localAuthorID, deviceID: localDeviceID) else {
- await log("engagement: ignored hail \(ping.recordName), addressed to \(addressee.rawValue)")
- return
- }
- guard let room = EngagementRoomCredentials.decode(ping.payload), room.ver == 2 else {
- await log("engagement: ignored malformed hail \(ping.recordName)")
- return
- }
- if let timestamp = Self.eventTimestamp(from: ping.recordName),
- now().timeIntervalSince(timestamp) > hailMaxAge {
- await deletePing(ping.recordName, ping.gameID)
- await log("engagement: deleted stale hail \(ping.recordName)")
- return
- }
- guard room.expiresAt > now() else {
- await deletePing(ping.recordName, ping.gameID)
- await log("engagement: deleted expired hail \(ping.recordName)")
- return
- }
-
- await acceptRoom(ping: ping, room: room, localAuthorID: localAuthorID)
- }
-
func teardown(gameID: UUID) async {
let state = state(for: gameID)
states[gameID] = .idle
@@ -340,9 +257,9 @@ actor EngagementCoordinator {
switch state {
case .idle:
return nil
- case .connecting(let peerAuthorID, _, let room, _),
- .live(let peerAuthorID, _, let room):
- states[gameID] = .live(peerAuthorID: peerAuthorID, engagementID: engagementID, room: room)
+ case .connecting(_, let room, _),
+ .live(_, let room):
+ states[gameID] = .live(engagementID: engagementID, room: room)
return gameID
}
}
@@ -357,7 +274,7 @@ actor EngagementCoordinator {
}
func sendDebugMessage(gameID: UUID, text: String) async {
- guard case .live(_, let engagementID, _) = state(for: gameID) else {
+ guard case .live(let engagementID, _) = state(for: gameID) else {
await log("engagement: test message skipped for \(gameID.uuidString), channel is not live")
return
}
@@ -371,7 +288,7 @@ actor EngagementCoordinator {
}
func sendCellEdit(_ edit: RealtimeCellEdit) async {
- guard case .live(_, let engagementID, _) = state(for: edit.gameID) else { return }
+ guard case .live(let engagementID, _) = state(for: edit.gameID) else { return }
do {
let message = EngagementMessage(cellEdit: edit)
try await host.send(engagementID: engagementID, message: message.encodedData())
@@ -385,7 +302,7 @@ actor EngagementCoordinator {
}
func sendSelection(_ selection: EngagementSelectionUpdate) async {
- guard case .live(_, let engagementID, _) = state(for: selection.gameID) else { return }
+ guard case .live(let engagementID, _) = state(for: selection.gameID) else { return }
do {
let message = EngagementMessage(selection: selection)
try await host.send(engagementID: engagementID, message: message.encodedData())
@@ -398,54 +315,18 @@ actor EngagementCoordinator {
}
}
- private func createRoom(gameID: UUID, peerAuthorID: String, localAuthorID: String) async {
- let engagementID = UUID()
- do {
- let room = try EngagementRoomCredentials.fresh(now: now(), ttl: roomTTL)
- states[gameID] = .connecting(
- peerAuthorID: peerAuthorID,
- engagementID: engagementID,
- room: room,
- at: now()
- )
- await sendHail(gameID, try room.encoded(), EngagementAddressee(authorID: peerAuthorID).rawValue)
- try await host.connect(
- engagementID: engagementID,
- room: room,
- authorID: localAuthorID,
- deviceID: localDeviceID
- )
- await log("engagement: sent room hail for \(gameID.uuidString) to \(peerAuthorID)")
- } catch {
- states[gameID] = .idle
- await log("engagement: room connect failed for \(gameID.uuidString): \(error.localizedDescription)")
- }
- }
-
- private func acceptRoom(ping: Ping, room: EngagementRoomCredentials, localAuthorID: String) async {
- let currentState = state(for: ping.gameID)
- if currentState != .idle {
- if currentState.roomID == room.roomID {
- await log("engagement: ignored duplicate room hail \(room.roomID.uuidString)")
- await deletePing(ping.recordName, ping.gameID)
- return
- }
- if let engagementID = currentState.engagementID {
- await host.disconnect(engagementID: engagementID)
- }
- await log(
- "engagement: replacing \(currentState.engagementID?.uuidString ?? "unknown") " +
- "with room \(room.roomID.uuidString)"
- )
+ /// Opens a socket to `room` and parks the game in `.connecting`; the
+ /// `.channelOpen` callback promotes it to `.live`. Callers that are
+ /// migrating off a previous room disconnect the stale engagement *after*
+ /// this returns, so the old socket's close races against a state that has
+ /// already moved on.
+ private func connect(gameID: UUID, room: EngagementRoomCredentials) async {
+ guard let localAuthorID = await localAuthorID(), !localAuthorID.isEmpty else {
+ await log("engagement: connect skipped for \(gameID.uuidString), missing local author")
+ return
}
-
let engagementID = UUID()
- states[ping.gameID] = .connecting(
- peerAuthorID: ping.authorID,
- engagementID: engagementID,
- room: room,
- at: now()
- )
+ states[gameID] = .connecting(engagementID: engagementID, room: room, at: now())
do {
try await host.connect(
engagementID: engagementID,
@@ -453,11 +334,10 @@ actor EngagementCoordinator {
authorID: localAuthorID,
deviceID: localDeviceID
)
- await deletePing(ping.recordName, ping.gameID)
- await log("engagement: accepted room hail \(room.roomID.uuidString)")
+ await log("engagement: connecting \(gameID.uuidString) to room \(room.roomID.uuidString)")
} catch {
- states[ping.gameID] = .idle
- await log("engagement: room accept failed for \(ping.gameID.uuidString): \(error.localizedDescription)")
+ states[gameID] = .idle
+ await log("engagement: connect failed for \(gameID.uuidString): \(error.localizedDescription)")
}
}
@@ -470,12 +350,6 @@ actor EngagementCoordinator {
state.engagementID == engagementID
}
}
-
- private static func eventTimestamp(from recordName: String) -> Date? {
- guard let timestampString = recordName.split(separator: "-").last,
- let milliseconds = TimeInterval(timestampString) else { return nil }
- return Date(timeIntervalSince1970: milliseconds / 1000)
- }
}
enum EngagementCoordinatorError: LocalizedError {
@@ -484,7 +358,7 @@ enum EngagementCoordinatorError: LocalizedError {
var errorDescription: String? {
switch self {
case .invalidPayloadEncoding:
- "Unable to encode engagement hail payload."
+ "Unable to encode engagement room payload."
}
}
}
diff --git a/Crossmate/Sync/RecordApplier.swift b/Crossmate/Sync/RecordApplier.swift
@@ -8,6 +8,7 @@ struct BatchEffects {
var pings: [Ping] = []
var playersUpdated = Set<UUID>()
var playerPresenceChanged = Set<UUID>()
+ var engagementChanged = Set<UUID>()
var removed = Set<UUID>()
var readCursors: [(UUID, Date)] = []
}
@@ -27,7 +28,12 @@ extension SyncEngine {
for record in records {
switch record.recordType {
case "Game":
- let entity = RecordSerializer.applyGameRecord(record, to: ctx, databaseScope: scopeValue)
+ let entity = RecordSerializer.applyGameRecord(
+ record,
+ to: ctx,
+ databaseScope: scopeValue,
+ onEngagementChange: { effects.engagementChanged.insert($0) }
+ )
if let id = entity.id { effects.affected.insert(id) }
case "Moves":
if let value = RecordSerializer.parseMovesRecord(record) {
@@ -93,6 +99,9 @@ extension SyncEngine {
if let onRemotePlayerPresenceChanged, !effects.playerPresenceChanged.isEmpty {
await onRemotePlayerPresenceChanged(effects.playerPresenceChanged)
}
+ if let onRemoteEngagementChanged, !effects.engagementChanged.isEmpty {
+ await onRemoteEngagementChanged(effects.engagementChanged)
+ }
if let onIncomingReadCursor, !effects.readCursors.isEmpty {
await onIncomingReadCursor(effects.readCursors)
}
diff --git a/Crossmate/Sync/RecordSerializer.swift b/Crossmate/Sync/RecordSerializer.swift
@@ -164,6 +164,11 @@ enum RecordSerializer {
// Owner-side share marker. Propagated so other owner-devices can flip
// their `isShared` flag without reading the zone's CKShare directly.
record["shareRecordName"] = entity.ckShareRecordName as CKRecordValue?
+ // The shared live-engagement room credentials (encoded
+ // EngagementRoomCredentials). Any present participant may mint these
+ // when the field is empty; convergence is plain record-level LWW, and
+ // peers connect to whatever creds the field currently holds.
+ record["engagement"] = entity.engagement as CKRecordValue?
guard includePuzzleSource, let source = entity.puzzleSource else { return }
let url = FileManager.default.temporaryDirectory
.appendingPathComponent(UUID().uuidString)
@@ -438,7 +443,8 @@ enum RecordSerializer {
static func applyGameRecord(
_ record: CKRecord,
to context: NSManagedObjectContext,
- databaseScope: Int16 = 0
+ databaseScope: Int16 = 0,
+ onEngagementChange: ((UUID) -> Void)? = nil
) -> GameEntity {
let recordName = record.recordID.recordName
let entity = fetchOrCreate(
@@ -505,6 +511,16 @@ enum RecordSerializer {
entity.ckShareRecordName = shareRecordName
}
+ // Adopt the engagement creds (skipped above while a local mint is
+ // still pushing, via the hasPendingSave guard). A change here is the
+ // signal a peer minted/rotated the room, so the receiver reconciles
+ // its live connection toward the new creds.
+ let incomingEngagement = record["engagement"] as? String
+ if entity.engagement != incomingEngagement {
+ entity.engagement = incomingEngagement
+ if let id = entity.id { onEngagementChange?(id) }
+ }
+
if let asset = record["puzzleSource"] as? CKAsset,
let fileURL = asset.fileURL {
do {
diff --git a/Crossmate/Sync/SyncEngine.swift b/Crossmate/Sync/SyncEngine.swift
@@ -84,6 +84,10 @@ actor SyncEngine {
/// `onRemotePlayersUpdated`, this fires for existing Player records too,
/// so live engagement can start when a known collaborator opens a puzzle.
var onRemotePlayerPresenceChanged: (@MainActor @Sendable (Set<UUID>) async -> Void)?
+ /// Fires with the game IDs whose Game-record `engagement` creds just
+ /// changed (a peer minted or rotated the room). Drives the receiver to
+ /// reconcile its live connection toward the new creds.
+ var onRemoteEngagementChanged: (@MainActor @Sendable (Set<UUID>) async -> Void)?
var onPings: (@MainActor @Sendable ([Ping]) async -> Void)?
private var onAccountChange: (@MainActor @Sendable () async -> Void)?
private var onGameAccessRevoked: (@MainActor @Sendable (UUID) async -> Void)?
@@ -152,6 +156,10 @@ actor SyncEngine {
onRemotePlayerPresenceChanged = cb
}
+ func setOnRemoteEngagementChanged(_ cb: @MainActor @Sendable @escaping (Set<UUID>) async -> Void) {
+ onRemoteEngagementChanged = cb
+ }
+
func setOnPings(_ cb: @MainActor @Sendable @escaping ([Ping]) async -> Void) {
onPings = cb
}
@@ -1165,7 +1173,12 @@ actor SyncEngine {
let record = mod.record
switch record.recordType {
case "Game":
- let entity = RecordSerializer.applyGameRecord(record, to: ctx, databaseScope: scope)
+ let entity = RecordSerializer.applyGameRecord(
+ record,
+ to: ctx,
+ databaseScope: scope,
+ onEngagementChange: { effects.engagementChanged.insert($0) }
+ )
if let id = entity.id { effects.affected.insert(id) }
case "Moves":
if let value = RecordSerializer.parseMovesRecord(record) {
@@ -1257,6 +1270,9 @@ actor SyncEngine {
if let onRemotePlayerPresenceChanged, !effects.playerPresenceChanged.isEmpty {
await onRemotePlayerPresenceChanged(effects.playerPresenceChanged)
}
+ if let onRemoteEngagementChanged, !effects.engagementChanged.isEmpty {
+ await onRemoteEngagementChanged(effects.engagementChanged)
+ }
if let onIncomingReadCursor, !effects.readCursors.isEmpty {
await onIncomingReadCursor(effects.readCursors)
}
diff --git a/Tests/Unit/Sync/EngagementCoordinatorTests.swift b/Tests/Unit/Sync/EngagementCoordinatorTests.swift
@@ -114,94 +114,44 @@ struct EngagementCoordinatorTests {
#expect(decoded.sentAt == Date(timeIntervalSince1970: 789))
}
- @Test("present peer with greater author ID creates room hail and connects")
+ @Test("reconcile connects to the advertised room when a peer is present")
@MainActor
- func presentPeerCreatesRoom() async throws {
+ func reconcileConnectsToCreds() async throws {
let gameID = UUID(uuidString: "33333333-3333-3333-3333-333333333333")!
let host = MockEngagementHost()
- let sink = EngagementCoordinatorTestSink()
- let coordinator = EngagementCoordinator(
- host: host,
- localAuthorID: { "alice" },
- localDeviceID: "deviceA",
- presentPeers: { _ in [gameID: ["bob", "aardvark"]] },
- sendHail: { gameID, payload, addressee in
- await sink.send(gameID: gameID, payload: payload, addressee: addressee)
- },
- deletePing: { recordName, gameID in
- await sink.delete(recordName: recordName, gameID: gameID)
- }
- )
+ let coordinator = makeCoordinator(host: host)
+ let room = roomCredentials()
- await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+ await coordinator.reconcile(gameID: gameID, creds: room, hasPeer: true)
- let sent = await sink.sentHails()
- #expect(sent.count == 1)
- #expect(sent.first?.gameID == gameID)
- #expect(sent.first?.addressee == "bob")
- let room = try #require(EngagementRoomCredentials.decode(sent.first?.payload))
#expect(host.connections.count == 1)
#expect(host.connections.first?.room == room)
#expect(host.connections.first?.authorID == "alice")
#expect(host.connections.first?.deviceID == "deviceA")
}
- @Test("inbound room hail connects and deletes ping")
+ @Test("reconcile does not connect without a present peer or without creds")
@MainActor
- func inboundRoomConnects() async throws {
- let gameID = UUID(uuidString: "44444444-4444-4444-4444-444444444444")!
- let room = roomCredentials()
+ func reconcileSkipsWithoutPeerOrCreds() async throws {
+ let gameID = UUID(uuidString: "34343434-3434-3434-3434-343434343434")!
let host = MockEngagementHost()
- let sink = EngagementCoordinatorTestSink()
- let coordinator = EngagementCoordinator(
- host: host,
- localAuthorID: { "bob" },
- localDeviceID: "deviceB",
- presentPeers: { _ in [:] },
- sendHail: { gameID, payload, addressee in
- await sink.send(gameID: gameID, payload: payload, addressee: addressee)
- },
- deletePing: { recordName, gameID in
- await sink.delete(recordName: recordName, gameID: gameID)
- }
- )
+ let coordinator = makeCoordinator(host: host)
+ let room = roomCredentials()
- await coordinator.handle(ping(
- recordName: "room-record",
- gameID: gameID,
- authorID: "alice",
- deviceID: "deviceA",
- payload: try room.encoded(),
- addressee: "bob:deviceB"
- ))
+ await coordinator.reconcile(gameID: gameID, creds: room, hasPeer: false)
+ await coordinator.reconcile(gameID: gameID, creds: nil, hasPeer: true)
- #expect(host.connections.count == 1)
- #expect(host.connections.first?.room == room)
- #expect(host.connections.first?.authorID == "bob")
- #expect(await sink.deletedPings() == [
- DeletedPing(recordName: "room-record", gameID: gameID)
- ])
+ #expect(host.connections.isEmpty)
}
- @Test("send is skipped until channel opens")
+ @Test("send is skipped until the channel opens")
@MainActor
func sendIsSkippedUntilChannelOpens() async throws {
let gameID = UUID(uuidString: "55555555-5555-5555-5555-555555555555")!
let host = MockEngagementHost()
- let sink = EngagementCoordinatorTestSink()
- let coordinator = EngagementCoordinator(
- host: host,
- localAuthorID: { "alice" },
- localDeviceID: "deviceA",
- presentPeers: { _ in [gameID: ["bob"]] },
- sendHail: { gameID, payload, addressee in
- await sink.send(gameID: gameID, payload: payload, addressee: addressee)
- },
- deletePing: { recordName, gameID in
- await sink.delete(recordName: recordName, gameID: gameID)
- }
- )
- await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+ let coordinator = makeCoordinator(host: host)
+
+ await coordinator.reconcile(gameID: gameID, creds: roomCredentials(), hasPeer: true)
let engagementID = try #require(host.connections.first?.engagementID)
await coordinator.sendDebugMessage(gameID: gameID, text: "too early")
@@ -213,228 +163,106 @@ struct EngagementCoordinatorTests {
#expect(EngagementMessage.decode(try #require(host.sentMessages.first?.message))?.text == "now ok")
}
- @Test("stale connecting state demotes to idle on next presence tick")
+ @Test("reconcile migrates to a rotated room and drops the old connection")
@MainActor
- func staleConnectingDemotesOnPresenceTick() async throws {
+ func reconcileMigratesWhenCredsChange() async throws {
let gameID = UUID(uuidString: "66666666-6666-6666-6666-666666666666")!
let host = MockEngagementHost()
- let sink = EngagementCoordinatorTestSink()
- let clock = TestClock(time: Date(timeIntervalSince1970: 10_000))
- let coordinator = EngagementCoordinator(
- host: host,
- localAuthorID: { "alice" },
- localDeviceID: "deviceA",
- presentPeers: { _ in [gameID: ["bob"]] },
- sendHail: { gameID, payload, addressee in
- await sink.send(gameID: gameID, payload: payload, addressee: addressee)
- },
- deletePing: { recordName, gameID in
- await sink.delete(recordName: recordName, gameID: gameID)
- },
- now: { clock.now },
- connectionTimeout: 30
- )
+ let coordinator = makeCoordinator(host: host)
+ let roomA = roomCredentials(roomID: UUID(uuidString: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")!)
+ let roomB = roomCredentials(roomID: UUID(uuidString: "bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb")!)
- await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
- let firstEngagementID = try #require(host.connections.first?.engagementID)
+ await coordinator.reconcile(gameID: gameID, creds: roomA, hasPeer: true)
+ let engagementA = try #require(host.connections.first?.engagementID)
+ #expect(await coordinator.channelOpened(engagementID: engagementA) == gameID)
- clock.advance(by: 31)
- await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+ await coordinator.reconcile(gameID: gameID, creds: roomB, hasPeer: true)
- #expect(host.disconnected == [firstEngagementID])
- #expect(await sink.sentHails().count == 2)
#expect(host.connections.count == 2)
+ #expect(host.connections.last?.room == roomB)
+ #expect(host.disconnected == [engagementA])
}
- @Test("live channel tears down when the peer's lease expires, and not before")
+ @Test("reconcile leaves an unchanged room alone")
@MainActor
- func livePeerLeaseExpiryTearsDown() async throws {
- let gameID = UUID(uuidString: "99999999-9999-9999-9999-999999999999")!
+ func reconcileNoOpWhenAlreadyConnected() async throws {
+ let gameID = UUID(uuidString: "77777777-7777-7777-7777-777777777777")!
let host = MockEngagementHost()
- let sink = EngagementCoordinatorTestSink()
- let presence = PresenceSource([gameID: ["bob"]])
- let coordinator = EngagementCoordinator(
- host: host,
- localAuthorID: { "alice" },
- localDeviceID: "deviceA",
- presentPeers: { await presence.current($0) },
- sendHail: { gameID, payload, addressee in
- await sink.send(gameID: gameID, payload: payload, addressee: addressee)
- },
- deletePing: { recordName, gameID in
- await sink.delete(recordName: recordName, gameID: gameID)
- }
- )
+ let coordinator = makeCoordinator(host: host)
+ let room = roomCredentials()
- // Connect and bring the channel live.
- await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+ await coordinator.reconcile(gameID: gameID, creds: room, hasPeer: true)
let engagementID = try #require(host.connections.first?.engagementID)
#expect(await coordinator.channelOpened(engagementID: engagementID) == gameID)
- // A later tick with the peer still present leaves the channel alone.
- await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+ await coordinator.reconcile(gameID: gameID, creds: room, hasPeer: true)
+
+ #expect(host.connections.count == 1)
#expect(host.disconnected.isEmpty)
+ }
+
+ @Test("reconcile tears down a live channel when the peer leaves")
+ @MainActor
+ func reconcileTearsDownWhenPeerLeaves() async throws {
+ let gameID = UUID(uuidString: "88888888-8888-8888-8888-888888888888")!
+ let host = MockEngagementHost()
+ let coordinator = makeCoordinator(host: host)
+ let room = roomCredentials()
+
+ await coordinator.reconcile(gameID: gameID, creds: room, hasPeer: true)
+ let engagementID = try #require(host.connections.first?.engagementID)
+ #expect(await coordinator.channelOpened(engagementID: engagementID) == gameID)
+
+ await coordinator.reconcile(gameID: gameID, creds: room, hasPeer: false)
- // Peer's lease lapses → present set drops them → channel is torn down,
- // and the now-peerless game is not immediately re-hailed.
- await presence.set([:])
- await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
#expect(host.disconnected == [engagementID])
- #expect(await sink.sentHails().count == 1)
}
- @Test("stale and expired hails are deleted")
+ @Test("a stale connecting attempt is swept and retried on the next reconcile")
@MainActor
- func staleAndExpiredHailsAreDeleted() async throws {
- let gameID = UUID(uuidString: "77777777-7777-7777-7777-777777777777")!
+ func staleConnectingDemotesOnReconcile() async throws {
+ let gameID = UUID(uuidString: "99999999-9999-9999-9999-999999999999")!
let host = MockEngagementHost()
- let sink = EngagementCoordinatorTestSink()
- let coordinator = EngagementCoordinator(
- host: host,
- localAuthorID: { "bob" },
- localDeviceID: "deviceB",
- presentPeers: { _ in [:] },
- sendHail: { gameID, payload, addressee in
- await sink.send(gameID: gameID, payload: payload, addressee: addressee)
- },
- deletePing: { recordName, gameID in
- await sink.delete(recordName: recordName, gameID: gameID)
- },
- now: { Date(timeIntervalSince1970: 1_000) },
- hailMaxAge: 120
- )
- let stale = roomCredentials(expiresAt: Date(timeIntervalSince1970: 2_000))
- let expired = roomCredentials(expiresAt: Date(timeIntervalSince1970: 999))
+ let clock = TestClock(time: Date(timeIntervalSince1970: 10_000))
+ let coordinator = makeCoordinator(host: host, now: { clock.now }, connectionTimeout: 30)
+ let room = roomCredentials()
- await coordinator.handle(ping(
- recordName: recordName(gameID: gameID, authorID: "alice", deviceID: "deviceA", timestampMs: 800_000),
- gameID: gameID,
- authorID: "alice",
- deviceID: "deviceA",
- payload: try stale.encoded(),
- addressee: "bob:deviceB"
- ))
- await coordinator.handle(ping(
- recordName: recordName(gameID: gameID, authorID: "alice", deviceID: "deviceA", timestampMs: 999_000),
- gameID: gameID,
- authorID: "alice",
- deviceID: "deviceA",
- payload: try expired.encoded(),
- addressee: "bob:deviceB"
- ))
+ await coordinator.reconcile(gameID: gameID, creds: room, hasPeer: true)
+ let firstEngagementID = try #require(host.connections.first?.engagementID)
- #expect(host.connections.isEmpty)
- #expect(await sink.deletedPings() == [
- DeletedPing(
- recordName: recordName(
- gameID: gameID,
- authorID: "alice",
- deviceID: "deviceA",
- timestampMs: 800_000
- ),
- gameID: gameID
- ),
- DeletedPing(
- recordName: recordName(
- gameID: gameID,
- authorID: "alice",
- deviceID: "deviceA",
- timestampMs: 999_000
- ),
- gameID: gameID
- )
- ])
+ clock.advance(by: 31)
+ await coordinator.reconcile(gameID: gameID, creds: room, hasPeer: true)
+
+ #expect(host.disconnected == [firstEngagementID])
+ #expect(host.connections.count == 2)
+ }
+
+ @MainActor
+ private func makeCoordinator(
+ host: MockEngagementHost,
+ now: @escaping @Sendable () -> Date = Date.init,
+ connectionTimeout: TimeInterval = 30
+ ) -> EngagementCoordinator {
+ EngagementCoordinator(
+ host: host,
+ localAuthorID: { "alice" },
+ localDeviceID: "deviceA",
+ now: now,
+ connectionTimeout: connectionTimeout
+ )
}
private func roomCredentials(
+ roomID: UUID = UUID(uuidString: "88888888-8888-8888-8888-888888888888")!,
expiresAt: Date = .distantFuture
) -> EngagementRoomCredentials {
EngagementRoomCredentials(
- roomID: UUID(uuidString: "88888888-8888-8888-8888-888888888888")!,
+ roomID: roomID,
secret: Data(repeating: 3, count: 32).base64URLEncodedString(),
createdAt: Date(timeIntervalSince1970: 1_000),
expiresAt: expiresAt
)
}
-
- private func ping(
- recordName: String,
- gameID: UUID,
- authorID: String,
- deviceID: String,
- payload: String,
- addressee: String
- ) -> Ping {
- Ping(
- recordName: recordName,
- gameID: gameID,
- authorID: authorID,
- deviceID: deviceID,
- playerName: authorID,
- puzzleTitle: "Puzzle",
- kind: .hail,
- payload: payload,
- addressee: addressee
- )
- }
-
- private func recordName(
- gameID: UUID,
- authorID: String,
- deviceID: String,
- timestampMs: Int64
- ) -> String {
- "ping-\(gameID.uuidString)-\(authorID)-\(deviceID)-\(timestampMs)"
- }
-}
-
-private struct SentHail: Equatable, Sendable {
- var gameID: UUID
- var payload: String
- var addressee: String
-}
-
-private struct DeletedPing: Equatable, Sendable {
- var recordName: String
- var gameID: UUID
-}
-
-private actor PresenceSource {
- private var peersByGame: [UUID: [String]]
-
- init(_ initial: [UUID: [String]]) {
- self.peersByGame = initial
- }
-
- func set(_ next: [UUID: [String]]) {
- peersByGame = next
- }
-
- func current(_ gameIDs: Set<UUID>?) -> [UUID: [String]] {
- guard let gameIDs else { return peersByGame }
- return peersByGame.filter { gameIDs.contains($0.key) }
- }
-}
-
-private actor EngagementCoordinatorTestSink {
- private var sent: [SentHail] = []
- private var deleted: [DeletedPing] = []
-
- func send(gameID: UUID, payload: String, addressee: String) {
- sent.append(SentHail(gameID: gameID, payload: payload, addressee: addressee))
- }
-
- func delete(recordName: String, gameID: UUID) {
- deleted.append(DeletedPing(recordName: recordName, gameID: gameID))
- }
-
- func sentHails() -> [SentHail] {
- sent
- }
-
- func deletedPings() -> [DeletedPing] {
- deleted
- }
}
private final class TestClock: @unchecked Sendable {