masto-fe/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
Claire 4ac78e2a06
Add feature to automatically delete old toots (#16529)
* Add account statuses cleanup policy model

* Record last inspected toot to delete to speed up successive calls to statuses_to_delete

* Add service to cleanup a given account's statuses within a budget

* Add worker to go through account policies and delete old toots

* Fix last inspected status id logic

All existing statuses older or equal to last inspected status id must be
kept by the current policy. This is an invariant that must be kept so that
resuming deletion from the last inspected status remains sound.

* Add tests

* Refactor scheduler and add tests

* Add user interface

* Add support for discriminating based on boosts/favs

* Add UI support for min_reblogs and min_favs, rework UI

* Address first round of review comments

* Replace Snowflake#id_at_start with with_random parameter

* Add tests

* Add tests for StatusesCleanupController

* Rework settings page

* Adjust load-avoiding mechanisms

* Please CodeClimate
2021-08-09 23:11:50 +02:00

96 lines
3.4 KiB
Ruby

# frozen_string_literal: true
class Scheduler::AccountsStatusesCleanupScheduler
include Sidekiq::Worker
# This limit is mostly to be nice to the fediverse at large and not
# generate too much traffic.
# This also helps limiting the running time of the scheduler itself.
MAX_BUDGET = 50
# This is an attempt to spread the load across instances, as various
# accounts are likely to have various followers.
PER_ACCOUNT_BUDGET = 5
# This is an attempt to limit the workload generated by status removal
# jobs to something the particular instance can handle.
PER_THREAD_BUDGET = 5
# Those avoid loading an instance that is already under load
MAX_DEFAULT_SIZE = 2
MAX_DEFAULT_LATENCY = 5
MAX_PUSH_SIZE = 5
MAX_PUSH_LATENCY = 10
# 'pull' queue has lower priority jobs, and it's unlikely that pushing
# deletes would cause much issues with this queue if it didn't cause issues
# with default and push. Yet, do not enqueue deletes if the instance is
# lagging behind too much.
MAX_PULL_SIZE = 500
MAX_PULL_LATENCY = 300
# This is less of an issue in general, but deleting old statuses is likely
# to cause delivery errors, and thus increase the number of jobs to be retried.
# This doesn't directly translate to load, but connection errors and a high
# number of dead instances may lead to this spiraling out of control if
# unchecked.
MAX_RETRY_SIZE = 50_000
sidekiq_options retry: 0, lock: :until_executed
def perform
return if under_load?
budget = compute_budget
first_policy_id = last_processed_id
loop do
num_processed_accounts = 0
scope = AccountStatusesCleanupPolicy.where(enabled: true)
scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present?
scope.find_each(order: :asc) do |policy|
num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
num_processed_accounts += 1 unless num_deleted.zero?
budget -= num_deleted
if budget.zero?
save_last_processed_id(policy.id)
break
end
end
# The idea here is to loop through all policies at least once until the budget is exhausted
# and start back after the last processed account otherwise
break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
first_policy_id = nil
end
end
def compute_budget
threads = Sidekiq::ProcessSet.new.filter { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
[PER_THREAD_BUDGET * threads, MAX_BUDGET].min
end
def under_load?
return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE
queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
end
private
def queue_under_load?(name, max_size, max_latency)
queue = Sidekiq::Queue.new(name)
queue.size > max_size || queue.latency > max_latency
end
def last_processed_id
Redis.current.get('account_statuses_cleanup_scheduler:last_account_id')
end
def save_last_processed_id(id)
if id.nil?
Redis.current.del('account_statuses_cleanup_scheduler:last_account_id')
else
Redis.current.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds)
end
end
end