From 6fd4163ab60be07b1a20ac8911e105ddca8e2095 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 31 Jan 2021 20:37:33 +0300 Subject: [PATCH] [#3213] ActivityPub: implemented subqueries-based hashtags filtering, removed aggregation-based hashtags filtering. --- config/description.exs | 2 +- lib/pleroma/web/activity_pub/activity_pub.ex | 228 ++++++------------ .../web/activity_pub/activity_pub_test.exs | 5 +- 3 files changed, 81 insertions(+), 154 deletions(-) diff --git a/config/description.exs b/config/description.exs index 147c1930c..ead541724 100644 --- a/config/description.exs +++ b/config/description.exs @@ -940,7 +940,7 @@ key: :improved_hashtag_timeline, type: :keyword, description: - "If `true` / `:prefer_aggregation`, hashtags table and selected strategy will be used for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes." + "If `true`, hashtags will be fetched from `hashtags` table for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes." } ] }, diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 0a21ac2f2..cda8d3f54 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -669,24 +669,6 @@ defp restrict_since(query, %{since_id: since_id}) do defp restrict_since(query, _), do: query - defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do - raise_on_missing_preload() - end - - defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do - from( - [_activity, object] in query, - where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) - ) - end - - defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) - when is_binary(tag_reject) do - restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]}) - end - - defp restrict_embedded_tag_reject(query, _), do: query - defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do raise_on_missing_preload() end @@ -699,139 +681,65 @@ defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) end defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do - restrict_embedded_tag(query, %{tag: tag}) + restrict_embedded_tag_any(query, %{tag: tag}) end defp restrict_embedded_tag_all(query, _), do: query - defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do + defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do + defp restrict_embedded_tag_any(query, %{tag: tag}) when is_list(tag) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag) ) end - defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do - restrict_embedded_tag(query, %{tag: [tag]}) + defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do + restrict_embedded_tag_any(query, %{tag: [tag]}) end - defp restrict_embedded_tag(query, _), do: query + defp restrict_embedded_tag_any(query, _), do: query - defp hashtag_conditions(opts) do - [:tag, :tag_all, :tag_reject] - |> Enum.map(&opts[&1]) - |> Enum.map(&List.wrap(&1)) - end - - # Note: times out on larger instances (with default timeout), intended for complex queries - defp restrict_hashtag_agg(query, opts) do - [tag_any, tag_all, tag_reject] = hashtag_conditions(opts) - has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1)) - - cond do - !has_conditions -> - query - - opts[:skip_preload] -> - raise_on_missing_preload() - - true -> - query - |> group_by_all_bindings() - |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag) - |> maybe_restrict_hashtag_any(tag_any) - |> maybe_restrict_hashtag_all(tag_all) - |> maybe_restrict_hashtag_reject_any(tag_reject) - end - end - - # Groups by all bindings to allow aggregation on hashtags - defp group_by_all_bindings(query) do - # Expecting named bindings: :object, :bookmark, :thread_mute, :report_note - cond do - Enum.count(query.aliases) == 4 -> - from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id]) - - Enum.count(query.aliases) == 3 -> - from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id]) - - Enum.count(query.aliases) == 2 -> - from([a, o, b3] in query, group_by: [a.id, o.id, b3.id]) - - true -> - from([a, o] in query, group_by: [a.id, o.id]) - end - end - - defp maybe_restrict_hashtag_any(query, []) do - query - end - - defp maybe_restrict_hashtag_any(query, tags) do - having( - query, - [hashtag: hashtag], - fragment("array_agg(?) && (?)", hashtag.name, ^tags) - ) - end - - defp maybe_restrict_hashtag_all(query, []) do - query - end - - defp maybe_restrict_hashtag_all(query, tags) do - having( - query, - [hashtag: hashtag], - fragment("array_agg(?) @> (?)", hashtag.name, ^tags) - ) - end - - defp maybe_restrict_hashtag_reject_any(query, []) do - query - end - - defp maybe_restrict_hashtag_reject_any(query, tags) do - having( - query, - [hashtag: hashtag], - fragment("not(array_agg(?) && (?))", hashtag.name, ^tags) - ) - end - - defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do raise_on_missing_preload() end - defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do - query - |> group_by_all_bindings() - |> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag) - |> having( - [hashtag: hashtag], - fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject) + defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject}) + when is_list(tag_reject) do + from( + [_activity, object] in query, + where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) ) end - defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do - restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) + defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject}) + when is_binary(tag_reject) do + restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]}) end - defp restrict_hashtag_reject_any(query, _), do: query + defp restrict_embedded_tag_reject_any(query, _), do: query defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do raise_on_missing_preload() end defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do - Enum.reduce( - tags, - query, - fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end + from( + [_activity, object] in query, + where: + fragment( + """ + (SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ?) @> ? + """, + ^tags, + object.id, + ^tags + ) ) end @@ -846,18 +754,19 @@ defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do end defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do - query = - from( - [_activity, object] in query, - join: hashtag in assoc(object, :hashtags), - where: hashtag.name in ^tags - ) - - if length(tags) > 1 do - distinct(query, [activity], true) - else - query - end + from( + [_activity, object] in query, + where: + fragment( + """ + EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ? LIMIT 1) + """, + ^tags, + object.id + ) + ) end defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do @@ -866,6 +775,32 @@ defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do defp restrict_hashtag_any(query, _), do: query + defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do + raise_on_missing_preload() + end + + defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do + from( + [_activity, object] in query, + where: + fragment( + """ + NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects + ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?) + AND hashtags_objects.object_id = ? LIMIT 1) + """, + ^tags_reject, + object.id + ) + ) + end + + defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do + restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) + end + + defp restrict_hashtag_reject_any(query, _), do: query + defp raise_on_missing_preload do raise "Can't use the child object without preloading!" end @@ -1286,23 +1221,16 @@ def fetch_activities_query(recipients, opts \\ %{}) do |> exclude_invisible_actors(opts) |> exclude_visibility(opts) - hashtag_timeline_strategy = Config.improved_hashtag_timeline() - - cond do - !hashtag_timeline_strategy -> - query - |> restrict_embedded_tag(opts) - |> restrict_embedded_tag_reject(opts) - |> restrict_embedded_tag_all(opts) - - hashtag_timeline_strategy == :prefer_aggregation -> - restrict_hashtag_agg(query, opts) - - true -> - query - |> restrict_hashtag_any(opts) - |> restrict_hashtag_all(opts) - |> restrict_hashtag_reject_any(opts) + if Config.improved_hashtag_timeline() do + query + |> restrict_hashtag_any(opts) + |> restrict_hashtag_all(opts) + |> restrict_hashtag_reject_any(opts) + else + query + |> restrict_embedded_tag_any(opts) + |> restrict_embedded_tag_all(opts) + |> restrict_embedded_tag_reject_any(opts) end end diff --git a/test/pleroma/web/activity_pub/activity_pub_test.exs b/test/pleroma/web/activity_pub/activity_pub_test.exs index 0b18269cd..c2cc5a9af 100644 --- a/test/pleroma/web/activity_pub/activity_pub_test.exs +++ b/test/pleroma/web/activity_pub/activity_pub_test.exs @@ -220,7 +220,7 @@ test "it fetches the appropriate tag-restricted posts" do {:ok, status_four} = CommonAPI.post(user, %{status: ". #any1 #any2"}) {:ok, status_five} = CommonAPI.post(user, %{status: ". #any2 #any1"}) - for hashtag_timeline_strategy <- [true, :prefer_aggregation, false] do + for hashtag_timeline_strategy <- [true, false] do clear_config([:instance, :improved_hashtag_timeline], hashtag_timeline_strategy) fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"}) @@ -241,8 +241,7 @@ test "it fetches the appropriate tag-restricted posts" do tag_all: ["test", "reject"] }) - # This test would fail if JOIN with 2+ terms in "any" clause is done without DISTINCT. - # The :limit is important (w/o DISTINCT 2 records are deduped by Ecto to 1 b/c of preload). + # Testing that deduplication (if needed) is done on DB (not Ecto) level; :limit is important fetch_five = ActivityPub.fetch_activities([], %{ type: "Create",