commit 773ab3b53caa322092b3ecfc4258d285c40d9375
parent a0bfc5724aa5e592d1aa337f5e8e50a78dbdb918
Author: Michael Camilleri <[email protected]>
Date: Mon, 25 May 2026 16:21:56 +0900
Handle stuck engagement handshakes
The .offered and .replied values in the handshake had no local timeout: once
createOffer enqueued a hail, the offerer state was pinned until either a
matching reply arrived or the puzzle closed. If the reply ping was lost — peer
offline, network flap, sync engine taking a break — peerPresenceMayHaveChanged
would refuse to retry because the state isn't idle, and the session would
silently lose engagement until the user closed and reopened. The .replied value
had the same shape: once we accepted an offer and sent a reply, a missed
channelOpen left the answerer stuck waiting.
Both transitional cases now carry an `at: Date` payload stamped from the
injected clock at construction. A new sweepStaleHandshakes demotes any case
older than handshakeTimeout (30s, well inside the peer-side 120s hail discard
window) back to .idle and calls host.teardown to release the JS-side
RTCPeerConnection. The sweep runs at the top of peerPresenceMayHaveChanged and
offerEngagement — the natural retry triggers — so the loop that follows
auto-issues a fresh offer when the peer is still present. A Task.sleep per
offer would have been more responsive but adds task lifecycle inside the actor
and decouples retry from the presence signal that knows when retry can succeed;
the lazy sweep matches the existing test harness's clock-injection pattern
instead.
Tests cover both timeout paths plus the negative case: a 15s-old offer survives
the sweep, a 31s-old offer is torn down and re-issued, a 31s-old reply is torn
down, and the manual offerEngagement entry-point also recovers from a stuck
.offered. A small TestClock helper threads the injected now closure so time can
advance without sleeping.
Co-Authored-By: Claude Opus 4.7 <[email protected]>
Diffstat:
2 files changed, 227 insertions(+), 10 deletions(-)
diff --git a/Crossmate/Sync/EngagementCoordinator.swift b/Crossmate/Sync/EngagementCoordinator.swift
@@ -147,16 +147,16 @@ actor EngagementCoordinator {
private enum State: Equatable {
case idle
- case offered(peerAuthorID: String, engagementID: UUID)
- case replied(peerAuthorID: String, engagementID: UUID)
+ case offered(peerAuthorID: String, engagementID: UUID, at: Date)
+ case replied(peerAuthorID: String, engagementID: UUID, at: Date)
case live(peerAuthorID: String, engagementID: UUID)
var engagementID: UUID? {
switch self {
case .idle:
nil
- case .offered(_, let engagementID),
- .replied(_, let engagementID),
+ case .offered(_, let engagementID, _),
+ .replied(_, let engagementID, _),
.live(_, let engagementID):
engagementID
}
@@ -172,6 +172,7 @@ actor EngagementCoordinator {
private let log: Log
private let now: @Sendable () -> Date
private let hailMaxAge: TimeInterval
+ private let handshakeTimeout: TimeInterval
private var states: [UUID: State] = [:]
init(
@@ -183,7 +184,8 @@ actor EngagementCoordinator {
deletePing: @escaping DeletePing,
log: @escaping Log = { _ in },
now: @escaping @Sendable () -> Date = Date.init,
- hailMaxAge: TimeInterval = 120
+ hailMaxAge: TimeInterval = 120,
+ handshakeTimeout: TimeInterval = 30
) {
self.host = host
self.localAuthorID = localAuthorID
@@ -194,10 +196,12 @@ actor EngagementCoordinator {
self.log = log
self.now = now
self.hailMaxAge = hailMaxAge
+ self.handshakeTimeout = handshakeTimeout
}
func peerPresenceMayHaveChanged(gameIDs: Set<UUID>? = nil) async {
guard let localAuthorID = await localAuthorID(), !localAuthorID.isEmpty else { return }
+ await sweepStaleHandshakes()
let peersByGame = await presentPeers(gameIDs)
for (gameID, peers) in peersByGame {
guard state(for: gameID) == .idle else { continue }
@@ -208,6 +212,7 @@ actor EngagementCoordinator {
func offerEngagement(gameID: UUID) async {
guard let localAuthorID = await localAuthorID(), !localAuthorID.isEmpty else { return }
+ await sweepStaleHandshakes()
guard state(for: gameID) == .idle else {
await log("engagement: manual offer skipped for \(gameID.uuidString), state is not idle")
return
@@ -215,6 +220,43 @@ actor EngagementCoordinator {
await createOffer(gameID: gameID, peerAuthorID: localAuthorID)
}
+ /// Demotes any `.offered`/`.replied` state that has been waiting longer
+ /// than `handshakeTimeout` back to `.idle`, tearing down the host's
+ /// peer connection so the next presence tick can retry cleanly. The
+ /// peer-side discard window (`hailMaxAge`, 120 s) is much longer than
+ /// the local handshake budget, so by the time we demote, the peer has
+ /// either already replied (and we'd be `.live` or `.replied`) or is
+ /// unreachable for reasons that a retry might fix.
+ private func sweepStaleHandshakes() async {
+ let cutoff = now()
+ var demoted: [(gameID: UUID, engagementID: UUID, kind: String, age: TimeInterval)] = []
+ for (gameID, state) in states {
+ switch state {
+ case .offered(_, let engagementID, let at):
+ let age = cutoff.timeIntervalSince(at)
+ if age > handshakeTimeout {
+ demoted.append((gameID, engagementID, "offer", age))
+ states[gameID] = .idle
+ }
+ case .replied(_, let engagementID, let at):
+ let age = cutoff.timeIntervalSince(at)
+ if age > handshakeTimeout {
+ demoted.append((gameID, engagementID, "reply", age))
+ states[gameID] = .idle
+ }
+ case .idle, .live:
+ continue
+ }
+ }
+ for entry in demoted {
+ await host.teardown(engagementID: entry.engagementID)
+ await log(
+ "engagement: \(entry.kind) timed out for \(entry.gameID.uuidString) " +
+ "after \(Int(entry.age))s, engagement \(entry.engagementID.uuidString)"
+ )
+ }
+ }
+
func handle(_ ping: Ping) async {
guard ping.kind == .hail else { return }
guard let localAuthorID = await localAuthorID(), !localAuthorID.isEmpty else {
@@ -265,8 +307,8 @@ actor EngagementCoordinator {
switch state {
case .idle:
return nil
- case .offered(let peerAuthorID, _),
- .replied(let peerAuthorID, _),
+ case .offered(let peerAuthorID, _, _),
+ .replied(let peerAuthorID, _, _),
.live(let peerAuthorID, _):
states[gameID] = .live(peerAuthorID: peerAuthorID, engagementID: engagementID)
return gameID
@@ -318,7 +360,7 @@ actor EngagementCoordinator {
private func createOffer(gameID: UUID, peerAuthorID: String) async {
let engagementID = UUID()
- states[gameID] = .offered(peerAuthorID: peerAuthorID, engagementID: engagementID)
+ states[gameID] = .offered(peerAuthorID: peerAuthorID, engagementID: engagementID, at: now())
do {
let signal = try await host.createOffer(engagementID: engagementID)
let payload = try EngagementHailPayload(
@@ -355,7 +397,7 @@ actor EngagementCoordinator {
"with offer \(payload.engagementID.uuidString)"
)
}
- states[ping.gameID] = .replied(peerAuthorID: ping.authorID, engagementID: payload.engagementID)
+ states[ping.gameID] = .replied(peerAuthorID: ping.authorID, engagementID: payload.engagementID, at: now())
do {
let reply = try await host.acceptOfferAndReply(
engagementID: payload.engagementID,
@@ -380,7 +422,7 @@ actor EngagementCoordinator {
}
private func acceptReply(ping: Ping, payload: EngagementHailPayload) async {
- guard case .offered(let peerAuthorID, payload.engagementID) = state(for: ping.gameID),
+ guard case .offered(let peerAuthorID, payload.engagementID, _) = state(for: ping.gameID),
peerAuthorID == ping.authorID else {
await deletePing(ping.recordName, ping.gameID)
await log("engagement: deleted unmatched reply \(payload.engagementID.uuidString)")
diff --git a/Tests/Unit/Sync/EngagementCoordinatorTests.swift b/Tests/Unit/Sync/EngagementCoordinatorTests.swift
@@ -627,6 +627,160 @@ struct EngagementCoordinatorTests {
#expect(secondOffer.engagementID != firstOffer.engagementID)
}
+ @Test("stale offered state demotes to idle on next presence tick and host is torn down")
+ @MainActor
+ func staleOfferDemotesOnPresenceTick() async throws {
+ let gameID = UUID(uuidString: "C0FFEE00-0000-0000-0000-000000000001")!
+ let host = MockEngagementHost()
+ let sink = EngagementCoordinatorTestSink()
+ let clock = TestClock(time: Date(timeIntervalSince1970: 10_000))
+ let coordinator = EngagementCoordinator(
+ host: host,
+ localAuthorID: { "alice" },
+ localDeviceID: "deviceA",
+ presentPeers: { _ in [gameID: ["bob"]] },
+ sendHail: { gameID, payload, addressee in
+ await sink.send(gameID: gameID, payload: payload, addressee: addressee)
+ },
+ deletePing: { recordName, gameID in
+ await sink.delete(recordName: recordName, gameID: gameID)
+ },
+ now: { clock.now },
+ handshakeTimeout: 30
+ )
+
+ await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+ let firstOffer = try #require(EngagementHailPayload.decode(await sink.sentHails().first?.payload))
+ #expect(host.createdOffers == [firstOffer.engagementID])
+
+ clock.advance(by: 31)
+ await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+
+ #expect(host.tornDown == [firstOffer.engagementID])
+ let sent = await sink.sentHails()
+ #expect(sent.count == 2)
+ let secondOffer = try #require(EngagementHailPayload.decode(sent.last?.payload))
+ #expect(secondOffer.role == .offer)
+ #expect(secondOffer.engagementID != firstOffer.engagementID)
+ #expect(host.createdOffers == [firstOffer.engagementID, secondOffer.engagementID])
+ }
+
+ @Test("fresh offered state is not demoted by the sweep")
+ @MainActor
+ func freshOfferSurvivesPresenceTick() async throws {
+ let gameID = UUID(uuidString: "C0FFEE00-0000-0000-0000-000000000002")!
+ let host = MockEngagementHost()
+ let sink = EngagementCoordinatorTestSink()
+ let clock = TestClock(time: Date(timeIntervalSince1970: 10_000))
+ let coordinator = EngagementCoordinator(
+ host: host,
+ localAuthorID: { "alice" },
+ localDeviceID: "deviceA",
+ presentPeers: { _ in [gameID: ["bob"]] },
+ sendHail: { gameID, payload, addressee in
+ await sink.send(gameID: gameID, payload: payload, addressee: addressee)
+ },
+ deletePing: { recordName, gameID in
+ await sink.delete(recordName: recordName, gameID: gameID)
+ },
+ now: { clock.now },
+ handshakeTimeout: 30
+ )
+
+ await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+ clock.advance(by: 15)
+ await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+
+ #expect(host.tornDown.isEmpty)
+ #expect(await sink.sentHails().count == 1)
+ #expect(host.createdOffers.count == 1)
+ }
+
+ @Test("stale replied state demotes to idle and tears down the host")
+ @MainActor
+ func staleReplyDemotesOnPresenceTick() async throws {
+ let gameID = UUID(uuidString: "C0FFEE00-0000-0000-0000-000000000003")!
+ let host = MockEngagementHost()
+ let sink = EngagementCoordinatorTestSink()
+ let clock = TestClock(time: Date(timeIntervalSince1970: 10_000))
+ let coordinator = EngagementCoordinator(
+ host: host,
+ // Local author is bob so an incoming offer from alice lands us in `.replied`
+ // (peerPresenceMayHaveChanged would not initiate because alice < bob).
+ localAuthorID: { "bob" },
+ localDeviceID: "deviceB",
+ presentPeers: { _ in [:] },
+ sendHail: { gameID, payload, addressee in
+ await sink.send(gameID: gameID, payload: payload, addressee: addressee)
+ },
+ deletePing: { recordName, gameID in
+ await sink.delete(recordName: recordName, gameID: gameID)
+ },
+ now: { clock.now },
+ handshakeTimeout: 30
+ )
+ let offer = EngagementHailPayload(
+ role: .offer,
+ engagementID: UUID(uuidString: "C0FFEE00-0000-0000-0000-0000000000A1")!,
+ signal: EngagementSignal(sdp: "offer-sdp", candidates: [])
+ )
+ await coordinator.handle(ping(
+ recordName: recordName(
+ gameID: gameID,
+ authorID: "alice",
+ deviceID: "deviceA",
+ timestampMs: Int64(clock.now.timeIntervalSince1970 * 1000)
+ ),
+ gameID: gameID,
+ authorID: "alice",
+ deviceID: "deviceA",
+ payload: try offer.encoded(),
+ addressee: "bob:deviceB"
+ ))
+ #expect(host.acceptedOffers == [offer.engagementID])
+ #expect(host.tornDown.isEmpty)
+
+ clock.advance(by: 31)
+ await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+
+ #expect(host.tornDown == [offer.engagementID])
+ }
+
+ @Test("manual offer recovers a stuck offered state")
+ @MainActor
+ func manualOfferRecoversFromStuckOffered() async throws {
+ let gameID = UUID(uuidString: "C0FFEE00-0000-0000-0000-000000000004")!
+ let host = MockEngagementHost()
+ let sink = EngagementCoordinatorTestSink()
+ let clock = TestClock(time: Date(timeIntervalSince1970: 10_000))
+ let coordinator = EngagementCoordinator(
+ host: host,
+ localAuthorID: { "alice" },
+ localDeviceID: "deviceA",
+ presentPeers: { _ in [gameID: ["bob"]] },
+ sendHail: { gameID, payload, addressee in
+ await sink.send(gameID: gameID, payload: payload, addressee: addressee)
+ },
+ deletePing: { recordName, gameID in
+ await sink.delete(recordName: recordName, gameID: gameID)
+ },
+ now: { clock.now },
+ handshakeTimeout: 30
+ )
+
+ await coordinator.peerPresenceMayHaveChanged(gameIDs: [gameID])
+ let firstOffer = try #require(EngagementHailPayload.decode(await sink.sentHails().first?.payload))
+
+ clock.advance(by: 31)
+ await coordinator.offerEngagement(gameID: gameID)
+
+ #expect(host.tornDown == [firstOffer.engagementID])
+ let sent = await sink.sentHails()
+ #expect(sent.count == 2)
+ let secondOffer = try #require(EngagementHailPayload.decode(sent.last?.payload))
+ #expect(secondOffer.engagementID != firstOffer.engagementID)
+ }
+
private func ping(
recordName: String,
gameID: UUID,
@@ -691,6 +845,27 @@ private actor EngagementCoordinatorTestSink {
}
}
+private final class TestClock: @unchecked Sendable {
+ private let lock = NSLock()
+ private var current: Date
+
+ init(time: Date) {
+ self.current = time
+ }
+
+ var now: Date {
+ lock.lock()
+ defer { lock.unlock() }
+ return current
+ }
+
+ func advance(by seconds: TimeInterval) {
+ lock.lock()
+ defer { lock.unlock() }
+ current = current.addingTimeInterval(seconds)
+ }
+}
+
@MainActor
private final class MockEngagementHost: EngagementHosting, @unchecked Sendable {
let offerSignal = EngagementSignal(sdp: "local-offer-sdp", candidates: ["local-offer-candidate"])