# Pleroma: A lightweight social networking server # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.ActivityPub.ActivityPub do alias Pleroma.Activity alias Pleroma.Activity.Ir.Topics alias Pleroma.Config alias Pleroma.Constants alias Pleroma.Conversation alias Pleroma.Conversation.Participation alias Pleroma.Maps alias Pleroma.Notification alias Pleroma.Object alias Pleroma.Object.Containment alias Pleroma.Object.Fetcher alias Pleroma.Pagination alias Pleroma.Repo alias Pleroma.Upload alias Pleroma.User alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.Streamer alias Pleroma.Web.WebFinger alias Pleroma.Workers.BackgroundWorker import Ecto.Query import Pleroma.Web.ActivityPub.Utils import Pleroma.Web.ActivityPub.Visibility require Logger require Pleroma.Constants # For Announce activities, we filter the recipients based on following status for any actors # that match actual users. See issue #164 for more information about why this is necessary. defp get_recipients(%{"type" => "Announce"} = data) do to = Map.get(data, "to", []) cc = Map.get(data, "cc", []) bcc = Map.get(data, "bcc", []) actor = User.get_cached_by_ap_id(data["actor"]) recipients = Enum.filter(Enum.concat([to, cc, bcc]), fn recipient -> case User.get_cached_by_ap_id(recipient) do nil -> true user -> User.following?(user, actor) end end) {recipients, to, cc} end defp get_recipients(%{"type" => "Create"} = data) do to = Map.get(data, "to", []) cc = Map.get(data, "cc", []) bcc = Map.get(data, "bcc", []) actor = Map.get(data, "actor", []) recipients = [to, cc, bcc, [actor]] |> Enum.concat() |> Enum.uniq() {recipients, to, cc} end defp get_recipients(data) do to = Map.get(data, "to", []) cc = Map.get(data, "cc", []) bcc = Map.get(data, "bcc", []) recipients = Enum.concat([to, cc, bcc]) {recipients, to, cc} end defp check_actor_is_active(actor) do if not is_nil(actor) do with user <- User.get_cached_by_ap_id(actor), false <- user.deactivated do true else _e -> false end else true end end defp check_remote_limit(%{"object" => %{"content" => content}}) when not is_nil(content) do limit = Config.get([:instance, :remote_limit]) String.length(content) <= limit end defp check_remote_limit(_), do: true def increase_note_count_if_public(actor, object) do if is_public?(object), do: User.increase_note_count(actor), else: {:ok, actor} end def decrease_note_count_if_public(actor, object) do if is_public?(object), do: User.decrease_note_count(actor), else: {:ok, actor} end def increase_replies_count_if_reply(%{ "object" => %{"inReplyTo" => reply_ap_id} = object, "type" => "Create" }) do if is_public?(object) do Object.increase_replies_count(reply_ap_id) end end def increase_replies_count_if_reply(_create_data), do: :noop def decrease_replies_count_if_reply(%Object{ data: %{"inReplyTo" => reply_ap_id} = object }) do if is_public?(object) do Object.decrease_replies_count(reply_ap_id) end end def decrease_replies_count_if_reply(_object), do: :noop def increase_poll_votes_if_vote(%{ "object" => %{"inReplyTo" => reply_ap_id, "name" => name}, "type" => "Create", "actor" => actor }) do Object.increase_vote_count(reply_ap_id, name, actor) end def increase_poll_votes_if_vote(_create_data), do: :noop @object_types ["ChatMessage"] @spec persist(map(), keyword()) :: {:ok, Activity.t() | Object.t()} def persist(%{"type" => type} = object, meta) when type in @object_types do with {:ok, object} <- Object.create(object) do {:ok, object, meta} end end def persist(object, meta) do with local <- Keyword.fetch!(meta, :local), {recipients, _, _} <- get_recipients(object), {:ok, activity} <- Repo.insert(%Activity{ data: object, local: local, recipients: recipients, actor: object["actor"] }) do {:ok, activity, meta} end end @spec insert(map(), boolean(), boolean(), boolean()) :: {:ok, Activity.t()} | {:error, any()} def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when is_map(map) do with nil <- Activity.normalize(map), map <- lazy_put_activity_defaults(map, fake), true <- bypass_actor_check || check_actor_is_active(map["actor"]), {_, true} <- {:remote_limit_error, check_remote_limit(map)}, {:ok, map} <- MRF.filter(map), {recipients, _, _} = get_recipients(map), {:fake, false, map, recipients} <- {:fake, fake, map, recipients}, {:containment, :ok} <- {:containment, Containment.contain_child(map)}, {:ok, map, object} <- insert_full_object(map) do {:ok, activity} = Repo.insert(%Activity{ data: map, local: local, actor: map["actor"], recipients: recipients }) # Splice in the child object if we have one. activity = Maps.put_if_present(activity, :object, object) BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) {:ok, activity} else %Activity{} = activity -> {:ok, activity} {:fake, true, map, recipients} -> activity = %Activity{ data: map, local: local, actor: map["actor"], recipients: recipients, id: "pleroma:fakeid" } Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) {:ok, activity} error -> {:error, error} end end def notify_and_stream(activity) do Notification.create_notifications(activity) conversation = create_or_bump_conversation(activity, activity.actor) participations = get_participations(conversation) stream_out(activity) stream_out_participations(participations) end defp create_or_bump_conversation(activity, actor) do with {:ok, conversation} <- Conversation.create_or_bump_for(activity), %User{} = user <- User.get_cached_by_ap_id(actor), Participation.mark_as_read(user, conversation) do {:ok, conversation} end end defp get_participations({:ok, conversation}) do conversation |> Repo.preload(:participations, force: true) |> Map.get(:participations) end defp get_participations(_), do: [] def stream_out_participations(participations) do participations = participations |> Repo.preload(:user) Streamer.stream("participation", participations) end def stream_out_participations(%Object{data: %{"context" => context}}, user) do with %Conversation{} = conversation <- Conversation.get_for_ap_id(context), conversation = Repo.preload(conversation, :participations), last_activity_id = fetch_latest_activity_id_for_context(conversation.ap_id, %{ "user" => user, "blocking_user" => user }) do if last_activity_id do stream_out_participations(conversation.participations) end end end def stream_out_participations(_, _), do: :noop def stream_out(%Activity{data: %{"type" => data_type}} = activity) when data_type in ["Create", "Announce", "Delete"] do activity |> Topics.get_activity_topics() |> Streamer.stream(activity) end def stream_out(_activity) do :noop end @spec create(map(), boolean()) :: {:ok, Activity.t()} | {:error, any()} def create(params, fake \\ false) do with {:ok, result} <- Repo.transaction(fn -> do_create(params, fake) end) do result end end defp do_create(%{to: to, actor: actor, context: context, object: object} = params, fake) do additional = params[:additional] || %{} # only accept false as false value local = !(params[:local] == false) published = params[:published] quick_insert? = Config.get([:env]) == :benchmark with create_data <- make_create_data( %{to: to, actor: actor, published: published, context: context, object: object}, additional ), {:ok, activity} <- insert(create_data, local, fake), {:fake, false, activity} <- {:fake, fake, activity}, _ <- increase_replies_count_if_reply(create_data), _ <- increase_poll_votes_if_vote(create_data), {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity}, {:ok, _actor} <- increase_note_count_if_public(actor, activity), _ <- notify_and_stream(activity), :ok <- maybe_federate(activity) do {:ok, activity} else {:quick_insert, true, activity} -> {:ok, activity} {:fake, true, activity} -> {:ok, activity} {:error, message} -> Repo.rollback(message) end end @spec listen(map()) :: {:ok, Activity.t()} | {:error, any()} def listen(%{to: to, actor: actor, context: context, object: object} = params) do additional = params[:additional] || %{} # only accept false as false value local = !(params[:local] == false) published = params[:published] with listen_data <- make_listen_data( %{to: to, actor: actor, published: published, context: context, object: object}, additional ), {:ok, activity} <- insert(listen_data, local), _ <- notify_and_stream(activity), :ok <- maybe_federate(activity) do {:ok, activity} end end @spec accept(map()) :: {:ok, Activity.t()} | {:error, any()} def accept(params) do accept_or_reject("Accept", params) end @spec reject(map()) :: {:ok, Activity.t()} | {:error, any()} def reject(params) do accept_or_reject("Reject", params) end @spec accept_or_reject(String.t(), map()) :: {:ok, Activity.t()} | {:error, any()} def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do local = Map.get(params, :local, true) activity_id = Map.get(params, :activity_id, nil) with data <- %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object} |> Maps.put_if_present("id", activity_id), {:ok, activity} <- insert(data, local), _ <- notify_and_stream(activity), :ok <- maybe_federate(activity) do {:ok, activity} end end @spec update(map()) :: {:ok, Activity.t()} | {:error, any()} def update(%{to: to, cc: cc, actor: actor, object: object} = params) do local = !(params[:local] == false) activity_id = params[:activity_id] with data <- %{ "to" => to, "cc" => cc, "type" => "Update", "actor" => actor, "object" => object }, data <- Maps.put_if_present(data, "id", activity_id), {:ok, activity} <- insert(data, local), _ <- notify_and_stream(activity), :ok <- maybe_federate(activity) do {:ok, activity} end end @spec follow(User.t(), User.t(), String.t() | nil, boolean(), keyword()) :: {:ok, Activity.t()} | {:error, any()} def follow(follower, followed, activity_id \\ nil, local \\ true, opts \\ []) do with {:ok, result} <- Repo.transaction(fn -> do_follow(follower, followed, activity_id, local, opts) end) do result end end defp do_follow(follower, followed, activity_id, local, opts) do skip_notify_and_stream = Keyword.get(opts, :skip_notify_and_stream, false) with data <- make_follow_data(follower, followed, activity_id), {:ok, activity} <- insert(data, local), _ <- skip_notify_and_stream || notify_and_stream(activity), :ok <- maybe_federate(activity) do {:ok, activity} else {:error, error} -> Repo.rollback(error) end end @spec unfollow(User.t(), User.t(), String.t() | nil, boolean()) :: {:ok, Activity.t()} | nil | {:error, any()} def unfollow(follower, followed, activity_id \\ nil, local \\ true) do with {:ok, result} <- Repo.transaction(fn -> do_unfollow(follower, followed, activity_id, local) end) do result end end defp do_unfollow(follower, followed, activity_id, local) do with %Activity{} = follow_activity <- fetch_latest_follow(follower, followed), {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"), unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id), {:ok, activity} <- insert(unfollow_data, local), _ <- notify_and_stream(activity), :ok <- maybe_federate(activity) do {:ok, activity} else nil -> nil {:error, error} -> Repo.rollback(error) end end @spec block(User.t(), User.t(), String.t() | nil, boolean()) :: {:ok, Activity.t()} | {:error, any()} def block(blocker, blocked, activity_id \\ nil, local \\ true) do with {:ok, result} <- Repo.transaction(fn -> do_block(blocker, blocked, activity_id, local) end) do result end end defp do_block(blocker, blocked, activity_id, local) do unfollow_blocked = Config.get([:activitypub, :unfollow_blocked]) if unfollow_blocked do follow_activity = fetch_latest_follow(blocker, blocked) if follow_activity, do: unfollow(blocker, blocked, nil, local) end with block_data <- make_block_data(blocker, blocked, activity_id), {:ok, activity} <- insert(block_data, local), _ <- notify_and_stream(activity), :ok <- maybe_federate(activity) do {:ok, activity} else {:error, error} -> Repo.rollback(error) end end @spec flag(map()) :: {:ok, Activity.t()} | {:error, any()} def flag( %{ actor: actor, context: _context, account: account, statuses: statuses, content: content } = params ) do # only accept false as false value local = !(params[:local] == false) forward = !(params[:forward] == false) additional = params[:additional] || %{} additional = if forward do Map.merge(additional, %{"to" => [], "cc" => [account.ap_id]}) else Map.merge(additional, %{"to" => [], "cc" => []}) end with flag_data <- make_flag_data(params, additional), {:ok, activity} <- insert(flag_data, local), {:ok, stripped_activity} <- strip_report_status_data(activity), _ <- notify_and_stream(activity), :ok <- maybe_federate(stripped_activity) do User.all_superusers() |> Enum.filter(fn user -> not is_nil(user.email) end) |> Enum.each(fn superuser -> superuser |> Pleroma.Emails.AdminEmail.report(actor, account, statuses, content) |> Pleroma.Emails.Mailer.deliver_async() end) {:ok, activity} end end @spec move(User.t(), User.t(), boolean()) :: {:ok, Activity.t()} | {:error, any()} def move(%User{} = origin, %User{} = target, local \\ true) do params = %{ "type" => "Move", "actor" => origin.ap_id, "object" => origin.ap_id, "target" => target.ap_id } with true <- origin.ap_id in target.also_known_as, {:ok, activity} <- insert(params, local), _ <- notify_and_stream(activity) do maybe_federate(activity) BackgroundWorker.enqueue("move_following", %{ "origin_id" => origin.id, "target_id" => target.id }) {:ok, activity} else false -> {:error, "Target account must have the origin in `alsoKnownAs`"} err -> err end end def fetch_activities_for_context_query(context, opts) do public = [Constants.as_public()] recipients = if opts["user"], do: [opts["user"].ap_id | User.following(opts["user"])] ++ public, else: public from(activity in Activity) |> maybe_preload_objects(opts) |> maybe_preload_bookmarks(opts) |> maybe_set_thread_muted_field(opts) |> restrict_blocked(opts) |> restrict_recipients(recipients, opts["user"]) |> where( [activity], fragment( "?->>'type' = ? and ?->>'context' = ?", activity.data, "Create", activity.data, ^context ) ) |> exclude_poll_votes(opts) |> exclude_id(opts) |> order_by([activity], desc: activity.id) end @spec fetch_activities_for_context(String.t(), keyword() | map()) :: [Activity.t()] def fetch_activities_for_context(context, opts \\ %{}) do context |> fetch_activities_for_context_query(opts) |> Repo.all() end @spec fetch_latest_activity_id_for_context(String.t(), keyword() | map()) :: FlakeId.Ecto.CompatType.t() | nil def fetch_latest_activity_id_for_context(context, opts \\ %{}) do context |> fetch_activities_for_context_query(Map.merge(%{"skip_preload" => true}, opts)) |> limit(1) |> select([a], a.id) |> Repo.one() end @spec fetch_public_or_unlisted_activities(map(), Pagination.type()) :: [Activity.t()] def fetch_public_or_unlisted_activities(opts \\ %{}, pagination \\ :keyset) do opts = Map.drop(opts, ["user"]) query = fetch_activities_query([Constants.as_public()], opts) query = if opts["restrict_unlisted"] do restrict_unlisted(query) else query end Pagination.fetch_paginated(query, opts, pagination) end @spec fetch_public_activities(map(), Pagination.type()) :: [Activity.t()] def fetch_public_activities(opts \\ %{}, pagination \\ :keyset) do opts |> Map.put("restrict_unlisted", true) |> fetch_public_or_unlisted_activities(pagination) end @valid_visibilities ~w[direct unlisted public private] defp restrict_visibility(query, %{visibility: visibility}) when is_list(visibility) do if Enum.all?(visibility, &(&1 in @valid_visibilities)) do query = from( a in query, where: fragment( "activity_visibility(?, ?, ?) = ANY (?)", a.actor, a.recipients, a.data, ^visibility ) ) query else Logger.error("Could not restrict visibility to #{visibility}") end end defp restrict_visibility(query, %{visibility: visibility}) when visibility in @valid_visibilities do from( a in query, where: fragment("activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility) ) end defp restrict_visibility(_query, %{visibility: visibility}) when visibility not in @valid_visibilities do Logger.error("Could not restrict visibility to #{visibility}") end defp restrict_visibility(query, _visibility), do: query defp exclude_visibility(query, %{"exclude_visibilities" => visibility}) when is_list(visibility) do if Enum.all?(visibility, &(&1 in @valid_visibilities)) do from( a in query, where: not fragment( "activity_visibility(?, ?, ?) = ANY (?)", a.actor, a.recipients, a.data, ^visibility ) ) else Logger.error("Could not exclude visibility to #{visibility}") query end end defp exclude_visibility(query, %{"exclude_visibilities" => visibility}) when visibility in @valid_visibilities do from( a in query, where: not fragment( "activity_visibility(?, ?, ?) = ?", a.actor, a.recipients, a.data, ^visibility ) ) end defp exclude_visibility(query, %{"exclude_visibilities" => visibility}) when visibility not in [nil | @valid_visibilities] do Logger.error("Could not exclude visibility to #{visibility}") query end defp exclude_visibility(query, _visibility), do: query defp restrict_thread_visibility(query, _, %{skip_thread_containment: true} = _), do: query defp restrict_thread_visibility( query, %{"user" => %User{skip_thread_containment: true}}, _ ), do: query defp restrict_thread_visibility(query, %{"user" => %User{ap_id: ap_id}}, _) do from( a in query, where: fragment("thread_visibility(?, (?)->>'id') = true", ^ap_id, a.data) ) end defp restrict_thread_visibility(query, _, _), do: query def fetch_user_abstract_activities(user, reading_user, params \\ %{}) do params = params |> Map.put("user", reading_user) |> Map.put("actor_id", user.ap_id) recipients = user_activities_recipients(%{ "godmode" => params["godmode"], "reading_user" => reading_user }) fetch_activities(recipients, params) |> Enum.reverse() end def fetch_user_activities(user, reading_user, params \\ %{}) do params = params |> Map.put("type", ["Create", "Announce"]) |> Map.put("user", reading_user) |> Map.put("actor_id", user.ap_id) |> Map.put("pinned_activity_ids", user.pinned_activities) params = if User.blocks?(reading_user, user) do params else params |> Map.put("blocking_user", reading_user) |> Map.put("muting_user", reading_user) end recipients = user_activities_recipients(%{ "godmode" => params["godmode"], "reading_user" => reading_user }) fetch_activities(recipients, params) |> Enum.reverse() end def fetch_statuses(reading_user, params) do params = params |> Map.put("type", ["Create", "Announce"]) recipients = user_activities_recipients(%{ "godmode" => params["godmode"], "reading_user" => reading_user }) fetch_activities(recipients, params, :offset) |> Enum.reverse() end defp user_activities_recipients(%{"godmode" => true}) do [] end defp user_activities_recipients(%{"reading_user" => reading_user}) do if reading_user do [Constants.as_public()] ++ [reading_user.ap_id | User.following(reading_user)] else [Constants.as_public()] end end defp restrict_since(query, %{"since_id" => ""}), do: query defp restrict_since(query, %{"since_id" => since_id}) do from(activity in query, where: activity.id > ^since_id) end defp restrict_since(query, _), do: query defp restrict_tag_reject(_query, %{"tag_reject" => _tag_reject, "skip_preload" => true}) do raise "Can't use the child object without preloading!" end defp restrict_tag_reject(query, %{"tag_reject" => tag_reject}) when is_list(tag_reject) and tag_reject != [] do from( [_activity, object] in query, where: fragment("not (?)->'tag' \\?| (?)", object.data, ^tag_reject) ) end defp restrict_tag_reject(query, _), do: query defp restrict_tag_all(_query, %{"tag_all" => _tag_all, "skip_preload" => true}) do raise "Can't use the child object without preloading!" end defp restrict_tag_all(query, %{"tag_all" => tag_all}) when is_list(tag_all) and tag_all != [] do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?& (?)", object.data, ^tag_all) ) end defp restrict_tag_all(query, _), do: query defp restrict_tag(_query, %{"tag" => _tag, "skip_preload" => true}) do raise "Can't use the child object without preloading!" end defp restrict_tag(query, %{"tag" => tag}) when is_list(tag) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\?| (?)", object.data, ^tag) ) end defp restrict_tag(query, %{"tag" => tag}) when is_binary(tag) do from( [_activity, object] in query, where: fragment("(?)->'tag' \\? (?)", object.data, ^tag) ) end defp restrict_tag(query, _), do: query defp restrict_recipients(query, [], _user), do: query defp restrict_recipients(query, recipients, nil) do from(activity in query, where: fragment("? && ?", ^recipients, activity.recipients)) end defp restrict_recipients(query, recipients, user) do from( activity in query, where: fragment("? && ?", ^recipients, activity.recipients), or_where: activity.actor == ^user.ap_id ) end defp restrict_local(query, %{"local_only" => true}) do from(activity in query, where: activity.local == true) end defp restrict_local(query, _), do: query defp restrict_actor(query, %{"actor_id" => actor_id}) do from(activity in query, where: activity.actor == ^actor_id) end defp restrict_actor(query, _), do: query defp restrict_type(query, %{"type" => type}) when is_binary(type) do from(activity in query, where: fragment("?->>'type' = ?", activity.data, ^type)) end defp restrict_type(query, %{"type" => type}) do from(activity in query, where: fragment("?->>'type' = ANY(?)", activity.data, ^type)) end defp restrict_type(query, _), do: query defp restrict_state(query, %{"state" => state}) do from(activity in query, where: fragment("?->>'state' = ?", activity.data, ^state)) end defp restrict_state(query, _), do: query defp restrict_favorited_by(query, %{"favorited_by" => ap_id}) do from( [_activity, object] in query, where: fragment("(?)->'likes' \\? (?)", object.data, ^ap_id) ) end defp restrict_favorited_by(query, _), do: query defp restrict_media(_query, %{"only_media" => _val, "skip_preload" => true}) do raise "Can't use the child object without preloading!" end defp restrict_media(query, %{"only_media" => val}) when val in [true, "true", "1"] do from( [_activity, object] in query, where: fragment("not (?)->'attachment' = (?)", object.data, ^[]) ) end defp restrict_media(query, _), do: query defp restrict_replies(query, %{"exclude_replies" => val}) when val in [true, "true", "1"] do from( [_activity, object] in query, where: fragment("?->>'inReplyTo' is null", object.data) ) end defp restrict_replies(query, %{ "reply_filtering_user" => user, "reply_visibility" => "self" }) do from( [activity, object] in query, where: fragment( "?->>'inReplyTo' is null OR ? = ANY(?)", object.data, ^user.ap_id, activity.recipients ) ) end defp restrict_replies(query, %{ "reply_filtering_user" => user, "reply_visibility" => "following" }) do from( [activity, object] in query, where: fragment( "?->>'inReplyTo' is null OR ? && array_remove(?, ?) OR ? = ?", object.data, ^[user.ap_id | User.get_cached_user_friends_ap_ids(user)], activity.recipients, activity.actor, activity.actor, ^user.ap_id ) ) end defp restrict_replies(query, _), do: query defp restrict_reblogs(query, %{"exclude_reblogs" => val}) when val in [true, "true", "1"] do from(activity in query, where: fragment("?->>'type' != 'Announce'", activity.data)) end defp restrict_reblogs(query, _), do: query defp restrict_muted(query, %{"with_muted" => val}) when val in [true, "true", "1"], do: query defp restrict_muted(query, %{"muting_user" => %User{} = user} = opts) do mutes = opts["muted_users_ap_ids"] || User.muted_users_ap_ids(user) query = from([activity] in query, where: fragment("not (? = ANY(?))", activity.actor, ^mutes), where: fragment("not (?->'to' \\?| ?)", activity.data, ^mutes) ) unless opts["skip_preload"] do from([thread_mute: tm] in query, where: is_nil(tm.user_id)) else query end end defp restrict_muted(query, _), do: query defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do blocked_ap_ids = opts["blocked_users_ap_ids"] || User.blocked_users_ap_ids(user) domain_blocks = user.domain_blocks || [] following_ap_ids = User.get_friends_ap_ids(user) query = if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query) from( [activity, object: o] in query, where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids), where: fragment("not (? && ?)", activity.recipients, ^blocked_ap_ids), where: fragment( "recipients_contain_blocked_domains(?, ?) = false", activity.recipients, ^domain_blocks ), where: fragment( "not (?->>'type' = 'Announce' and ?->'to' \\?| ?)", activity.data, activity.data, ^blocked_ap_ids ), where: fragment( "(not (split_part(?, '/', 3) = ANY(?))) or ? = ANY(?)", activity.actor, ^domain_blocks, activity.actor, ^following_ap_ids ), where: fragment( "(not (split_part(?->>'actor', '/', 3) = ANY(?))) or (?->>'actor') = ANY(?)", o.data, ^domain_blocks, o.data, ^following_ap_ids ) ) end defp restrict_blocked(query, _), do: query defp restrict_unlisted(query) do from( activity in query, where: fragment( "not (coalesce(?->'cc', '{}'::jsonb) \\?| ?)", activity.data, ^[Constants.as_public()] ) ) end # TODO: when all endpoints migrated to OpenAPI compare `pinned` with `true` (boolean) only, # the same for `restrict_media/2`, `restrict_replies/2`, 'restrict_reblogs/2' # and `restrict_muted/2` defp restrict_pinned(query, %{"pinned" => pinned, "pinned_activity_ids" => ids}) when pinned in [true, "true", "1"] do from(activity in query, where: activity.id in ^ids) end defp restrict_pinned(query, _), do: query defp restrict_muted_reblogs(query, %{"muting_user" => %User{} = user} = opts) do muted_reblogs = opts["reblog_muted_users_ap_ids"] || User.reblog_muted_users_ap_ids(user) from( activity in query, where: fragment( "not ( ?->>'type' = 'Announce' and ? = ANY(?))", activity.data, activity.actor, ^muted_reblogs ) ) end defp restrict_muted_reblogs(query, _), do: query defp restrict_instance(query, %{"instance" => instance}) do users = from( u in User, select: u.ap_id, where: fragment("? LIKE ?", u.nickname, ^"%@#{instance}") ) |> Repo.all() from(activity in query, where: activity.actor in ^users) end defp restrict_instance(query, _), do: query defp exclude_poll_votes(query, %{"include_poll_votes" => true}), do: query defp exclude_poll_votes(query, _) do if has_named_binding?(query, :object) do from([activity, object: o] in query, where: fragment("not(?->>'type' = ?)", o.data, "Answer") ) else query end end defp exclude_chat_messages(query, %{"include_chat_messages" => true}), do: query defp exclude_chat_messages(query, _) do if has_named_binding?(query, :object) do from([activity, object: o] in query, where: fragment("not(?->>'type' = ?)", o.data, "ChatMessage") ) else query end end defp exclude_invisible_actors(query, %{"invisible_actors" => true}), do: query defp exclude_invisible_actors(query, _opts) do invisible_ap_ids = User.Query.build(%{invisible: true, select: [:ap_id]}) |> Repo.all() |> Enum.map(fn %{ap_id: ap_id} -> ap_id end) from([activity] in query, where: activity.actor not in ^invisible_ap_ids) end defp exclude_id(query, %{"exclude_id" => id}) when is_binary(id) do from(activity in query, where: activity.id != ^id) end defp exclude_id(query, _), do: query defp maybe_preload_objects(query, %{"skip_preload" => true}), do: query defp maybe_preload_objects(query, _) do query |> Activity.with_preloaded_object() end defp maybe_preload_bookmarks(query, %{"skip_preload" => true}), do: query defp maybe_preload_bookmarks(query, opts) do query |> Activity.with_preloaded_bookmark(opts["user"]) end defp maybe_preload_report_notes(query, %{"preload_report_notes" => true}) do query |> Activity.with_preloaded_report_notes() end defp maybe_preload_report_notes(query, _), do: query defp maybe_set_thread_muted_field(query, %{"skip_preload" => true}), do: query defp maybe_set_thread_muted_field(query, opts) do query |> Activity.with_set_thread_muted_field(opts["muting_user"] || opts["user"]) end defp maybe_order(query, %{order: :desc}) do query |> order_by(desc: :id) end defp maybe_order(query, %{order: :asc}) do query |> order_by(asc: :id) end defp maybe_order(query, _), do: query defp fetch_activities_query_ap_ids_ops(opts) do source_user = opts["muting_user"] ap_id_relationships = if source_user, do: [:mute, :reblog_mute], else: [] ap_id_relationships = ap_id_relationships ++ if opts["blocking_user"] && opts["blocking_user"] == source_user do [:block] else [] end preloaded_ap_ids = User.outgoing_relationships_ap_ids(source_user, ap_id_relationships) restrict_blocked_opts = Map.merge(%{"blocked_users_ap_ids" => preloaded_ap_ids[:block]}, opts) restrict_muted_opts = Map.merge(%{"muted_users_ap_ids" => preloaded_ap_ids[:mute]}, opts) restrict_muted_reblogs_opts = Map.merge(%{"reblog_muted_users_ap_ids" => preloaded_ap_ids[:reblog_mute]}, opts) {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} end def fetch_activities_query(recipients, opts \\ %{}) do {restrict_blocked_opts, restrict_muted_opts, restrict_muted_reblogs_opts} = fetch_activities_query_ap_ids_ops(opts) config = %{ skip_thread_containment: Config.get([:instance, :skip_thread_containment]) } Activity |> maybe_preload_objects(opts) |> maybe_preload_bookmarks(opts) |> maybe_preload_report_notes(opts) |> maybe_set_thread_muted_field(opts) |> maybe_order(opts) |> restrict_recipients(recipients, opts["user"]) |> restrict_replies(opts) |> restrict_tag(opts) |> restrict_tag_reject(opts) |> restrict_tag_all(opts) |> restrict_since(opts) |> restrict_local(opts) |> restrict_actor(opts) |> restrict_type(opts) |> restrict_state(opts) |> restrict_favorited_by(opts) |> restrict_blocked(restrict_blocked_opts) |> restrict_muted(restrict_muted_opts) |> restrict_media(opts) |> restrict_visibility(opts) |> restrict_thread_visibility(opts, config) |> restrict_reblogs(opts) |> restrict_pinned(opts) |> restrict_muted_reblogs(restrict_muted_reblogs_opts) |> restrict_instance(opts) |> Activity.restrict_deactivated_users() |> exclude_poll_votes(opts) |> exclude_chat_messages(opts) |> exclude_invisible_actors(opts) |> exclude_visibility(opts) end def fetch_activities(recipients, opts \\ %{}, pagination \\ :keyset) do list_memberships = Pleroma.List.memberships(opts["user"]) fetch_activities_query(recipients ++ list_memberships, opts) |> Pagination.fetch_paginated(opts, pagination) |> Enum.reverse() |> maybe_update_cc(list_memberships, opts["user"]) end @doc """ Fetch favorites activities of user with order by sort adds to favorites """ @spec fetch_favourites(User.t(), map(), Pagination.type()) :: list(Activity.t()) def fetch_favourites(user, params \\ %{}, pagination \\ :keyset) do user.ap_id |> Activity.Queries.by_actor() |> Activity.Queries.by_type("Like") |> Activity.with_joined_object() |> Object.with_joined_activity() |> select([_like, object, activity], %{activity | object: object}) |> order_by([like, _, _], desc_nulls_last: like.id) |> Pagination.fetch_paginated( Map.merge(params, %{"skip_order" => true}), pagination, :object_activity ) end defp maybe_update_cc(activities, list_memberships, %User{ap_id: user_ap_id}) when is_list(list_memberships) and length(list_memberships) > 0 do Enum.map(activities, fn %{data: %{"bcc" => bcc}} = activity when is_list(bcc) and length(bcc) > 0 -> if Enum.any?(bcc, &(&1 in list_memberships)) do update_in(activity.data["cc"], &[user_ap_id | &1]) else activity end activity -> activity end) end defp maybe_update_cc(activities, _, _), do: activities def fetch_activities_bounded_query(query, recipients, recipients_with_public) do from(activity in query, where: fragment("? && ?", activity.recipients, ^recipients) or (fragment("? && ?", activity.recipients, ^recipients_with_public) and ^Constants.as_public() in activity.recipients) ) end def fetch_activities_bounded( recipients, recipients_with_public, opts \\ %{}, pagination \\ :keyset ) do fetch_activities_query([], opts) |> fetch_activities_bounded_query(recipients, recipients_with_public) |> Pagination.fetch_paginated(opts, pagination) |> Enum.reverse() end @spec upload(Upload.source(), keyword()) :: {:ok, Object.t()} | {:error, any()} def upload(file, opts \\ []) do with {:ok, data} <- Upload.store(file, opts) do obj_data = Maps.put_if_present(data, "actor", opts[:actor]) Repo.insert(%Object{data: obj_data}) end end @spec get_actor_url(any()) :: binary() | nil defp get_actor_url(url) when is_binary(url), do: url defp get_actor_url(%{"href" => href}) when is_binary(href), do: href defp get_actor_url(url) when is_list(url) do url |> List.first() |> get_actor_url() end defp get_actor_url(_url), do: nil defp object_to_user_data(data) do avatar = data["icon"]["url"] && %{ "type" => "Image", "url" => [%{"href" => data["icon"]["url"]}] } banner = data["image"]["url"] && %{ "type" => "Image", "url" => [%{"href" => data["image"]["url"]}] } fields = data |> Map.get("attachment", []) |> Enum.filter(fn %{"type" => t} -> t == "PropertyValue" end) |> Enum.map(fn fields -> Map.take(fields, ["name", "value"]) end) emojis = data |> Map.get("tag", []) |> Enum.filter(fn %{"type" => "Emoji"} -> true _ -> false end) |> Enum.reduce(%{}, fn %{"icon" => %{"url" => url}, "name" => name}, acc -> Map.put(acc, String.trim(name, ":"), url) end) locked = data["manuallyApprovesFollowers"] || false data = Transmogrifier.maybe_fix_user_object(data) discoverable = data["discoverable"] || false invisible = data["invisible"] || false actor_type = data["type"] || "Person" public_key = if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do data["publicKey"]["publicKeyPem"] else nil end shared_inbox = if is_map(data["endpoints"]) && is_binary(data["endpoints"]["sharedInbox"]) do data["endpoints"]["sharedInbox"] else nil end user_data = %{ ap_id: data["id"], uri: get_actor_url(data["url"]), ap_enabled: true, banner: banner, fields: fields, emoji: emojis, locked: locked, discoverable: discoverable, invisible: invisible, avatar: avatar, name: data["name"], follower_address: data["followers"], following_address: data["following"], bio: data["summary"], actor_type: actor_type, also_known_as: Map.get(data, "alsoKnownAs", []), public_key: public_key, inbox: data["inbox"], shared_inbox: shared_inbox } # nickname can be nil because of virtual actors user_data = if data["preferredUsername"] do Map.put( user_data, :nickname, "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}" ) else Map.put(user_data, :nickname, nil) end {:ok, user_data} end def fetch_follow_information_for_user(user) do with {:ok, following_data} <- Fetcher.fetch_and_contain_remote_object_from_id(user.following_address), {:ok, hide_follows} <- collection_private(following_data), {:ok, followers_data} <- Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address), {:ok, hide_followers} <- collection_private(followers_data) do {:ok, %{ hide_follows: hide_follows, follower_count: normalize_counter(followers_data["totalItems"]), following_count: normalize_counter(following_data["totalItems"]), hide_followers: hide_followers }} else {:error, _} = e -> e e -> {:error, e} end end defp normalize_counter(counter) when is_integer(counter), do: counter defp normalize_counter(_), do: 0 def maybe_update_follow_information(user_data) do with {:enabled, true} <- {:enabled, Config.get([:instance, :external_user_synchronization])}, {_, true} <- {:user_type_check, user_data[:type] in ["Person", "Service"]}, {_, true} <- {:collections_available, !!(user_data[:following_address] && user_data[:follower_address])}, {:ok, info} <- fetch_follow_information_for_user(user_data) do info = Map.merge(user_data[:info] || %{}, info) user_data |> Map.put(:info, info) else {:user_type_check, false} -> user_data {:collections_available, false} -> user_data {:enabled, false} -> user_data e -> Logger.error( "Follower/Following counter update for #{user_data.ap_id} failed.\n" <> inspect(e) ) user_data end end defp collection_private(%{"first" => %{"type" => type}}) when type in ["CollectionPage", "OrderedCollectionPage"], do: {:ok, false} defp collection_private(%{"first" => first}) do with {:ok, %{"type" => type}} when type in ["CollectionPage", "OrderedCollectionPage"] <- Fetcher.fetch_and_contain_remote_object_from_id(first) do {:ok, false} else {:error, {:ok, %{status: code}}} when code in [401, 403] -> {:ok, true} {:error, _} = e -> e e -> {:error, e} end end defp collection_private(_data), do: {:ok, true} def user_data_from_user_object(data) do with {:ok, data} <- MRF.filter(data), {:ok, data} <- object_to_user_data(data) do {:ok, data} else e -> {:error, e} end end def fetch_and_prepare_user_from_ap_id(ap_id) do with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id), {:ok, data} <- user_data_from_user_object(data), data <- maybe_update_follow_information(data) do {:ok, data} else {:error, "Object has been deleted"} = e -> Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}") {:error, e} e -> Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}") {:error, e} end end def make_user_from_ap_id(ap_id) do user = User.get_cached_by_ap_id(ap_id) if user && !User.ap_enabled?(user) do Transmogrifier.upgrade_user_from_ap_id(ap_id) else with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do if user do user |> User.remote_user_changeset(data) |> User.update_and_set_cache() else data |> User.remote_user_changeset() |> Repo.insert() |> User.set_cache() end else e -> {:error, e} end end end def make_user_from_nickname(nickname) do with {:ok, %{"ap_id" => ap_id}} when not is_nil(ap_id) <- WebFinger.finger(nickname) do make_user_from_ap_id(ap_id) else _e -> {:error, "No AP id in WebFinger"} end end # filter out broken threads def contain_broken_threads(%Activity{} = activity, %User{} = user) do entire_thread_visible_for_user?(activity, user) end # do post-processing on a specific activity def contain_activity(%Activity{} = activity, %User{} = user) do contain_broken_threads(activity, user) end def fetch_direct_messages_query do Activity |> restrict_type(%{"type" => "Create"}) |> restrict_visibility(%{visibility: "direct"}) |> order_by([activity], asc: activity.id) end end