| 
						
					 | 
				
			
			 | 
			 | 
			
				@ -226,9 +226,15 @@ const startWorker = async (workerId) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    callbacks.forEach(callback => callback(json));
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  };
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  /**
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @callback SubscriptionListener
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {ReturnType<parseJSON>} json of the message
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @returns void
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   */
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  /**
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {string} channel
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {function(string): void} callback
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {SubscriptionListener} callback
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   */
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  const subscribe = (channel, callback) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    log.silly(`Adding listener for ${channel}`);
 | 
			
		
		
	
	
		
			
				
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@ -245,7 +251,7 @@ const startWorker = async (workerId) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  /**
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {string} channel
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {function(Object<string, any>): void} callback
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {SubscriptionListener} callback
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   */
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  const unsubscribe = (channel, callback) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    log.silly(`Removing listener for ${channel}`);
 | 
			
		
		
	
	
		
			
				
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@ -623,51 +629,66 @@ const startWorker = async (workerId) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {string[]} ids
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {any} req
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {function(string, string): void} output
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {function(string[], function(string): void): void} attachCloseHandler
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {undefined | function(string[], SubscriptionListener): void} attachCloseHandler
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {boolean=} needsFiltering
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @returns {function(object): void}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @returns {SubscriptionListener}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   */
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    const accountId = req.accountId || req.remoteAddress;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    // Currently message is of type string, soon it'll be Record<string, any>
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    const transmit = (event, payload) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // TODO: Replace "string"-based delete payloads with object payloads:
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload}`);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      output(event, encodedPayload);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    };
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    // The listener used to process each message off the redis subscription,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    // message here is an object with an `event` and `payload` property. Some
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    // events also include a queued_at value, but this is being removed shortly.
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    /** @type {SubscriptionListener} */
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    const listener = message => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      const { event, payload, queued_at } = message;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      const { event, payload } = message;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      const transmit = () => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        const now = new Date().getTime();
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        const delta = now - queued_at;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        output(event, encodedPayload);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      };
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // Only messages that may require filtering are statuses, since notifications
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // are already personalized and deletes do not matter
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      if (!needsFiltering || event !== 'update') {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        transmit();
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // Streaming only needs to apply filtering to some channels and only to
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // some events. This is because majority of the filtering happens on the
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // Ruby on Rails side when producing the event for streaming.
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      //
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // The only events that require filtering from the streaming server are
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // `update` and `status.update`, all other events are transmitted to the
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // client as soon as they're received (pass-through).
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      //
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // The channels that need filtering are determined in the function
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // `channelNameToIds` defined below:
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      if (!needsFiltering || (event !== 'update' && event !== 'status.update')) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        transmit(event, payload);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        return;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      const unpackedPayload = payload;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      const accountDomain = unpackedPayload.account.acct.split('@')[1];
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // The rest of the logic from here on in this function is to handle
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // filtering of statuses:
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // Filter based on language:
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      if (Array.isArray(req.chosenLanguages) && payload.language !== null && req.chosenLanguages.indexOf(payload.language) === -1) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        log.silly(req.requestId, `Message ${payload.id} filtered by language (${payload.language})`);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        return;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // When the account is not logged in, it is not necessary to confirm the block or mute
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      if (!req.accountId) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        transmit();
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        transmit(event, payload);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        return;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      pgPool.connect((err, client, done) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // Filter based on domain blocks, blocks, mutes, or custom filters:
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      const targetAccountIds = [payload.account.id].concat(payload.mentions.map(item => item.id));
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      const accountDomain = payload.account.acct.split('@')[1];
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      // TODO: Move this logic out of the message handling loop
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      pgPool.connect((err, client, releasePgConnection) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        if (err) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          log.error(err);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          return;
 | 
			
		
		
	
	
		
			
				
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@ -682,40 +703,57 @@ const startWorker = async (workerId) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                        SELECT 1
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                        FROM mutes
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                        WHERE account_id = $1
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                          AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                          AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, payload.account.id].concat(targetAccountIds)),
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        ];
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        if (accountDomain) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        if (!unpackedPayload.filtered && !req.cachedFilters) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        if (!payload.filtered && !req.cachedFilters) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          queries.push(client.query('SELECT filter.id AS id, filter.phrase AS title, filter.context AS context, filter.expires_at AS expires_at, filter.action AS filter_action, keyword.keyword AS keyword, keyword.whole_word AS whole_word FROM custom_filter_keywords keyword JOIN custom_filters filter ON keyword.custom_filter_id = filter.id WHERE filter.account_id = $1 AND (filter.expires_at IS NULL OR filter.expires_at > NOW())', [req.accountId]));
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        Promise.all(queries).then(values => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          done();
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          releasePgConnection();
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          // Handling blocks & mutes and domain blocks: If one of those applies,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          // then we don't transmit the payload of the event to the client
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          if (values[0].rows.length > 0 || (accountDomain && values[1].rows.length > 0)) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            return;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          if (!unpackedPayload.filtered && !req.cachedFilters) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          // If the payload already contains the `filtered` property, it means
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          // that filtering has been applied on the ruby on rails side, as
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          // such, we don't need to construct or apply the filters in streaming:
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          if (Object.prototype.hasOwnProperty.call(payload, "filtered")) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            transmit(event, payload);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            return;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          // Handling for constructing the custom filters and caching them on the request
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          // TODO: Move this logic out of the message handling lifecycle
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          if (!req.cachedFilters) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            const filterRows = values[accountDomain ? 2 : 1].rows;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            req.cachedFilters = filterRows.reduce((cache, row) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              if (cache[row.id]) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                cache[row.id].keywords.push([row.keyword, row.whole_word]);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            req.cachedFilters = filterRows.reduce((cache, filter) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              if (cache[filter.id]) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                cache[filter.id].keywords.push([filter.keyword, filter.whole_word]);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              } else {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                cache[row.id] = {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  keywords: [[row.keyword, row.whole_word]],
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  expires_at: row.expires_at,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  repr: {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    id: row.id,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    title: row.title,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    context: row.context,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    expires_at: row.expires_at,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    filter_action: ['warn', 'hide'][row.filter_action],
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                cache[filter.id] = {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  keywords: [[filter.keyword, filter.whole_word]],
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  expires_at: filter.expires_at,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  filter: {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    id: filter.id,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    title: filter.title,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    context: filter.context,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    expires_at: filter.expires_at,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    // filter.filter_action is the value from the
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    // custom_filters.action database column, it is an integer
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    // representing a value in an enum defined by Ruby on Rails:
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    //
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    // enum { warn: 0, hide: 1 }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    filter_action: ['warn', 'hide'][filter.filter_action],
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  },
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                };
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              }
 | 
			
		
		
	
	
		
			
				
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@ -723,6 +761,10 @@ const startWorker = async (workerId) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              return cache;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            }, {});
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            // Construct the regular expressions for the custom filters: This
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            // needs to be done in a separate loop as the database returns one
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            // filterRow per keyword, so we need all the keywords before
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            // constructing the regular expression
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            Object.keys(req.cachedFilters).forEach((key) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              req.cachedFilters[key].regexp = new RegExp(req.cachedFilters[key].keywords.map(([keyword, whole_word]) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                let expr = keyword.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
 | 
			
		
		
	
	
		
			
				
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@ -742,31 +784,58 @@ const startWorker = async (workerId) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            });
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          // Check filters
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          if (req.cachedFilters && !unpackedPayload.filtered) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            const status = unpackedPayload;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            const searchContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            const searchIndex = JSDOM.fragment(searchContent).textContent;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          // Apply cachedFilters against the payload, constructing a
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          // `filter_results` array of FilterResult entities
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          if (req.cachedFilters) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            const status = payload;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            // TODO: Calculate searchableContent in Ruby on Rails:
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            const searchableContent = ([status.spoiler_text || '', status.content].concat((status.poll && status.poll.options) ? status.poll.options.map(option => option.title) : [])).concat(status.media_attachments.map(att => att.description)).join('\n\n').replace(/<br\s*\/?>/g, '\n').replace(/<\/p><p>/g, '\n\n');
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            const searchableTextContent = JSDOM.fragment(searchableContent).textContent;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            const now = new Date();
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            payload.filtered = [];
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            Object.values(req.cachedFilters).forEach((cachedFilter) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              if ((cachedFilter.expires_at === null || cachedFilter.expires_at > now)) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                const keyword_matches = searchIndex.match(cachedFilter.regexp);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                if (keyword_matches) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  payload.filtered.push({
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    filter: cachedFilter.repr,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                    keyword_matches,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  });
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            const filter_results = Object.values(req.cachedFilters).reduce((results, cachedFilter) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              // Check the filter hasn't expired before applying:
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              if (cachedFilter.expires_at !== null && cachedFilter.expires_at < now) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                return results;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            });
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          transmit();
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              // Just in-case JSDOM fails to find textContent in searchableContent
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              if (!searchableTextContent) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                return results;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              const keyword_matches = searchableTextContent.match(cachedFilter.regexp);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              if (keyword_matches) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                // results is an Array of FilterResult; status_matches is always
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                // null as we only are only applying the keyword-based custom
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                // filters, not the status-based custom filters.
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                // https://docs.joinmastodon.org/entities/FilterResult/
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                results.push({
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  filter: cachedFilter.filter,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  keyword_matches,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                  status_matches: null
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				                });
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              return results;
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            }, []);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            // Send the payload + the FilterResults as the `filtered` property
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            // to the streaming connection. To reach this code, the `event` must
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            // have been either `update` or `status.update`, meaning the
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            // `payload` is a Status entity, which has a `filtered` property:
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            //
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            // filtered: https://docs.joinmastodon.org/entities/Status/#filtered
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            transmit(event, {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              ...payload,
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				              filtered: filter_results
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            });
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          } else {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				            transmit(event, payload);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        }).catch(err => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          releasePgConnection();
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          log.error(err);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				          done();
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        });
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      });
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    };
 | 
			
		
		
	
	
		
			
				
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@ -775,7 +844,7 @@ const startWorker = async (workerId) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      subscribe(`${redisPrefix}${id}`, listener);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    });
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    if (attachCloseHandler) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    if (typeof attachCloseHandler === 'function') {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      attachCloseHandler(ids.map(id => `${redisPrefix}${id}`), listener);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    }
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
	
		
			
				
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@ -812,12 +881,13 @@ const startWorker = async (workerId) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  /**
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {any} req
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @param {function(): void} [closeHandler]
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @return {function(string[]): void}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @returns {function(string[], SubscriptionListener): void}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   */
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  const streamHttpEnd = (req, closeHandler = undefined) => (ids) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  const streamHttpEnd = (req, closeHandler = undefined) => (ids, listener) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				    req.on('close', () => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      ids.forEach(id => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        unsubscribe(id);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				        unsubscribe(id, listener);
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      });
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				      if (closeHandler) {
 | 
			
		
		
	
	
		
			
				
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@ -1077,7 +1147,7 @@ const startWorker = async (workerId) => {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @typedef WebSocketSession
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @property {any} socket
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @property {any} request
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @property {Object.<string, { listener: function(string): void, stopHeartbeat: function(): void }>} subscriptions
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   * @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				   */
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				  /**
 | 
			
		
		
	
	
		
			
				
					| 
						
					 | 
				
			
			 | 
			 | 
			
				
 
 |