Cleanup Pleroma.Activity and Pleroma.Web.ActivityPub.Utils

This commit is contained in:
Egor Kislitsyn 2019-09-03 21:58:30 +07:00
parent 5ca643b80d
commit 39dc9b470c
5 changed files with 128 additions and 268 deletions

View file

@ -6,6 +6,7 @@ defmodule Pleroma.Activity do
use Ecto.Schema use Ecto.Schema
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Activity.Queries
alias Pleroma.ActivityExpiration alias Pleroma.ActivityExpiration
alias Pleroma.Bookmark alias Pleroma.Bookmark
alias Pleroma.Notification alias Pleroma.Notification
@ -65,8 +66,8 @@ defmodule Pleroma.Activity do
timestamps() timestamps()
end end
def with_joined_object(query) do def with_joined_object(query, join_type \\ :inner) do
join(query, :inner, [activity], o in Object, join(query, join_type, [activity], o in Object,
on: on:
fragment( fragment(
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')", "(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
@ -78,10 +79,10 @@ def with_joined_object(query) do
) )
end end
def with_preloaded_object(query) do def with_preloaded_object(query, join_type \\ :inner) do
query query
|> has_named_binding?(:object) |> has_named_binding?(:object)
|> if(do: query, else: with_joined_object(query)) |> if(do: query, else: with_joined_object(query, join_type))
|> preload([activity, object: object], object: object) |> preload([activity, object: object], object: object)
end end
@ -107,12 +108,9 @@ def with_set_thread_muted_field(query, %User{} = user) do
def with_set_thread_muted_field(query, _), do: query def with_set_thread_muted_field(query, _), do: query
def get_by_ap_id(ap_id) do def get_by_ap_id(ap_id) do
Repo.one( ap_id
from( |> Queries.by_ap_id()
activity in Activity, |> Repo.one()
where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
)
)
end end
def get_bookmark(%Activity{} = activity, %User{} = user) do def get_bookmark(%Activity{} = activity, %User{} = user) do
@ -133,21 +131,10 @@ def change(struct, params \\ %{}) do
end end
def get_by_ap_id_with_object(ap_id) do def get_by_ap_id_with_object(ap_id) do
Repo.one( ap_id
from( |> Queries.by_ap_id()
activity in Activity, |> with_preloaded_object(:left)
where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id)), |> Repo.one()
left_join: o in Object,
on:
fragment(
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
o.data,
activity.data,
activity.data
),
preload: [object: o]
)
)
end end
def get_by_id(id) do def get_by_id(id) do
@ -158,18 +145,9 @@ def get_by_id(id) do
end end
def get_by_id_with_object(id) do def get_by_id_with_object(id) do
from(activity in Activity, Activity
where: activity.id == ^id, |> where(id: ^id)
inner_join: o in Object, |> with_preloaded_object()
on:
fragment(
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
o.data,
activity.data,
activity.data
),
preload: [object: o]
)
|> Repo.one() |> Repo.one()
end end
@ -180,51 +158,21 @@ def all_by_ids_with_object(ids) do
|> Repo.all() |> Repo.all()
end end
def by_object_ap_id(ap_id) do @doc """
from( Accepts `ap_id` or list of `ap_id`.
activity in Activity, Returns a query.
where: """
fragment( @spec create_by_object_ap_id(String.t() | [String.t()]) :: Ecto.Queryable.t()
"coalesce((?)->'object'->>'id', (?)->>'object') = ?", def create_by_object_ap_id(ap_id) do
activity.data, ap_id
activity.data, |> Queries.by_object_id()
^to_string(ap_id) |> Queries.by_type("Create")
)
)
end end
def create_by_object_ap_id(ap_ids) when is_list(ap_ids) do
from(
activity in Activity,
where:
fragment(
"coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
activity.data,
activity.data,
^ap_ids
),
where: fragment("(?)->>'type' = 'Create'", activity.data)
)
end
def create_by_object_ap_id(ap_id) when is_binary(ap_id) do
from(
activity in Activity,
where:
fragment(
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
activity.data,
activity.data,
^to_string(ap_id)
),
where: fragment("(?)->>'type' = 'Create'", activity.data)
)
end
def create_by_object_ap_id(_), do: nil
def get_all_create_by_object_ap_id(ap_id) do def get_all_create_by_object_ap_id(ap_id) do
Repo.all(create_by_object_ap_id(ap_id)) ap_id
|> create_by_object_ap_id()
|> Repo.all()
end end
def get_create_by_object_ap_id(ap_id) when is_binary(ap_id) do def get_create_by_object_ap_id(ap_id) when is_binary(ap_id) do
@ -235,54 +183,17 @@ def get_create_by_object_ap_id(ap_id) when is_binary(ap_id) do
def get_create_by_object_ap_id(_), do: nil def get_create_by_object_ap_id(_), do: nil
def create_by_object_ap_id_with_object(ap_ids) when is_list(ap_ids) do @doc """
from( Accepts `ap_id` or list of `ap_id`.
activity in Activity, Returns a query.
where: """
fragment( @spec create_by_object_ap_id_with_object(String.t() | [String.t()]) :: Ecto.Queryable.t()
"coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)", def create_by_object_ap_id_with_object(ap_id) do
activity.data, ap_id
activity.data, |> create_by_object_ap_id()
^ap_ids |> with_preloaded_object()
),
where: fragment("(?)->>'type' = 'Create'", activity.data),
inner_join: o in Object,
on:
fragment(
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
o.data,
activity.data,
activity.data
),
preload: [object: o]
)
end end
def create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
from(
activity in Activity,
where:
fragment(
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
activity.data,
activity.data,
^to_string(ap_id)
),
where: fragment("(?)->>'type' = 'Create'", activity.data),
inner_join: o in Object,
on:
fragment(
"(?->>'id') = COALESCE(?->'object'->>'id', ?->>'object')",
o.data,
activity.data,
activity.data
),
preload: [object: o]
)
end
def create_by_object_ap_id_with_object(_), do: nil
def get_create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do def get_create_by_object_ap_id_with_object(ap_id) when is_binary(ap_id) do
ap_id ap_id
|> create_by_object_ap_id_with_object() |> create_by_object_ap_id_with_object()
@ -306,7 +217,8 @@ def normalize(ap_id) when is_binary(ap_id), do: get_by_ap_id_with_object(ap_id)
def normalize(_), do: nil def normalize(_), do: nil
def delete_by_ap_id(id) when is_binary(id) do def delete_by_ap_id(id) when is_binary(id) do
by_object_ap_id(id) id
|> Queries.by_object_id()
|> select([u], u) |> select([u], u)
|> Repo.delete_all() |> Repo.delete_all()
|> elem(1) |> elem(1)
@ -350,31 +262,10 @@ def all_by_actor_and_id(actor, status_ids) do
end end
def follow_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do def follow_requests_for_actor(%Pleroma.User{ap_id: ap_id}) do
from( ap_id
a in Activity, |> Queries.by_object_id()
where: |> Queries.by_type("Follow")
fragment( |> where([a], fragment("? ->> 'state' = 'pending'", a.data))
"? ->> 'type' = 'Follow'",
a.data
),
where:
fragment(
"? ->> 'state' = 'pending'",
a.data
),
where:
fragment(
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
a.data,
a.data,
^ap_id
)
)
end
@spec query_by_actor(actor()) :: Ecto.Query.t()
def query_by_actor(actor) do
from(a in Activity, where: a.actor == ^actor)
end end
def restrict_deactivated_users(query) do def restrict_deactivated_users(query) do

View file

@ -13,6 +13,14 @@ defmodule Pleroma.Activity.Queries do
alias Pleroma.Activity alias Pleroma.Activity
@spec by_ap_id(query, String.t()) :: query
def by_ap_id(query \\ Activity, ap_id) do
from(
activity in query,
where: fragment("(?)->>'id' = ?", activity.data, ^to_string(ap_id))
)
end
@spec by_actor(query, String.t()) :: query @spec by_actor(query, String.t()) :: query
def by_actor(query \\ Activity, actor) do def by_actor(query \\ Activity, actor) do
from( from(
@ -21,8 +29,23 @@ def by_actor(query \\ Activity, actor) do
) )
end end
@spec by_object_id(query, String.t()) :: query @spec by_object_id(query, String.t() | [String.t()]) :: query
def by_object_id(query \\ Activity, object_id) do def by_object_id(query \\ Activity, object_id)
def by_object_id(query, object_ids) when is_list(object_ids) do
from(
activity in query,
where:
fragment(
"coalesce((?)->'object'->>'id', (?)->>'object') = ANY(?)",
activity.data,
activity.data,
^object_ids
)
)
end
def by_object_id(query, object_id) when is_binary(object_id) do
from(activity in query, from(activity in query,
where: where:
fragment( fragment(
@ -41,9 +64,4 @@ def by_type(query \\ Activity, activity_type) do
where: fragment("(?)->>'type' = ?", activity.data, ^activity_type) where: fragment("(?)->>'type' = ?", activity.data, ^activity_type)
) )
end end
@spec limit(query, pos_integer()) :: query
def limit(query \\ Activity, limit) do
from(activity in query, limit: ^limit)
end
end end

View file

@ -1219,7 +1219,7 @@ def follow_import(%User{} = follower, followed_identifiers) when is_list(followe
def delete_user_activities(%User{ap_id: ap_id} = user) do def delete_user_activities(%User{ap_id: ap_id} = user) do
ap_id ap_id
|> Activity.query_by_actor() |> Activity.Queries.by_actor()
|> RepoStreamer.chunk_stream(50) |> RepoStreamer.chunk_stream(50)
|> Stream.each(fn activities -> |> Stream.each(fn activities ->
Enum.each(activities, &delete_activity(&1)) Enum.each(activities, &delete_activity(&1))

View file

@ -85,15 +85,13 @@ defp extract_list(lst) when is_list(lst), do: lst
defp extract_list(_), do: [] defp extract_list(_), do: []
def maybe_splice_recipient(ap_id, params) do def maybe_splice_recipient(ap_id, params) do
need_splice = need_splice? =
!recipient_in_collection(ap_id, params["to"]) && !recipient_in_collection(ap_id, params["to"]) &&
!recipient_in_collection(ap_id, params["cc"]) !recipient_in_collection(ap_id, params["cc"])
if need_splice? do
cc_list = extract_list(params["cc"]) cc_list = extract_list(params["cc"])
Map.put(params, "cc", [ap_id | cc_list])
if need_splice do
params
|> Map.put("cc", [ap_id | cc_list])
else else
params params
end end
@ -139,7 +137,7 @@ def get_notified_from_object(%{"type" => type} = object) when type in @supported
"object" => object "object" => object
} }
Notification.get_notified_from_activity(%Activity{data: fake_create_activity}, false) get_notified_from_object(fake_create_activity)
end end
def get_notified_from_object(object) do def get_notified_from_object(object) do
@ -188,9 +186,9 @@ def maybe_federate(_), do: :ok
Adds an id and a published data if they aren't there, Adds an id and a published data if they aren't there,
also adds it to an included object also adds it to an included object
""" """
def lazy_put_activity_defaults(map, fake \\ false) do def lazy_put_activity_defaults(map, fake? \\ false) do
map = map =
unless fake do if not fake? do
%{data: %{"id" => context}, id: context_id} = create_context(map["context"]) %{data: %{"id" => context}, id: context_id} = create_context(map["context"])
map map
@ -207,7 +205,7 @@ def lazy_put_activity_defaults(map, fake \\ false) do
end end
if is_map(map["object"]) do if is_map(map["object"]) do
object = lazy_put_object_defaults(map["object"], map, fake) object = lazy_put_object_defaults(map["object"], map, fake?)
%{map | "object" => object} %{map | "object" => object}
else else
map map
@ -217,9 +215,9 @@ def lazy_put_activity_defaults(map, fake \\ false) do
@doc """ @doc """
Adds an id and published date if they aren't there. Adds an id and published date if they aren't there.
""" """
def lazy_put_object_defaults(map, activity \\ %{}, fake) def lazy_put_object_defaults(map, activity \\ %{}, fake?)
def lazy_put_object_defaults(map, activity, true = _fake) do def lazy_put_object_defaults(map, activity, true = _fake?) do
map map
|> Map.put_new_lazy("published", &make_date/0) |> Map.put_new_lazy("published", &make_date/0)
|> Map.put_new("id", "pleroma:fake_object_id") |> Map.put_new("id", "pleroma:fake_object_id")
@ -228,7 +226,7 @@ def lazy_put_object_defaults(map, activity, true = _fake) do
|> Map.put_new("context_id", activity["context_id"]) |> Map.put_new("context_id", activity["context_id"])
end end
def lazy_put_object_defaults(map, activity, _fake) do def lazy_put_object_defaults(map, activity, _fake?) do
map map
|> Map.put_new_lazy("id", &generate_object_id/0) |> Map.put_new_lazy("id", &generate_object_id/0)
|> Map.put_new_lazy("published", &make_date/0) |> Map.put_new_lazy("published", &make_date/0)
@ -242,9 +240,7 @@ def lazy_put_object_defaults(map, activity, _fake) do
def insert_full_object(%{"object" => %{"type" => type} = object_data} = map) def insert_full_object(%{"object" => %{"type" => type} = object_data} = map)
when is_map(object_data) and type in @supported_object_types do when is_map(object_data) and type in @supported_object_types do
with {:ok, object} <- Object.create(object_data) do with {:ok, object} <- Object.create(object_data) do
map = map = Map.put(map, "object", object.data["id"])
map
|> Map.put("object", object.data["id"])
{:ok, map, object} {:ok, map, object}
end end
@ -263,7 +259,7 @@ def get_existing_like(actor, %{data: %{"id" => id}}) do
|> Activity.Queries.by_actor() |> Activity.Queries.by_actor()
|> Activity.Queries.by_object_id(id) |> Activity.Queries.by_object_id(id)
|> Activity.Queries.by_type("Like") |> Activity.Queries.by_type("Like")
|> Activity.Queries.limit(1) |> limit(1)
|> Repo.one() |> Repo.one()
end end
@ -380,12 +376,11 @@ def update_follow_state(
%Activity{data: %{"actor" => actor, "object" => object}} = activity, %Activity{data: %{"actor" => actor, "object" => object}} = activity,
state state
) do ) do
with new_data <- new_data = Map.put(activity.data, "state", state)
activity.data changeset = Changeset.change(activity, data: new_data)
|> Map.put("state", state),
changeset <- Changeset.change(activity, data: new_data), with {:ok, activity} <- Repo.update(changeset) do
{:ok, activity} <- Repo.update(changeset), User.set_follow_state_cache(actor, object, state)
_ <- User.set_follow_state_cache(actor, object, state) do
{:ok, activity} {:ok, activity}
end end
end end
@ -410,28 +405,14 @@ def make_follow_data(
end end
def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do
query = "Follow"
from( |> Activity.Queries.by_type()
activity in Activity, |> where(actor: ^follower_id)
where:
fragment(
"? ->> 'type' = 'Follow'",
activity.data
),
where: activity.actor == ^follower_id,
# this is to use the index # this is to use the index
where: |> Activity.Queries.by_object_id(followed_id)
fragment( |> order_by([activity], fragment("? desc nulls last", activity.id))
"coalesce((?)->'object'->>'id', (?)->>'object') = ?", |> limit(1)
activity.data, |> Repo.one()
activity.data,
^followed_id
),
order_by: [fragment("? desc nulls last", activity.id)],
limit: 1
)
Repo.one(query)
end end
#### Announce-related helpers #### Announce-related helpers
@ -439,23 +420,13 @@ def fetch_latest_follow(%User{ap_id: follower_id}, %User{ap_id: followed_id}) do
@doc """ @doc """
Retruns an existing announce activity if the notice has already been announced Retruns an existing announce activity if the notice has already been announced
""" """
def get_existing_announce(actor, %{data: %{"id" => id}}) do def get_existing_announce(actor, %{data: %{"id" => ap_id}}) do
query = "Announce"
from( |> Activity.Queries.by_type()
activity in Activity, |> where(actor: ^actor)
where: activity.actor == ^actor,
# this is to use the index # this is to use the index
where: |> Activity.Queries.by_object_id(ap_id)
fragment( |> Repo.one()
"coalesce((?)->'object'->>'id', (?)->>'object') = ?",
activity.data,
activity.data,
^id
),
where: fragment("(?)->>'type' = 'Announce'", activity.data)
)
Repo.one(query)
end end
@doc """ @doc """
@ -538,11 +509,13 @@ def add_announce_to_object(
object object
) do ) do
announcements = announcements =
if is_list(object.data["announcements"]), do: object.data["announcements"], else: [] if is_list(object.data["announcements"]) do
Enum.uniq([actor | object.data["announcements"]])
with announcements <- [actor | announcements] |> Enum.uniq() do else
update_element_in_object("announcement", announcements, object) [actor]
end end
update_element_in_object("announcement", announcements, object)
end end
def add_announce_to_object(_, object), do: {:ok, object} def add_announce_to_object(_, object), do: {:ok, object}
@ -570,28 +543,14 @@ def make_unfollow_data(follower, followed, follow_activity, activity_id) do
#### Block-related helpers #### Block-related helpers
def fetch_latest_block(%User{ap_id: blocker_id}, %User{ap_id: blocked_id}) do def fetch_latest_block(%User{ap_id: blocker_id}, %User{ap_id: blocked_id}) do
query = "Block"
from( |> Activity.Queries.by_type()
activity in Activity, |> where(actor: ^blocker_id)
where:
fragment(
"? ->> 'type' = 'Block'",
activity.data
),
where: activity.actor == ^blocker_id,
# this is to use the index # this is to use the index
where: |> Activity.Queries.by_object_id(blocked_id)
fragment( |> order_by([activity], fragment("? desc nulls last", activity.id))
"coalesce((?)->'object'->>'id', (?)->>'object') = ?", |> limit(1)
activity.data, |> Repo.one()
activity.data,
^blocked_id
),
order_by: [fragment("? desc nulls last", activity.id)],
limit: 1
)
Repo.one(query)
end end
def make_block_data(blocker, blocked, activity_id) do def make_block_data(blocker, blocked, activity_id) do
@ -695,11 +654,11 @@ def fetch_ordered_collection(from, pages_left, acc \\ []) do
#### Report-related helpers #### Report-related helpers
def update_report_state(%Activity{} = activity, state) when state in @supported_report_states do def update_report_state(%Activity{} = activity, state) when state in @supported_report_states do
with new_data <- Map.put(activity.data, "state", state), new_data = Map.put(activity.data, "state", state)
changeset <- Changeset.change(activity, data: new_data),
{:ok, activity} <- Repo.update(changeset) do activity
{:ok, activity} |> Changeset.change(data: new_data)
end |> Repo.update()
end end
def update_report_state(_, _), do: {:error, "Unsupported state"} def update_report_state(_, _), do: {:error, "Unsupported state"}
@ -766,21 +725,13 @@ defp get_updated_targets(
end end
def get_existing_votes(actor, %{data: %{"id" => id}}) do def get_existing_votes(actor, %{data: %{"id" => id}}) do
query = actor
from( |> Activity.Queries.by_actor()
[activity, object: object] in Activity.with_preloaded_object(Activity), |> Activity.Queries.by_type("Create")
where: fragment("(?)->>'type' = 'Create'", activity.data), |> Activity.with_preloaded_object()
where: fragment("(?)->>'actor' = ?", activity.data, ^actor), |> where([a, object: o], fragment("(?)->>'inReplyTo' = ?", o.data, ^to_string(id)))
where: |> where([a, object: o], fragment("(?)->>'type' = 'Answer'", o.data))
fragment( |> Repo.all()
"(?)->>'inReplyTo' = ?",
object.data,
^to_string(id)
),
where: fragment("(?)->>'type' = 'Answer'", object.data)
)
Repo.all(query)
end end
defp maybe_put(map, _key, nil), do: map defp maybe_put(map, _key, nil), do: map

View file

@ -1081,7 +1081,7 @@ test "it deletes a user, all follow relationships and all activities", %{user: u
user_activities = user_activities =
user.ap_id user.ap_id
|> Activity.query_by_actor() |> Activity.Queries.by_actor()
|> Repo.all() |> Repo.all()
|> Enum.map(fn act -> act.data["type"] end) |> Enum.map(fn act -> act.data["type"] end)