From 81cdc0f972ce2008f8daab8100cdfd5e7f3906b4 Mon Sep 17 00:00:00 2001 From: Emelia Smith Date: Fri, 28 Jul 2023 12:06:29 +0200 Subject: [PATCH] Fix: Streaming server memory leak in HTTP EventSource cleanup (#26228) --- streaming/index.js | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/streaming/index.js b/streaming/index.js index 3adf37c19..9db23b876 100644 --- a/streaming/index.js +++ b/streaming/index.js @@ -222,9 +222,15 @@ const startServer = async () => { callbacks.forEach(callback => callback(json)); }; + /** + * @callback SubscriptionListener + * @param {ReturnType} json of the message + * @returns void + */ + /** * @param {string} channel - * @param {function(string): void} callback + * @param {SubscriptionListener} callback */ const subscribe = (channel, callback) => { log.silly(`Adding listener for ${channel}`); @@ -241,7 +247,7 @@ const startServer = async () => { /** * @param {string} channel - * @param {function(Object): void} callback + * @param {SubscriptionListener} callback */ const unsubscribe = (channel, callback) => { log.silly(`Removing listener for ${channel}`); @@ -613,9 +619,9 @@ const startServer = async () => { * @param {string[]} ids * @param {any} req * @param {function(string, string): void} output - * @param {function(string[], function(string): void): void} attachCloseHandler + * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler * @param {boolean=} needsFiltering - * @returns {function(object): void} + * @returns {SubscriptionListener} */ const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { const accountId = req.accountId || req.remoteAddress; @@ -633,6 +639,7 @@ const startServer = async () => { // The listener used to process each message off the redis subscription, // message here is an object with an `event` and `payload` property. Some // events also include a queued_at value, but this is being removed shortly. + /** @type {SubscriptionListener} */ const listener = message => { const { event, payload } = message; @@ -825,7 +832,7 @@ const startServer = async () => { subscribe(`${redisPrefix}${id}`, listener); }); - if (attachCloseHandler) { + if (typeof attachCloseHandler === 'function') { attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener); } @@ -862,12 +869,13 @@ const startServer = async () => { /** * @param {any} req * @param {function(): void} [closeHandler] - * @returns {function(string[]): void} + * @returns {function(string[], SubscriptionListener): void} */ - const streamHttpEnd = (req, closeHandler = undefined) => (ids) => { + + const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => { req.on('close', () => { ids.forEach(id => { - unsubscribe(id); + unsubscribe(id, listener); }); if (closeHandler) { @@ -1131,7 +1139,7 @@ const startServer = async () => { * @typedef WebSocketSession * @property {any} socket * @property {any} request - * @property {Object.} subscriptions + * @property {Object.} subscriptions */ /**