[#3213] Performance-related stat in HashtagsTableMigrator. Reworked count/_
to indicate approximate total count for current iteration.
This commit is contained in:
parent
f0f0f2af00
commit
b830605577
1 changed files with 13 additions and 1 deletions
|
@ -80,6 +80,7 @@ def handle_info(:migrate_hashtags, state) do
|
||||||
DataMigration.update(data_migration, %{state: :running, data: persistent_data})
|
DataMigration.update(data_migration, %{state: :running, data: persistent_data})
|
||||||
|
|
||||||
update_status(:running)
|
update_status(:running)
|
||||||
|
put_stat(:started_at, NaiveDateTime.utc_now())
|
||||||
|
|
||||||
Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
|
Logger.info("Starting transferring object embedded hashtags to `hashtags` table...")
|
||||||
|
|
||||||
|
@ -118,6 +119,12 @@ def handle_info(:migrate_hashtags, state) do
|
||||||
increment_stat(:processed_count, length(object_ids))
|
increment_stat(:processed_count, length(object_ids))
|
||||||
increment_stat(:failed_count, length(failed_ids))
|
increment_stat(:failed_count, length(failed_ids))
|
||||||
|
|
||||||
|
put_stat(
|
||||||
|
:records_per_second,
|
||||||
|
state()[:processed_count] /
|
||||||
|
Enum.max([NaiveDateTime.diff(NaiveDateTime.utc_now(), state()[:started_at]), 1])
|
||||||
|
)
|
||||||
|
|
||||||
persist_stats(data_migration)
|
persist_stats(data_migration)
|
||||||
|
|
||||||
# A quick and dirty approach to controlling the load this background migration imposes
|
# A quick and dirty approach to controlling the load this background migration imposes
|
||||||
|
@ -192,13 +199,18 @@ defp transfer_object_hashtags(object) do
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@doc "Approximate count for current iteration (including processed records count)"
|
||||||
def count(force \\ false, timeout \\ :infinity) do
|
def count(force \\ false, timeout \\ :infinity) do
|
||||||
stored_count = state()[:count]
|
stored_count = state()[:count]
|
||||||
|
|
||||||
if stored_count && !force do
|
if stored_count && !force do
|
||||||
stored_count
|
stored_count
|
||||||
else
|
else
|
||||||
count = Repo.aggregate(query(), :count, :id, timeout: timeout)
|
processed_count = state()[:processed_count] || 0
|
||||||
|
max_processed_id = data_migration().data["max_processed_id"] || 0
|
||||||
|
query = where(query(), [object], object.id > ^max_processed_id)
|
||||||
|
|
||||||
|
count = Repo.aggregate(query, :count, :id, timeout: timeout) + processed_count
|
||||||
put_stat(:count, count)
|
put_stat(:count, count)
|
||||||
count
|
count
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue