[#3213] Reorganized hashtags cleanup. Transaction-wrapped Hashtag.get_or_create_by_names/1. Misc. improvements.

This commit is contained in:
Ivan Tashkinov 2021-02-11 19:30:21 +03:00
parent d1c6dd97aa
commit a996ab46a5
6 changed files with 112 additions and 100 deletions

View file

@ -560,7 +560,6 @@
],
plugins: [Oban.Plugins.Pruner],
crontab: [
{"0 1 * * *", Pleroma.Workers.Cron.HashtagsCleanupWorker},
{"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
{"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
]

View file

@ -1964,7 +1964,6 @@
type: {:list, :tuple},
description: "Settings for cron background jobs",
suggestions: [
{"0 1 * * *", Pleroma.Workers.Cron.HashtagsCleanupWorker},
{"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker},
{"0 0 * * *", Pleroma.Workers.Cron.NewUsersDigestWorker}
]

View file

@ -6,14 +6,17 @@ defmodule Pleroma.Hashtag do
use Ecto.Schema
import Ecto.Changeset
import Ecto.Query
alias Ecto.Multi
alias Pleroma.Hashtag
alias Pleroma.Object
alias Pleroma.Repo
schema "hashtags" do
field(:name, :string)
many_to_many(:objects, Pleroma.Object, join_through: "hashtags_objects", on_replace: :delete)
many_to_many(:objects, Object, join_through: "hashtags_objects", on_replace: :delete)
timestamps()
end
@ -34,15 +37,27 @@ def get_or_create_by_name(name) when is_bitstring(name) do
end
def get_or_create_by_names(names) when is_list(names) do
Enum.reduce_while(names, {:ok, []}, fn name, {:ok, list} ->
case get_or_create_by_name(name) do
{:ok, %Hashtag{} = hashtag} ->
{:cont, {:ok, list ++ [hashtag]}}
timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
error ->
{:halt, error}
end
end)
structs =
Enum.map(names, fn name ->
%Hashtag{}
|> changeset(%{name: name})
|> Map.get(:changes)
|> Map.merge(%{inserted_at: timestamp, updated_at: timestamp})
end)
with {:ok, %{query_op: hashtags}} <-
Multi.new()
|> Multi.insert_all(:insert_all_op, Hashtag, structs, on_conflict: :nothing)
|> Multi.run(:query_op, fn _repo, _changes ->
{:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))}
end)
|> Repo.transaction() do
{:ok, hashtags}
else
{:error, _name, value, _changes_so_far} -> {:error, value}
end
end
def changeset(%Hashtag{} = struct, params) do
@ -52,4 +67,29 @@ def changeset(%Hashtag{} = struct, params) do
|> validate_required([:name])
|> unique_constraint(:name)
end
def unlink(%Object{id: object_id}) do
with {_, hashtag_ids} <-
from(hto in "hashtags_objects",
where: hto.object_id == ^object_id,
select: hto.hashtag_id
)
|> Repo.delete_all() do
delete_unreferenced(hashtag_ids)
end
end
@delete_unreferenced_query """
DELETE FROM hashtags WHERE id IN
(SELECT hashtags.id FROM hashtags
LEFT OUTER JOIN hashtags_objects
ON hashtags_objects.hashtag_id = hashtags.id
WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.id = ANY($1));
"""
def delete_unreferenced(ids) do
with {:ok, %{num_rows: deleted_count}} <- Repo.query(@delete_unreferenced_query, [ids]) do
{:ok, deleted_count}
end
end
end

View file

@ -74,16 +74,15 @@ def handle_continue(:init_state, _state) do
def handle_info(:migrate_hashtags, state) do
State.clear()
data_migration = data_migration()
update_status(:running)
put_stat(:started_at, NaiveDateTime.utc_now())
data_migration = data_migration()
persistent_data = Map.take(data_migration.data, ["max_processed_id"])
{:ok, data_migration} =
DataMigration.update(data_migration, %{state: :running, data: persistent_data})
update_status(:running)
put_stat(:started_at, NaiveDateTime.utc_now())
Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
max_processed_id = data_migration.data["max_processed_id"] || 0
@ -137,6 +136,8 @@ def handle_info(:migrate_hashtags, state) do
|> Stream.run()
with 0 <- failures_count(data_migration.id) do
_ = delete_non_create_activities_hashtags()
{:ok, data_migration} = DataMigration.update_state(data_migration, :complete)
handle_success(data_migration)
@ -150,9 +151,37 @@ def handle_info(:migrate_hashtags, state) do
{:noreply, state}
end
@hashtags_objects_cleanup_query """
DELETE FROM hashtags_objects WHERE object_id IN
(SELECT DISTINCT objects.id FROM objects
JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
(objects.data->>'id')
AND activities.data->>'type' = 'Create'
WHERE activities.id IS NULL);
"""
@hashtags_cleanup_query """
DELETE FROM hashtags WHERE id IN
(SELECT hashtags.id FROM hashtags
LEFT OUTER JOIN hashtags_objects
ON hashtags_objects.hashtag_id = hashtags.id
WHERE hashtags_objects.hashtag_id IS NULL);
"""
def delete_non_create_activities_hashtags do
{:ok, %{num_rows: hashtags_objects_count}} =
Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity)
{:ok, %{num_rows: hashtags_count}} =
Repo.query(@hashtags_cleanup_query, [], timeout: :infinity)
{:ok, hashtags_objects_count, hashtags_count}
end
defp query do
# Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out)
# Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later
# Note: not checking activity type, expecting remove_non_create_objects_hashtags/_ to clean up
from(
object in Object,
where:
@ -182,25 +211,20 @@ defp transfer_object_hashtags(object) do
defp transfer_object_hashtags(object, hashtags) do
Repo.transaction(fn ->
with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
for hashtag_record <- hashtag_records do
with {:ok, _} <-
Repo.query(
"insert into hashtags_objects(hashtag_id, object_id) values ($1, $2);",
[hashtag_record.id, object.id]
) do
nil
else
{:error, e} ->
error =
"ERROR: could not link object #{object.id} and hashtag " <>
"#{hashtag_record.id}: #{inspect(e)}"
maps = Enum.map(hashtag_records, &%{hashtag_id: &1.id, object_id: object.id})
expected_rows = length(hashtag_records)
Logger.error(error)
Repo.rollback(object.id)
end
with {^expected_rows, _} <- Repo.insert_all("hashtags_objects", maps) do
object.id
else
e ->
error =
"ERROR when inserting #{expected_rows} hashtags_objects " <>
"for object #{object.id}: #{inspect(e)}"
Logger.error(error)
Repo.rollback(object.id)
end
object.id
else
e ->
error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}"

View file

@ -62,27 +62,30 @@ def change(struct, params \\ %{}) do
|> cast(params, [:data])
|> validate_required([:data])
|> unique_constraint(:ap_id, name: :objects_unique_apid_index)
# Expecting `maybe_handle_hashtags_change/1` to run last:
|> maybe_handle_hashtags_change(struct)
end
# Note: not checking activity type; HashtagsCleanupWorker should clean up unused records later
# Note: not checking activity type (assuming non-legacy objects are associated with Create act.)
defp maybe_handle_hashtags_change(changeset, struct) do
with data_hashtags_change = get_change(changeset, :data),
true <- hashtags_changed?(struct, data_hashtags_change),
with %Ecto.Changeset{valid?: true} <- changeset,
data_hashtags_change = get_change(changeset, :data),
{_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)},
{:ok, hashtag_records} <-
data_hashtags_change
|> object_data_hashtags()
|> Hashtag.get_or_create_by_names() do
put_assoc(changeset, :hashtags, hashtag_records)
else
false ->
%{valid?: false} ->
changeset
{:error, hashtag_changeset} ->
failed_hashtag = get_field(hashtag_changeset, :name)
{:changed, false} ->
changeset
{:error, _} ->
validate_change(changeset, :data, fn _, _ ->
[data: "error referencing hashtag: #{failed_hashtag}"]
[data: "error referencing hashtags"]
end)
end
end
@ -221,9 +224,13 @@ def make_tombstone(%Object{data: %{"id" => id, "type" => type}}, deleted \\ Date
def swap_object_with_tombstone(object) do
tombstone = make_tombstone(object)
object
|> Object.change(%{data: tombstone})
|> Repo.update()
with {:ok, object} <-
object
|> Object.change(%{data: tombstone})
|> Repo.update() do
Hashtag.unlink(object)
{:ok, object}
end
end
def delete(%Object{data: %{"id" => id}} = object) do

View file

@ -1,57 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Cron.HashtagsCleanupWorker do
@moduledoc """
The worker to clean up unused hashtags_objects and hashtags.
"""
use Oban.Worker, queue: "hashtags_cleanup"
alias Pleroma.Repo
require Logger
@hashtags_objects_query """
DELETE FROM hashtags_objects WHERE object_id IN
(SELECT DISTINCT objects.id FROM objects
JOIN hashtags_objects ON hashtags_objects.object_id = objects.id LEFT JOIN activities
ON COALESCE(activities.data->'object'->>'id', activities.data->>'object') =
(objects.data->>'id')
AND activities.data->>'type' = 'Create'
WHERE activities.id IS NULL);
"""
@hashtags_query """
DELETE FROM hashtags WHERE id IN
(SELECT hashtags.id FROM hashtags
LEFT OUTER JOIN hashtags_objects
ON hashtags_objects.hashtag_id = hashtags.id
WHERE hashtags_objects.hashtag_id IS NULL AND hashtags.inserted_at < $1);
"""
@impl Oban.Worker
def perform(_job) do
Logger.info("Cleaning up unused `hashtags_objects` records...")
{:ok, %{num_rows: hashtags_objects_count}} =
Repo.query(@hashtags_objects_query, [], timeout: :infinity)
Logger.info("Deleted #{hashtags_objects_count} unused `hashtags_objects` records.")
Logger.info("Cleaning up unused `hashtags` records...")
# Note: ignoring recently created hashtags since references are added after hashtag is created
{:ok, %{num_rows: hashtags_count}} =
Repo.query(@hashtags_query, [NaiveDateTime.add(NaiveDateTime.utc_now(), -3600 * 24)],
timeout: :infinity
)
Logger.info("Deleted #{hashtags_count} unused `hashtags` records.")
Logger.info("HashtagsCleanupWorker complete.")
:ok
end
end