[#3213] ActivityPub: implemented subqueries-based hashtags filtering, removed aggregation-based hashtags filtering.

This commit is contained in:
Ivan Tashkinov 2021-01-31 20:37:33 +03:00
parent 9948ff3356
commit 6fd4163ab6
3 changed files with 81 additions and 154 deletions

View file

@ -940,7 +940,7 @@
key: :improved_hashtag_timeline, key: :improved_hashtag_timeline,
type: :keyword, type: :keyword,
description: description:
"If `true` / `:prefer_aggregation`, hashtags table and selected strategy will be used for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes." "If `true`, hashtags will be fetched from `hashtags` table for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes."
} }
] ]
}, },

View file

@ -669,24 +669,6 @@ defp restrict_since(query, %{since_id: since_id}) do
defp restrict_since(query, _), do: query defp restrict_since(query, _), do: query
defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
from(
[_activity, object] in query,
where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
)
end
defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject})
when is_binary(tag_reject) do
restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]})
end
defp restrict_embedded_tag_reject(query, _), do: query
defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
raise_on_missing_preload() raise_on_missing_preload()
end end
@ -699,139 +681,65 @@ defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all)
end end
defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
restrict_embedded_tag(query, %{tag: tag}) restrict_embedded_tag_any(query, %{tag: tag})
end end
defp restrict_embedded_tag_all(query, _), do: query defp restrict_embedded_tag_all(query, _), do: query
defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
raise_on_missing_preload() raise_on_missing_preload()
end end
defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do defp restrict_embedded_tag_any(query, %{tag: tag}) when is_list(tag) do
from( from(
[_activity, object] in query, [_activity, object] in query,
where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag) where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
) )
end end
defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do defp restrict_embedded_tag_any(query, %{tag: tag}) when is_binary(tag) do
restrict_embedded_tag(query, %{tag: [tag]}) restrict_embedded_tag_any(query, %{tag: [tag]})
end end
defp restrict_embedded_tag(query, _), do: query defp restrict_embedded_tag_any(query, _), do: query
defp hashtag_conditions(opts) do defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
[:tag, :tag_all, :tag_reject]
|> Enum.map(&opts[&1])
|> Enum.map(&List.wrap(&1))
end
# Note: times out on larger instances (with default timeout), intended for complex queries
defp restrict_hashtag_agg(query, opts) do
[tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
cond do
!has_conditions ->
query
opts[:skip_preload] ->
raise_on_missing_preload()
true ->
query
|> group_by_all_bindings()
|> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
|> maybe_restrict_hashtag_any(tag_any)
|> maybe_restrict_hashtag_all(tag_all)
|> maybe_restrict_hashtag_reject_any(tag_reject)
end
end
# Groups by all bindings to allow aggregation on hashtags
defp group_by_all_bindings(query) do
# Expecting named bindings: :object, :bookmark, :thread_mute, :report_note
cond do
Enum.count(query.aliases) == 4 ->
from([a, o, b3, b4, b5] in query, group_by: [a.id, o.id, b3.id, b4.id, b5.id])
Enum.count(query.aliases) == 3 ->
from([a, o, b3, b4] in query, group_by: [a.id, o.id, b3.id, b4.id])
Enum.count(query.aliases) == 2 ->
from([a, o, b3] in query, group_by: [a.id, o.id, b3.id])
true ->
from([a, o] in query, group_by: [a.id, o.id])
end
end
defp maybe_restrict_hashtag_any(query, []) do
query
end
defp maybe_restrict_hashtag_any(query, tags) do
having(
query,
[hashtag: hashtag],
fragment("array_agg(?) && (?)", hashtag.name, ^tags)
)
end
defp maybe_restrict_hashtag_all(query, []) do
query
end
defp maybe_restrict_hashtag_all(query, tags) do
having(
query,
[hashtag: hashtag],
fragment("array_agg(?) @> (?)", hashtag.name, ^tags)
)
end
defp maybe_restrict_hashtag_reject_any(query, []) do
query
end
defp maybe_restrict_hashtag_reject_any(query, tags) do
having(
query,
[hashtag: hashtag],
fragment("not(array_agg(?) && (?))", hashtag.name, ^tags)
)
end
defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
raise_on_missing_preload() raise_on_missing_preload()
end end
defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
query when is_list(tag_reject) do
|> group_by_all_bindings() from(
|> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag) [_activity, object] in query,
|> having( where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
[hashtag: hashtag],
fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
) )
end end
defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do defp restrict_embedded_tag_reject_any(query, %{tag_reject: tag_reject})
restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]}) when is_binary(tag_reject) do
restrict_embedded_tag_reject_any(query, %{tag_reject: [tag_reject]})
end end
defp restrict_hashtag_reject_any(query, _), do: query defp restrict_embedded_tag_reject_any(query, _), do: query
defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
raise_on_missing_preload() raise_on_missing_preload()
end end
defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
Enum.reduce( from(
tags, [_activity, object] in query,
query, where:
fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end fragment(
"""
(SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
AND hashtags_objects.object_id = ?) @> ?
""",
^tags,
object.id,
^tags
)
) )
end end
@ -846,18 +754,19 @@ defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
end end
defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
query =
from( from(
[_activity, object] in query, [_activity, object] in query,
join: hashtag in assoc(object, :hashtags), where:
where: hashtag.name in ^tags fragment(
"""
EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
AND hashtags_objects.object_id = ? LIMIT 1)
""",
^tags,
object.id
)
) )
if length(tags) > 1 do
distinct(query, [activity], true)
else
query
end
end end
defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
@ -866,6 +775,32 @@ defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
defp restrict_hashtag_any(query, _), do: query defp restrict_hashtag_any(query, _), do: query
defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
from(
[_activity, object] in query,
where:
fragment(
"""
NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
AND hashtags_objects.object_id = ? LIMIT 1)
""",
^tags_reject,
object.id
)
)
end
defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
end
defp restrict_hashtag_reject_any(query, _), do: query
defp raise_on_missing_preload do defp raise_on_missing_preload do
raise "Can't use the child object without preloading!" raise "Can't use the child object without preloading!"
end end
@ -1286,23 +1221,16 @@ def fetch_activities_query(recipients, opts \\ %{}) do
|> exclude_invisible_actors(opts) |> exclude_invisible_actors(opts)
|> exclude_visibility(opts) |> exclude_visibility(opts)
hashtag_timeline_strategy = Config.improved_hashtag_timeline() if Config.improved_hashtag_timeline() do
cond do
!hashtag_timeline_strategy ->
query
|> restrict_embedded_tag(opts)
|> restrict_embedded_tag_reject(opts)
|> restrict_embedded_tag_all(opts)
hashtag_timeline_strategy == :prefer_aggregation ->
restrict_hashtag_agg(query, opts)
true ->
query query
|> restrict_hashtag_any(opts) |> restrict_hashtag_any(opts)
|> restrict_hashtag_all(opts) |> restrict_hashtag_all(opts)
|> restrict_hashtag_reject_any(opts) |> restrict_hashtag_reject_any(opts)
else
query
|> restrict_embedded_tag_any(opts)
|> restrict_embedded_tag_all(opts)
|> restrict_embedded_tag_reject_any(opts)
end end
end end

View file

@ -220,7 +220,7 @@ test "it fetches the appropriate tag-restricted posts" do
{:ok, status_four} = CommonAPI.post(user, %{status: ". #any1 #any2"}) {:ok, status_four} = CommonAPI.post(user, %{status: ". #any1 #any2"})
{:ok, status_five} = CommonAPI.post(user, %{status: ". #any2 #any1"}) {:ok, status_five} = CommonAPI.post(user, %{status: ". #any2 #any1"})
for hashtag_timeline_strategy <- [true, :prefer_aggregation, false] do for hashtag_timeline_strategy <- [true, false] do
clear_config([:instance, :improved_hashtag_timeline], hashtag_timeline_strategy) clear_config([:instance, :improved_hashtag_timeline], hashtag_timeline_strategy)
fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"}) fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})
@ -241,8 +241,7 @@ test "it fetches the appropriate tag-restricted posts" do
tag_all: ["test", "reject"] tag_all: ["test", "reject"]
}) })
# This test would fail if JOIN with 2+ terms in "any" clause is done without DISTINCT. # Testing that deduplication (if needed) is done on DB (not Ecto) level; :limit is important
# The :limit is important (w/o DISTINCT 2 records are deduped by Ecto to 1 b/c of preload).
fetch_five = fetch_five =
ActivityPub.fetch_activities([], %{ ActivityPub.fetch_activities([], %{
type: "Create", type: "Create",