forked from AkkomaGang/akkoma
[#3213] hashtags
: altered name
type to text
. hashtags_objects
: removed unused index. HashtagsTableMigrator: records_per_second calculation fix. ActivityPub: hashtags-related options normalization.
This commit is contained in:
parent
998437d4a4
commit
6531eddf36
6 changed files with 76 additions and 53 deletions
|
@ -21,10 +21,14 @@ defmodule Pleroma.Hashtag do
|
||||||
timestamps()
|
timestamps()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def normalize_name(name) do
|
||||||
|
name
|
||||||
|
|> String.downcase()
|
||||||
|
|> String.trim()
|
||||||
|
end
|
||||||
|
|
||||||
def get_by_name(name) do
|
def get_by_name(name) do
|
||||||
from(h in Hashtag)
|
Repo.get_by(Hashtag, name: normalize_name(name))
|
||||||
|> where([h], fragment("name = ?::citext", ^String.downcase(name)))
|
|
||||||
|> Repo.one()
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def get_or_create_by_name(name) when is_bitstring(name) do
|
def get_or_create_by_name(name) when is_bitstring(name) do
|
||||||
|
@ -39,7 +43,7 @@ def get_or_create_by_name(name) when is_bitstring(name) do
|
||||||
end
|
end
|
||||||
|
|
||||||
def get_or_create_by_names(names) when is_list(names) do
|
def get_or_create_by_names(names) when is_list(names) do
|
||||||
names = Enum.map(names, &String.downcase/1)
|
names = Enum.map(names, &normalize_name/1)
|
||||||
timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
|
timestamp = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
|
||||||
|
|
||||||
structs =
|
structs =
|
||||||
|
@ -53,10 +57,12 @@ def get_or_create_by_names(names) when is_list(names) do
|
||||||
try do
|
try do
|
||||||
with {:ok, %{query_op: hashtags}} <-
|
with {:ok, %{query_op: hashtags}} <-
|
||||||
Multi.new()
|
Multi.new()
|
||||||
|> Multi.insert_all(:insert_all_op, Hashtag, structs, on_conflict: :nothing)
|
|> Multi.insert_all(:insert_all_op, Hashtag, structs,
|
||||||
|
on_conflict: :nothing,
|
||||||
|
conflict_target: :name
|
||||||
|
)
|
||||||
|> Multi.run(:query_op, fn _repo, _changes ->
|
|> Multi.run(:query_op, fn _repo, _changes ->
|
||||||
{:ok,
|
{:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))}
|
||||||
Repo.all(from(ht in Hashtag, where: ht.name in fragment("?::citext[]", ^names)))}
|
|
||||||
end)
|
end)
|
||||||
|> Repo.transaction() do
|
|> Repo.transaction() do
|
||||||
{:ok, hashtags}
|
{:ok, hashtags}
|
||||||
|
@ -71,7 +77,7 @@ def get_or_create_by_names(names) when is_list(names) do
|
||||||
def changeset(%Hashtag{} = struct, params) do
|
def changeset(%Hashtag{} = struct, params) do
|
||||||
struct
|
struct
|
||||||
|> cast(params, [:name])
|
|> cast(params, [:name])
|
||||||
|> update_change(:name, &String.downcase/1)
|
|> update_change(:name, &normalize_name/1)
|
||||||
|> validate_required([:name])
|
|> validate_required([:name])
|
||||||
|> unique_constraint(:name)
|
|> unique_constraint(:name)
|
||||||
end
|
end
|
||||||
|
|
|
@ -82,6 +82,7 @@ def handle_info(:migrate_hashtags, state) do
|
||||||
State.reinit()
|
State.reinit()
|
||||||
|
|
||||||
update_status(:running)
|
update_status(:running)
|
||||||
|
put_stat(:iteration_processed_count, 0)
|
||||||
put_stat(:started_at, NaiveDateTime.utc_now())
|
put_stat(:started_at, NaiveDateTime.utc_now())
|
||||||
|
|
||||||
data_migration_id = data_migration_id()
|
data_migration_id = data_migration_id()
|
||||||
|
@ -127,6 +128,7 @@ def handle_info(:migrate_hashtags, state) do
|
||||||
max_object_id = Enum.at(object_ids, -1)
|
max_object_id = Enum.at(object_ids, -1)
|
||||||
|
|
||||||
put_stat(:max_processed_id, max_object_id)
|
put_stat(:max_processed_id, max_object_id)
|
||||||
|
increment_stat(:iteration_processed_count, length(object_ids))
|
||||||
increment_stat(:processed_count, length(object_ids))
|
increment_stat(:processed_count, length(object_ids))
|
||||||
increment_stat(:failed_count, length(failed_ids))
|
increment_stat(:failed_count, length(failed_ids))
|
||||||
increment_stat(:affected_count, chunk_affected_count)
|
increment_stat(:affected_count, chunk_affected_count)
|
||||||
|
@ -176,7 +178,7 @@ def fault_rate do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp records_per_second do
|
defp records_per_second do
|
||||||
get_stat(:processed_count, 0) / Enum.max([running_time(), 1])
|
get_stat(:iteration_processed_count, 0) / Enum.max([running_time(), 1])
|
||||||
end
|
end
|
||||||
|
|
||||||
defp running_time do
|
defp running_time do
|
||||||
|
|
|
@ -10,6 +10,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
|
||||||
alias Pleroma.Conversation
|
alias Pleroma.Conversation
|
||||||
alias Pleroma.Conversation.Participation
|
alias Pleroma.Conversation.Participation
|
||||||
alias Pleroma.Filter
|
alias Pleroma.Filter
|
||||||
|
alias Pleroma.Hashtag
|
||||||
alias Pleroma.Maps
|
alias Pleroma.Maps
|
||||||
alias Pleroma.Notification
|
alias Pleroma.Notification
|
||||||
alias Pleroma.Object
|
alias Pleroma.Object
|
||||||
|
@ -698,8 +699,6 @@ defp restrict_embedded_tag_all(_query, %{tag_all: _tag_all, skip_preload: true})
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
|
defp restrict_embedded_tag_all(query, %{tag_all: [_ | _] = tag_all}) do
|
||||||
tag_all = Enum.map(tag_all, &String.downcase/1)
|
|
||||||
|
|
||||||
from(
|
from(
|
||||||
[_activity, object] in query,
|
[_activity, object] in query,
|
||||||
where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
|
where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
|
||||||
|
@ -717,8 +716,6 @@ defp restrict_embedded_tag_any(_query, %{tag: _tag, skip_preload: true}) do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
|
defp restrict_embedded_tag_any(query, %{tag: [_ | _] = tag_any}) do
|
||||||
tag_any = Enum.map(tag_any, &String.downcase/1)
|
|
||||||
|
|
||||||
from(
|
from(
|
||||||
[_activity, object] in query,
|
[_activity, object] in query,
|
||||||
where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
|
where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag_any)
|
||||||
|
@ -736,8 +733,6 @@ defp restrict_embedded_tag_reject_any(_query, %{tag_reject: _tag_reject, skip_pr
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
|
defp restrict_embedded_tag_reject_any(query, %{tag_reject: [_ | _] = tag_reject}) do
|
||||||
tag_reject = Enum.map(tag_reject, &String.downcase/1)
|
|
||||||
|
|
||||||
from(
|
from(
|
||||||
[_activity, object] in query,
|
[_activity, object] in query,
|
||||||
where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
|
where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
|
||||||
|
@ -766,7 +761,7 @@ defp restrict_hashtag_all(query, %{tag_all: [_ | _] = tags}) do
|
||||||
fragment(
|
fragment(
|
||||||
"""
|
"""
|
||||||
(SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
|
(SELECT array_agg(hashtags.name) FROM hashtags JOIN hashtags_objects
|
||||||
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
|
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
|
||||||
AND hashtags_objects.object_id = ?) @> ?
|
AND hashtags_objects.object_id = ?) @> ?
|
||||||
""",
|
""",
|
||||||
^tags,
|
^tags,
|
||||||
|
@ -787,35 +782,13 @@ defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
|
defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
|
||||||
# TODO: refactor: debug / experimental feature
|
|
||||||
if Config.get([:database, :improved_hashtag_timeline]) == :preselect_hashtag_ids do
|
|
||||||
hashtag_ids =
|
|
||||||
from(ht in Pleroma.Hashtag,
|
|
||||||
where: fragment("name = ANY(?::citext[])", ^tags),
|
|
||||||
select: ht.id
|
|
||||||
)
|
|
||||||
|> Repo.all()
|
|
||||||
|
|
||||||
from(
|
|
||||||
[_activity, object] in query,
|
|
||||||
where:
|
|
||||||
fragment(
|
|
||||||
"""
|
|
||||||
EXISTS (
|
|
||||||
SELECT 1 FROM hashtags_objects WHERE hashtag_id = ANY(?) AND object_id = ? LIMIT 1)
|
|
||||||
""",
|
|
||||||
^hashtag_ids,
|
|
||||||
object.id
|
|
||||||
)
|
|
||||||
)
|
|
||||||
else
|
|
||||||
from(
|
from(
|
||||||
[_activity, object] in query,
|
[_activity, object] in query,
|
||||||
where:
|
where:
|
||||||
fragment(
|
fragment(
|
||||||
"""
|
"""
|
||||||
EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
|
EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
|
||||||
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
|
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
|
||||||
AND hashtags_objects.object_id = ? LIMIT 1)
|
AND hashtags_objects.object_id = ? LIMIT 1)
|
||||||
""",
|
""",
|
||||||
^tags,
|
^tags,
|
||||||
|
@ -823,7 +796,6 @@ defp restrict_hashtag_any(query, %{tag: [_ | _] = tags}) do
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
|
||||||
defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
|
defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
|
||||||
restrict_hashtag_any(query, %{tag: [tag]})
|
restrict_hashtag_any(query, %{tag: [tag]})
|
||||||
|
@ -842,7 +814,7 @@ defp restrict_hashtag_reject_any(query, %{tag_reject: [_ | _] = tags_reject}) do
|
||||||
fragment(
|
fragment(
|
||||||
"""
|
"""
|
||||||
NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
|
NOT EXISTS (SELECT 1 FROM hashtags JOIN hashtags_objects
|
||||||
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?::citext[])
|
ON hashtags_objects.hashtag_id = hashtags.id WHERE hashtags.name = ANY(?)
|
||||||
AND hashtags_objects.object_id = ? LIMIT 1)
|
AND hashtags_objects.object_id = ? LIMIT 1)
|
||||||
""",
|
""",
|
||||||
^tags_reject,
|
^tags_reject,
|
||||||
|
@ -1220,6 +1192,21 @@ defp maybe_order(query, %{order: :asc}) do
|
||||||
|
|
||||||
defp maybe_order(query, _), do: query
|
defp maybe_order(query, _), do: query
|
||||||
|
|
||||||
|
defp normalize_fetch_activities_query_opts(opts) do
|
||||||
|
Enum.reduce([:tag, :tag_all, :tag_reject], opts, fn key, opts ->
|
||||||
|
case opts[key] do
|
||||||
|
value when is_bitstring(value) ->
|
||||||
|
Map.put(opts, key, Hashtag.normalize_name(value))
|
||||||
|
|
||||||
|
value when is_list(value) ->
|
||||||
|
Map.put(opts, key, Enum.map(value, &Hashtag.normalize_name/1))
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
opts
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
defp fetch_activities_query_ap_ids_ops(opts) do
|
defp fetch_activities_query_ap_ids_ops(opts) do
|
||||||
source_user = opts[:muting_user]
|
source_user = opts[:muting_user]
|
||||||
ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
|
ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: []
|
||||||
|
@ -1243,6 +1230,8 @@ defp fetch_activities_query_ap_ids_ops(opts) do
|
||||||
end
|
end
|
||||||
|
|
||||||
def fetch_activities_query(recipients, opts \\ %{}) do
|
def fetch_activities_query(recipients, opts \\ %{}) do
|
||||||
|
opts = normalize_fetch_activities_query_opts(opts)
|
||||||
|
|
||||||
{restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
|
{restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} =
|
||||||
fetch_activities_query_ap_ids_ops(opts)
|
fetch_activities_query_ap_ids_ops(opts)
|
||||||
|
|
||||||
|
|
|
@ -7,7 +7,7 @@ def change do
|
||||||
add(:object_id, references(:objects), null: false, primary_key: true)
|
add(:object_id, references(:objects), null: false, primary_key: true)
|
||||||
end
|
end
|
||||||
|
|
||||||
create_if_not_exists(unique_index(:hashtags_objects, [:hashtag_id, :object_id]))
|
# Note: PK index: "hashtags_objects_pkey" PRIMARY KEY, btree (hashtag_id, object_id)
|
||||||
create_if_not_exists(index(:hashtags_objects, [:object_id]))
|
create_if_not_exists(index(:hashtags_objects, [:object_id]))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
defmodule Pleroma.Repo.Migrations.RemoveHashtagsObjectsDuplicateIndex do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
@moduledoc "Removes `hashtags_objects_hashtag_id_object_id_index` index (duplicate of PK index)."
|
||||||
|
|
||||||
|
def up do
|
||||||
|
drop_if_exists(unique_index(:hashtags_objects, [:hashtag_id, :object_id]))
|
||||||
|
end
|
||||||
|
|
||||||
|
def down, do: nil
|
||||||
|
end
|
|
@ -0,0 +1,15 @@
|
||||||
|
defmodule Pleroma.Repo.Migrations.ChangeHashtagsNameToText do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def up do
|
||||||
|
alter table(:hashtags) do
|
||||||
|
modify(:name, :text)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def down do
|
||||||
|
alter table(:hashtags) do
|
||||||
|
modify(:name, :citext)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in a new issue