[#3213] Hashtag-filtering functions in ActivityPub. Mix task for migrating hashtags to hashtags table.

This commit is contained in:
Ivan Tashkinov 2020-12-26 22:20:55 +03:00
parent e369b1306b
commit cbb19d0e18
3 changed files with 218 additions and 65 deletions

View file

@ -4,14 +4,18 @@
defmodule Mix.Tasks.Pleroma.Database do defmodule Mix.Tasks.Pleroma.Database do
alias Pleroma.Conversation alias Pleroma.Conversation
alias Pleroma.Hashtag
alias Pleroma.Maintenance alias Pleroma.Maintenance
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
require Logger require Logger
require Pleroma.Constants require Pleroma.Constants
import Ecto.Query import Ecto.Query
import Mix.Pleroma import Mix.Pleroma
use Mix.Task use Mix.Task
@shortdoc "A collection of database related tasks" @shortdoc "A collection of database related tasks"
@ -128,6 +132,66 @@ def run(["fix_likes_collections"]) do
|> Stream.run() |> Stream.run()
end end
def run(["transfer_hashtags"]) do
import Ecto.Query
start_pleroma()
from(
object in Object,
left_join: hashtag in assoc(object, :hashtags),
where: is_nil(hashtag.id),
where: fragment("(?)->>'tag' != '[]'", object.data),
select: %{
id: object.id,
inserted_at: object.inserted_at,
tag: fragment("(?)->>'tag'", object.data)
},
order_by: [desc: object.id]
)
|> Pleroma.Repo.chunk_stream(100, :batches)
|> Stream.each(fn objects ->
chunk_start = List.first(objects)
chunk_end = List.last(objects)
Logger.info(
"transfer_hashtags: " <>
"#{chunk_start.id} (#{chunk_start.inserted_at}) -- " <>
"#{chunk_end.id} (#{chunk_end.inserted_at})"
)
Enum.map(
objects,
fn object ->
hashtags =
object.tag
|> Jason.decode!()
|> Enum.filter(&is_bitstring(&1))
with {:ok, hashtag_records} <- Hashtag.get_or_create_by_names(hashtags) do
Repo.transaction(fn ->
for hashtag_record <- hashtag_records do
with {:error, _} <-
Ecto.Adapters.SQL.query(
Repo,
"insert into hashtags_objects(hashtag_id, object_id) values " <>
"(#{hashtag_record.id}, #{object.id});"
) do
Logger.warn(
"ERROR: could not link object #{object.id} and hashtag #{hashtag_record.id}"
)
end
end
end)
else
e -> Logger.warn("ERROR: could not process object #{object.id}: #{inspect(e)}")
end
end
)
end)
|> Stream.run()
end
def run(["vacuum", args]) do def run(["vacuum", args]) do
start_pleroma() start_pleroma()

View file

@ -660,33 +660,41 @@ defp restrict_since(query, %{since_id: since_id}) do
defp restrict_since(query, _), do: query defp restrict_since(query, _), do: query
defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do defp restrict_tag_reject(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
raise "Can't use the child object without preloading!" raise_on_missing_preload()
end end
defp restrict_tag_reject(query, %{tag_reject: [_ | _] = tag_reject}) do defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_list(tag_reject) do
from( from(
[_activity, object] in query, [_activity, object] in query,
where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject)
) )
end end
defp restrict_tag_reject(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
restrict_tag_reject(query, %{tag_reject: [tag_reject]})
end
defp restrict_tag_reject(query, _), do: query defp restrict_tag_reject(query, _), do: query
defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do defp restrict_tag_all(_query, %{tag_all: _tag_all, skip_preload: true}) do
raise "Can't use the child object without preloading!" raise_on_missing_preload()
end end
defp restrict_tag_all(query, %{tag_all: [_ | _] = tag_all}) do defp restrict_tag_all(query, %{tag_all: tag_all}) when is_list(tag_all) do
from( from(
[_activity, object] in query, [_activity, object] in query,
where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all)
) )
end end
defp restrict_tag_all(query, %{tag_all: tag}) when is_binary(tag) do
restrict_tag(query, %{tag: tag})
end
defp restrict_tag_all(query, _), do: query defp restrict_tag_all(query, _), do: query
defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do defp restrict_tag(_query, %{tag: _tag, skip_preload: true}) do
raise "Can't use the child object without preloading!" raise_on_missing_preload()
end end
defp restrict_tag(query, %{tag: tag}) when is_list(tag) do defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
@ -697,14 +705,80 @@ defp restrict_tag(query, %{tag: tag}) when is_list(tag) do
end end
defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do defp restrict_tag(query, %{tag: tag}) when is_binary(tag) do
from( restrict_tag(query, %{tag: [tag]})
[_activity, object] in query,
where: fragment("(?)->'tag' \\? (?)", object.data, ^tag)
)
end end
defp restrict_tag(query, _), do: query defp restrict_tag(query, _), do: query
defp restrict_hashtag_reject_any(_query, %{tag_reject: _tag_reject, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_hashtag_reject_any(query, %{tag_reject: tags_reject}) when is_list(tags_reject) do
if has_named_binding?(query, :thread_mute) do
from(
[activity, object, thread_mute] in query,
group_by: [activity.id, object.id, thread_mute.id]
)
else
from(
[activity, object] in query,
group_by: [activity.id, object.id]
)
end
|> join(:left, [_activity, object], hashtag in assoc(object, :hashtags), as: :hashtag)
|> having(
[hashtag: hashtag],
fragment("not(array_agg(?) && (?))", hashtag.name, ^tags_reject)
)
end
defp restrict_hashtag_reject_any(query, %{tag_reject: tag_reject}) when is_binary(tag_reject) do
restrict_hashtag_reject_any(query, %{tag_reject: [tag_reject]})
end
defp restrict_hashtag_reject_any(query, _), do: query
defp restrict_hashtag_all(_query, %{tag_all: _tag, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_hashtag_all(query, %{tag_all: tags}) when is_list(tags) do
Enum.reduce(
tags,
query,
fn tag, acc -> restrict_hashtag_any(acc, %{tag: tag}) end
)
end
defp restrict_hashtag_all(query, %{tag_all: tag}) when is_binary(tag) do
restrict_hashtag_any(query, %{tag: tag})
end
defp restrict_hashtag_all(query, _), do: query
defp restrict_hashtag_any(_query, %{tag: _tag, skip_preload: true}) do
raise_on_missing_preload()
end
defp restrict_hashtag_any(query, %{tag: tags}) when is_list(tags) do
from(
[_activity, object] in query,
join: hashtag in assoc(object, :hashtags),
where: hashtag.name in ^tags
)
end
defp restrict_hashtag_any(query, %{tag: tag}) when is_binary(tag) do
restrict_hashtag_any(query, %{tag: [tag]})
end
defp restrict_hashtag_any(query, _), do: query
defp raise_on_missing_preload do
raise "Can't use the child object without preloading!"
end
defp restrict_recipients(query, [], _user), do: query defp restrict_recipients(query, [], _user), do: query
defp restrict_recipients(query, recipients, nil) do defp restrict_recipients(query, recipients, nil) do
@ -1088,40 +1162,51 @@ def fetch_activities_query(recipients, opts \\ %{}) do
skip_thread_containment: Config.get([:instance, :skip_thread_containment]) skip_thread_containment: Config.get([:instance, :skip_thread_containment])
} }
Activity query =
|> maybe_preload_objects(opts) Activity
|> maybe_preload_bookmarks(opts) |> distinct([a], true)
|> maybe_preload_report_notes(opts) |> maybe_preload_objects(opts)
|> maybe_set_thread_muted_field(opts) |> maybe_preload_bookmarks(opts)
|> maybe_order(opts) |> maybe_preload_report_notes(opts)
|> restrict_recipients(recipients, opts[:user]) |> maybe_set_thread_muted_field(opts)
|> restrict_replies(opts) |> maybe_order(opts)
|> restrict_tag(opts) |> restrict_recipients(recipients, opts[:user])
|> restrict_tag_reject(opts) |> restrict_replies(opts)
|> restrict_tag_all(opts) |> restrict_since(opts)
|> restrict_since(opts) |> restrict_local(opts)
|> restrict_local(opts) |> restrict_actor(opts)
|> restrict_actor(opts) |> restrict_type(opts)
|> restrict_type(opts) |> restrict_state(opts)
|> restrict_state(opts) |> restrict_favorited_by(opts)
|> restrict_favorited_by(opts) |> restrict_blocked(restrict_blocked_opts)
|> restrict_blocked(restrict_blocked_opts) |> restrict_muted(restrict_muted_opts)
|> restrict_muted(restrict_muted_opts) |> restrict_filtered(opts)
|> restrict_filtered(opts) |> restrict_media(opts)
|> restrict_media(opts) |> restrict_visibility(opts)
|> restrict_visibility(opts) |> restrict_thread_visibility(opts, config)
|> restrict_thread_visibility(opts, config) |> restrict_reblogs(opts)
|> restrict_reblogs(opts) |> restrict_pinned(opts)
|> restrict_pinned(opts) |> restrict_muted_reblogs(restrict_muted_reblogs_opts)
|> restrict_muted_reblogs(restrict_muted_reblogs_opts) |> restrict_instance(opts)
|> restrict_instance(opts) |> restrict_announce_object_actor(opts)
|> restrict_announce_object_actor(opts) |> restrict_filtered(opts)
|> restrict_filtered(opts) |> Activity.restrict_deactivated_users()
|> Activity.restrict_deactivated_users() |> exclude_poll_votes(opts)
|> exclude_poll_votes(opts) |> exclude_chat_messages(opts)
|> exclude_chat_messages(opts) |> exclude_invisible_actors(opts)
|> exclude_invisible_actors(opts) |> exclude_visibility(opts)
|> exclude_visibility(opts)
if Config.get([:instance, :improved_hashtag_timeline]) do
query
|> restrict_hashtag_any(opts)
|> restrict_hashtag_all(opts)
|> restrict_hashtag_reject_any(opts)
else
query
|> restrict_tag(opts)
|> restrict_tag_reject(opts)
|> restrict_tag_all(opts)
end
end end
def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do

View file

@ -199,33 +199,37 @@ test "it fetches the appropriate tag-restricted posts" do
{:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"}) {:ok, status_two} = CommonAPI.post(user, %{status: ". #essais"})
{:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"}) {:ok, status_three} = CommonAPI.post(user, %{status: ". #test #reject"})
fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"}) for new_timeline_enabled <- [true, false] do
clear_config([:instance, :improved_hashtag_timeline], new_timeline_enabled)
fetch_two = ActivityPub.fetch_activities([], %{type: "Create", tag: ["test", "essais"]}) fetch_one = ActivityPub.fetch_activities([], %{type: "Create", tag: "test"})
fetch_three = fetch_two = ActivityPub.fetch_activities([], %{type: "Create", tag: ["test", "essais"]})
ActivityPub.fetch_activities([], %{
type: "Create",
tag: ["test", "essais"],
tag_reject: ["reject"]
})
fetch_four = fetch_three =
ActivityPub.fetch_activities([], %{ ActivityPub.fetch_activities([], %{
type: "Create", type: "Create",
tag: ["test"], tag: ["test", "essais"],
tag_all: ["test", "reject"] tag_reject: ["reject"]
}) })
[fetch_one, fetch_two, fetch_three, fetch_four] = fetch_four =
Enum.map([fetch_one, fetch_two, fetch_three, fetch_four], fn statuses -> ActivityPub.fetch_activities([], %{
Enum.map(statuses, fn s -> Repo.preload(s, object: :hashtags) end) type: "Create",
end) tag: ["test"],
tag_all: ["test", "reject"]
})
assert fetch_one == [status_one, status_three] [fetch_one, fetch_two, fetch_three, fetch_four] =
assert fetch_two == [status_one, status_two, status_three] Enum.map([fetch_one, fetch_two, fetch_three, fetch_four], fn statuses ->
assert fetch_three == [status_one, status_two] Enum.map(statuses, fn s -> Repo.preload(s, object: :hashtags) end)
assert fetch_four == [status_three] end)
assert fetch_one == [status_one, status_three]
assert fetch_two == [status_one, status_two, status_three]
assert fetch_three == [status_one, status_two]
assert fetch_four == [status_three]
end
end end
describe "insertion" do describe "insertion" do