forked from AkkomaGang/akkoma
[#3213] Refactoring of HashtagsTableMigrator. Hashtag timeline performance optimization (auto switch to non-aggregate join strategy when efficient).
This commit is contained in:
parent
f5f267fa76
commit
48b399cedb
6 changed files with 86 additions and 53 deletions
|
@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
|
||||||
- Search: When using Postgres 11+, Pleroma will use the `websearch_to_tsvector` function to parse search queries.
|
- Search: When using Postgres 11+, Pleroma will use the `websearch_to_tsvector` function to parse search queries.
|
||||||
- Emoji: Support the full Unicode 13.1 set of Emoji for reactions, plus regional indicators.
|
- Emoji: Support the full Unicode 13.1 set of Emoji for reactions, plus regional indicators.
|
||||||
- Admin API: Reports now ordered by newest
|
- Admin API: Reports now ordered by newest
|
||||||
|
- Extracted object hashtags into separate table in order to improve hashtag timeline performance (via background migration in `Pleroma.Migrators.HashtagsTableMigrator`).
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
|
|
|
@ -941,6 +941,12 @@
|
||||||
key: :show_reactions,
|
key: :show_reactions,
|
||||||
type: :boolean,
|
type: :boolean,
|
||||||
description: "Let favourites and emoji reactions be viewed through the API."
|
description: "Let favourites and emoji reactions be viewed through the API."
|
||||||
|
},
|
||||||
|
%{
|
||||||
|
key: :improved_hashtag_timeline,
|
||||||
|
type: :keyword,
|
||||||
|
description:
|
||||||
|
"If `true` / `:prefer_aggregation` / `:avoid_aggregation`, hashtags table and selected strategy will be used for hashtags timeline. When `false`, object-embedded hashtags will be used (slower). Is auto-set to `true` (unless overridden) when HashtagsTableMigrator completes."
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
|
@ -45,25 +45,23 @@ def init(_) do
|
||||||
def handle_continue(:init_state, _state) do
|
def handle_continue(:init_state, _state) do
|
||||||
{:ok, _} = State.start_link(nil)
|
{:ok, _} = State.start_link(nil)
|
||||||
|
|
||||||
put_stat(:status, :init)
|
update_status(:init)
|
||||||
|
|
||||||
dm = data_migration()
|
data_migration = data_migration()
|
||||||
manual_migrations = Config.get([:instance, :manual_data_migrations], [])
|
manual_migrations = Config.get([:instance, :manual_data_migrations], [])
|
||||||
|
|
||||||
cond do
|
cond do
|
||||||
Config.get(:env) == :test ->
|
Config.get(:env) == :test ->
|
||||||
put_stat(:status, :noop)
|
update_status(:noop)
|
||||||
|
|
||||||
is_nil(dm) ->
|
is_nil(data_migration) ->
|
||||||
put_stat(:status, :halt)
|
update_status(:halt, "Data migration does not exist.")
|
||||||
put_stat(:message, "Data migration does not exist.")
|
|
||||||
|
|
||||||
dm.state == :manual or dm.name in manual_migrations ->
|
data_migration.state == :manual or data_migration.name in manual_migrations ->
|
||||||
put_stat(:status, :noop)
|
update_status(:noop, "Data migration is in manual execution state.")
|
||||||
put_stat(:message, "Data migration is in manual execution state.")
|
|
||||||
|
|
||||||
dm.state == :complete ->
|
data_migration.state == :complete ->
|
||||||
handle_success()
|
handle_success(data_migration)
|
||||||
|
|
||||||
true ->
|
true ->
|
||||||
send(self(), :migrate_hashtags)
|
send(self(), :migrate_hashtags)
|
||||||
|
@ -81,7 +79,7 @@ def handle_info(:migrate_hashtags, state) do
|
||||||
{:ok, data_migration} =
|
{:ok, data_migration} =
|
||||||
DataMigration.update(data_migration, %{state: :running, data: persistent_data})
|
DataMigration.update(data_migration, %{state: :running, data: persistent_data})
|
||||||
|
|
||||||
put_stat(:status, :running)
|
update_status(:running)
|
||||||
|
|
||||||
Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
|
Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
|
||||||
|
|
||||||
|
@ -146,13 +144,12 @@ def handle_info(:migrate_hashtags, state) do
|
||||||
) do
|
) do
|
||||||
_ = DataMigration.update_state(data_migration, :complete)
|
_ = DataMigration.update_state(data_migration, :complete)
|
||||||
|
|
||||||
handle_success()
|
handle_success(data_migration)
|
||||||
else
|
else
|
||||||
_ ->
|
_ ->
|
||||||
_ = DataMigration.update_state(data_migration, :failed)
|
_ = DataMigration.update_state(data_migration, :failed)
|
||||||
|
|
||||||
put_stat(:status, :failed)
|
update_status(:failed, "Please check data_migration_failed_ids records.")
|
||||||
put_stat(:message, "Please check data_migration_failed_ids records.")
|
|
||||||
end
|
end
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
|
@ -196,16 +193,25 @@ defp persist_stats(data_migration) do
|
||||||
_ = DataMigration.update(data_migration, %{data: runner_state})
|
_ = DataMigration.update(data_migration, %{data: runner_state})
|
||||||
end
|
end
|
||||||
|
|
||||||
defp handle_success do
|
defp handle_success(data_migration) do
|
||||||
put_stat(:status, :complete)
|
update_status(:complete)
|
||||||
|
|
||||||
unless Config.improved_hashtag_timeline() do
|
unless data_migration.feature_lock || Config.improved_hashtag_timeline() do
|
||||||
Config.put(Config.improved_hashtag_timeline_path(), true)
|
Config.put(Config.improved_hashtag_timeline_path(), true)
|
||||||
end
|
end
|
||||||
|
|
||||||
:ok
|
:ok
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def failed_objects_query do
|
||||||
|
from(o in Object)
|
||||||
|
|> join(:inner, [o], dmf in fragment("SELECT * FROM data_migration_failed_ids"),
|
||||||
|
on: dmf.record_id == o.id
|
||||||
|
)
|
||||||
|
|> where([_o, dmf], dmf.data_migration_id == ^data_migration().id)
|
||||||
|
|> order_by([o], asc: o.id)
|
||||||
|
end
|
||||||
|
|
||||||
def force_continue do
|
def force_continue do
|
||||||
send(whereis(), :migrate_hashtags)
|
send(whereis(), :migrate_hashtags)
|
||||||
end
|
end
|
||||||
|
@ -214,4 +220,9 @@ def force_restart do
|
||||||
{:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}})
|
{:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}})
|
||||||
force_continue()
|
force_continue()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp update_status(status, message \\ nil) do
|
||||||
|
put_stat(:status, status)
|
||||||
|
put_stat(:message, message)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -2,23 +2,24 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
|
||||||
use Agent
|
use Agent
|
||||||
|
|
||||||
@init_state %{}
|
@init_state %{}
|
||||||
|
@reg_name {:global, __MODULE__}
|
||||||
|
|
||||||
def start_link(_) do
|
def start_link(_) do
|
||||||
Agent.start_link(fn -> @init_state end, name: __MODULE__)
|
Agent.start_link(fn -> @init_state end, name: @reg_name)
|
||||||
end
|
end
|
||||||
|
|
||||||
def get do
|
def get do
|
||||||
Agent.get(__MODULE__, & &1)
|
Agent.get(@reg_name, & &1)
|
||||||
end
|
end
|
||||||
|
|
||||||
def put(key, value) do
|
def put(key, value) do
|
||||||
Agent.update(__MODULE__, fn state ->
|
Agent.update(@reg_name, fn state ->
|
||||||
Map.put(state, key, value)
|
Map.put(state, key, value)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
def increment(key, increment \\ 1) do
|
def increment(key, increment \\ 1) do
|
||||||
Agent.update(__MODULE__, fn state ->
|
Agent.update(@reg_name, fn state ->
|
||||||
updated_value = (state[key] || 0) + increment
|
updated_value = (state[key] || 0) + increment
|
||||||
Map.put(state, key, updated_value)
|
Map.put(state, key, updated_value)
|
||||||
end)
|
end)
|
||||||
|
|
|
@ -669,63 +669,66 @@ defp restrict_since(query, %{since_id: since_id}) do
|
||||||
|
|
||||||
defp restrict_since(query, _), do: query
|
defp restrict_since(query, _), do: query
|
||||||
|
|
||||||
defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
|
defp restrict_embedded_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
|
||||||
raise_on_missing_preload()
|
raise_on_missing_preload()
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
|
defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
|
||||||
from(
|
from(
|
||||||
[_activity, object] in query,
|
[_activity, object] in query,
|
||||||
where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
|
where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
|
defp restrict_embedded_tag_reject(query, %{tag_reject: tag_reject})
|
||||||
restrict_tag_reject(query, %{tag_reject: [tag_reject]})
|
when is_binary(tag_reject) do
|
||||||
|
restrict_embedded_tag_reject(query, %{tag_reject: [tag_reject]})
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_tag_reject(query, _), do: query
|
defp restrict_embedded_tag_reject(query, _), do: query
|
||||||
|
|
||||||
defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
|
defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
|
||||||
raise_on_missing_preload()
|
raise_on_missing_preload()
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
|
defp restrict_embedded_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
|
||||||
from(
|
from(
|
||||||
[_activity, object] in query,
|
[_activity, object] in query,
|
||||||
where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
|
where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
|
defp restrict_embedded_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
|
||||||
restrict_tag(query, %{tag: tag})
|
restrict_embedded_tag(query, %{tag: tag})
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_tag_all(query, _), do: query
|
defp restrict_embedded_tag_all(query, _), do: query
|
||||||
|
|
||||||
defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
|
defp restrict_embedded_tag(_query, %{tag: _tag, skip_preload: true}) do
|
||||||
raise_on_missing_preload()
|
raise_on_missing_preload()
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
|
defp restrict_embedded_tag(query, %{tag: tag}) when is_list(tag) do
|
||||||
from(
|
from(
|
||||||
[_activity, object] in query,
|
[_activity, object] in query,
|
||||||
where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
|
where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag)
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
|
defp restrict_embedded_tag(query, %{tag: tag}) when is_binary(tag) do
|
||||||
restrict_tag(query, %{tag: [tag]})
|
restrict_embedded_tag(query, %{tag: [tag]})
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_tag(query, _), do: query
|
defp restrict_embedded_tag(query, _), do: query
|
||||||
|
|
||||||
defp restrict_hashtag(query, opts) do
|
defp hashtag_conditions(opts) do
|
||||||
[tag_any, tag_all, tag_reject] =
|
|
||||||
[:tag, :tag_all, :tag_reject]
|
[:tag, :tag_all, :tag_reject]
|
||||||
|> Enum.map(&opts[&1])
|
|> Enum.map(&opts[&1])
|
||||||
|> Enum.map(&List.wrap(&1))
|
|> Enum.map(&List.wrap(&1))
|
||||||
|
end
|
||||||
|
|
||||||
|
defp restrict_hashtag_agg(query, opts) do
|
||||||
|
[tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
|
||||||
has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
|
has_conditions = Enum.any?([tag_any, tag_all, tag_reject], &Enum.any?(&1))
|
||||||
|
|
||||||
cond do
|
cond do
|
||||||
|
@ -1275,15 +1278,19 @@ def fetch_activities_query(recipients, opts \\ %{}) do
|
||||||
|> exclude_invisible_actors(opts)
|
|> exclude_invisible_actors(opts)
|
||||||
|> exclude_visibility(opts)
|
|> exclude_visibility(opts)
|
||||||
|
|
||||||
cond do
|
hashtag_timeline_strategy = Config.improved_hashtag_timeline()
|
||||||
Config.object_embedded_hashtags?() ->
|
|
||||||
query
|
|
||||||
|> restrict_tag(opts)
|
|
||||||
|> restrict_tag_reject(opts)
|
|
||||||
|> restrict_tag_all(opts)
|
|
||||||
|
|
||||||
# TODO: benchmark (initial approach preferring non-aggregate ops when possible)
|
cond do
|
||||||
Config.improved_hashtag_timeline() == :join ->
|
!hashtag_timeline_strategy ->
|
||||||
|
query
|
||||||
|
|> restrict_embedded_tag(opts)
|
||||||
|
|> restrict_embedded_tag_reject(opts)
|
||||||
|
|> restrict_embedded_tag_all(opts)
|
||||||
|
|
||||||
|
hashtag_timeline_strategy == :prefer_aggregation ->
|
||||||
|
restrict_hashtag_agg(query, opts)
|
||||||
|
|
||||||
|
hashtag_timeline_strategy == :avoid_aggregation or avoid_hashtags_aggregation?(opts) ->
|
||||||
query
|
query
|
||||||
|> distinct([activity], true)
|
|> distinct([activity], true)
|
||||||
|> restrict_hashtag_any(opts)
|
|> restrict_hashtag_any(opts)
|
||||||
|
@ -1291,10 +1298,17 @@ def fetch_activities_query(recipients, opts \\ %{}) do
|
||||||
|> restrict_hashtag_reject_any(opts)
|
|> restrict_hashtag_reject_any(opts)
|
||||||
|
|
||||||
true ->
|
true ->
|
||||||
restrict_hashtag(query, opts)
|
restrict_hashtag_agg(query, opts)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp avoid_hashtags_aggregation?(opts) do
|
||||||
|
[tag_any, tag_all, tag_reject] = hashtag_conditions(opts)
|
||||||
|
|
||||||
|
joins_count = length(tag_all) + if Enum.any?(tag_any), do: 1, else: 0
|
||||||
|
Enum.empty?(tag_reject) and joins_count <= 2
|
||||||
|
end
|
||||||
|
|
||||||
def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
|
def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do
|
||||||
list_memberships = Pleroma.List.memberships(opts[:user])
|
list_memberships = Pleroma.List.memberships(opts[:user])
|
||||||
|
|
||||||
|
|
|
@ -217,8 +217,8 @@ test "it fetches the appropriate tag-restricted posts" do
|
||||||
{:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"})
|
{:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"})
|
||||||
{:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"})
|
{:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"})
|
||||||
|
|
||||||
for new_timeline_enabled <- [true, false] do
|
for hashtag_timeline_strategy <- [true, :prefer_aggregation, :avoid_aggregation, false] do
|
||||||
clear_config([:instance, :improved_hashtag_timeline], new_timeline_enabled)
|
clear_config([:instance, :improved_hashtag_timeline], hashtag_timeline_strategy)
|
||||||
|
|
||||||
fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})
|
fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue