From 7f07909a7b56eb368b3f8aab4752def1551c12fe Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Tue, 19 Jan 2021 21:13:32 +0300 Subject: [PATCH] [#3213] Added `HashtagsTableMigrator.count/1`. --- .../migrators/hashtags_table_migrator.ex | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/lib/pleroma/migrators/hashtags_table_migrator.ex b/lib/pleroma/migrators/hashtags_table_migrator.ex index 47de5e134..048f3c8ee 100644 --- a/lib/pleroma/migrators/hashtags_table_migrator.ex +++ b/lib/pleroma/migrators/hashtags_table_migrator.ex @@ -85,19 +85,8 @@ def handle_info(:migrate_hashtags, state) do max_processed_id = data_migration.data["max_processed_id"] || 0 - # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) - from( - object in Object, - left_join: hashtag in assoc(object, :hashtags), - where: object.id > ^max_processed_id, - where: is_nil(hashtag.id), - where: - fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), - select: %{ - id: object.id, - tag: fragment("(?)->'tag'", object.data) - } - ) + query() + |> where([object], object.id > ^max_processed_id) |> Repo.chunk_stream(100, :batches, timeout: :infinity) |> Stream.each(fn objects -> object_ids = Enum.map(objects, & &1.id) @@ -155,6 +144,21 @@ def handle_info(:migrate_hashtags, state) do {:noreply, state} end + defp query do + # Note: most objects have Mention-type AS2 tags and no hashtags (but we can't filter them out) + from( + object in Object, + left_join: hashtag in assoc(object, :hashtags), + where: is_nil(hashtag.id), + where: + fragment("(?)->'tag' IS NOT NULL AND (?)->'tag' != '[]'::jsonb", object.data, object.data), + select: %{ + id: object.id, + tag: fragment("(?)->'tag'", object.data) + } + ) + end + defp transfer_object_hashtags(object) do hashtags = Object.object_data_hashtags(%{"tag" => object.tag}) @@ -188,6 +192,18 @@ defp transfer_object_hashtags(object) do end) end + def count(force \\ false) do + stored_count = state()[:count] + + if stored_count && !force do + stored_count + else + count = Repo.aggregate(query(), :count, :id) + put_stat(:count, count) + count + end + end + defp persist_stats(data_migration) do runner_state = Map.drop(state(), [:status]) _ = DataMigration.update(data_migration, %{data: runner_state})