diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 3f5cd9d..3da56b3 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -414,7 +414,6 @@ export class YSocketIO { if (nsp?.sockets.size === 0 && stream) { this.cleanupNamespace(ns, stream, DEFAULT_CLEAR_TIMEOUT) if (this.namespaceDocMap.has(ns)) this.debouncedPersist(ns, true) - this.persistentLeaderOf.delete(ns) } logSocketIO(`disconnecting socket in ${ns}, ${nsp?.sockets.size || 0} remaining`) } @@ -475,7 +474,8 @@ export class YSocketIO { const nsp = this.namespaceMap.get(namespace) if (!nsp) return if (nsp.sockets.size === 0 && this.subscriber) { - this.cleanupNamespace(namespace, stream, DEFAULT_CLEAR_TIMEOUT) + const isLeader = this.persistentLeaderOf.has(namespace) + this.cleanupNamespace(namespace, stream, DEFAULT_CLEAR_TIMEOUT, isLeader) } /** @type {Uint8Array[]} */ @@ -708,13 +708,19 @@ export class YSocketIO { * @param {string} namespace * @param {string} stream * @param {number=} removeAfterWait + * @param {boolean=} extendExisting */ - cleanupNamespace (namespace, stream, removeAfterWait) { + cleanupNamespace (namespace, stream, removeAfterWait, extendExisting = false) { if (!removeAfterWait) { this.awaitingCleanupNamespace.delete(namespace) return this.cleanupNamespaceImpl(namespace, stream) } - if (this.awaitingCleanupNamespace.has(namespace)) return + + const existingTimer = this.awaitingCleanupNamespace.get(namespace) + if (existingTimer) { + if (!extendExisting) return + clearTimeout(existingTimer) + } const timer = setTimeout(async () => { const awaitingPersist = this.awaitingPersistMap.get(namespace) @@ -739,6 +745,15 @@ export class YSocketIO { this.namespaceDocMap.get(namespace)?.ydoc.destroy() this.namespaceDocMap.delete(namespace) this.namespacePersistentMap.delete(namespace) + this.persistentLeaderOf.delete(namespace) + + // clear leader key in redis when we fully clean up + if (this.client) { + const key = this.getLeaderKeyOf(namespace) + this.client.redis + .del(key) + .catch((e) => console.error(e)) + } } async waitUntilWorkerReady () { @@ -818,8 +833,11 @@ export class YSocketIO { const curLeader = await redis.get(key) // remove orphaned if exist - const aliveClients = this.namespaceMap.get(namespace)?.sockets.size || 0 - if (aliveClients === 0) { + const nsp = this.namespaceMap.get(namespace) + const hasDoc = this.namespaceDocMap.has(namespace) + const hasCleanupTimer = this.awaitingCleanupNamespace.has(namespace) + const isActiveHere = Boolean(nsp || hasDoc || hasCleanupTimer) + if (!isActiveHere) { logSocketIO(`clearing leader heartbeat for [${namespace}] (SID: ${this.serverId})`) this.persistentLeaderOf.delete(namespace) continue