From a25c1e8ec0b6f4ef2e9f68c4ad5e48e18f5f01a7 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Wed, 30 Dec 2020 14:35:19 +0300 Subject: [PATCH] [#3213] Improved `database.transfer_hashtags` mix task: proper rollback, speedup. --- lib/mix/tasks/pleroma/database.ex | 46 +++++++++++++++++-------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/lib/mix/tasks/pleroma/database.ex b/lib/mix/tasks/pleroma/database.ex index 093c7dd30..d44bd3478 100644 --- a/lib/mix/tasks/pleroma/database.ex +++ b/lib/mix/tasks/pleroma/database.ex @@ -137,6 +137,8 @@ def run(["transfer_hashtags"]) do start_pleroma() + Logger.info("Starting transferring object embedded hashtags to `hashtags` table...") + from( object in Object, left_join: hashtag in assoc(object, :hashtags), @@ -144,21 +146,12 @@ def run(["transfer_hashtags"]) do where: fragment("(?)->>'tag' != '[]'", object.data), select: %{ id: object.id, - inserted_at: object.inserted_at, tag: fragment("(?)->>'tag'", object.data) - }, - order_by: [desc: object.id] + } ) |> Pleroma.Repo.chunk_stream(100, :batches) |> Stream.each(fn objects -> - chunk_start = List.first(objects) - chunk_end = List.last(objects) - - Logger.info( - "transfer_hashtags: " <> - "#{chunk_start.id} (#{chunk_start.inserted_at}) -- " <> - "#{chunk_end.id} (#{chunk_end.inserted_at})" - ) + Logger.info("Processing #{length(objects)} objects...") Enum.map( objects, @@ -168,28 +161,39 @@ def run(["transfer_hashtags"]) do |> Jason.decode!() |> Enum.filter(&is_bitstring(&1)) - with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do - Repo.transaction(fn -> + Repo.transaction(fn -> + with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do for hashtag_record <- hashtag_records do - with {:error, _} <- + with {:ok, _} <- Ecto.Adapters.SQL.query( Repo, "insert into hashtags_objects(hashtag_id, object_id) values " <> "(#{hashtag_record.id}, #{object.id});" ) do - Logger.warn( - "ERROR: could not link object #{object.id} and hashtag #{hashtag_record.id}" - ) + :noop + else + {:error, e} -> + error = + "ERROR: could not link object #{object.id} and hashtag " <> + "#{hashtag_record.id}: #{inspect(e)}" + + Logger.error(error) + Repo.rollback(error) end end - end) - else - e -> Logger.warn("ERROR: could not process object #{object.id}: #{inspect(e)}") - end + else + e -> + error = "ERROR: could not create hashtags for object #{object.id}: #{inspect(e)}" + Logger.error(error) + Repo.rollback(error) + end + end) end ) end) |> Stream.run() + + Logger.info("Done transferring hashtags. Please check logs to ensure no errors.") end def run(["vacuum", args]) do