Add limit CLI flags to prune jobs #655

Merged
floatingghost merged 11 commits from Oneric/akkoma:prune-batch into develop 2024-06-17 20:47:53 +00:00
13 changed files with 346 additions and 178 deletions

View file

@ -106,6 +106,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Akkoma API is now documented - Akkoma API is now documented
- ability to auto-approve follow requests from users you are already following - ability to auto-approve follow requests from users you are already following
- The SimplePolicy MRF can now strip user backgrounds from selected remote hosts - The SimplePolicy MRF can now strip user backgrounds from selected remote hosts
- New standalone `prune_orphaned_activities` mix task with configurable batch limit
- The `prune_objects` mix task now accepts a `--limit` parameter for initial object pruning
## Changed ## Changed
- OTP builds are now built on erlang OTP26 - OTP builds are now built on erlang OTP26

View file

@ -50,9 +50,39 @@ This will prune remote posts older than 90 days (configurable with [`config :ple
- `--keep-threads` - Don't prune posts when they are part of a thread where at least one post has seen local interaction (e.g. one of the posts is a local post, or is favourited by a local user, or has been repeated by a local user...). It also wont delete posts when at least one of the posts in that thread is kept (e.g. because one of the posts has seen recent activity). - `--keep-threads` - Don't prune posts when they are part of a thread where at least one post has seen local interaction (e.g. one of the posts is a local post, or is favourited by a local user, or has been repeated by a local user...). It also wont delete posts when at least one of the posts in that thread is kept (e.g. because one of the posts has seen recent activity).
- `--keep-non-public` - Keep non-public posts like DM's and followers-only, even if they are remote. - `--keep-non-public` - Keep non-public posts like DM's and followers-only, even if they are remote.
- `--limit` - limits how many remote posts get pruned. This limit does **not** apply to any of the follow up jobs. If wanting to keep the database load in check it is thus advisable to run the standalone `prune_orphaned_activities` task with a limit afterwards instead of passing `--prune-orphaned-activities` to this task.
- `--prune-orphaned-activities` - Also prune orphaned activities afterwards. Activities are things like Like, Create, Announce, Flag (aka reports)... They can significantly help reduce the database size. - `--prune-orphaned-activities` - Also prune orphaned activities afterwards. Activities are things like Like, Create, Announce, Flag (aka reports)... They can significantly help reduce the database size.
- `--vacuum` - Run `VACUUM FULL` after the objects are pruned. This should not be used on a regular basis, but is useful if your instance has been running for a long time before pruning. - `--vacuum` - Run `VACUUM FULL` after the objects are pruned. This should not be used on a regular basis, but is useful if your instance has been running for a long time before pruning.
## Prune orphaned activities from the database
This will prune activities which are no longer referenced by anything.
Such activities might be the result of running `prune_objects` without `--prune-orphaned-activities`.
The same notes and warnings apply as for `prune_objects`.
The task will print out how many rows were freed in total in its last
line of output in the form `Deleted 345 rows`.
When running the job in limited batches this can be used to determine
when all orphaned activities have been deleted.
=== "OTP"
```sh
./bin/pleroma_ctl database prune_orphaned_activities [option ...]
```
=== "From Source"
```sh
mix pleroma.database prune_orphaned_activities [option ...]
```
### Options
- `--limit n` - Only delete up to `n` activities in each query making up this job, i.e. if this job runs two queries at most `2n` activities will be deleted. Running this task repeatedly in limited batches can help maintain the instances responsiveness while still freeing up some space.
- `--no-singles` - Do not delete activites referencing single objects
- `--no-arrays` - Do not delete activites referencing an array of objects
## Create a conversation for all existing DMs ## Create a conversation for all existing DMs
Can be safely re-run Can be safely re-run

View file

@ -112,18 +112,26 @@ def shell_prompt(prompt, defval \\ nil, defname \\ nil) do
end end
end end
def shell_info(message) do def shell_info(message) when is_binary(message) or is_list(message) do
if mix_shell?(), if mix_shell?(),
do: Mix.shell().info(message), do: Mix.shell().info(message),
else: IO.puts(message) else: IO.puts(message)
end end
def shell_error(message) do def shell_info(message) do
shell_info("#{inspect(message)}")
end
def shell_error(message) when is_binary(message) or is_list(message) do
if mix_shell?(), if mix_shell?(),
do: Mix.shell().error(message), do: Mix.shell().error(message),
else: IO.puts(:stderr, message) else: IO.puts(:stderr, message)
end end
def shell_error(message) do
shell_error("#{inspect(message)}")
end
@doc "Performs a safe check whether `Mix.shell/0` is available (does not raise if Mix is not loaded)" @doc "Performs a safe check whether `Mix.shell/0` is available (does not raise if Mix is not loaded)"
def mix_shell?, do: :erlang.function_exported(Mix, :shell, 0) def mix_shell?, do: :erlang.function_exported(Mix, :shell, 0)

View file

@ -8,7 +8,6 @@ defmodule Mix.Tasks.Pleroma.Activity do
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Pagination alias Pleroma.Pagination
require Logger
import Mix.Pleroma import Mix.Pleroma
import Ecto.Query import Ecto.Query
@ -17,7 +16,7 @@ def run(["get", id | _rest]) do
id id
|> Activity.get_by_id() |> Activity.get_by_id()
|> IO.inspect() |> shell_info()
end end
def run(["delete_by_keyword", user, keyword | _rest]) do def run(["delete_by_keyword", user, keyword | _rest]) do
@ -35,7 +34,7 @@ def run(["delete_by_keyword", user, keyword | _rest]) do
) )
|> Enum.map(fn x -> CommonAPI.delete(x.id, u) end) |> Enum.map(fn x -> CommonAPI.delete(x.id, u) end)
|> Enum.count() |> Enum.count()
|> IO.puts() |> shell_info()
end end
defp query_with(q, search_query) do defp query_with(q, search_query) do

View file

@ -20,6 +20,102 @@ defmodule Mix.Tasks.Pleroma.Database do
@shortdoc "A collection of database related tasks" @shortdoc "A collection of database related tasks"
@moduledoc File.read!("docs/docs/administration/CLI_tasks/database.md") @moduledoc File.read!("docs/docs/administration/CLI_tasks/database.md")
defp maybe_limit(query, limit_cnt) do
if is_number(limit_cnt) and limit_cnt > 0 do
limit(query, [], ^limit_cnt)
else
query
end
end
defp limit_statement(limit) when is_number(limit) do
if limit > 0 do
"LIMIT #{limit}"
else
""
end
end
defp prune_orphaned_activities_singles(limit) do
%{:num_rows => del_single} =
"""
delete from public.activities
where id in (
select a.id from public.activities a
left join public.objects o on a.data ->> 'object' = o.data ->> 'id'
left join public.activities a2 on a.data ->> 'object' = a2.data ->> 'id'
left join public.users u on a.data ->> 'object' = u.ap_id
where not a.local
and jsonb_typeof(a."data" -> 'object') = 'string'
and o.id is null
and a2.id is null
and u.id is null
#{limit_statement(limit)}
)
"""
|> Repo.query!([], timeout: :infinity)
Logger.info("Prune activity singles: deleted #{del_single} rows...")
del_single
end
defp prune_orphaned_activities_array(limit) do
%{:num_rows => del_array} =
"""
delete from public.activities
where id in (
select a.id from public.activities a
join json_array_elements_text((a."data" -> 'object')::json) as j
on a.data->>'type' = 'Flag'
left join public.objects o on j.value = o.data ->> 'id'
left join public.activities a2 on j.value = a2.data ->> 'id'
left join public.users u on j.value = u.ap_id
group by a.id
having max(o.data ->> 'id') is null
and max(a2.data ->> 'id') is null
and max(u.ap_id) is null
#{limit_statement(limit)}
)
"""
|> Repo.query!([], timeout: :infinity)
Logger.info("Prune activity arrays: deleted #{del_array} rows...")
del_array
end
def prune_orphaned_activities(limit \\ 0, opts \\ []) when is_number(limit) do
# Activities can either refer to a single object id, and array of object ids
# or contain an inlined object (at least after going through our normalisation)
#
# Flag is the only type we support with an array (and always has arrays).
# Update the only one with inlined objects.
#
# We already regularly purge old Delete, Undo, Update and Remove and if
# rejected Follow requests anyway; no need to explicitly deal with those here.
#
# Since theres an index on types and there are typically only few Flag
# activites, its _much_ faster to utilise the index. To avoid accidentally
# deleting useful activities should more types be added, keep typeof for singles.
# Prune activities who link to an array of objects
del_array =
if Keyword.get(opts, :arrays, true) do
prune_orphaned_activities_array(limit)
else
0
end
# Prune activities who link to a single object
del_single =
if Keyword.get(opts, :singles, true) do
prune_orphaned_activities_singles(limit)
else
0
end
del_single + del_array
end
def run(["remove_embedded_objects" | args]) do def run(["remove_embedded_objects" | args]) do
{options, [], []} = {options, [], []} =
OptionParser.parse( OptionParser.parse(
@ -62,6 +158,37 @@ def run(["update_users_following_followers_counts"]) do
) )
end end
def run(["prune_orphaned_activities" | args]) do
{options, [], []} =
OptionParser.parse(
args,
strict: [
limit: :integer,
singles: :boolean,
arrays: :boolean
]
)
start_pleroma()
{limit, options} = Keyword.pop(options, :limit, 0)
log_message = "Pruning orphaned activities"
log_message =
if limit > 0 do
log_message <> ", limiting deletion to #{limit} rows"
else
log_message
end
Logger.info(log_message)
deleted = prune_orphaned_activities(limit, options)
Logger.info("Deleted #{deleted} rows")
end
def run(["prune_objects" | args]) do def run(["prune_objects" | args]) do
{options, [], []} = {options, [], []} =
OptionParser.parse( OptionParser.parse(
@ -70,7 +197,8 @@ def run(["prune_objects" | args]) do
vacuum: :boolean, vacuum: :boolean,
keep_threads: :boolean, keep_threads: :boolean,
keep_non_public: :boolean, keep_non_public: :boolean,
prune_orphaned_activities: :boolean prune_orphaned_activities: :boolean,
limit: :integer
] ]
) )
@ -79,6 +207,8 @@ def run(["prune_objects" | args]) do
deadline = Pleroma.Config.get([:instance, :remote_post_retention_days]) deadline = Pleroma.Config.get([:instance, :remote_post_retention_days])
time_deadline = NaiveDateTime.utc_now() |> NaiveDateTime.add(-(deadline * 86_400)) time_deadline = NaiveDateTime.utc_now() |> NaiveDateTime.add(-(deadline * 86_400))
limit_cnt = Keyword.get(options, :limit, 0)
log_message = "Pruning objects older than #{deadline} days" log_message = "Pruning objects older than #{deadline} days"
log_message = log_message =
@ -110,8 +240,16 @@ def run(["prune_objects" | args]) do
log_message log_message
end end
log_message =
if limit_cnt > 0 do
log_message <> ", limiting to #{limit_cnt} rows"
else
log_message
end
Logger.info(log_message) Logger.info(log_message)
{del_obj, _} =
if Keyword.get(options, :keep_threads) do if Keyword.get(options, :keep_threads) do
# We want to delete objects from threads where # We want to delete objects from threads where
# 1. the newest post is still old # 1. the newest post is still old
@ -143,11 +281,13 @@ def run(["prune_objects" | args]) do
|> having([a], max(a.updated_at) < ^time_deadline) |> having([a], max(a.updated_at) < ^time_deadline)
|> having([a], not fragment("bool_or(?)", a.local)) |> having([a], not fragment("bool_or(?)", a.local))
|> having([_, b], fragment("max(?::text) is null", b.id)) |> having([_, b], fragment("max(?::text) is null", b.id))
|> maybe_limit(limit_cnt)
|> select([a], fragment("? ->> 'context'::text", a.data)) |> select([a], fragment("? ->> 'context'::text", a.data))
Pleroma.Object Pleroma.Object
|> where([o], fragment("? ->> 'context'::text", o.data) in subquery(deletable_context)) |> where([o], fragment("? ->> 'context'::text", o.data) in subquery(deletable_context))
else else
deletable =
if Keyword.get(options, :keep_non_public) do if Keyword.get(options, :keep_non_public) do
Pleroma.Object Pleroma.Object
|> where( |> where(
@ -168,12 +308,20 @@ def run(["prune_objects" | args]) do
[o], [o],
fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host()) fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host())
) )
|> maybe_limit(limit_cnt)
|> select([o], o.id)
Pleroma.Object
|> where([o], o.id in subquery(deletable))
end end
|> Repo.delete_all(timeout: :infinity) |> Repo.delete_all(timeout: :infinity)
Logger.info("Deleted #{del_obj} objects...")
if !Keyword.get(options, :keep_threads) do if !Keyword.get(options, :keep_threads) do
# Without the --keep-threads option, it's possible that bookmarked # Without the --keep-threads option, it's possible that bookmarked
# objects have been deleted. We remove the corresponding bookmarks. # objects have been deleted. We remove the corresponding bookmarks.
%{:num_rows => del_bookmarks} =
""" """
delete from public.bookmarks delete from public.bookmarks
where id in ( where id in (
@ -183,56 +331,33 @@ def run(["prune_objects" | args]) do
where o.id is null where o.id is null
) )
""" """
|> Repo.query([], timeout: :infinity) |> Repo.query!([], timeout: :infinity)
Logger.info("Deleted #{del_bookmarks} orphaned bookmarks...")
end end
if Keyword.get(options, :prune_orphaned_activities) do if Keyword.get(options, :prune_orphaned_activities) do
# Prune activities who link to a single object del_activities = prune_orphaned_activities()
""" Logger.info("Deleted #{del_activities} orphaned activities...")
delete from public.activities
where id in (
select a.id from public.activities a
left join public.objects o on a.data ->> 'object' = o.data ->> 'id'
left join public.activities a2 on a.data ->> 'object' = a2.data ->> 'id'
left join public.users u on a.data ->> 'object' = u.ap_id
where not a.local
and jsonb_typeof(a."data" -> 'object') = 'string'
and o.id is null
and a2.id is null
and u.id is null
)
"""
|> Repo.query([], timeout: :infinity)
# Prune activities who link to an array of objects
"""
delete from public.activities
where id in (
select a.id from public.activities a
join json_array_elements_text((a."data" -> 'object')::json) as j on jsonb_typeof(a."data" -> 'object') = 'array'
left join public.objects o on j.value = o.data ->> 'id'
left join public.activities a2 on j.value = a2.data ->> 'id'
left join public.users u on j.value = u.ap_id
group by a.id
having max(o.data ->> 'id') is null
and max(a2.data ->> 'id') is null
and max(u.ap_id) is null
)
"""
|> Repo.query([], timeout: :infinity)
end end
%{:num_rows => del_hashtags} =
""" """
DELETE FROM hashtags AS ht DELETE FROM hashtags AS ht
WHERE NOT EXISTS ( WHERE NOT EXISTS (
SELECT 1 FROM hashtags_objects hto SELECT 1 FROM hashtags_objects hto
WHERE ht.id = hto.hashtag_id) WHERE ht.id = hto.hashtag_id)
""" """
|> Repo.query() |> Repo.query!()
Logger.info("Deleted #{del_hashtags} no longer used hashtags...")
if Keyword.get(options, :vacuum) do if Keyword.get(options, :vacuum) do
Logger.info("Starting vacuum...")
Maintenance.vacuum("full") Maintenance.vacuum("full")
end end
Logger.info("All done!")
end end
def run(["prune_task"]) do def run(["prune_task"]) do

View file

@ -3,7 +3,6 @@ defmodule Mix.Tasks.Pleroma.Diagnostics do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
require Logger
require Pleroma.Constants require Pleroma.Constants
import Mix.Pleroma import Mix.Pleroma
@ -14,7 +13,7 @@ def run(["http", url]) do
start_pleroma() start_pleroma()
Pleroma.HTTP.get(url) Pleroma.HTTP.get(url)
|> IO.inspect() |> shell_info()
end end
def run(["fetch_object", url]) do def run(["fetch_object", url]) do
@ -27,7 +26,7 @@ def run(["fetch_object", url]) do
def run(["home_timeline", nickname]) do def run(["home_timeline", nickname]) do
start_pleroma() start_pleroma()
user = Repo.get_by!(User, nickname: nickname) user = Repo.get_by!(User, nickname: nickname)
Logger.info("Home timeline query #{user.nickname}") shell_info("Home timeline query #{user.nickname}")
followed_hashtags = followed_hashtags =
user user
@ -56,14 +55,14 @@ def run(["home_timeline", nickname]) do
|> limit(20) |> limit(20)
Ecto.Adapters.SQL.explain(Repo, :all, query, analyze: true, timeout: :infinity) Ecto.Adapters.SQL.explain(Repo, :all, query, analyze: true, timeout: :infinity)
|> IO.puts() |> shell_info()
end end
def run(["user_timeline", nickname, reading_nickname]) do def run(["user_timeline", nickname, reading_nickname]) do
start_pleroma() start_pleroma()
user = Repo.get_by!(User, nickname: nickname) user = Repo.get_by!(User, nickname: nickname)
reading_user = Repo.get_by!(User, nickname: reading_nickname) reading_user = Repo.get_by!(User, nickname: reading_nickname)
Logger.info("User timeline query #{user.nickname}") shell_info("User timeline query #{user.nickname}")
params = params =
%{limit: 20} %{limit: 20}
@ -87,7 +86,7 @@ def run(["user_timeline", nickname, reading_nickname]) do
|> limit(20) |> limit(20)
Ecto.Adapters.SQL.explain(Repo, :all, query, analyze: true, timeout: :infinity) Ecto.Adapters.SQL.explain(Repo, :all, query, analyze: true, timeout: :infinity)
|> IO.puts() |> shell_info()
end end
def run(["notifications", nickname]) do def run(["notifications", nickname]) do
@ -103,7 +102,7 @@ def run(["notifications", nickname]) do
|> limit(20) |> limit(20)
Ecto.Adapters.SQL.explain(Repo, :all, query, analyze: true, timeout: :infinity) Ecto.Adapters.SQL.explain(Repo, :all, query, analyze: true, timeout: :infinity)
|> IO.puts() |> shell_info()
end end
def run(["known_network", nickname]) do def run(["known_network", nickname]) do
@ -129,6 +128,6 @@ def run(["known_network", nickname]) do
|> limit(20) |> limit(20)
Ecto.Adapters.SQL.explain(Repo, :all, query, analyze: true, timeout: :infinity) Ecto.Adapters.SQL.explain(Repo, :all, query, analyze: true, timeout: :infinity)
|> IO.puts() |> shell_info()
end end
end end

View file

@ -27,11 +27,11 @@ def run(["ls-packs" | args]) do
] ]
for {param, value} <- to_print do for {param, value} <- to_print do
IO.puts(IO.ANSI.format([:bright, param, :normal, ": ", value])) shell_info(IO.ANSI.format([:bright, param, :normal, ": ", value]))
end end
# A newline # A newline
IO.puts("") shell_info("")
end) end)
end end
@ -49,7 +49,7 @@ def run(["get-packs" | args]) do
pack = manifest[pack_name] pack = manifest[pack_name]
src = pack["src"] src = pack["src"]
IO.puts( shell_info(
IO.ANSI.format([ IO.ANSI.format([
"Downloading ", "Downloading ",
:bright, :bright,
@ -67,9 +67,9 @@ def run(["get-packs" | args]) do
sha_status_text = ["SHA256 of ", :bright, pack_name, :normal, " source file is ", :bright] sha_status_text = ["SHA256 of ", :bright, pack_name, :normal, " source file is ", :bright]
if archive_sha == String.upcase(pack["src_sha256"]) do if archive_sha == String.upcase(pack["src_sha256"]) do
IO.puts(IO.ANSI.format(sha_status_text ++ [:green, "OK"])) shell_info(IO.ANSI.format(sha_status_text ++ [:green, "OK"]))
else else
IO.puts(IO.ANSI.format(sha_status_text ++ [:red, "BAD"])) shell_info(IO.ANSI.format(sha_status_text ++ [:red, "BAD"]))
raise "Bad SHA256 for #{pack_name}" raise "Bad SHA256 for #{pack_name}"
end end
@ -80,7 +80,7 @@ def run(["get-packs" | args]) do
|> Path.dirname() |> Path.dirname()
|> Path.join(pack["files"]) |> Path.join(pack["files"])
IO.puts( shell_info(
IO.ANSI.format([ IO.ANSI.format([
"Fetching the file list for ", "Fetching the file list for ",
:bright, :bright,
@ -94,7 +94,7 @@ def run(["get-packs" | args]) do
files = fetch_and_decode!(files_loc) files = fetch_and_decode!(files_loc)
IO.puts(IO.ANSI.format(["Unpacking ", :bright, pack_name])) shell_info(IO.ANSI.format(["Unpacking ", :bright, pack_name]))
pack_path = pack_path =
Path.join([ Path.join([
@ -115,7 +115,7 @@ def run(["get-packs" | args]) do
file_list: files_to_unzip file_list: files_to_unzip
) )
IO.puts(IO.ANSI.format(["Writing pack.json for ", :bright, pack_name])) shell_info(IO.ANSI.format(["Writing pack.json for ", :bright, pack_name]))
pack_json = %{ pack_json = %{
pack: %{ pack: %{
@ -132,7 +132,7 @@ def run(["get-packs" | args]) do
File.write!(Path.join(pack_path, "pack.json"), Jason.encode!(pack_json, pretty: true)) File.write!(Path.join(pack_path, "pack.json"), Jason.encode!(pack_json, pretty: true))
Pleroma.Emoji.reload() Pleroma.Emoji.reload()
else else
IO.puts(IO.ANSI.format([:bright, :red, "No pack named \"#{pack_name}\" found"])) shell_info(IO.ANSI.format([:bright, :red, "No pack named \"#{pack_name}\" found"]))
end end
end end
end end
@ -180,14 +180,14 @@ def run(["gen-pack" | args]) do
custom_exts custom_exts
end end
IO.puts("Using #{Enum.join(exts, " ")} extensions") shell_info("Using #{Enum.join(exts, " ")} extensions")
IO.puts("Downloading the pack and generating SHA256") shell_info("Downloading the pack and generating SHA256")
{:ok, %{body: binary_archive}} = Pleroma.HTTP.get(src) {:ok, %{body: binary_archive}} = Pleroma.HTTP.get(src)
archive_sha = :crypto.hash(:sha256, binary_archive) |> Base.encode16() archive_sha = :crypto.hash(:sha256, binary_archive) |> Base.encode16()
IO.puts("SHA256 is #{archive_sha}") shell_info("SHA256 is #{archive_sha}")
pack_json = %{ pack_json = %{
name => %{ name => %{
@ -208,7 +208,7 @@ def run(["gen-pack" | args]) do
File.write!(files_name, Jason.encode!(emoji_map, pretty: true)) File.write!(files_name, Jason.encode!(emoji_map, pretty: true))
IO.puts(""" shell_info("""
#{files_name} has been created and contains the list of all found emojis in the pack. #{files_name} has been created and contains the list of all found emojis in the pack.
Please review the files in the pack and remove those not needed. Please review the files in the pack and remove those not needed.
@ -230,11 +230,11 @@ def run(["gen-pack" | args]) do
) )
) )
IO.puts("#{pack_file} has been updated with the #{name} pack") shell_info("#{pack_file} has been updated with the #{name} pack")
else else
File.write!(pack_file, Jason.encode!(pack_json, pretty: true)) File.write!(pack_file, Jason.encode!(pack_json, pretty: true))
IO.puts("#{pack_file} has been created with the #{name} pack") shell_info("#{pack_file} has been created with the #{name} pack")
end end
Pleroma.Emoji.reload() Pleroma.Emoji.reload()
@ -243,7 +243,7 @@ def run(["gen-pack" | args]) do
def run(["reload"]) do def run(["reload"]) do
start_pleroma() start_pleroma()
Pleroma.Emoji.reload() Pleroma.Emoji.reload()
IO.puts("Emoji packs have been reloaded.") shell_info("Emoji packs have been reloaded.")
end end
defp fetch_and_decode!(from) do defp fetch_and_decode!(from) do

View file

@ -11,7 +11,6 @@ defmodule Mix.Tasks.Pleroma.RefreshCounterCache do
alias Pleroma.CounterCache alias Pleroma.CounterCache
alias Pleroma.Repo alias Pleroma.Repo
require Logger
import Ecto.Query import Ecto.Query
def run([]) do def run([]) do

View file

@ -48,7 +48,7 @@ def run(["index"]) do
] ]
) )
IO.puts("Created indices. Starting to insert posts.") shell_info("Created indices. Starting to insert posts.")
chunk_size = Pleroma.Config.get([Pleroma.Search.Meilisearch, :initial_indexing_chunk_size]) chunk_size = Pleroma.Config.get([Pleroma.Search.Meilisearch, :initial_indexing_chunk_size])
@ -65,7 +65,7 @@ def run(["index"]) do
) )
count = query |> Pleroma.Repo.aggregate(:count, :data) count = query |> Pleroma.Repo.aggregate(:count, :data)
IO.puts("Entries to index: #{count}") shell_info("Entries to index: #{count}")
Pleroma.Repo.stream( Pleroma.Repo.stream(
query, query,
@ -92,10 +92,10 @@ def run(["index"]) do
with {:ok, res} <- result do with {:ok, res} <- result do
if not Map.has_key?(res, "indexUid") do if not Map.has_key?(res, "indexUid") do
IO.puts("\nFailed to index: #{inspect(result)}") shell_info("\nFailed to index: #{inspect(result)}")
end end
else else
e -> IO.puts("\nFailed to index due to network error: #{inspect(e)}") e -> shell_error("\nFailed to index due to network error: #{inspect(e)}")
end end
end) end)
|> Stream.run() |> Stream.run()
@ -128,13 +128,13 @@ def run(["show-keys", master_key]) do
if decoded["results"] do if decoded["results"] do
Enum.each(decoded["results"], fn Enum.each(decoded["results"], fn
%{"name" => name, "key" => key} -> %{"name" => name, "key" => key} ->
IO.puts("#{name}: #{key}") shell_info("#{name}: #{key}")
%{"description" => desc, "key" => key} -> %{"description" => desc, "key" => key} ->
IO.puts("#{desc}: #{key}") shell_info("#{desc}: #{key}")
end) end)
else else
IO.puts("Error fetching the keys, check the master key is correct: #{inspect(decoded)}") shell_error("Error fetching the keys, check the master key is correct: #{inspect(decoded)}")
end end
end end
@ -142,7 +142,7 @@ def run(["stats"]) do
start_pleroma() start_pleroma()
{:ok, result} = meili_get("/indexes/objects/stats") {:ok, result} = meili_get("/indexes/objects/stats")
IO.puts("Number of entries: #{result["numberOfDocuments"]}") shell_info("Number of entries: #{result["numberOfDocuments"]}")
IO.puts("Indexing? #{result["isIndexing"]}") shell_info("Indexing? #{result["isIndexing"]}")
end end
end end

View file

@ -38,7 +38,7 @@ def run(["spoof-uploaded"]) do
Logger.put_process_level(self(), :notice) Logger.put_process_level(self(), :notice)
start_pleroma() start_pleroma()
IO.puts(""" shell_info("""
+------------------------+ +------------------------+
| SPOOF SEARCH UPLOADS | | SPOOF SEARCH UPLOADS |
+------------------------+ +------------------------+
@ -55,7 +55,7 @@ def run(["spoof-inserted"]) do
Logger.put_process_level(self(), :notice) Logger.put_process_level(self(), :notice)
start_pleroma() start_pleroma()
IO.puts(""" shell_info("""
+----------------------+ +----------------------+
| SPOOF SEARCH NOTES | | SPOOF SEARCH NOTES |
+----------------------+ +----------------------+
@ -77,7 +77,7 @@ defp do_spoof_uploaded() do
uploads_search_spoofs_local_dir(Config.get!([Pleroma.Uploaders.Local, :uploads])) uploads_search_spoofs_local_dir(Config.get!([Pleroma.Uploaders.Local, :uploads]))
_ -> _ ->
IO.puts(""" shell_info("""
NOTE: NOTE:
Not using local uploader; thus not affected by this exploit. Not using local uploader; thus not affected by this exploit.
It's impossible to check for files, but in case local uploader was used before It's impossible to check for files, but in case local uploader was used before
@ -98,13 +98,13 @@ defp do_spoof_uploaded() do
orphaned_attachs = upload_search_orphaned_attachments(not_orphaned_urls) orphaned_attachs = upload_search_orphaned_attachments(not_orphaned_urls)
IO.puts("\nSearch concluded; here are the results:") shell_info("\nSearch concluded; here are the results:")
pretty_print_list_with_title(emoji, "Emoji") pretty_print_list_with_title(emoji, "Emoji")
pretty_print_list_with_title(files, "Uploaded Files") pretty_print_list_with_title(files, "Uploaded Files")
pretty_print_list_with_title(post_attachs, "(Not Deleted) Post Attachments") pretty_print_list_with_title(post_attachs, "(Not Deleted) Post Attachments")
pretty_print_list_with_title(orphaned_attachs, "Orphaned Uploads") pretty_print_list_with_title(orphaned_attachs, "Orphaned Uploads")
IO.puts(""" shell_info("""
In total found In total found
#{length(emoji)} emoji #{length(emoji)} emoji
#{length(files)} uploads #{length(files)} uploads
@ -116,7 +116,7 @@ defp do_spoof_uploaded() do
defp uploads_search_spoofs_local_dir(dir) do defp uploads_search_spoofs_local_dir(dir) do
local_dir = String.replace_suffix(dir, "/", "") local_dir = String.replace_suffix(dir, "/", "")
IO.puts("Searching for suspicious files in #{local_dir}...") shell_info("Searching for suspicious files in #{local_dir}...")
glob_ext = "{" <> Enum.join(@activity_exts, ",") <> "}" glob_ext = "{" <> Enum.join(@activity_exts, ",") <> "}"
@ -128,7 +128,7 @@ defp uploads_search_spoofs_local_dir(dir) do
end end
defp uploads_search_spoofs_notes() do defp uploads_search_spoofs_notes() do
IO.puts("Now querying DB for posts with spoofing attachments. This might take a while...") shell_info("Now querying DB for posts with spoofing attachments. This might take a while...")
patterns = [local_id_pattern() | activity_ext_url_patterns()] patterns = [local_id_pattern() | activity_ext_url_patterns()]
@ -153,7 +153,7 @@ defp uploads_search_spoofs_notes() do
end end
defp upload_search_orphaned_attachments(not_orphaned_urls) do defp upload_search_orphaned_attachments(not_orphaned_urls) do
IO.puts(""" shell_info("""
Now querying DB for orphaned spoofing attachment (i.e. their post was deleted, Now querying DB for orphaned spoofing attachment (i.e. their post was deleted,
but if :cleanup_attachments was not enabled traces remain in the database) but if :cleanup_attachments was not enabled traces remain in the database)
This might take a bit... This might take a bit...
@ -184,7 +184,7 @@ defp upload_search_orphaned_attachments(not_orphaned_urls) do
# | S P O O F - I N S E R T E D | # | S P O O F - I N S E R T E D |
# +-----------------------------+ # +-----------------------------+
defp do_spoof_inserted() do defp do_spoof_inserted() do
IO.puts(""" shell_info("""
Searching for local posts whose Create activity has no ActivityPub id... Searching for local posts whose Create activity has no ActivityPub id...
This is a pretty good indicator, but only for spoofs of local actors This is a pretty good indicator, but only for spoofs of local actors
and only if the spoofing happened after around late 2021. and only if the spoofing happened after around late 2021.
@ -194,9 +194,9 @@ defp do_spoof_inserted() do
search_local_notes_without_create_id() search_local_notes_without_create_id()
|> Enum.sort() |> Enum.sort()
IO.puts("Done.\n") shell_info("Done.\n")
IO.puts(""" shell_info("""
Now trying to weed out other poorly hidden spoofs. Now trying to weed out other poorly hidden spoofs.
This can't detect all and may have some false positives. This can't detect all and may have some false positives.
""") """)
@ -207,9 +207,9 @@ defp do_spoof_inserted() do
search_sus_notes_by_id_patterns() search_sus_notes_by_id_patterns()
|> Enum.filter(fn r -> !(r in likely_spoofed_posts_set) end) |> Enum.filter(fn r -> !(r in likely_spoofed_posts_set) end)
IO.puts("Done.\n") shell_info("Done.\n")
IO.puts(""" shell_info("""
Finally, searching for spoofed, local user accounts. Finally, searching for spoofed, local user accounts.
(It's impossible to detect spoofed remote users) (It's impossible to detect spoofed remote users)
""") """)
@ -220,7 +220,7 @@ defp do_spoof_inserted() do
pretty_print_list_with_title(idless_create, "Likely Spoofed Posts") pretty_print_list_with_title(idless_create, "Likely Spoofed Posts")
pretty_print_list_with_title(spoofed_users, "Spoofed local user accounts") pretty_print_list_with_title(spoofed_users, "Spoofed local user accounts")
IO.puts(""" shell_info("""
In total found: In total found:
#{length(spoofed_users)} bogus users #{length(spoofed_users)} bogus users
#{length(idless_create)} likely spoofed posts #{length(idless_create)} likely spoofed posts
@ -289,27 +289,27 @@ defp search_bogus_local_users() do
defp pretty_print_list_with_title(list, title) do defp pretty_print_list_with_title(list, title) do
title_len = String.length(title) title_len = String.length(title)
title_underline = String.duplicate("=", title_len) title_underline = String.duplicate("=", title_len)
IO.puts(title) shell_info(title)
IO.puts(title_underline) shell_info(title_underline)
pretty_print_list(list) pretty_print_list(list)
end end
defp pretty_print_list([]), do: IO.puts("") defp pretty_print_list([]), do: shell_info("")
defp pretty_print_list([{a, o} | rest]) defp pretty_print_list([{a, o} | rest])
when (is_binary(a) or is_number(a)) and is_binary(o) do when (is_binary(a) or is_number(a)) and is_binary(o) do
IO.puts(" {#{a}, #{o}}") shell_info(" {#{a}, #{o}}")
pretty_print_list(rest) pretty_print_list(rest)
end end
defp pretty_print_list([{u, a, o} | rest]) defp pretty_print_list([{u, a, o} | rest])
when is_binary(a) and is_binary(u) and is_binary(o) do when is_binary(a) and is_binary(u) and is_binary(o) do
IO.puts(" {#{u}, #{a}, #{o}}") shell_info(" {#{u}, #{a}, #{o}}")
pretty_print_list(rest) pretty_print_list(rest)
end end
defp pretty_print_list([e | rest]) when is_binary(e) do defp pretty_print_list([e | rest]) when is_binary(e) do
IO.puts(" #{e}") shell_info(" #{e}")
pretty_print_list(rest) pretty_print_list(rest)
end end

View file

@ -114,7 +114,7 @@ def run(["reset_password", nickname]) do
{:ok, token} <- Pleroma.PasswordResetToken.create_token(user) do {:ok, token} <- Pleroma.PasswordResetToken.create_token(user) do
shell_info("Generated password reset token for #{user.nickname}") shell_info("Generated password reset token for #{user.nickname}")
IO.puts("URL: #{~p[/api/v1/pleroma/password_reset/#{token.token}]}") shell_info("URL: #{~p[/api/v1/pleroma/password_reset/#{token.token}]}")
else else
_ -> _ ->
shell_error("No local user #{nickname}") shell_error("No local user #{nickname}")
@ -301,7 +301,7 @@ def run(["invite" | rest]) do
shell_info("Generated user invite token " <> String.replace(invite.invite_type, "_", " ")) shell_info("Generated user invite token " <> String.replace(invite.invite_type, "_", " "))
url = url(~p[/registration/#{invite.token}]) url = url(~p[/registration/#{invite.token}])
IO.puts(url) shell_info(url)
else else
error -> error ->
shell_error("Could not create invite token: #{inspect(error)}") shell_error("Could not create invite token: #{inspect(error)}")
@ -373,7 +373,7 @@ def run(["show", nickname]) do
nickname nickname
|> User.get_cached_by_nickname() |> User.get_cached_by_nickname()
shell_info("#{inspect(user)}") shell_info(user)
end end
def run(["send_confirmation", nickname]) do def run(["send_confirmation", nickname]) do
@ -457,7 +457,7 @@ def run(["blocking", nickname]) do
with %User{local: true} = user <- User.get_cached_by_nickname(nickname) do with %User{local: true} = user <- User.get_cached_by_nickname(nickname) do
blocks = User.following_ap_ids(user) blocks = User.following_ap_ids(user)
IO.puts("#{inspect(blocks)}") shell_info(blocks)
end end
end end
@ -516,12 +516,12 @@ def run(["fix_follow_state", local_user, remote_user]) do
{:follow_data, Pleroma.Web.ActivityPub.Utils.fetch_latest_follow(local, remote)} do {:follow_data, Pleroma.Web.ActivityPub.Utils.fetch_latest_follow(local, remote)} do
calculated_state = User.following?(local, remote) calculated_state = User.following?(local, remote)
IO.puts( shell_info(
"Request state is #{request_state}, vs calculated state of following=#{calculated_state}" "Request state is #{request_state}, vs calculated state of following=#{calculated_state}"
) )
if calculated_state == false && request_state == "accept" do if calculated_state == false && request_state == "accept" do
IO.puts("Discrepancy found, fixing") shell_info("Discrepancy found, fixing")
Pleroma.Web.CommonAPI.reject_follow_request(local, remote) Pleroma.Web.CommonAPI.reject_follow_request(local, remote)
shell_info("Relationship fixed") shell_info("Relationship fixed")
else else
@ -551,14 +551,14 @@ defp refetch_public_keys(query) do
|> Stream.each(fn users -> |> Stream.each(fn users ->
users users
|> Enum.each(fn user -> |> Enum.each(fn user ->
IO.puts("Re-Resolving: #{user.ap_id}") shell_info("Re-Resolving: #{user.ap_id}")
with {:ok, user} <- Pleroma.User.fetch_by_ap_id(user.ap_id), with {:ok, user} <- Pleroma.User.fetch_by_ap_id(user.ap_id),
changeset <- Pleroma.User.update_changeset(user), changeset <- Pleroma.User.update_changeset(user),
{:ok, _user} <- Pleroma.User.update_and_set_cache(changeset) do {:ok, _user} <- Pleroma.User.update_and_set_cache(changeset) do
:ok :ok
else else
error -> IO.puts("Could not resolve: #{user.ap_id}, #{inspect(error)}") error -> shell_info("Could not resolve: #{user.ap_id}, #{inspect(error)}")
end end
end) end)
end) end)

View file

@ -470,7 +470,7 @@ test "it prunes orphaned activities with the --prune-orphaned-activities" do
assert length(activities) == 4 assert length(activities) == 4
end end
test "it prunes orphaned activities with the --prune-orphaned-activities when the objects are referenced from an array" do test "it prunes orphaned activities with prune_orphaned_activities when the objects are referenced from an array" do
%Object{} |> Map.merge(%{data: %{"id" => "existing_object"}}) |> Repo.insert() %Object{} |> Map.merge(%{data: %{"id" => "existing_object"}}) |> Repo.insert()
%User{} |> Map.merge(%{ap_id: "existing_actor"}) |> Repo.insert() %User{} |> Map.merge(%{ap_id: "existing_actor"}) |> Repo.insert()
@ -478,6 +478,7 @@ test "it prunes orphaned activities with the --prune-orphaned-activities when th
|> Map.merge(%{ |> Map.merge(%{
local: false, local: false,
data: %{ data: %{
"type" => "Flag",
"id" => "remote_activity_existing_object", "id" => "remote_activity_existing_object",
"object" => ["non_ existing_object", "existing_object"] "object" => ["non_ existing_object", "existing_object"]
} }
@ -488,6 +489,7 @@ test "it prunes orphaned activities with the --prune-orphaned-activities when th
|> Map.merge(%{ |> Map.merge(%{
local: false, local: false,
data: %{ data: %{
"type" => "Flag",
"id" => "remote_activity_existing_actor", "id" => "remote_activity_existing_actor",
"object" => ["non_ existing_object", "existing_actor"] "object" => ["non_ existing_object", "existing_actor"]
} }
@ -498,6 +500,7 @@ test "it prunes orphaned activities with the --prune-orphaned-activities when th
|> Map.merge(%{ |> Map.merge(%{
local: false, local: false,
data: %{ data: %{
"type" => "Flag",
"id" => "remote_activity_existing_activity", "id" => "remote_activity_existing_activity",
"object" => ["non_ existing_object", "remote_activity_existing_actor"] "object" => ["non_ existing_object", "remote_activity_existing_actor"]
} }
@ -508,6 +511,7 @@ test "it prunes orphaned activities with the --prune-orphaned-activities when th
|> Map.merge(%{ |> Map.merge(%{
local: false, local: false,
data: %{ data: %{
"type" => "Flag",
"id" => "remote_activity_without_existing_referenced_object", "id" => "remote_activity_without_existing_referenced_object",
"object" => ["owo", "whats_this"] "object" => ["owo", "whats_this"]
} }
@ -517,7 +521,7 @@ test "it prunes orphaned activities with the --prune-orphaned-activities when th
assert length(Repo.all(Activity)) == 4 assert length(Repo.all(Activity)) == 4
Mix.Tasks.Pleroma.Database.run(["prune_objects"]) Mix.Tasks.Pleroma.Database.run(["prune_objects"])
assert length(Repo.all(Activity)) == 4 assert length(Repo.all(Activity)) == 4
Mix.Tasks.Pleroma.Database.run(["prune_objects", "--prune-orphaned-activities"]) Mix.Tasks.Pleroma.Database.run(["prune_orphaned_activities"])
activities = Repo.all(Activity) activities = Repo.all(Activity)
assert length(activities) == 3 assert length(activities) == 3

View file

@ -280,12 +280,13 @@ test "no user to set status" do
test "password reset token is generated" do test "password reset token is generated" do
user = insert(:user) user = insert(:user)
assert capture_io(fn ->
Mix.Tasks.Pleroma.User.run(["reset_password", user.nickname]) Mix.Tasks.Pleroma.User.run(["reset_password", user.nickname])
end) =~ "URL:"
assert_receive {:mix_shell, :info, [message]} assert_receive {:mix_shell, :info, [message]}
assert message =~ "Generated" assert message =~ "Generated"
assert_receive {:mix_shell, :info, [url]}
assert url =~ "URL:"
end end
test "no user to reset password" do test "no user to reset password" do
@ -327,12 +328,13 @@ test "no user to reset MFA" do
describe "running invite" do describe "running invite" do
test "invite token is generated" do test "invite token is generated" do
assert capture_io(fn ->
Mix.Tasks.Pleroma.User.run(["invite"]) Mix.Tasks.Pleroma.User.run(["invite"])
end) =~ "http"
assert_receive {:mix_shell, :info, [message]} assert_receive {:mix_shell, :info, [message]}
assert message =~ "Generated user invite token one time" assert message =~ "Generated user invite token one time"
assert_receive {:mix_shell, :info, [invite_token]}
assert invite_token =~ "http"
end end
test "token is generated with expires_at" do test "token is generated with expires_at" do