Add limit CLI flags to prune jobs #655

Open
Oneric wants to merge 10 commits from Oneric/akkoma:prune-batch into develop
4 changed files with 267 additions and 105 deletions

View File

@ -92,6 +92,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Akkoma API is now documented - Akkoma API is now documented
- ability to auto-approve follow requests from users you are already following - ability to auto-approve follow requests from users you are already following
- The SimplePolicy MRF can now strip user backgrounds from selected remote hosts - The SimplePolicy MRF can now strip user backgrounds from selected remote hosts
- New standalone `prune_orphaned_activities` mix task with configurable batch limit
- The `prune_objects` mix task now accepts a `--limit` parameter for initial object pruning
## Changed ## Changed
- OTP builds are now built on erlang OTP26 - OTP builds are now built on erlang OTP26

View File

@ -50,9 +50,39 @@ This will prune remote posts older than 90 days (configurable with [`config :ple
- `--keep-threads` - Don't prune posts when they are part of a thread where at least one post has seen local interaction (e.g. one of the posts is a local post, or is favourited by a local user, or has been repeated by a local user...). It also wont delete posts when at least one of the posts in that thread is kept (e.g. because one of the posts has seen recent activity). - `--keep-threads` - Don't prune posts when they are part of a thread where at least one post has seen local interaction (e.g. one of the posts is a local post, or is favourited by a local user, or has been repeated by a local user...). It also wont delete posts when at least one of the posts in that thread is kept (e.g. because one of the posts has seen recent activity).
- `--keep-non-public` - Keep non-public posts like DM's and followers-only, even if they are remote. - `--keep-non-public` - Keep non-public posts like DM's and followers-only, even if they are remote.
- `--limit` - limits how many remote posts get pruned. This limit does **not** apply to any of the follow up jobs. If wanting to keep the database load in check it is thus advisable to run the standalone `prune_orphaned_activities` task with a limit afterwards instead of passing `--prune-orphaned-activities` to this task.
- `--prune-orphaned-activities` - Also prune orphaned activities afterwards. Activities are things like Like, Create, Announce, Flag (aka reports)... They can significantly help reduce the database size. - `--prune-orphaned-activities` - Also prune orphaned activities afterwards. Activities are things like Like, Create, Announce, Flag (aka reports)... They can significantly help reduce the database size.
- `--vacuum` - Run `VACUUM FULL` after the objects are pruned. This should not be used on a regular basis, but is useful if your instance has been running for a long time before pruning. - `--vacuum` - Run `VACUUM FULL` after the objects are pruned. This should not be used on a regular basis, but is useful if your instance has been running for a long time before pruning.
## Prune orphaned activities from the database
This will prune activities which are no longer referenced by anything.
Such activities might be the result of running `prune_objects` without `--prune-orphaned-activities`.
The same notes and warnings apply as for `prune_objects`.
The task will print out how many rows were freed in total in its last
line of output in the form `Deleted 345 rows`.
When running the job in limited batches this can be used to determine
when all orphaned activities have been deleted.
=== "OTP"
```sh
./bin/pleroma_ctl database prune_orphaned_activities [option ...]
```
=== "From Source"
```sh
mix pleroma.database prune_orphaned_activities [option ...]
```
### Options
- `--limit n` - Only delete up to `n` activities in each query making up this job, i.e. if this job runs two queries at most `2n` activities will be deleted. Running this task repeatedly in limited batches can help maintain the instances responsiveness while still freeing up some space.
- `--no-singles` - Do not delete activites referencing single objects
- `--no-arrays` - Do not delete activites referencing an array of objects
## Create a conversation for all existing DMs ## Create a conversation for all existing DMs
Can be safely re-run Can be safely re-run

View File

@ -20,6 +20,103 @@ defmodule Mix.Tasks.Pleroma.Database do
@shortdoc "A collection of database related tasks" @shortdoc "A collection of database related tasks"
@moduledoc File.read!("docs/docs/administration/CLI_tasks/database.md") @moduledoc File.read!("docs/docs/administration/CLI_tasks/database.md")
defp maybe_limit(query, limit_cnt) do
if is_number(limit_cnt) and limit_cnt > 0 do
limit(query, [], ^limit_cnt)
else
query
end
end
defp limit_statement(limit) when is_number(limit) do
if limit > 0 do
"LIMIT #{limit}"
else
""
end
end
defp prune_orphaned_activities_singles(limit) do
%{:num_rows => del_single} =
"""
delete from public.activities
where id in (
select a.id from public.activities a
left join public.objects o on a.data ->> 'object' = o.data ->> 'id'
left join public.activities a2 on a.data ->> 'object' = a2.data ->> 'id'
left join public.users u on a.data ->> 'object' = u.ap_id
where not a.local
and jsonb_typeof(a."data" -> 'object') = 'string'
and o.id is null
and a2.id is null
and u.id is null
#{limit_statement(limit)}
)
"""
|> Repo.query!([], timeout: :infinity)
Logger.info("Prune activity singles: deteleted #{del_single} rows...")
del_single
end
defp prune_orphaned_activities_array(limit) do
%{:num_rows => del_array} =
"""
delete from public.activities
where id in (
select a.id from public.activities a
join json_array_elements_text((a."data" -> 'object')::json) as j
on a.data->>'type' = 'Flag'
left join public.objects o on j.value = o.data ->> 'id'
left join public.activities a2 on j.value = a2.data ->> 'id'
left join public.users u on j.value = u.ap_id
group by a.id
having max(o.data ->> 'id') is null
and max(a2.data ->> 'id') is null
and max(u.ap_id) is null
#{limit_statement(limit)}
)
"""
|> Repo.query!([], timeout: :infinity)
Logger.info("Prune activity arrays: deteleted #{del_array} rows...")
del_array
end
def prune_orphaned_activities(limit \\ 0, opts \\ []) when is_number(limit) do
# Activities can either refer to a single object id, and array of object ids
# or contain an inlined object (at least after going through our normalisation)
#
# Flag is the only type we support with an array (and always has arrays).
# Update the only one with inlined objects, but old Update activities are
#
# We already regularly purge old Delte, Undo, Update and Remove and if
# rejected Follow requests anyway; no need to explicitly deal with those here.
#
# Since theres an index on types and there are typically only few Flag
# activites, its _much_ faster to utilise the index. To avoid accidentally
# deleting useful activities should more types be added, keep typeof for singles.
# Prune activities who link to an array of objects
del_array =
if Keyword.get(opts, :arrays, true) do
prune_orphaned_activities_array(limit)
else
0
end
# Prune activities who link to a single object
del_single =
if Keyword.get(opts, :singles, true) do
prune_orphaned_activities_singles(limit)
else
0
end
del_single + del_array
end
def run(["remove_embedded_objects" | args]) do def run(["remove_embedded_objects" | args]) do
{options, [], []} = {options, [], []} =
OptionParser.parse( OptionParser.parse(
@ -62,6 +159,37 @@ defmodule Mix.Tasks.Pleroma.Database do
) )
end end
def run(["prune_orphaned_activities" | args]) do
{options, [], []} =
OptionParser.parse(
args,
strict: [
limit: :integer,
singles: :boolean,
arrays: :boolean,
]
)
start_pleroma()
{limit, options} = Keyword.pop(options, :limit, 0)
log_message = "Pruning orphaned activities"
log_message =
if limit > 0 do
log_message <> ", limiting deletion to #{limit} rows"
else
log_message
end
Logger.info(log_message)
deleted = prune_orphaned_activities(limit, options)
Logger.info("Deleted #{deleted} rows")
end
def run(["prune_objects" | args]) do def run(["prune_objects" | args]) do
{options, [], []} = {options, [], []} =
OptionParser.parse( OptionParser.parse(
@ -70,7 +198,8 @@ defmodule Mix.Tasks.Pleroma.Database do
vacuum: :boolean, vacuum: :boolean,
keep_threads: :boolean, keep_threads: :boolean,
keep_non_public: :boolean, keep_non_public: :boolean,
prune_orphaned_activities: :boolean prune_orphaned_activities: :boolean,
limit: :integer
] ]
) )
@ -79,6 +208,8 @@ defmodule Mix.Tasks.Pleroma.Database do
deadline = Pleroma.Config.get([:instance, :remote_post_retention_days]) deadline = Pleroma.Config.get([:instance, :remote_post_retention_days])
time_deadline = NaiveDateTime.utc_now() |> NaiveDateTime.add(-(deadline * 86_400)) time_deadline = NaiveDateTime.utc_now() |> NaiveDateTime.add(-(deadline * 86_400))
limit_cnt = Keyword.get(options, :limit, 0)
log_message = "Pruning objects older than #{deadline} days" log_message = "Pruning objects older than #{deadline} days"
log_message = log_message =
@ -110,129 +241,124 @@ defmodule Mix.Tasks.Pleroma.Database do
log_message log_message
end end
log_message =
if limit_cnt > 0 do
log_message <> ", limiting to #{limit_cnt} rows"
else
log_message
end
Logger.info(log_message) Logger.info(log_message)
if Keyword.get(options, :keep_threads) do {del_obj, _} =
# We want to delete objects from threads where if Keyword.get(options, :keep_threads) do
# 1. the newest post is still old # We want to delete objects from threads where
# 2. none of the activities is local # 1. the newest post is still old
# 3. none of the activities is bookmarked # 2. none of the activities is local
# 4. optionally none of the posts is non-public # 3. none of the activities is bookmarked
deletable_context = # 4. optionally none of the posts is non-public
if Keyword.get(options, :keep_non_public) do deletable_context =
Pleroma.Activity if Keyword.get(options, :keep_non_public) do
|> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id) Pleroma.Activity
|> group_by([a], fragment("? ->> 'context'::text", a.data)) |> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id)
|> having( |> group_by([a], fragment("? ->> 'context'::text", a.data))
[a], |> having(
not fragment( [a],
# Posts (checked on Create Activity) is non-public not fragment(
"bool_or((not(?->'to' \\? ? OR ?->'cc' \\? ?)) and ? ->> 'type' = 'Create')", # Posts (checked on Create Activity) is non-public
a.data, "bool_or((not(?->'to' \\? ? OR ?->'cc' \\? ?)) and ? ->> 'type' = 'Create')",
^Pleroma.Constants.as_public(), a.data,
a.data, ^Pleroma.Constants.as_public(),
^Pleroma.Constants.as_public(), a.data,
a.data ^Pleroma.Constants.as_public(),
a.data
)
) )
) else
else Pleroma.Activity
Pleroma.Activity |> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id)
|> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id) |> group_by([a], fragment("? ->> 'context'::text", a.data))
|> group_by([a], fragment("? ->> 'context'::text", a.data)) end
end |> having([a], max(a.updated_at) < ^time_deadline)
|> having([a], max(a.updated_at) < ^time_deadline) |> having([a], not fragment("bool_or(?)", a.local))
|> having([a], not fragment("bool_or(?)", a.local)) |> having([_, b], fragment("max(?::text) is null", b.id))
|> having([_, b], fragment("max(?::text) is null", b.id)) |> maybe_limit(limit_cnt)
|> select([a], fragment("? ->> 'context'::text", a.data)) |> select([a], fragment("? ->> 'context'::text", a.data))
Pleroma.Object
|> where([o], fragment("? ->> 'context'::text", o.data) in subquery(deletable_context))
else
if Keyword.get(options, :keep_non_public) do
Pleroma.Object Pleroma.Object
|> where( |> where([o], fragment("? ->> 'context'::text", o.data) in subquery(deletable_context))
[o],
fragment(
"?->'to' \\? ? OR ?->'cc' \\? ?",
o.data,
^Pleroma.Constants.as_public(),
o.data,
^Pleroma.Constants.as_public()
)
)
else else
deletable =
if Keyword.get(options, :keep_non_public) do
Pleroma.Object
|> where(
[o],
fragment(
"?->'to' \\? ? OR ?->'cc' \\? ?",
o.data,
^Pleroma.Constants.as_public(),
o.data,
^Pleroma.Constants.as_public()
)
)
else
Pleroma.Object
end
|> where([o], o.updated_at < ^time_deadline)
|> where(
[o],
fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host())
)
|> maybe_limit(limit_cnt)
|> select([o], o.id)
Pleroma.Object Pleroma.Object
|> where([o], o.id in subquery(deletable))
end end
|> where([o], o.updated_at < ^time_deadline) |> Repo.delete_all(timeout: :infinity)
|> where(
[o], Logger.info("Deleted #{del_obj} objects...")
fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host())
)
end
|> Repo.delete_all(timeout: :infinity)
if !Keyword.get(options, :keep_threads) do if !Keyword.get(options, :keep_threads) do
# Without the --keep-threads option, it's possible that bookmarked # Without the --keep-threads option, it's possible that bookmarked
# objects have been deleted. We remove the corresponding bookmarks. # objects have been deleted. We remove the corresponding bookmarks.
""" %{:num_rows => del_bookmarks} =
delete from public.bookmarks """
where id in ( delete from public.bookmarks
select b.id from public.bookmarks b where id in (
left join public.activities a on b.activity_id = a.id select b.id from public.bookmarks b
left join public.objects o on a."data" ->> 'object' = o.data ->> 'id' left join public.activities a on b.activity_id = a.id
where o.id is null left join public.objects o on a."data" ->> 'object' = o.data ->> 'id'
) where o.id is null
""" )
|> Repo.query([], timeout: :infinity) """
|> Repo.query!([], timeout: :infinity)
Logger.info("Deleted #{del_bookmarks} orphaned bookmarks...")
end end
if Keyword.get(options, :prune_orphaned_activities) do if Keyword.get(options, :prune_orphaned_activities) do
# Prune activities who link to a single object del_activities = prune_orphaned_activities()
""" Logger.info("Deleted #{del_activities} orphaned activities...")
delete from public.activities
where id in (
select a.id from public.activities a
left join public.objects o on a.data ->> 'object' = o.data ->> 'id'
left join public.activities a2 on a.data ->> 'object' = a2.data ->> 'id'
left join public.users u on a.data ->> 'object' = u.ap_id
where not a.local
and jsonb_typeof(a."data" -> 'object') = 'string'
and o.id is null
and a2.id is null
and u.id is null
)
"""
|> Repo.query([], timeout: :infinity)
# Prune activities who link to an array of objects
"""
delete from public.activities
where id in (
select a.id from public.activities a
join json_array_elements_text((a."data" -> 'object')::json) as j on jsonb_typeof(a."data" -> 'object') = 'array'
left join public.objects o on j.value = o.data ->> 'id'
left join public.activities a2 on j.value = a2.data ->> 'id'
left join public.users u on j.value = u.ap_id
group by a.id
having max(o.data ->> 'id') is null
and max(a2.data ->> 'id') is null
and max(u.ap_id) is null
)
"""
|> Repo.query([], timeout: :infinity)
end end
""" %{:num_rows => del_hashtags} =
DELETE FROM hashtags AS ht """
WHERE NOT EXISTS ( DELETE FROM hashtags AS ht
SELECT 1 FROM hashtags_objects hto WHERE NOT EXISTS (
WHERE ht.id = hto.hashtag_id) SELECT 1 FROM hashtags_objects hto
""" WHERE ht.id = hto.hashtag_id)
|> Repo.query() """
|> Repo.query!()
Logger.info("Deleted #{del_hashtags} no longer used hashtags...")
if Keyword.get(options, :vacuum) do if Keyword.get(options, :vacuum) do
Logger.info("Starting vacuum...")
Maintenance.vacuum("full") Maintenance.vacuum("full")
end end
Logger.info("All done!")
end end
def run(["prune_task"]) do def run(["prune_task"]) do

View File

@ -470,7 +470,7 @@ defmodule Mix.Tasks.Pleroma.DatabaseTest do
assert length(activities) == 4 assert length(activities) == 4
end end
test "it prunes orphaned activities with the --prune-orphaned-activities when the objects are referenced from an array" do test "it prunes orphaned activities with prune_orphaned_activities when the objects are referenced from an array" do
%Object{} |> Map.merge(%{data: %{"id" => "existing_object"}}) |> Repo.insert() %Object{} |> Map.merge(%{data: %{"id" => "existing_object"}}) |> Repo.insert()
%User{} |> Map.merge(%{ap_id: "existing_actor"}) |> Repo.insert() %User{} |> Map.merge(%{ap_id: "existing_actor"}) |> Repo.insert()
@ -478,6 +478,7 @@ defmodule Mix.Tasks.Pleroma.DatabaseTest do
|> Map.merge(%{ |> Map.merge(%{
local: false, local: false,
data: %{ data: %{
"type" => "Flag",
"id" => "remote_activity_existing_object", "id" => "remote_activity_existing_object",
"object" => ["non_ existing_object", "existing_object"] "object" => ["non_ existing_object", "existing_object"]
} }
@ -488,6 +489,7 @@ defmodule Mix.Tasks.Pleroma.DatabaseTest do
|> Map.merge(%{ |> Map.merge(%{
local: false, local: false,
data: %{ data: %{
"type" => "Flag",
"id" => "remote_activity_existing_actor", "id" => "remote_activity_existing_actor",
"object" => ["non_ existing_object", "existing_actor"] "object" => ["non_ existing_object", "existing_actor"]
} }
@ -498,6 +500,7 @@ defmodule Mix.Tasks.Pleroma.DatabaseTest do
|> Map.merge(%{ |> Map.merge(%{
local: false, local: false,
data: %{ data: %{
"type" => "Flag",
"id" => "remote_activity_existing_activity", "id" => "remote_activity_existing_activity",
"object" => ["non_ existing_object", "remote_activity_existing_actor"] "object" => ["non_ existing_object", "remote_activity_existing_actor"]
} }
@ -508,6 +511,7 @@ defmodule Mix.Tasks.Pleroma.DatabaseTest do
|> Map.merge(%{ |> Map.merge(%{
local: false, local: false,
data: %{ data: %{
"type" => "Flag",
"id" => "remote_activity_without_existing_referenced_object", "id" => "remote_activity_without_existing_referenced_object",
"object" => ["owo", "whats_this"] "object" => ["owo", "whats_this"]
} }
@ -517,7 +521,7 @@ defmodule Mix.Tasks.Pleroma.DatabaseTest do
assert length(Repo.all(Activity)) == 4 assert length(Repo.all(Activity)) == 4
Mix.Tasks.Pleroma.Database.run(["prune_objects"]) Mix.Tasks.Pleroma.Database.run(["prune_objects"])
assert length(Repo.all(Activity)) == 4 assert length(Repo.all(Activity)) == 4
Mix.Tasks.Pleroma.Database.run(["prune_objects", "--prune-orphaned-activities"]) Mix.Tasks.Pleroma.Database.run(["prune_orphaned_activities"])
activities = Repo.all(Activity) activities = Repo.all(Activity)
assert length(activities) == 3 assert length(activities) == 3