Compare commits
21 Commits
gh/v4.1.5-
...
gh/stable
Author | SHA1 | Date |
---|---|---|
Ducky | 1c2f0f97e8 | |
Ducky | fa47e96fa0 | |
Ducky | 73938a75b2 | |
Ducky | f117479a75 | |
Ducky | cd2f2b2fe2 | |
Ducky | f14ac87522 | |
Ducky | 767aa3d66f | |
Ducky | 1642c4127d | |
Ducky | 9bef0d71ea | |
Ducky | ccaafd7157 | |
Ducky | f6625f49c2 | |
Ducky | fa9feece61 | |
Ducky | 7442c4fef4 | |
Ducky | 0124911b89 | |
Astra | 878d76adc5 | |
Claire | 370b8f0b81 | |
Renaud Chaput | b7bf343b26 | |
Emelia Smith | caf1c87ecd | |
Claire | eace7f9fcf | |
Emelia Smith | 2461ffbff9 | |
Claire | b5791487b1 |
|
@ -3,6 +3,14 @@ Changelog
|
|||
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
## [4.1.6] - 2023-07-28
|
||||
|
||||
### Fixed
|
||||
|
||||
- Fix memory leak in streaming server ([ThisIsMissEm](https://github.com/mastodon/mastodon/pull/26228))
|
||||
- Fix wrong filters sometimes applying in streaming ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26159), [ThisIsMissEm](https://github.com/mastodon/mastodon/pull/26213), [renchap](https://github.com/mastodon/mastodon/pull/26233))
|
||||
- Fix incorrect connect timeout in outgoing requests ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26116))
|
||||
|
||||
## [4.1.5] - 2023-07-21
|
||||
|
||||
### Added
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
# syntax=docker/dockerfile:1.4
|
||||
# This needs to be bullseye-slim because the Ruby image is built on bullseye-slim
|
||||
ARG NODE_VERSION="16.18.1-bullseye-slim"
|
||||
|
||||
FROM ghcr.io/moritzheiber/ruby-jemalloc:3.0.6-slim as ruby
|
||||
FROM node:${NODE_VERSION} as build
|
||||
|
||||
COPY --from=ruby /opt/ruby /opt/ruby
|
||||
|
||||
ENV DEBIAN_FRONTEND="noninteractive" \
|
||||
PATH="${PATH}:/opt/ruby/bin"
|
||||
|
||||
#SHELL ["/bin/bash", "-o", "pipefail", "-c"]
|
||||
|
||||
WORKDIR /opt/mastodon
|
||||
COPY Gemfile* package.json yarn.lock /opt/mastodon/
|
||||
|
||||
# hadolint ignore=DL3008
|
||||
RUN apt-get update && \
|
||||
apt-get install -y --no-install-recommends build-essential \
|
||||
ca-certificates \
|
||||
git \
|
||||
libicu-dev \
|
||||
libidn11-dev \
|
||||
libpq-dev \
|
||||
libjemalloc-dev \
|
||||
zlib1g-dev \
|
||||
libgdbm-dev \
|
||||
libgmp-dev \
|
||||
libssl-dev \
|
||||
libyaml-0-2 \
|
||||
ca-certificates \
|
||||
libreadline8 \
|
||||
python3 \
|
||||
shared-mime-info && \
|
||||
bundle config set --local deployment 'true' && \
|
||||
bundle config set --local without 'development test' && \
|
||||
bundle config set silence_root_warning true && \
|
||||
bundle install -j"$(nproc)" && \
|
||||
yarn install --pure-lockfile --network-timeout 600000
|
||||
|
||||
FROM node:${NODE_VERSION}
|
||||
|
||||
ARG UID="991"
|
||||
ARG GID="991"
|
||||
|
||||
COPY --from=ruby /opt/ruby /opt/ruby
|
||||
|
||||
#SHELL ["/bin/bash", "-o", "pipefail", "-c"]
|
||||
|
||||
ENV DEBIAN_FRONTEND="noninteractive" \
|
||||
PATH="${PATH}:/opt/ruby/bin:/opt/mastodon/bin"
|
||||
|
||||
# Ignoreing these here since we don't want to pin any versions and the Debian image removes apt-get content after use
|
||||
# hadolint ignore=DL3008,DL3009
|
||||
RUN apt-get update && \
|
||||
echo "Etc/UTC" > /etc/localtime && \
|
||||
groupadd -g "${GID}" mastodon && \
|
||||
useradd -l -u "$UID" -g "${GID}" -m -d /opt/mastodon mastodon && \
|
||||
apt-get -y --no-install-recommends install whois \
|
||||
wget \
|
||||
procps \
|
||||
libssl1.1 \
|
||||
libpq5 \
|
||||
imagemagick \
|
||||
ffmpeg \
|
||||
libjemalloc2 \
|
||||
libicu67 \
|
||||
libidn11 \
|
||||
libyaml-0-2 \
|
||||
file \
|
||||
ca-certificates \
|
||||
tzdata \
|
||||
libreadline8 \
|
||||
tini && \
|
||||
ln -s /opt/mastodon /mastodon
|
||||
|
||||
# Note: no, cleaning here since Debian does this automatically
|
||||
# See the file /etc/apt/apt.conf.d/docker-clean within the Docker image's filesystem
|
||||
|
||||
COPY --chown=mastodon:mastodon . /opt/mastodon
|
||||
COPY --chown=mastodon:mastodon --from=build /opt/mastodon /opt/mastodon
|
||||
|
||||
ENV RAILS_ENV="production" \
|
||||
NODE_ENV="production" \
|
||||
RAILS_SERVE_STATIC_FILES="true" \
|
||||
BIND="0.0.0.0"
|
||||
|
||||
# Set the run user
|
||||
USER mastodon
|
||||
WORKDIR /opt/mastodon
|
||||
|
||||
# Precompile assets
|
||||
RUN OTP_SECRET=precompile_placeholder SECRET_KEY_BASE=precompile_placeholder rails assets:precompile && \
|
||||
yarn cache clean
|
||||
|
||||
# Set the work dir and the container entry point
|
||||
ENTRYPOINT ["/usr/bin/tini", "--"]
|
||||
EXPOSE 3000 4000
|
|
@ -0,0 +1,6 @@
|
|||
@import 'contrast/variables';
|
||||
@import 'application';
|
||||
@import 'contrast/diff';
|
||||
@import 'elephant/layout-single-column.scss';
|
||||
@import 'elephant/layout-multiple-columns.scss';
|
||||
@import 'gh/elephant-mods.scss';
|
|
@ -0,0 +1,6 @@
|
|||
@import 'mastodon-light/variables';
|
||||
@import 'application';
|
||||
@import 'mastodon-light/diff';
|
||||
@import 'elephant/layout-single-column.scss';
|
||||
@import 'elephant/layout-multiple-columns.scss';
|
||||
@import 'gh/elephant-mods.scss';
|
|
@ -0,0 +1,4 @@
|
|||
@import 'application';
|
||||
@import 'elephant/layout-single-column.scss';
|
||||
@import 'elephant/layout-multiple-columns.scss';
|
||||
@import 'gh/elephant-mods.scss';
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -285,11 +285,11 @@ class Request
|
|||
end
|
||||
|
||||
until socks.empty?
|
||||
_, available_socks, = IO.select(nil, socks, nil, Request::TIMEOUT[:connect])
|
||||
_, available_socks, = IO.select(nil, socks, nil, Request::TIMEOUT[:connect_timeout])
|
||||
|
||||
if available_socks.nil?
|
||||
socks.each(&:close)
|
||||
raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect]} seconds"
|
||||
raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect_timeout]} seconds"
|
||||
end
|
||||
|
||||
available_socks.each do |sock|
|
||||
|
|
|
@ -1597,6 +1597,9 @@ en:
|
|||
themes:
|
||||
contrast: Mastodon (High contrast)
|
||||
default: Mastodon (Dark)
|
||||
elephant: Elephant (Dark)
|
||||
elephant-contrast: Elephant (Contrast)
|
||||
elephant-light: Elephant (Light)
|
||||
mastodon-light: Mastodon (Light)
|
||||
time:
|
||||
formats:
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
default: styles/application.scss
|
||||
contrast: styles/contrast.scss
|
||||
elephant: styles/elephant.scss
|
||||
elephant-light: styles/elephant-light.scss
|
||||
elephant-contrast: styles/elephant-contrast.scss
|
||||
mastodon-light: styles/mastodon-light.scss
|
||||
contrast: styles/contrast.scss
|
||||
|
|
|
@ -56,7 +56,7 @@ services:
|
|||
|
||||
web:
|
||||
build: .
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.5
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.6
|
||||
restart: always
|
||||
env_file: .env.production
|
||||
command: bash -c "rm -f /mastodon/tmp/pids/server.pid; bundle exec rails s -p 3000"
|
||||
|
@ -77,7 +77,7 @@ services:
|
|||
|
||||
streaming:
|
||||
build: .
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.5
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.6
|
||||
restart: always
|
||||
env_file: .env.production
|
||||
command: node ./streaming
|
||||
|
@ -95,7 +95,7 @@ services:
|
|||
|
||||
sidekiq:
|
||||
build: .
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.5
|
||||
image: ghcr.io/mastodon/mastodon:v4.1.6
|
||||
restart: always
|
||||
env_file: .env.production
|
||||
command: bundle exec sidekiq
|
||||
|
|
|
@ -13,7 +13,7 @@ module Mastodon
|
|||
end
|
||||
|
||||
def patch
|
||||
5
|
||||
6
|
||||
end
|
||||
|
||||
def flags
|
||||
|
@ -21,7 +21,7 @@ module Mastodon
|
|||
end
|
||||
|
||||
def suffix
|
||||
'-gh23202'
|
||||
'-gh23240'
|
||||
end
|
||||
|
||||
def to_a
|
||||
|
|
|
@ -226,9 +226,15 @@ const startWorker = async (workerId) => {
|
|||
callbacks.forEach(callback => callback(json));
|
||||
};
|
||||
|
||||
/**
|
||||
* @callback SubscriptionListener
|
||||
* @param {ReturnType<parseJSON>} json of the message
|
||||
* @returns void
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {string} channel
|
||||
* @param {function(string): void} callback
|
||||
* @param {SubscriptionListener} callback
|
||||
*/
|
||||
const subscribe = (channel, callback) => {
|
||||
log.silly(`Adding listener for ${channel}`);
|
||||
|
@ -245,7 +251,7 @@ const startWorker = async (workerId) => {
|
|||
|
||||
/**
|
||||
* @param {string} channel
|
||||
* @param {function(Object<string, any>): void} callback
|
||||
* @param {SubscriptionListener} callback
|
||||
*/
|
||||
const unsubscribe = (channel, callback) => {
|
||||
log.silly(`Removing listener for ${channel}`);
|
||||
|
@ -623,51 +629,66 @@ const startWorker = async (workerId) => {
|
|||
* @param {string[]} ids
|
||||
* @param {any} req
|
||||
* @param {function(string, string): void} output
|
||||
* @param {function(string[], function(string): void): void} attachCloseHandler
|
||||
* @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
|
||||
* @param {boolean=} needsFiltering
|
||||
* @returns {function(object): void}
|
||||
* @returns {SubscriptionListener}
|
||||
*/
|
||||
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
|
||||
const accountId = req.accountId || req.remoteAddress;
|
||||
|
||||
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
|
||||
|
||||
// Currently message is of type string, soon it'll be Record<string, any>
|
||||
const listener = message => {
|
||||
const { event, payload, queued_at } = message;
|
||||
|
||||
const transmit = () => {
|
||||
const now = new Date().getTime();
|
||||
const delta = now - queued_at;
|
||||
const transmit = (event, payload) => {
|
||||
// TODO: Replace "string"-based delete payloads with object payloads:
|
||||
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
|
||||
|
||||
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
|
||||
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
|
||||
output(event, encodedPayload);
|
||||
};
|
||||
|
||||
// Only messages that may require filtering are statuses, since notifications
|
||||
// are already personalized and deletes do not matter
|
||||
if (!needsFiltering || event !== 'update') {
|
||||
transmit();
|
||||
// The listener used to process each message off the redis subscription,
|
||||
// message here is an object with an `event` and `payload` property. Some
|
||||
// events also include a queued_at value, but this is being removed shortly.
|
||||
/** @type {SubscriptionListener} */
|
||||
const listener = message => {
|
||||
const { event, payload } = message;
|
||||
|
||||
// Streaming only needs to apply filtering to some channels and only to
|
||||
// some events. This is because majority of the filtering happens on the
|
||||
// Ruby on Rails side when producing the event for streaming.
|
||||
//
|
||||
// The only events that require filtering from the streaming server are
|
||||
// `update` and `status.update`, all other events are transmitted to the
|
||||
// client as soon as they're received (pass-through).
|
||||
//
|
||||
// The channels that need filtering are determined in the function
|
||||
// `channelNameToIds` defined below:
|
||||
if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
|
||||
transmit(event, payload);
|
||||
return;
|
||||
}
|
||||
|
||||
const unpackedPayload = payload;
|
||||
const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
|
||||
const accountDomain = unpackedPayload.account.acct.split('@')[1];
|
||||
// The rest of the logic from here on in this function is to handle
|
||||
// filtering of statuses:
|
||||
|
||||
if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
|
||||
log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
|
||||
// Filter based on language:
|
||||
if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
|
||||
log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
|
||||
return;
|
||||
}
|
||||
|
||||
// When the account is not logged in, it is not necessary to confirm the block or mute
|
||||
if (!req.accountId) {
|
||||
transmit();
|
||||
transmit(event, payload);
|
||||
return;
|
||||
}
|
||||
|
||||
pgPool.connect((err, client, done) => {
|
||||
// Filter based on domain blocks, blocks, mutes, or custom filters:
|
||||
const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
|
||||
const accountDomain = payload.account.acct.split('@')[1];
|
||||
|
||||
// TODO: Move this logic out of the message handling loop
|
||||
pgPool.connect((err, client, releasePgConnection) => {
|
||||
if (err) {
|
||||
log.error(err);
|
||||
return;
|
||||
|
@ -682,40 +703,57 @@ const startWorker = async (workerId) => {
|
|||
SELECT 1
|
||||
FROM mutes
|
||||
WHERE account_id = $1
|
||||
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
|
||||
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
|
||||
];
|
||||
|
||||
if (accountDomain) {
|
||||
queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
|
||||
}
|
||||
|
||||
if (!unpackedPayload.filtered && !req.cachedFilters) {
|
||||
if (!payload.filtered && !req.cachedFilters) {
|
||||
queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
|
||||
}
|
||||
|
||||
Promise.all(queries).then(values => {
|
||||
done();
|
||||
releasePgConnection();
|
||||
|
||||
// Handling blocks & mutes and domain blocks: If one of those applies,
|
||||
// then we don't transmit the payload of the event to the client
|
||||
if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!unpackedPayload.filtered && !req.cachedFilters) {
|
||||
// If the payload already contains the `filtered` property, it means
|
||||
// that filtering has been applied on the ruby on rails side, as
|
||||
// such, we don't need to construct or apply the filters in streaming:
|
||||
if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
|
||||
transmit(event, payload);
|
||||
return;
|
||||
}
|
||||
|
||||
// Handling for constructing the custom filters and caching them on the request
|
||||
// TODO: Move this logic out of the message handling lifecycle
|
||||
if (!req.cachedFilters) {
|
||||
const filterRows = values[accountDomain ? 2 : 1].rows;
|
||||
|
||||
req.cachedFilters = filterRows.reduce((cache, row) => {
|
||||
if (cache[row.id]) {
|
||||
cache[row.id].keywords.push([row.keyword, row.whole_word]);
|
||||
req.cachedFilters = filterRows.reduce((cache, filter) => {
|
||||
if (cache[filter.id]) {
|
||||
cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
|
||||
} else {
|
||||
cache[row.id] = {
|
||||
keywords: [[row.keyword, row.whole_word]],
|
||||
expires_at: row.expires_at,
|
||||
repr: {
|
||||
id: row.id,
|
||||
title: row.title,
|
||||
context: row.context,
|
||||
expires_at: row.expires_at,
|
||||
filter_action: ['warn', 'hide'][row.filter_action],
|
||||
cache[filter.id] = {
|
||||
keywords: [[filter.keyword, filter.whole_word]],
|
||||
expires_at: filter.expires_at,
|
||||
filter: {
|
||||
id: filter.id,
|
||||
title: filter.title,
|
||||
context: filter.context,
|
||||
expires_at: filter.expires_at,
|
||||
// filter.filter_action is the value from the
|
||||
// custom_filters.action database column, it is an integer
|
||||
// representing a value in an enum defined by Ruby on Rails:
|
||||
//
|
||||
// enum { warn: 0, hide: 1 }
|
||||
filter_action: ['warn', 'hide'][filter.filter_action],
|
||||
},
|
||||
};
|
||||
}
|
||||
|
@ -723,6 +761,10 @@ const startWorker = async (workerId) => {
|
|||
return cache;
|
||||
}, {});
|
||||
|
||||
// Construct the regular expressions for the custom filters: This
|
||||
// needs to be done in a separate loop as the database returns one
|
||||
// filterRow per keyword, so we need all the keywords before
|
||||
// constructing the regular expression
|
||||
Object.keys(req.cachedFilters).forEach((key) => {
|
||||
req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
|
||||
let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
|
||||
|
@ -742,31 +784,58 @@ const startWorker = async (workerId) => {
|
|||
});
|
||||
}
|
||||
|
||||
// Check filters
|
||||
if (req.cachedFilters && !unpackedPayload.filtered) {
|
||||
const status = unpackedPayload;
|
||||
const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
|
||||
const searchIndex = JSDOM.fragment(searchContent).textContent;
|
||||
// Apply cachedFilters against the payload, constructing a
|
||||
// `filter_results` array of FilterResult entities
|
||||
if (req.cachedFilters) {
|
||||
const status = payload;
|
||||
// TODO: Calculate searchableContent in Ruby on Rails:
|
||||
const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
|
||||
const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
|
||||
|
||||
const now = new Date();
|
||||
payload.filtered = [];
|
||||
Object.values(req.cachedFilters).forEach((cachedFilter) => {
|
||||
if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) {
|
||||
const keyword_matches = searchIndex.match(cachedFilter.regexp);
|
||||
const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
|
||||
// Check the filter hasn't expired before applying:
|
||||
if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
|
||||
return results;
|
||||
}
|
||||
|
||||
// Just in-case JSDOM fails to find textContent in searchableContent
|
||||
if (!searchableTextContent) {
|
||||
return results;
|
||||
}
|
||||
|
||||
const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
|
||||
if (keyword_matches) {
|
||||
payload.filtered.push({
|
||||
filter: cachedFilter.repr,
|
||||
// results is an Array of FilterResult; status_matches is always
|
||||
// null as we only are only applying the keyword-based custom
|
||||
// filters, not the status-based custom filters.
|
||||
// https://docs.joinmastodon.org/entities/FilterResult/
|
||||
results.push({
|
||||
filter: cachedFilter.filter,
|
||||
keyword_matches,
|
||||
});
|
||||
}
|
||||
}
|
||||
status_matches: null
|
||||
});
|
||||
}
|
||||
|
||||
transmit();
|
||||
return results;
|
||||
}, []);
|
||||
|
||||
// Send the payload + the FilterResults as the `filtered` property
|
||||
// to the streaming connection. To reach this code, the `event` must
|
||||
// have been either `update` or `status.update`, meaning the
|
||||
// `payload` is a Status entity, which has a `filtered` property:
|
||||
//
|
||||
// filtered: https://docs.joinmastodon.org/entities/Status/#filtered
|
||||
transmit(event, {
|
||||
...payload,
|
||||
filtered: filter_results
|
||||
});
|
||||
} else {
|
||||
transmit(event, payload);
|
||||
}
|
||||
}).catch(err => {
|
||||
releasePgConnection();
|
||||
log.error(err);
|
||||
done();
|
||||
});
|
||||
});
|
||||
};
|
||||
|
@ -775,7 +844,7 @@ const startWorker = async (workerId) => {
|
|||
subscribe(`${redisPrefix}${id}`, listener);
|
||||
});
|
||||
|
||||
if (attachCloseHandler) {
|
||||
if (typeof attachCloseHandler === 'function') {
|
||||
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
|
||||
}
|
||||
|
||||
|
@ -812,12 +881,13 @@ const startWorker = async (workerId) => {
|
|||
/**
|
||||
* @param {any} req
|
||||
* @param {function(): void} [closeHandler]
|
||||
* @return {function(string[]): void}
|
||||
* @returns {function(string[], SubscriptionListener): void}
|
||||
*/
|
||||
const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
|
||||
|
||||
const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
|
||||
req.on('close', () => {
|
||||
ids.forEach(id => {
|
||||
unsubscribe(id);
|
||||
unsubscribe(id, listener);
|
||||
});
|
||||
|
||||
if (closeHandler) {
|
||||
|
@ -1077,7 +1147,7 @@ const startWorker = async (workerId) => {
|
|||
* @typedef WebSocketSession
|
||||
* @property {any} socket
|
||||
* @property {any} request
|
||||
* @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
|
||||
* @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
|
||||
*/
|
||||
|
||||
/**
|
||||
|
|
Reference in New Issue