commit 8177ebe430f8e839caef9ff38f912983de765133
parent 72261ba3f7764c20a041a7c64820b972dff046cf
Author: Michael Camilleri <[email protected]>
Date: Tue, 26 May 2026 02:12:21 +0900
Stabilize socket engagements
This commit fixes several issues found while testing WebSocket-based
engagements. Socket URLs are now required to use ws/wss endpoints, duplicate
room hails for an already-connected room are ignored, and inbound Moves records
merge per-cell timestamps so stale CloudKit snapshots do not overwrite newer
realtime edits.
This commit also changes Durable Object room expiry to be idle-based instead of
a hard room lifetime, so active rooms remain available while users are still
connected.
Co-Authored-By: Codex GPT 5.5 <[email protected]>
Diffstat:
5 files changed, 83 insertions(+), 18 deletions(-)
diff --git a/Crossmate/Sync/EngagementCoordinator.swift b/Crossmate/Sync/EngagementCoordinator.swift
@@ -162,6 +162,16 @@ actor EngagementCoordinator {
engagementID
}
}
+
+ var roomID: UUID? {
+ switch self {
+ case .idle:
+ nil
+ case .connecting(_, _, let room, _),
+ .live(_, _, let room):
+ room.roomID
+ }
+ }
}
private let host: any EngagementTransporting
@@ -381,7 +391,7 @@ actor EngagementCoordinator {
private func acceptRoom(ping: Ping, room: EngagementRoomCredentials, localAuthorID: String) async {
let currentState = state(for: ping.gameID)
if currentState != .idle {
- if case .connecting(_, _, room, _) = currentState {
+ if currentState.roomID == room.roomID {
await log("engagement: ignored duplicate room hail \(room.roomID.uuidString)")
await deletePing(ping.recordName, ping.gameID)
return
diff --git a/Crossmate/Sync/RecordSerializer.swift b/Crossmate/Sync/RecordSerializer.swift
@@ -565,16 +565,43 @@ enum RecordSerializer {
let isLocalDeviceRow = value.authorID == localAuthorID
&& value.deviceID == localDeviceID
guard !foundExisting || !isLocalDeviceRow else { return false }
- entity.updatedAt = value.updatedAt
- entity.cells = (record["cells"] as? Data) ?? Data()
+ let previousUpdatedAt = entity.updatedAt ?? .distantPast
+ let previousCells = (entity.cells.flatMap { try? MovesCodec.decode($0) }) ?? [:]
+ let mergedCells = mergeIncomingMovesCells(
+ existing: previousCells,
+ incoming: value.cells
+ )
+ let mergedUpdatedAt = max(
+ previousUpdatedAt,
+ value.updatedAt,
+ mergedCells.values.map(\.updatedAt).max() ?? .distantPast
+ )
+
+ entity.updatedAt = mergedUpdatedAt
+ entity.cells = (try? MovesCodec.encode(mergedCells)) ?? ((record["cells"] as? Data) ?? Data())
if let game = entity.game,
- game.updatedAt.map({ $0 < value.updatedAt }) ?? true {
- game.updatedAt = value.updatedAt
+ game.updatedAt.map({ $0 < mergedUpdatedAt }) ?? true {
+ game.updatedAt = mergedUpdatedAt
}
return true
}
+ private static func mergeIncomingMovesCells(
+ existing: [GridPosition: TimestampedCell],
+ incoming: [GridPosition: TimestampedCell]
+ ) -> [GridPosition: TimestampedCell] {
+ var cells = existing
+ for (position, incomingCell) in incoming {
+ if let existingCell = cells[position],
+ existingCell.updatedAt > incomingCell.updatedAt {
+ continue
+ }
+ cells[position] = incomingCell
+ }
+ return cells
+ }
+
/// Projects an inbound `Decision` record onto local Core Data. For
/// `kind == "block"` this upserts a `FriendEntity` tombstone keyed by the
/// blocked author so the block becomes authoritative across the user's own
diff --git a/Tests/Unit/Sync/EngagementCoordinatorTests.swift b/Tests/Unit/Sync/EngagementCoordinatorTests.swift
@@ -51,6 +51,7 @@ struct EngagementCoordinatorTests {
}
@Test("socket URL preserves configured WebSocket endpoint")
+ @MainActor
func socketURL() throws {
let room = EngagementRoomCredentials(
roomID: UUID(uuidString: "23232323-2323-2323-2323-232323232323")!,
@@ -59,12 +60,13 @@ struct EngagementCoordinatorTests {
expiresAt: Date(timeIntervalSince1970: 700)
)
- let url = try #require(EngagementHost.socketURL(
+ let maybeURL = try EngagementHost.socketURL(
room: room,
authorID: "alice",
deviceID: "deviceA",
baseURL: URL(string: "wss://example.org")
- ))
+ )
+ let url = try #require(maybeURL)
#expect(url.scheme == "wss")
#expect(url.host() == "example.org")
diff --git a/Tests/Unit/Sync/MovesInboundTests.swift b/Tests/Unit/Sync/MovesInboundTests.swift
@@ -188,8 +188,8 @@ struct MovesInboundTests {
#expect(row.ckSystemFields != nil)
}
- @Test("Inbound other-device record replaces cached row")
- func inboundOtherDeviceRecordReplacesCachedRow() throws {
+ @Test("Inbound other-device record preserves newer realtime cells")
+ func inboundOtherDeviceRecordPreservesNewerRealtimeCells() throws {
let persistence = makeTestPersistence()
let ctx = persistence.viewContext
@@ -207,6 +207,11 @@ struct MovesInboundTests {
updatedAt: Date(timeIntervalSince1970: 20),
authorID: "bob"
),
+ GridPosition(row: 1, col: 1): TimestampedCell(
+ letter: "", markKind: 0, checkedWrong: false,
+ updatedAt: Date(timeIntervalSince1970: 30),
+ authorID: "bob"
+ ),
]
let cached = MovesEntity(context: ctx)
cached.game = game
@@ -227,6 +232,16 @@ struct MovesInboundTests {
updatedAt: Date(timeIntervalSince1970: 10),
authorID: "bob"
),
+ GridPosition(row: 1, col: 1): TimestampedCell(
+ letter: "Z", markKind: 0, checkedWrong: false,
+ updatedAt: Date(timeIntervalSince1970: 15),
+ authorID: "bob"
+ ),
+ GridPosition(row: 2, col: 2): TimestampedCell(
+ letter: "C", markKind: 0, checkedWrong: false,
+ updatedAt: Date(timeIntervalSince1970: 40),
+ authorID: "bob"
+ ),
]
let (rec, value) = try record(
in: ctx,
@@ -244,9 +259,11 @@ struct MovesInboundTests {
)
let row = try #require(fetchAll(ctx).first)
- #expect(row.updatedAt == Date(timeIntervalSince1970: 10))
+ #expect(row.updatedAt == Date(timeIntervalSince1970: 40))
let decoded = try MovesCodec.decode(row.cells ?? Data())
- #expect(decoded == serverCells)
+ #expect(decoded[GridPosition(row: 0, col: 0)]?.letter == "B")
+ #expect(decoded[GridPosition(row: 1, col: 1)]?.letter == "")
+ #expect(decoded[GridPosition(row: 2, col: 2)]?.letter == "C")
}
@Test("Two devices for the same game produce two distinct rows")
diff --git a/Worker/engagement-worker.js b/Worker/engagement-worker.js
@@ -76,6 +76,7 @@ export class EngagementRoom {
await this.state.storage.put("createdAt", Date.now());
}
+ await this.state.storage.put("lastSeenAt", Date.now());
await this.state.storage.put(nonceKey, Date.now());
await this.pruneNonces();
await this.scheduleExpiry();
@@ -85,6 +86,8 @@ export class EngagementRoom {
async webSocketMessage(ws, message) {
const data = typeof message === "string" ? message : message.slice(0);
+ await this.state.storage.put("lastSeenAt", Date.now());
+ await this.scheduleExpiry();
for (const peer of this.state.getWebSockets()) {
if (peer === ws) continue;
peer.send(data);
@@ -92,17 +95,23 @@ export class EngagementRoom {
}
async webSocketClose(ws, code, reason) {
+ await this.state.storage.put("lastSeenAt", Date.now());
+ await this.scheduleExpiry();
ws.close(code, reason);
}
async alarm() {
await this.pruneNonces();
const ttlMs = Number(this.env.ROOM_TTL_SECONDS || "600") * 1000;
- const createdAt = await this.state.storage.get("createdAt");
- if (createdAt && Date.now() - createdAt > ttlMs) {
- for (const ws of this.state.getWebSockets()) {
- ws.close(1001, "room expired");
- }
+ const sockets = this.state.getWebSockets();
+ if (sockets.length > 0) {
+ await this.scheduleExpiry();
+ return;
+ }
+
+ const lastSeenAt = await this.state.storage.get("lastSeenAt");
+ const idleMs = Date.now() - (lastSeenAt || Date.now());
+ if (idleMs > ttlMs) {
await this.state.storage.deleteAll();
return;
}
@@ -122,8 +131,8 @@ export class EngagementRoom {
async scheduleExpiry() {
const ttlMs = Number(this.env.ROOM_TTL_SECONDS || "600") * 1000;
- const createdAt = (await this.state.storage.get("createdAt")) || Date.now();
- await this.state.storage.setAlarm(createdAt + ttlMs);
+ const lastSeenAt = (await this.state.storage.get("lastSeenAt")) || Date.now();
+ await this.state.storage.setAlarm(lastSeenAt + ttlMs);
}
}