Archived
2
0
Fork 0

4.1.3-gh23188

This commit is contained in:
Ducky 2023-07-07 16:24:49 +01:00
commit 5812b02c24
62 changed files with 860 additions and 216 deletions

View file

@ -20,6 +20,7 @@ module Admin
authorize :webhook, :create?
@webhook = Webhook.new(resource_params)
@webhook.current_account = current_account
if @webhook.save
redirect_to admin_webhook_path(@webhook)
@ -39,10 +40,12 @@ module Admin
def update
authorize @webhook, :update?
@webhook.current_account = current_account
if @webhook.update(resource_params)
redirect_to admin_webhook_path(@webhook)
else
render :show
render :edit
end
end

View file

@ -11,7 +11,7 @@ class Api::V1::ConversationsController < Api::BaseController
def index
@conversations = paginated_conversations
render json: @conversations, each_serializer: REST::ConversationSerializer
render json: @conversations, each_serializer: REST::ConversationSerializer, relationships: StatusRelationshipsPresenter.new(@conversations.map(&:last_status), current_user&.account_id)
end
def read
@ -32,6 +32,19 @@ class Api::V1::ConversationsController < Api::BaseController
def paginated_conversations
AccountConversation.where(account: current_account)
.includes(
account: :account_stat,
last_status: [
:media_attachments,
:preview_cards,
:status_stat,
:tags,
{
active_mentions: [account: :account_stat],
account: :account_stat,
},
]
)
.to_a_paginated_by_id(limit_param(LIMIT), params_slice(:max_id, :since_id, :min_id))
end

View file

@ -7,11 +7,15 @@ class Api::V1::Statuses::HistoriesController < Api::BaseController
before_action :set_status
def show
render json: @status.edits.includes(:account, status: [:account]), each_serializer: REST::StatusEditSerializer
render json: status_edits, each_serializer: REST::StatusEditSerializer
end
private
def status_edits
@status.edits.includes(:account, status: [:account]).to_a.presence || [@status.build_snapshot(at_time: @status.edited_at || @status.created_at)]
end
def set_status
@status = Status.find(params[:status_id])
authorize @status, :show?

View file

@ -2,6 +2,8 @@
class Api::V1::Statuses::ReblogsController < Api::BaseController
include Authorization
include Redisable
include Lockable
before_action -> { doorkeeper_authorize! :write, :'write:statuses' }
before_action :require_user!
@ -10,7 +12,9 @@ class Api::V1::Statuses::ReblogsController < Api::BaseController
override_rate_limit_headers :create, family: :statuses
def create
@status = ReblogService.new.call(current_account, @reblog, reblog_params)
with_lock("reblog:#{current_account.id}:#{@reblog.id}") do
@status = ReblogService.new.call(current_account, @reblog, reblog_params)
end
render json: @status, serializer: REST::StatusSerializer
end

View file

@ -18,6 +18,14 @@ class Api::V2::Admin::AccountsController < Api::V1::Admin::AccountsController
private
def next_path
api_v2_admin_accounts_url(pagination_params(max_id: pagination_max_id)) if records_continue?
end
def prev_path
api_v2_admin_accounts_url(pagination_params(min_id: pagination_since_id)) unless @accounts.empty?
end
def filtered_accounts
AccountFilter.new(translated_filter_params).results
end

View file

@ -13,7 +13,7 @@ class BackupsController < ApplicationController
when :s3
redirect_to @backup.dump.expiring_url(10)
when :fog
if Paperclip::Attachment.default_options.dig(:storage, :fog_credentials, :openstack_temp_url_key).present?
if Paperclip::Attachment.default_options.dig(:fog_credentials, :openstack_temp_url_key).present?
redirect_to @backup.dump.expiring_url(Time.now.utc + 10)
else
redirect_to full_asset_url(@backup.dump.url)

View file

@ -46,6 +46,6 @@ class MediaController < ApplicationController
end
def allow_iframing
response.headers['X-Frame-Options'] = 'ALLOWALL'
response.headers.delete('X-Frame-Options')
end
end

View file

@ -8,6 +8,8 @@ class Oauth::AuthorizedApplicationsController < Doorkeeper::AuthorizedApplicatio
before_action :require_not_suspended!, only: :destroy
before_action :set_body_classes
before_action :set_last_used_at_by_app, only: :index, unless: -> { request.format == :json }
skip_before_action :require_functional!
include Localized
@ -30,4 +32,14 @@ class Oauth::AuthorizedApplicationsController < Doorkeeper::AuthorizedApplicatio
def require_not_suspended!
forbidden if current_account.suspended?
end
def set_last_used_at_by_app
@last_used_at_by_app = Doorkeeper::AccessToken
.select('DISTINCT ON (application_id) application_id, last_used_at')
.where(resource_owner_id: current_resource_owner.id)
.where.not(last_used_at: nil)
.order(application_id: :desc, last_used_at: :desc)
.pluck(:application_id, :last_used_at)
.to_h
end
end

View file

@ -43,7 +43,7 @@ class StatusesController < ApplicationController
return not_found if @status.hidden? || @status.reblog?
expires_in 180, public: true
response.headers['X-Frame-Options'] = 'ALLOWALL'
response.headers.delete('X-Frame-Options')
render layout: 'embedded'
end

View file

@ -18,7 +18,14 @@ module WellKnown
private
def set_account
@account = Account.find_local!(username_from_resource)
username = username_from_resource
@account = begin
if username == Rails.configuration.x.local_domain
Account.representative
else
Account.find_local!(username)
end
end
end
def username_from_resource

View file

@ -58,6 +58,10 @@ module FormattingHelper
end
def account_field_value_format(field, with_rel_me: true)
html_aware_format(field.value, field.account.local?, with_rel_me: with_rel_me, with_domains: true, multiline: false)
if field.verified? && !field.account.local?
TextFormatter.shortened_link(field.value_for_verification)
else
html_aware_format(field.value, field.account.local?, with_rel_me: with_rel_me, with_domains: true, multiline: false)
end
end
end

View file

@ -6,7 +6,7 @@ class AccountReachFinder
end
def inboxes
(followers_inboxes + reporters_inboxes + relay_inboxes).uniq
(followers_inboxes + reporters_inboxes + recently_mentioned_inboxes + relay_inboxes).uniq
end
private
@ -19,6 +19,13 @@ class AccountReachFinder
Account.where(id: @account.targeted_reports.select(:account_id)).inboxes
end
def recently_mentioned_inboxes
cutoff_id = Mastodon::Snowflake.id_at(2.days.ago, with_random: false)
recent_statuses = @account.statuses.recent.where(id: cutoff_id...).limit(200)
Account.joins(:mentions).where(mentions: { status: recent_statuses }).inboxes.take(2000)
end
def relay_inboxes
Relay.enabled.pluck(:inbox_url)
end

View file

@ -9,10 +9,6 @@ module ApplicationExtension
validates :redirect_uri, length: { maximum: 2_000 }
end
def most_recently_used_access_token
@most_recently_used_access_token ||= access_tokens.where.not(last_used_at: nil).order(last_used_at: :desc).first
end
def confirmation_redirect_uri
redirect_uri.lines.first.strip
end

View file

@ -140,7 +140,7 @@ class LinkDetailsExtractor
end
def html
player_url.present? ? content_tag(:iframe, nil, src: player_url, width: width, height: height, allowtransparency: 'true', scrolling: 'no', frameborder: '0') : nil
player_url.present? ? content_tag(:iframe, nil, src: player_url, width: width, height: height, allowfullscreen: 'true', allowtransparency: 'true', scrolling: 'no', frameborder: '0') : nil
end
def width

View file

@ -7,11 +7,48 @@ require 'resolv'
# Monkey-patch the HTTP.rb timeout class to avoid using a timeout block
# around the Socket#open method, since we use our own timeout blocks inside
# that method
#
# Also changes how the read timeout behaves so that it is cumulative (closer
# to HTTP::Timeout::Global, but still having distinct timeouts for other
# operation types)
class HTTP::Timeout::PerOperation
def connect(socket_class, host, port, nodelay = false)
@socket = socket_class.open(host, port)
@socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) if nodelay
end
# Reset deadline when the connection is re-used for different requests
def reset_counter
@deadline = nil
end
# Read data from the socket
def readpartial(size, buffer = nil)
@deadline ||= Process.clock_gettime(Process::CLOCK_MONOTONIC) + @read_timeout
timeout = false
loop do
result = @socket.read_nonblock(size, buffer, exception: false)
return :eof if result.nil?
remaining_time = @deadline - Process.clock_gettime(Process::CLOCK_MONOTONIC)
raise HTTP::TimeoutError, "Read timed out after #{@read_timeout} seconds" if timeout || remaining_time <= 0
return result if result != :wait_readable
# marking the socket for timeout. Why is this not being raised immediately?
# it seems there is some race-condition on the network level between calling
# #read_nonblock and #wait_readable, in which #read_nonblock signalizes waiting
# for reads, and when waiting for x seconds, it returns nil suddenly without completing
# the x seconds. In a normal case this would be a timeout on wait/read, but it can
# also mean that the socket has been closed by the server. Therefore we "mark" the
# socket for timeout and try to read more bytes. If it returns :eof, it's all good, no
# timeout. Else, the first timeout was a proper timeout.
# This hack has to be done because io/wait#wait_readable doesn't provide a value for when
# the socket is closed by the server, and HTTP::Parser doesn't provide the limit for the chunks.
timeout = true unless @socket.to_io.wait_readable(remaining_time)
end
end
end
class Request

View file

@ -1,7 +1,7 @@
# frozen_string_literal: true
class ScopeParser < Parslet::Parser
rule(:term) { match('[a-z]').repeat(1).as(:term) }
rule(:term) { match('[a-z_]').repeat(1).as(:term) }
rule(:colon) { str(':') }
rule(:access) { (str('write') | str('read')).as(:access) }
rule(:namespace) { str('admin').as(:namespace) }

View file

@ -48,6 +48,26 @@ class TextFormatter
html.html_safe # rubocop:disable Rails/OutputSafety
end
class << self
include ERB::Util
def shortened_link(url, rel_me: false)
url = Addressable::URI.parse(url).to_s
rel = rel_me ? (DEFAULT_REL + %w(me)) : DEFAULT_REL
prefix = url.match(URL_PREFIX_REGEX).to_s
display_url = url[prefix.length, 30]
suffix = url[prefix.length + 30..-1]
cutoff = url[prefix.length..-1].length > 30
<<~HTML.squish
<a href="#{h(url)}" target="_blank" rel="#{rel.join(' ')}"><span class="invisible">#{h(prefix)}</span><span class="#{cutoff ? 'ellipsis' : ''}">#{h(display_url)}</span><span class="invisible">#{h(suffix)}</span></a>
HTML
rescue Addressable::URI::InvalidURIError, IDN::Idna::IdnaError
h(url)
end
end
private
def rewrite
@ -70,19 +90,7 @@ class TextFormatter
end
def link_to_url(entity)
url = Addressable::URI.parse(entity[:url]).to_s
rel = with_rel_me? ? (DEFAULT_REL + %w(me)) : DEFAULT_REL
prefix = url.match(URL_PREFIX_REGEX).to_s
display_url = url[prefix.length, 30]
suffix = url[prefix.length + 30..-1]
cutoff = url[prefix.length..-1].length > 30
<<~HTML.squish
<a href="#{h(url)}" target="_blank" rel="#{rel.join(' ')}"><span class="invisible">#{h(prefix)}</span><span class="#{cutoff ? 'ellipsis' : ''}">#{h(display_url)}</span><span class="invisible">#{h(suffix)}</span></a>
HTML
rescue Addressable::URI::InvalidURIError, IDN::Idna::IdnaError
h(entity[:url])
TextFormatter.shortened_link(entity[:url], rel_me: with_rel_me?)
end
def link_to_hashtag(entity)

View file

@ -9,10 +9,12 @@ class Vacuum::AccessTokensVacuum
private
def vacuum_revoked_access_tokens!
Doorkeeper::AccessToken.where.not(revoked_at: nil).where('revoked_at < NOW()').delete_all
Doorkeeper::AccessToken.where.not(expires_in: nil).where('created_at + make_interval(secs => expires_in) < NOW()').in_batches.delete_all
Doorkeeper::AccessToken.where.not(revoked_at: nil).where('revoked_at < NOW()').in_batches.delete_all
end
def vacuum_revoked_access_grants!
Doorkeeper::AccessGrant.where.not(revoked_at: nil).where('revoked_at < NOW()').delete_all
Doorkeeper::AccessGrant.where.not(expires_in: nil).where('created_at + make_interval(secs => expires_in) < NOW()').in_batches.delete_all
Doorkeeper::AccessGrant.where.not(revoked_at: nil).where('revoked_at < NOW()').in_batches.delete_all
end
end

View file

@ -16,34 +16,44 @@
class AccountConversation < ApplicationRecord
include Redisable
attr_writer :participant_accounts
before_validation :set_last_status
after_commit :push_to_streaming_api
belongs_to :account
belongs_to :conversation
belongs_to :last_status, class_name: 'Status'
before_validation :set_last_status
def participant_account_ids=(arr)
self[:participant_account_ids] = arr.sort
@participant_accounts = nil
end
def participant_accounts
if participant_account_ids.empty?
[account]
else
participants = Account.where(id: participant_account_ids)
participants.empty? ? [account] : participants
end
@participant_accounts ||= Account.where(id: participant_account_ids).to_a
@participant_accounts.presence || [account]
end
class << self
def to_a_paginated_by_id(limit, options = {})
if options[:min_id]
paginate_by_min_id(limit, options[:min_id], options[:max_id]).reverse
else
paginate_by_max_id(limit, options[:max_id], options[:since_id]).to_a
array = begin
if options[:min_id]
paginate_by_min_id(limit, options[:min_id], options[:max_id]).reverse
else
paginate_by_max_id(limit, options[:max_id], options[:since_id]).to_a
end
end
# Preload participants
participant_ids = array.flat_map(&:participant_account_ids)
accounts_by_id = Account.where(id: participant_ids).index_by(&:id)
array.each do |conversation|
conversation.participant_accounts = conversation.participant_account_ids.filter_map { |id| accounts_by_id[id] }
end
array
end
def paginate_by_min_id(limit, min_id = nil, max_id = nil)

View file

@ -22,15 +22,14 @@ module Attachmentable
included do
def self.has_attached_file(name, options = {}) # rubocop:disable Naming/PredicateName
options = { validate_media_type: false }.merge(options)
super(name, options)
send(:"before_#{name}_post_process") do
send(:"before_#{name}_validate") do
attachment = send(name)
check_image_dimension(attachment)
set_file_content_type(attachment)
obfuscate_file_name(attachment)
set_file_extension(attachment)
Paperclip::Validators::MediaTypeSpoofDetectionValidator.new(attributes: [name]).validate(self)
end
end
end

View file

@ -123,7 +123,18 @@ class Form::AccountBatch
account: current_account,
action: :suspend
)
Admin::SuspensionWorker.perform_async(account.id)
# Suspending a single account closes their associated reports, so
# mass-suspending would be consistent.
Report.where(target_account: account).unresolved.find_each do |report|
authorize(report, :update?)
log_action(:resolve, report)
report.resolve!(current_account)
rescue Mastodon::NotPermittedError
# This should not happen, but just in case, do not fail early
end
end
def approve_account(account)

View file

@ -12,7 +12,7 @@
#
class Identity < ApplicationRecord
belongs_to :user, dependent: :destroy
belongs_to :user
validates :uid, presence: true, uniqueness: { scope: :provider }
validates :provider, presence: true

View file

@ -20,6 +20,8 @@ class Webhook < ApplicationRecord
report.created
).freeze
attr_writer :current_account
scope :enabled, -> { where(enabled: true) }
validates :url, presence: true, url: true
@ -27,6 +29,7 @@ class Webhook < ApplicationRecord
validates :events, presence: true
validate :validate_events
validate :validate_permissions
before_validation :strip_events
before_validation :generate_secret
@ -43,12 +46,29 @@ class Webhook < ApplicationRecord
update!(enabled: false)
end
def required_permissions
events.map { |event| Webhook.permission_for_event(event) }
end
def self.permission_for_event(event)
case event
when 'account.approved', 'account.created', 'account.updated'
:manage_users
when 'report.created'
:manage_reports
end
end
private
def validate_events
errors.add(:events, :invalid) if events.any? { |e| !EVENTS.include?(e) }
end
def validate_permissions
errors.add(:events, :invalid_permissions) if defined?(@current_account) && required_permissions.any? { |permission| !@current_account.user_role.can?(permission) }
end
def strip_events
self.events = events.map { |str| str.strip.presence }.compact if events.present?
end

View file

@ -14,7 +14,7 @@ class WebhookPolicy < ApplicationPolicy
end
def update?
role.can?(:manage_webhooks)
role.can?(:manage_webhooks) && record.required_permissions.all? { |permission| role.can?(permission) }
end
def enable?
@ -30,6 +30,6 @@ class WebhookPolicy < ApplicationPolicy
end
def destroy?
role.can?(:manage_webhooks)
role.can?(:manage_webhooks) && record.required_permissions.all? { |permission| role.can?(permission) }
end
end

View file

@ -11,4 +11,8 @@ class REST::PreviewCardSerializer < ActiveModel::Serializer
def image
object.image? ? full_asset_url(object.image.url(:original)) : nil
end
def html
Sanitize.fragment(object.html, Sanitize::Config::MASTODON_OEMBED)
end
end

View file

@ -12,6 +12,7 @@ class RemoveStatusService < BaseService
# @option [Boolean] :immediate
# @option [Boolean] :preserve
# @option [Boolean] :original_removed
# @option [Boolean] :skip_streaming
def call(status, **options)
@payload = Oj.dump(event: :delete, payload: status.id.to_s)
@status = status
@ -52,6 +53,9 @@ class RemoveStatusService < BaseService
private
# The following FeedManager calls all do not result in redis publishes for
# streaming, as the `:update` option is false
def remove_from_self
FeedManager.instance.unpush_from_home(@account, @status)
end
@ -75,6 +79,8 @@ class RemoveStatusService < BaseService
# followers. Here we send a delete to actively mentioned accounts
# that may not follow the account
return if skip_streaming?
@status.active_mentions.find_each do |mention|
redis.publish("timeline:#{mention.account_id}", @payload)
end
@ -103,7 +109,7 @@ class RemoveStatusService < BaseService
# without us being able to do all the fancy stuff
@status.reblogs.rewhere(deleted_at: [nil, @status.deleted_at]).includes(:account).reorder(nil).find_each do |reblog|
RemoveStatusService.new.call(reblog, original_removed: true)
RemoveStatusService.new.call(reblog, original_removed: true, skip_streaming: skip_streaming?)
end
end
@ -114,6 +120,8 @@ class RemoveStatusService < BaseService
return unless @status.public_visibility?
return if skip_streaming?
@status.tags.map(&:name).each do |hashtag|
redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", @payload)
redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", @payload) if @status.local?
@ -123,6 +131,8 @@ class RemoveStatusService < BaseService
def remove_from_public
return unless @status.public_visibility?
return if skip_streaming?
redis.publish('timeline:public', @payload)
redis.publish(@status.local? ? 'timeline:public:local' : 'timeline:public:remote', @payload)
end
@ -130,6 +140,8 @@ class RemoveStatusService < BaseService
def remove_from_media
return unless @status.public_visibility?
return if skip_streaming?
redis.publish('timeline:public:media', @payload)
redis.publish(@status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', @payload)
end
@ -143,4 +155,8 @@ class RemoveStatusService < BaseService
def permanently?
@options[:immediate] || !(@options[:preserve] || @status.reported?)
end
def skip_streaming?
!!@options[:skip_streaming]
end
end

View file

@ -89,13 +89,28 @@ class ResolveURLService < BaseService
def process_local_url
recognized_params = Rails.application.routes.recognize_path(@url)
return unless recognized_params[:action] == 'show'
case recognized_params[:controller]
when 'statuses'
return unless recognized_params[:action] == 'show'
if recognized_params[:controller] == 'statuses'
status = Status.find_by(id: recognized_params[:id])
check_local_status(status)
elsif recognized_params[:controller] == 'accounts'
when 'accounts'
return unless recognized_params[:action] == 'show'
Account.find_local(recognized_params[:username])
when 'home'
return unless recognized_params[:action] == 'index' && recognized_params[:username_with_domain].present?
if recognized_params[:any]&.match?(/\A[0-9]+\Z/)
status = Status.find_by(id: recognized_params[:any])
check_local_status(status)
elsif recognized_params[:any].blank?
username, domain = recognized_params[:username_with_domain].gsub(/\A@/, '').split('@')
return unless username.present? && domain.present?
Account.find_remote(username, domain)
end
end
end

View file

@ -3,8 +3,8 @@
class VoteValidator < ActiveModel::Validator
def validate(vote)
vote.errors.add(:base, I18n.t('polls.errors.expired')) if vote.poll.expired?
vote.errors.add(:base, I18n.t('polls.errors.invalid_choice')) if invalid_choice?(vote)
vote.errors.add(:base, I18n.t('polls.errors.self_vote')) if self_vote?(vote)
if vote.poll.multiple? && vote.poll.votes.where(account: vote.account, choice: vote.choice).exists?
vote.errors.add(:base, I18n.t('polls.errors.already_voted'))
@ -18,4 +18,8 @@ class VoteValidator < ActiveModel::Validator
def invalid_choice?(vote)
vote.choice.negative? || vote.choice >= vote.poll.options.size
end
def self_vote?(vote)
vote.account_id == vote.poll.account_id
end
end

View file

@ -5,7 +5,7 @@
= f.input :url, wrapper: :with_block_label, input_html: { placeholder: 'https://' }
.fields-group
= f.input :events, collection: Webhook::EVENTS, wrapper: :with_block_label, include_blank: false, as: :check_boxes, collection_wrapper_tag: 'ul', item_wrapper_tag: 'li'
= f.input :events, collection: Webhook::EVENTS, wrapper: :with_block_label, include_blank: false, as: :check_boxes, collection_wrapper_tag: 'ul', item_wrapper_tag: 'li', disabled: Webhook::EVENTS.filter { |event| !current_user.role.can?(Webhook.permission_for_event(event)) }
.actions
= f.button :button, @webhook.new_record? ? t('admin.webhooks.add_new') : t('generic.save_changes'), type: :submit

View file

@ -18,8 +18,8 @@
.announcements-list__item__action-bar
.announcements-list__item__meta
- if application.most_recently_used_access_token
= t('doorkeeper.authorized_applications.index.last_used_at', date: l(application.most_recently_used_access_token.last_used_at.to_date))
- if @last_used_at_by_app[application.id]
= t('doorkeeper.authorized_applications.index.last_used_at', date: l(@last_used_at_by_app[application.id].to_date))
- else
= t('doorkeeper.authorized_applications.index.never_used')

View file

@ -7,28 +7,30 @@ class Scheduler::AccountsStatusesCleanupScheduler
# This limit is mostly to be nice to the fediverse at large and not
# generate too much traffic.
# This also helps limiting the running time of the scheduler itself.
MAX_BUDGET = 150
MAX_BUDGET = 300
# This is an attempt to spread the load across instances, as various
# accounts are likely to have various followers.
# This is an attempt to spread the load across remote servers, as
# spreading deletions across diverse accounts is likely to spread
# the deletion across diverse followers. It also helps each individual
# user see some effect sooner.
PER_ACCOUNT_BUDGET = 5
# This is an attempt to limit the workload generated by status removal
# jobs to something the particular instance can handle.
PER_THREAD_BUDGET = 6
# jobs to something the particular server can handle.
PER_THREAD_BUDGET = 5
# Those avoid loading an instance that is already under load
MAX_DEFAULT_SIZE = 200
MAX_DEFAULT_LATENCY = 5
MAX_PUSH_SIZE = 500
MAX_PUSH_LATENCY = 10
# 'pull' queue has lower priority jobs, and it's unlikely that pushing
# deletes would cause much issues with this queue if it didn't cause issues
# with default and push. Yet, do not enqueue deletes if the instance is
# lagging behind too much.
MAX_PULL_SIZE = 10_000
MAX_PULL_LATENCY = 5.minutes.to_i
# These are latency limits on various queues above which a server is
# considered to be under load, causing the auto-deletion to be entirely
# skipped for that run.
LOAD_LATENCY_THRESHOLDS = {
default: 5,
push: 10,
# The `pull` queue has lower priority jobs, and it's unlikely that
# pushing deletes would cause much issues with this queue if it didn't
# cause issues with `default` and `push`. Yet, do not enqueue deletes
# if the instance is lagging behind too much.
pull: 5.minutes.to_i,
}.freeze
sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i
@ -36,17 +38,37 @@ class Scheduler::AccountsStatusesCleanupScheduler
return if under_load?
budget = compute_budget
first_policy_id = last_processed_id
# If the budget allows it, we want to consider all accounts with enabled
# auto cleanup at least once.
#
# We start from `first_policy_id` (the last processed id in the previous
# run) and process each policy until we loop to `first_policy_id`,
# recording into `affected_policies` any policy that caused posts to be
# deleted.
#
# After that, we set `full_iteration` to `false` and continue looping on
# policies from `affected_policies`.
first_policy_id = last_processed_id || 0
first_iteration = true
full_iteration = true
affected_policies = []
loop do
num_processed_accounts = 0
scope = AccountStatusesCleanupPolicy.where(enabled: true)
scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present?
scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
scope.find_each(order: :asc) do |policy|
num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
num_processed_accounts += 1 unless num_deleted.zero?
budget -= num_deleted
unless num_deleted.zero?
num_processed_accounts += 1
affected_policies << policy.id if full_iteration
end
full_iteration = false if !first_iteration && policy.id >= first_policy_id
if budget.zero?
save_last_processed_id(policy.id)
break
@ -55,36 +77,55 @@ class Scheduler::AccountsStatusesCleanupScheduler
# The idea here is to loop through all policies at least once until the budget is exhausted
# and start back after the last processed account otherwise
break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
first_policy_id = nil
break if budget.zero? || (num_processed_accounts.zero? && !full_iteration)
full_iteration = false unless first_iteration
first_iteration = false
end
end
def compute_budget
threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
# Each post deletion is a `RemovalWorker` job (on `default` queue), each
# potentially spawning many `ActivityPub::DeliveryWorker` jobs (on the `push` queue).
threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.pluck('concurrency').sum
[PER_THREAD_BUDGET * threads, MAX_BUDGET].min
end
def under_load?
queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) }
end
private
def queue_under_load?(name, max_size, max_latency)
queue = Sidekiq::Queue.new(name)
queue.size > max_size || queue.latency > max_latency
def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
scope = AccountStatusesCleanupPolicy.where(enabled: true)
if full_iteration
# If we are doing a full iteration, examine all policies we have not examined yet
if first_iteration
scope.where(id: first_policy_id...)
else
scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies))
end
else
# Otherwise, examine only policies that previously yielded posts to delete
scope.where(id: affected_policies)
end
end
def queue_under_load?(name, max_latency)
Sidekiq::Queue.new(name).latency > max_latency
end
def last_processed_id
redis.get('account_statuses_cleanup_scheduler:last_account_id')
redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i
end
def save_last_processed_id(id)
if id.nil?
redis.del('account_statuses_cleanup_scheduler:last_account_id')
redis.del('account_statuses_cleanup_scheduler:last_policy_id')
else
redis.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds)
redis.set('account_statuses_cleanup_scheduler:last_policy_id', id, ex: 1.hour.seconds)
end
end
end

View file

@ -6,17 +6,19 @@ class Scheduler::IndexingScheduler
sidekiq_options retry: 0
IMPORT_BATCH_SIZE = 1000
SCAN_BATCH_SIZE = 10 * IMPORT_BATCH_SIZE
def perform
return unless Chewy.enabled?
indexes.each do |type|
with_redis do |redis|
ids = redis.smembers("chewy:queue:#{type.name}")
type.import!(ids)
redis.pipelined do |pipeline|
ids.each { |id| pipeline.srem("chewy:queue:#{type.name}", id) }
redis.sscan_each("chewy:queue:#{type.name}", count: SCAN_BATCH_SIZE).each_slice(IMPORT_BATCH_SIZE) do |ids|
type.import!(ids)
redis.pipelined do |pipeline|
pipeline.srem("chewy:queue:#{type.name}", ids)
end
end
end
end

View file

@ -24,7 +24,7 @@ class Scheduler::UserCleanupScheduler
def clean_discarded_statuses!
Status.unscoped.discarded.where('deleted_at <= ?', 30.days.ago).find_in_batches do |statuses|
RemovalWorker.push_bulk(statuses) do |status|
[status.id, { 'immediate' => true }]
[status.id, { 'immediate' => true, 'skip_streaming' => true }]
end
end
end