Compare commits
15 Commits
2465f9d180
...
767aa3d66f
Author | SHA1 | Date |
---|---|---|
Ducky | 767aa3d66f | |
Ducky | 1642c4127d | |
Ducky | 9bef0d71ea | |
Ducky | ccaafd7157 | |
Ducky | f6625f49c2 | |
Ducky | fa9feece61 | |
Ducky | 7442c4fef4 | |
Ducky | 0124911b89 | |
Astra | 878d76adc5 | |
Claire | 370b8f0b81 | |
Renaud Chaput | b7bf343b26 | |
Emelia Smith | caf1c87ecd | |
Claire | eace7f9fcf | |
Emelia Smith | 2461ffbff9 | |
Claire | b5791487b1 |
|
@ -3,6 +3,14 @@ Changelog
|
||||||
|
|
||||||
All notable changes to this project will be documented in this file.
|
All notable changes to this project will be documented in this file.
|
||||||
|
|
||||||
|
## [4.1.6] - 2023-07-28
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
|
||||||
|
- Fix memory leak in streaming server ([ThisIsMissEm](https://github.com/mastodon/mastodon/pull/26228))
|
||||||
|
- Fix wrong filters sometimes applying in streaming ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26159), [ThisIsMissEm](https://github.com/mastodon/mastodon/pull/26213), [renchap](https://github.com/mastodon/mastodon/pull/26233))
|
||||||
|
- Fix incorrect connect timeout in outgoing requests ([ClearlyClaire](https://github.com/mastodon/mastodon/pull/26116))
|
||||||
|
|
||||||
## [4.1.5] - 2023-07-21
|
## [4.1.5] - 2023-07-21
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
# syntax=docker/dockerfile:1.4
|
||||||
|
# This needs to be bullseye-slim because the Ruby image is built on bullseye-slim
|
||||||
|
ARG NODE_VERSION="16.18.1-bullseye-slim"
|
||||||
|
|
||||||
|
FROM ghcr.io/moritzheiber/ruby-jemalloc:3.0.6-slim as ruby
|
||||||
|
FROM node:${NODE_VERSION} as build
|
||||||
|
|
||||||
|
COPY --from=ruby /opt/ruby /opt/ruby
|
||||||
|
|
||||||
|
ENV DEBIAN_FRONTEND="noninteractive" \
|
||||||
|
PATH="${PATH}:/opt/ruby/bin"
|
||||||
|
|
||||||
|
#SHELL ["/bin/bash", "-o", "pipefail", "-c"]
|
||||||
|
|
||||||
|
WORKDIR /opt/mastodon
|
||||||
|
COPY Gemfile* package.json yarn.lock /opt/mastodon/
|
||||||
|
|
||||||
|
# hadolint ignore=DL3008
|
||||||
|
RUN apt-get update && \
|
||||||
|
apt-get install -y --no-install-recommends build-essential \
|
||||||
|
ca-certificates \
|
||||||
|
git \
|
||||||
|
libicu-dev \
|
||||||
|
libidn11-dev \
|
||||||
|
libpq-dev \
|
||||||
|
libjemalloc-dev \
|
||||||
|
zlib1g-dev \
|
||||||
|
libgdbm-dev \
|
||||||
|
libgmp-dev \
|
||||||
|
libssl-dev \
|
||||||
|
libyaml-0-2 \
|
||||||
|
ca-certificates \
|
||||||
|
libreadline8 \
|
||||||
|
python3 \
|
||||||
|
shared-mime-info && \
|
||||||
|
bundle config set --local deployment 'true' && \
|
||||||
|
bundle config set --local without 'development test' && \
|
||||||
|
bundle config set silence_root_warning true && \
|
||||||
|
bundle install -j"$(nproc)" && \
|
||||||
|
yarn install --pure-lockfile --network-timeout 600000
|
||||||
|
|
||||||
|
FROM node:${NODE_VERSION}
|
||||||
|
|
||||||
|
ARG UID="991"
|
||||||
|
ARG GID="991"
|
||||||
|
|
||||||
|
COPY --from=ruby /opt/ruby /opt/ruby
|
||||||
|
|
||||||
|
#SHELL ["/bin/bash", "-o", "pipefail", "-c"]
|
||||||
|
|
||||||
|
ENV DEBIAN_FRONTEND="noninteractive" \
|
||||||
|
PATH="${PATH}:/opt/ruby/bin:/opt/mastodon/bin"
|
||||||
|
|
||||||
|
# Ignoreing these here since we don't want to pin any versions and the Debian image removes apt-get content after use
|
||||||
|
# hadolint ignore=DL3008,DL3009
|
||||||
|
RUN apt-get update && \
|
||||||
|
echo "Etc/UTC" > /etc/localtime && \
|
||||||
|
groupadd -g "${GID}" mastodon && \
|
||||||
|
useradd -l -u "$UID" -g "${GID}" -m -d /opt/mastodon mastodon && \
|
||||||
|
apt-get -y --no-install-recommends install whois \
|
||||||
|
wget \
|
||||||
|
procps \
|
||||||
|
libssl1.1 \
|
||||||
|
libpq5 \
|
||||||
|
imagemagick \
|
||||||
|
ffmpeg \
|
||||||
|
libjemalloc2 \
|
||||||
|
libicu67 \
|
||||||
|
libidn11 \
|
||||||
|
libyaml-0-2 \
|
||||||
|
file \
|
||||||
|
ca-certificates \
|
||||||
|
tzdata \
|
||||||
|
libreadline8 \
|
||||||
|
tini && \
|
||||||
|
ln -s /opt/mastodon /mastodon
|
||||||
|
|
||||||
|
# Note: no, cleaning here since Debian does this automatically
|
||||||
|
# See the file /etc/apt/apt.conf.d/docker-clean within the Docker image's filesystem
|
||||||
|
|
||||||
|
COPY --chown=mastodon:mastodon . /opt/mastodon
|
||||||
|
COPY --chown=mastodon:mastodon --from=build /opt/mastodon /opt/mastodon
|
||||||
|
|
||||||
|
ENV RAILS_ENV="production" \
|
||||||
|
NODE_ENV="production" \
|
||||||
|
RAILS_SERVE_STATIC_FILES="true" \
|
||||||
|
BIND="0.0.0.0"
|
||||||
|
|
||||||
|
# Set the run user
|
||||||
|
USER mastodon
|
||||||
|
WORKDIR /opt/mastodon
|
||||||
|
|
||||||
|
# Precompile assets
|
||||||
|
RUN OTP_SECRET=precompile_placeholder SECRET_KEY_BASE=precompile_placeholder rails assets:precompile && \
|
||||||
|
yarn cache clean
|
||||||
|
|
||||||
|
# Set the work dir and the container entry point
|
||||||
|
ENTRYPOINT ["/usr/bin/tini", "--"]
|
||||||
|
EXPOSE 3000 4000
|
|
@ -0,0 +1,6 @@
|
||||||
|
@import 'contrast/variables';
|
||||||
|
@import 'application';
|
||||||
|
@import 'contrast/diff';
|
||||||
|
@import 'elephant/layout-single-column.scss';
|
||||||
|
@import 'elephant/layout-multiple-columns.scss';
|
||||||
|
@import 'gh/elephant-mods.scss';
|
|
@ -0,0 +1,6 @@
|
||||||
|
@import 'mastodon-light/variables';
|
||||||
|
@import 'application';
|
||||||
|
@import 'mastodon-light/diff';
|
||||||
|
@import 'elephant/layout-single-column.scss';
|
||||||
|
@import 'elephant/layout-multiple-columns.scss';
|
||||||
|
@import 'gh/elephant-mods.scss';
|
|
@ -0,0 +1,4 @@
|
||||||
|
@import 'application';
|
||||||
|
@import 'elephant/layout-single-column.scss';
|
||||||
|
@import 'elephant/layout-multiple-columns.scss';
|
||||||
|
@import 'gh/elephant-mods.scss';
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -285,11 +285,11 @@ class Request
|
||||||
end
|
end
|
||||||
|
|
||||||
until socks.empty?
|
until socks.empty?
|
||||||
_, available_socks, = IO.select(nil, socks, nil, Request::TIMEOUT[:connect])
|
_, available_socks, = IO.select(nil, socks, nil, Request::TIMEOUT[:connect_timeout])
|
||||||
|
|
||||||
if available_socks.nil?
|
if available_socks.nil?
|
||||||
socks.each(&:close)
|
socks.each(&:close)
|
||||||
raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect]} seconds"
|
raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect_timeout]} seconds"
|
||||||
end
|
end
|
||||||
|
|
||||||
available_socks.each do |sock|
|
available_socks.each do |sock|
|
||||||
|
|
|
@ -1,3 +1,6 @@
|
||||||
default: styles/application.scss
|
default: styles/application.scss
|
||||||
contrast: styles/contrast.scss
|
contrast: styles/contrast.scss
|
||||||
mastodon-light: styles/mastodon-light.scss
|
mastodon-light: styles/mastodon-light.scss
|
||||||
|
elephant: styles/elephant.scss
|
||||||
|
elephant-contrast: styles/elephant-contrast.scss
|
||||||
|
elephant-light: styles/elephant-light.scss
|
||||||
|
|
|
@ -56,7 +56,7 @@ services:
|
||||||
|
|
||||||
web:
|
web:
|
||||||
build: .
|
build: .
|
||||||
image: ghcr.io/mastodon/mastodon:v4.1.5
|
image: ghcr.io/mastodon/mastodon:v4.1.6
|
||||||
restart: always
|
restart: always
|
||||||
env_file: .env.production
|
env_file: .env.production
|
||||||
command: bash -c "rm -f /mastodon/tmp/pids/server.pid; bundle exec rails s -p 3000"
|
command: bash -c "rm -f /mastodon/tmp/pids/server.pid; bundle exec rails s -p 3000"
|
||||||
|
@ -77,7 +77,7 @@ services:
|
||||||
|
|
||||||
streaming:
|
streaming:
|
||||||
build: .
|
build: .
|
||||||
image: ghcr.io/mastodon/mastodon:v4.1.5
|
image: ghcr.io/mastodon/mastodon:v4.1.6
|
||||||
restart: always
|
restart: always
|
||||||
env_file: .env.production
|
env_file: .env.production
|
||||||
command: node ./streaming
|
command: node ./streaming
|
||||||
|
@ -95,7 +95,7 @@ services:
|
||||||
|
|
||||||
sidekiq:
|
sidekiq:
|
||||||
build: .
|
build: .
|
||||||
image: ghcr.io/mastodon/mastodon:v4.1.5
|
image: ghcr.io/mastodon/mastodon:v4.1.6
|
||||||
restart: always
|
restart: always
|
||||||
env_file: .env.production
|
env_file: .env.production
|
||||||
command: bundle exec sidekiq
|
command: bundle exec sidekiq
|
||||||
|
|
|
@ -13,7 +13,7 @@ module Mastodon
|
||||||
end
|
end
|
||||||
|
|
||||||
def patch
|
def patch
|
||||||
5
|
6
|
||||||
end
|
end
|
||||||
|
|
||||||
def flags
|
def flags
|
||||||
|
@ -21,7 +21,7 @@ module Mastodon
|
||||||
end
|
end
|
||||||
|
|
||||||
def suffix
|
def suffix
|
||||||
'-gh23202'
|
'-gh23210'
|
||||||
end
|
end
|
||||||
|
|
||||||
def to_a
|
def to_a
|
||||||
|
|
|
@ -226,9 +226,15 @@ const startWorker = async (workerId) => {
|
||||||
callbacks.forEach(callback => callback(json));
|
callbacks.forEach(callback => callback(json));
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @callback SubscriptionListener
|
||||||
|
* @param {ReturnType<parseJSON>} json of the message
|
||||||
|
* @returns void
|
||||||
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} channel
|
* @param {string} channel
|
||||||
* @param {function(string): void} callback
|
* @param {SubscriptionListener} callback
|
||||||
*/
|
*/
|
||||||
const subscribe = (channel, callback) => {
|
const subscribe = (channel, callback) => {
|
||||||
log.silly(`Adding listener for ${channel}`);
|
log.silly(`Adding listener for ${channel}`);
|
||||||
|
@ -245,7 +251,7 @@ const startWorker = async (workerId) => {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} channel
|
* @param {string} channel
|
||||||
* @param {function(Object<string, any>): void} callback
|
* @param {SubscriptionListener} callback
|
||||||
*/
|
*/
|
||||||
const unsubscribe = (channel, callback) => {
|
const unsubscribe = (channel, callback) => {
|
||||||
log.silly(`Removing listener for ${channel}`);
|
log.silly(`Removing listener for ${channel}`);
|
||||||
|
@ -623,51 +629,66 @@ const startWorker = async (workerId) => {
|
||||||
* @param {string[]} ids
|
* @param {string[]} ids
|
||||||
* @param {any} req
|
* @param {any} req
|
||||||
* @param {function(string, string): void} output
|
* @param {function(string, string): void} output
|
||||||
* @param {function(string[], function(string): void): void} attachCloseHandler
|
* @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
|
||||||
* @param {boolean=} needsFiltering
|
* @param {boolean=} needsFiltering
|
||||||
* @returns {function(object): void}
|
* @returns {SubscriptionListener}
|
||||||
*/
|
*/
|
||||||
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
|
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
|
||||||
const accountId = req.accountId || req.remoteAddress;
|
const accountId = req.accountId || req.remoteAddress;
|
||||||
|
|
||||||
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
|
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
|
||||||
|
|
||||||
// Currently message is of type string, soon it'll be Record<string, any>
|
const transmit = (event, payload) => {
|
||||||
|
// TODO: Replace "string"-based delete payloads with object payloads:
|
||||||
|
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
|
||||||
|
|
||||||
|
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
|
||||||
|
output(event, encodedPayload);
|
||||||
|
};
|
||||||
|
|
||||||
|
// The listener used to process each message off the redis subscription,
|
||||||
|
// message here is an object with an `event` and `payload` property. Some
|
||||||
|
// events also include a queued_at value, but this is being removed shortly.
|
||||||
|
/** @type {SubscriptionListener} */
|
||||||
const listener = message => {
|
const listener = message => {
|
||||||
const { event, payload, queued_at } = message;
|
const { event, payload } = message;
|
||||||
|
|
||||||
const transmit = () => {
|
// Streaming only needs to apply filtering to some channels and only to
|
||||||
const now = new Date().getTime();
|
// some events. This is because majority of the filtering happens on the
|
||||||
const delta = now - queued_at;
|
// Ruby on Rails side when producing the event for streaming.
|
||||||
const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
|
//
|
||||||
|
// The only events that require filtering from the streaming server are
|
||||||
log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
|
// `update` and `status.update`, all other events are transmitted to the
|
||||||
output(event, encodedPayload);
|
// client as soon as they're received (pass-through).
|
||||||
};
|
//
|
||||||
|
// The channels that need filtering are determined in the function
|
||||||
// Only messages that may require filtering are statuses, since notifications
|
// `channelNameToIds` defined below:
|
||||||
// are already personalized and deletes do not matter
|
if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
|
||||||
if (!needsFiltering || event !== 'update') {
|
transmit(event, payload);
|
||||||
transmit();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const unpackedPayload = payload;
|
// The rest of the logic from here on in this function is to handle
|
||||||
const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
|
// filtering of statuses:
|
||||||
const accountDomain = unpackedPayload.account.acct.split('@')[1];
|
|
||||||
|
|
||||||
if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
|
// Filter based on language:
|
||||||
log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
|
if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
|
||||||
|
log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// When the account is not logged in, it is not necessary to confirm the block or mute
|
// When the account is not logged in, it is not necessary to confirm the block or mute
|
||||||
if (!req.accountId) {
|
if (!req.accountId) {
|
||||||
transmit();
|
transmit(event, payload);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pgPool.connect((err, client, done) => {
|
// Filter based on domain blocks, blocks, mutes, or custom filters:
|
||||||
|
const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
|
||||||
|
const accountDomain = payload.account.acct.split('@')[1];
|
||||||
|
|
||||||
|
// TODO: Move this logic out of the message handling loop
|
||||||
|
pgPool.connect((err, client, releasePgConnection) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(err);
|
log.error(err);
|
||||||
return;
|
return;
|
||||||
|
@ -682,40 +703,57 @@ const startWorker = async (workerId) => {
|
||||||
SELECT 1
|
SELECT 1
|
||||||
FROM mutes
|
FROM mutes
|
||||||
WHERE account_id = $1
|
WHERE account_id = $1
|
||||||
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
|
AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
|
||||||
];
|
];
|
||||||
|
|
||||||
if (accountDomain) {
|
if (accountDomain) {
|
||||||
queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
|
queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!unpackedPayload.filtered && !req.cachedFilters) {
|
if (!payload.filtered && !req.cachedFilters) {
|
||||||
queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
|
queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
|
||||||
}
|
}
|
||||||
|
|
||||||
Promise.all(queries).then(values => {
|
Promise.all(queries).then(values => {
|
||||||
done();
|
releasePgConnection();
|
||||||
|
|
||||||
|
// Handling blocks & mutes and domain blocks: If one of those applies,
|
||||||
|
// then we don't transmit the payload of the event to the client
|
||||||
if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
|
if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!unpackedPayload.filtered && !req.cachedFilters) {
|
// If the payload already contains the `filtered` property, it means
|
||||||
|
// that filtering has been applied on the ruby on rails side, as
|
||||||
|
// such, we don't need to construct or apply the filters in streaming:
|
||||||
|
if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
|
||||||
|
transmit(event, payload);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handling for constructing the custom filters and caching them on the request
|
||||||
|
// TODO: Move this logic out of the message handling lifecycle
|
||||||
|
if (!req.cachedFilters) {
|
||||||
const filterRows = values[accountDomain ? 2 : 1].rows;
|
const filterRows = values[accountDomain ? 2 : 1].rows;
|
||||||
|
|
||||||
req.cachedFilters = filterRows.reduce((cache, row) => {
|
req.cachedFilters = filterRows.reduce((cache, filter) => {
|
||||||
if (cache[row.id]) {
|
if (cache[filter.id]) {
|
||||||
cache[row.id].keywords.push([row.keyword, row.whole_word]);
|
cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
|
||||||
} else {
|
} else {
|
||||||
cache[row.id] = {
|
cache[filter.id] = {
|
||||||
keywords: [[row.keyword, row.whole_word]],
|
keywords: [[filter.keyword, filter.whole_word]],
|
||||||
expires_at: row.expires_at,
|
expires_at: filter.expires_at,
|
||||||
repr: {
|
filter: {
|
||||||
id: row.id,
|
id: filter.id,
|
||||||
title: row.title,
|
title: filter.title,
|
||||||
context: row.context,
|
context: filter.context,
|
||||||
expires_at: row.expires_at,
|
expires_at: filter.expires_at,
|
||||||
filter_action: ['warn', 'hide'][row.filter_action],
|
// filter.filter_action is the value from the
|
||||||
|
// custom_filters.action database column, it is an integer
|
||||||
|
// representing a value in an enum defined by Ruby on Rails:
|
||||||
|
//
|
||||||
|
// enum { warn: 0, hide: 1 }
|
||||||
|
filter_action: ['warn', 'hide'][filter.filter_action],
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -723,6 +761,10 @@ const startWorker = async (workerId) => {
|
||||||
return cache;
|
return cache;
|
||||||
}, {});
|
}, {});
|
||||||
|
|
||||||
|
// Construct the regular expressions for the custom filters: This
|
||||||
|
// needs to be done in a separate loop as the database returns one
|
||||||
|
// filterRow per keyword, so we need all the keywords before
|
||||||
|
// constructing the regular expression
|
||||||
Object.keys(req.cachedFilters).forEach((key) => {
|
Object.keys(req.cachedFilters).forEach((key) => {
|
||||||
req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
|
req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
|
||||||
let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
|
let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
|
||||||
|
@ -742,31 +784,58 @@ const startWorker = async (workerId) => {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check filters
|
// Apply cachedFilters against the payload, constructing a
|
||||||
if (req.cachedFilters && !unpackedPayload.filtered) {
|
// `filter_results` array of FilterResult entities
|
||||||
const status = unpackedPayload;
|
if (req.cachedFilters) {
|
||||||
const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
|
const status = payload;
|
||||||
const searchIndex = JSDOM.fragment(searchContent).textContent;
|
// TODO: Calculate searchableContent in Ruby on Rails:
|
||||||
|
const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
|
||||||
|
const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
|
||||||
|
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
payload.filtered = [];
|
const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
|
||||||
Object.values(req.cachedFilters).forEach((cachedFilter) => {
|
// Check the filter hasn't expired before applying:
|
||||||
if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) {
|
if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
|
||||||
const keyword_matches = searchIndex.match(cachedFilter.regexp);
|
return results;
|
||||||
if (keyword_matches) {
|
|
||||||
payload.filtered.push({
|
|
||||||
filter: cachedFilter.repr,
|
|
||||||
keyword_matches,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
transmit();
|
// Just in-case JSDOM fails to find textContent in searchableContent
|
||||||
|
if (!searchableTextContent) {
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
|
||||||
|
if (keyword_matches) {
|
||||||
|
// results is an Array of FilterResult; status_matches is always
|
||||||
|
// null as we only are only applying the keyword-based custom
|
||||||
|
// filters, not the status-based custom filters.
|
||||||
|
// https://docs.joinmastodon.org/entities/FilterResult/
|
||||||
|
results.push({
|
||||||
|
filter: cachedFilter.filter,
|
||||||
|
keyword_matches,
|
||||||
|
status_matches: null
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}, []);
|
||||||
|
|
||||||
|
// Send the payload + the FilterResults as the `filtered` property
|
||||||
|
// to the streaming connection. To reach this code, the `event` must
|
||||||
|
// have been either `update` or `status.update`, meaning the
|
||||||
|
// `payload` is a Status entity, which has a `filtered` property:
|
||||||
|
//
|
||||||
|
// filtered: https://docs.joinmastodon.org/entities/Status/#filtered
|
||||||
|
transmit(event, {
|
||||||
|
...payload,
|
||||||
|
filtered: filter_results
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
transmit(event, payload);
|
||||||
|
}
|
||||||
}).catch(err => {
|
}).catch(err => {
|
||||||
|
releasePgConnection();
|
||||||
log.error(err);
|
log.error(err);
|
||||||
done();
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
@ -775,7 +844,7 @@ const startWorker = async (workerId) => {
|
||||||
subscribe(`${redisPrefix}${id}`, listener);
|
subscribe(`${redisPrefix}${id}`, listener);
|
||||||
});
|
});
|
||||||
|
|
||||||
if (attachCloseHandler) {
|
if (typeof attachCloseHandler === 'function') {
|
||||||
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
|
attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -812,12 +881,13 @@ const startWorker = async (workerId) => {
|
||||||
/**
|
/**
|
||||||
* @param {any} req
|
* @param {any} req
|
||||||
* @param {function(): void} [closeHandler]
|
* @param {function(): void} [closeHandler]
|
||||||
* @return {function(string[]): void}
|
* @returns {function(string[], SubscriptionListener): void}
|
||||||
*/
|
*/
|
||||||
const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
|
|
||||||
|
const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
|
||||||
req.on('close', () => {
|
req.on('close', () => {
|
||||||
ids.forEach(id => {
|
ids.forEach(id => {
|
||||||
unsubscribe(id);
|
unsubscribe(id, listener);
|
||||||
});
|
});
|
||||||
|
|
||||||
if (closeHandler) {
|
if (closeHandler) {
|
||||||
|
@ -1077,7 +1147,7 @@ const startWorker = async (workerId) => {
|
||||||
* @typedef WebSocketSession
|
* @typedef WebSocketSession
|
||||||
* @property {any} socket
|
* @property {any} socket
|
||||||
* @property {any} request
|
* @property {any} request
|
||||||
* @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
|
* @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Reference in New Issue