[#3213] HashtagsTableMigrator state management refactoring & improvements (proper stats serialization etc.).

This commit is contained in:
Ivan Tashkinov 2021-02-16 23:14:15 +03:00
parent 1dac7d1462
commit 938823c730
3 changed files with 122 additions and 67 deletions

View file

@ -10,6 +10,7 @@ defmodule Pleroma.DataMigration do
alias Pleroma.Repo alias Pleroma.Repo
import Ecto.Changeset import Ecto.Changeset
import Ecto.Query
schema "data_migrations" do schema "data_migrations" do
field(:name, :string) field(:name, :string)
@ -28,14 +29,12 @@ def changeset(data_migration, params \\ %{}) do
|> unique_constraint(:name) |> unique_constraint(:name)
end end
def update(data_migration, params \\ %{}) do def update_one_by_id(id, params \\ %{}) do
data_migration with {1, _} <-
|> changeset(params) from(dm in DataMigration, where: dm.id == ^id)
|> Repo.update() |> Repo.update_all(set: params) do
:ok
end end
def update_state(data_migration, new_state) do
update(data_migration, %{state: new_state})
end end
def get_by_name(name) do def get_by_name(name) do

View file

@ -11,16 +11,16 @@ defmodule Pleroma.Migrators.HashtagsTableMigrator do
alias __MODULE__.State alias __MODULE__.State
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.DataMigration
alias Pleroma.Hashtag alias Pleroma.Hashtag
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Repo alias Pleroma.Repo
defdelegate state(), to: State, as: :get defdelegate data_migration(), to: State
defdelegate put_stat(key, value), to: State, as: :put
defdelegate increment_stat(key, increment), to: State, as: :increment
defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table defdelegate state(), to: State
defdelegate get_stat(key, value), to: State, as: :get_data_key
defdelegate put_stat(key, value), to: State, as: :put_data_key
defdelegate increment_stat(key, increment), to: State, as: :increment_data_key
@reg_name {:global, __MODULE__} @reg_name {:global, __MODULE__}
@ -45,7 +45,7 @@ def init(_) do
def handle_continue(:init_state, _state) do def handle_continue(:init_state, _state) do
{:ok, _} = State.start_link(nil) {:ok, _} = State.start_link(nil)
update_status(:init) update_status(:pending)
data_migration = data_migration() data_migration = data_migration()
manual_migrations = Config.get([:instance, :manual_data_migrations], []) manual_migrations = Config.get([:instance, :manual_data_migrations], [])
@ -55,13 +55,13 @@ def handle_continue(:init_state, _state) do
update_status(:noop) update_status(:noop)
is_nil(data_migration) -> is_nil(data_migration) ->
update_status(:halt, "Data migration does not exist.") update_status(:failed, "Data migration does not exist.")
data_migration.state == :manual or data_migration.name in manual_migrations -> data_migration.state == :manual or data_migration.name in manual_migrations ->
update_status(:noop, "Data migration is in manual execution state.") update_status(:manual, "Data migration is in manual execution state.")
data_migration.state == :complete -> data_migration.state == :complete ->
handle_success(data_migration) on_complete(data_migration)
true -> true ->
send(self(), :migrate_hashtags) send(self(), :migrate_hashtags)
@ -72,20 +72,15 @@ def handle_continue(:init_state, _state) do
@impl true @impl true
def handle_info(:migrate_hashtags, state) do def handle_info(:migrate_hashtags, state) do
State.clear() State.reinit()
update_status(:running) update_status(:running)
put_stat(:started_at, NaiveDateTime.utc_now()) put_stat(:started_at, NaiveDateTime.utc_now())
data_migration = data_migration() %{id: data_migration_id} = data_migration()
persistent_data = Map.take(data_migration.data, ["max_processed_id"]) max_processed_id = get_stat(:max_processed_id, 0)
{:ok, data_migration} = Logger.info("Transferring embedded hashtags to `hashtags` (from oid: #{max_processed_id})...")
DataMigration.update(data_migration, %{state: :running, data: persistent_data})
Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
max_processed_id = data_migration.data["max_processed_id"] || 0
query() query()
|> where([object], object.id > ^max_processed_id) |> where([object], object.id > ^max_processed_id)
@ -104,7 +99,7 @@ def handle_info(:migrate_hashtags, state) do
Repo.query( Repo.query(
"INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <> "INSERT INTO data_migration_failed_ids(data_migration_id, record_id) " <>
"VALUES ($1, $2) ON CONFLICT DO NOTHING;", "VALUES ($1, $2) ON CONFLICT DO NOTHING;",
[data_migration.id, failed_id] [data_migration_id, failed_id]
) )
end end
@ -112,7 +107,7 @@ def handle_info(:migrate_hashtags, state) do
Repo.query( Repo.query(
"DELETE FROM data_migration_failed_ids " <> "DELETE FROM data_migration_failed_ids " <>
"WHERE data_migration_id = $1 AND record_id = ANY($2)", "WHERE data_migration_id = $1 AND record_id = ANY($2)",
[data_migration.id, object_ids -- failed_ids] [data_migration_id, object_ids -- failed_ids]
) )
max_object_id = Enum.at(object_ids, -1) max_object_id = Enum.at(object_ids, -1)
@ -120,14 +115,8 @@ def handle_info(:migrate_hashtags, state) do
put_stat(:max_processed_id, max_object_id) put_stat(:max_processed_id, max_object_id)
increment_stat(:processed_count, length(object_ids)) increment_stat(:processed_count, length(object_ids))
increment_stat(:failed_count, length(failed_ids)) increment_stat(:failed_count, length(failed_ids))
put_stat(:records_per_second, records_per_second())
put_stat( _ = State.persist_to_db()
:records_per_second,
state()[:processed_count] /
Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1])
)
persist_stats(data_migration)
# A quick and dirty approach to controlling the load this background migration imposes # A quick and dirty approach to controlling the load this background migration imposes
sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0) sleep_interval = Config.get([:populate_hashtags_table, :sleep_interval_ms], 0)
@ -135,22 +124,25 @@ def handle_info(:migrate_hashtags, state) do
end) end)
|> Stream.run() |> Stream.run()
with 0 <- failures_count(data_migration.id) do with 0 <- failures_count(data_migration_id) do
_ = delete_non_create_activities_hashtags() _ = delete_non_create_activities_hashtags()
set_complete()
{:ok, data_migration} = DataMigration.update_state(data_migration, :complete)
handle_success(data_migration)
else else
_ -> _ ->
_ = DataMigration.update_state(data_migration, :failed)
update_status(:failed, "Please check data_migration_failed_ids records.") update_status(:failed, "Please check data_migration_failed_ids records.")
end end
{:noreply, state} {:noreply, state}
end end
defp records_per_second do
get_stat(:processed_count, 0) / Enum.max([running_time(), 1])
end
defp running_time do
NaiveDateTime.diff(NaiveDateTime.utc_now(), get_stat(:started_at, NaiveDateTime.utc_now()))
end
@hashtags_objects_cleanup_query """ @hashtags_objects_cleanup_query """
DELETE FROM hashtags_objects WHERE object_id IN DELETE FROM hashtags_objects WHERE object_id IN
(SELECT DISTINCT objects.id FROM objects (SELECT DISTINCT objects.id FROM objects
@ -169,6 +161,10 @@ def handle_info(:migrate_hashtags, state) do
WHERE hashtags_objects.hashtag_id IS NULL); WHERE hashtags_objects.hashtag_id IS NULL);
""" """
@doc """
Deletes `hashtags_objects` for legacy objects not asoociated with Create activity.
Also deletes unreferenced `hashtags` records (might occur after deletion of `hashtags_objects`).
"""
def delete_non_create_activities_hashtags do def delete_non_create_activities_hashtags do
{:ok, %{num_rows: hashtags_objects_count}} = {:ok, %{num_rows: hashtags_objects_count}} =
Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity) Repo.query(@hashtags_objects_cleanup_query, [], timeout: :infinity)
@ -256,14 +252,7 @@ def count(force \\ false, timeout \\ :infinity) do
end end
end end
defp persist_stats(data_migration) do defp on_complete(data_migration) do
runner_state = Map.drop(state(), [:status])
_ = DataMigration.update(data_migration, %{data: runner_state})
end
defp handle_success(data_migration) do
update_status(:complete)
cond do cond do
data_migration.feature_lock -> data_migration.feature_lock ->
:noop :noop
@ -321,18 +310,18 @@ def force_continue do
end end
def force_restart do def force_restart do
{:ok, _} = DataMigration.update(data_migration(), %{state: :pending, data: %{}}) :ok = State.reset()
force_continue() force_continue()
end end
def force_complete do def set_complete do
{:ok, data_migration} = DataMigration.update_state(data_migration(), :complete) update_status(:complete)
_ = State.persist_to_db()
handle_success(data_migration) on_complete(data_migration())
end end
defp update_status(status, message \\ nil) do defp update_status(status, message \\ nil) do
put_stat(:status, status) put_stat(:state, status)
put_stat(:message, message) put_stat(:message, message)
end end
end end

View file

@ -5,31 +5,98 @@
defmodule Pleroma.Migrators.HashtagsTableMigrator.State do defmodule Pleroma.Migrators.HashtagsTableMigrator.State do
use Agent use Agent
@init_state %{} alias Pleroma.DataMigration
defdelegate data_migration(), to: DataMigration, as: :populate_hashtags_table
@reg_name {:global, __MODULE__} @reg_name {:global, __MODULE__}
def start_link(_) do def start_link(_) do
Agent.start_link(fn -> @init_state end, name: @reg_name) Agent.start_link(fn -> load_state_from_db() end, name: @reg_name)
end end
def clear do defp load_state_from_db do
Agent.update(@reg_name, fn _state -> @init_state end) data_migration = data_migration()
data =
if data_migration do
Map.new(data_migration.data, fn {k, v} -> {String.to_atom(k), v} end)
else
%{}
end end
def get do %{
data_migration_id: data_migration && data_migration.id,
data: data
}
end
def persist_to_db do
%{data_migration_id: data_migration_id, data: data} = state()
if data_migration_id do
DataMigration.update_one_by_id(data_migration_id, data: data)
else
{:error, :nil_data_migration_id}
end
end
def reset do
%{data_migration_id: data_migration_id} = state()
with false <- is_nil(data_migration_id),
:ok <-
DataMigration.update_one_by_id(data_migration_id,
state: :pending,
data: %{}
) do
reinit()
else
true -> {:error, :nil_data_migration_id}
e -> e
end
end
def reinit do
Agent.update(@reg_name, fn _state -> load_state_from_db() end)
end
def state do
Agent.get(@reg_name, & &1) Agent.get(@reg_name, & &1)
end end
def put(key, value) do def get_data_key(key, default \\ nil) do
get_in(state(), [:data, key]) || default
end
def put_data_key(key, value) do
_ = persist_non_data_change(key, value)
Agent.update(@reg_name, fn state -> Agent.update(@reg_name, fn state ->
Map.put(state, key, value) put_in(state, [:data, key], value)
end) end)
end end
def increment(key, increment \\ 1) do def increment_data_key(key, increment \\ 1) do
Agent.update(@reg_name, fn state -> Agent.update(@reg_name, fn state ->
updated_value = (state[key] || 0) + increment initial_value = get_in(state, [:data, key]) || 0
Map.put(state, key, updated_value) updated_value = initial_value + increment
put_in(state, [:data, key], updated_value)
end) end)
end end
defp persist_non_data_change(:state, value) do
with true <- get_data_key(:state) != value,
true <- value in Pleroma.DataMigration.State.__valid_values__(),
%{data_migration_id: data_migration_id} when not is_nil(data_migration_id) <- state() do
DataMigration.update_one_by_id(data_migration_id, state: value)
else
false -> :ok
_ -> {:error, :nil_data_migration_id}
end
end
defp persist_non_data_change(_, _) do
nil
end
end end