Compare commits

...

40 commits

Author SHA1 Message Date
29d3be800e Merge branch 'develop' of https://akkoma.dev/AkkomaGang/akkoma into akko.wtf 2025-02-22 12:04:44 -05:00
355263858c Merge pull request 'Expose Port IO stats via Prometheus' (#869) from Oneric/akkoma:io-telemetry into develop
Reviewed-on: AkkomaGang/akkoma#869
2025-02-21 15:28:09 +00:00
4c41f8c286 Merge pull request 'Improve stat queries and ReceiverWorker logic' (#862) from Oneric/akkoma:perf_tweaks_stats+jobs into develop
Reviewed-on: AkkomaGang/akkoma#862
2025-02-14 19:22:35 +00:00
Oneric
f0a99b4595 article_note_validator: fix handling of Mastodon-style replies collections
The first collection page is (sometimes?) inlined
which caused crashes when attempting to log the fetch failure.
But there’s no need to fetch and we can treat it like the other inline collection
2025-02-14 18:49:51 +01:00
Oneric
a1c841a122 federation.md: list FEP-dc88 formatting mathematics
Implemented by AkkomaGang/akkoma#642
2025-02-14 18:49:51 +01:00
Oneric
1b09b9fc22 static_fe: fix HTML quotation for upload alt text
Reported by riley on IRC
2025-02-14 18:49:51 +01:00
Oneric
46148c0825 Don't return garbage on failed collection fetches
And for now treat partial fetches as a success, since for all
current users partial collection data is better than no data at all.

If an error occurred while fetching a page, this previously
returned a bogus {:ok, {:error, _}} success, causing the error
to be attached to the object as an reply list subsequently
leading to the whole post getting rejected during validation.

Also the pinned collection caller did not actually handle
the preexisting error case resulting in process crashes.
2025-02-14 18:49:51 +01:00
Oneric
4701aa2a38 receiver_worker: log processes crashes
Oban cataches crashes to handle job failure and retry,
thus it never bubbles up all the way and nothing is logged by default.
For better debugging, catch and log any crashes.
2025-02-14 18:46:19 +01:00
Oneric
fb3de8045a Expose Port IO stats via Prometheus 2025-01-27 20:00:30 +01:00
Oneric
8fa51700d4 changelog: summarise preceeding perf tweaks 2025-01-07 20:27:28 +01:00
Oneric
2ddff7e386 transmogrifier: gracefully ignore Delete of unknown objects
It's quite common to receive spurious Deletes,
so we neither want to waste resources on retrying
nor spam "invalid AP" logs
2025-01-07 20:27:28 +01:00
Oneric
cd8e6a4235 transmogrifier: gracefully ignore duplicated object deletes
The object lookup is later repeated in the validator, but due to
caching shouldn't incur any noticeable performance impact.
It’s actually preferable to check here, since it avoids the otherwise
occuring user lookup and overhead from starting and aborting a
transaction
2025-01-07 20:27:28 +01:00
Oneric
ac2327c8fc transmogrfier: be more selective about Delete retry
If something else renders the Delete invalid,
there’s no point in retrying anyway
2025-01-07 20:27:28 +01:00
Oneric
92bf93a4f7 transmogrifier: avoid crashes on non-validation Delte errors
Happens e.g. for duplicated Deletes.
The remaining tombstone object no longer has an actor,
leading to an error response during side-effect handling.
2025-01-07 20:27:28 +01:00
Oneric
7ad5f8d3c0 object_validators: only query relevant table for object
Most of them actually only accept either activities or a
non-activity object later; querying both is then a waste
of resources and may create false positives.
2025-01-07 20:27:28 +01:00
Oneric
b0387dee14 Gracefully ignore Undo activities referring to unknown objects 2025-01-07 20:27:28 +01:00
Oneric
caa4fbe326 user: avoid database work on superfluous pin
The only thing this does is changing the updated_at field of the user.
Afaict this came to be because prior to pins federating this was split
into two functions, one of which created a changeset, the other applying
a given changeset. When this was merged the bits were just copied into
place.
2025-01-07 20:27:28 +01:00
Oneric
09736431e0 Don't spam logs about deleted users
User.get_or_fetch_by_(apid|nickname) are the only external users of fetch_and_prepare_user_from_ap_id,
thus there’s no point in duplicating logging, expecially not at error level.
Currently (duplicated) _not_found errors for users make up the bulk of my logs
and are created almost every second. Deleted users are a common occurence and not
worth logging outside of debug
2025-01-07 20:27:28 +01:00
Oneric
bcf3e101f6 rich_media: lower log level of update 2025-01-07 20:27:28 +01:00
Oneric
05bbdbf388 nodeinfo: lower log level of regular actions to debug 2025-01-07 20:27:28 +01:00
Oneric
2c75600532 federation/incoming: improve link_resolve retry decision
To facilitate this ObjectValidator.fetch_actor_and_object is adapted to
return an informative error. Otherwise we’d be unable to make an
informed decision on retrying or not later. There’s no point in
retrying to fetch MRF-blocked stuff or private posts for example.
2025-01-07 20:27:28 +01:00
Oneric
0cd4040db6 Error out earlier on missing mandatory reference
This is the only user of fetch_actor_and_object which previously just
always preteneded to be successful. For all the activity types handled
here, we absolutely need the referenced object to be able to process it
(other than Announce whether or not processing those activity types for
unknown remote objects is desirable in the first place is up for debate)

All other users of the similar fetch_actor already properly check success.

Note, this currently lumps all reolv failure reasons together,
so even e.g. boosts of MRF rejected posts will still exhaust all
retries. The following commit improves on this.
2025-01-07 20:27:28 +01:00
Oneric
0ba5c3649d federator: don't nest {:error, _} tuples
It makes decisions based on error sources harder since all possible
nesting levels need to be checked for. As shown by the return values
handled in the receiver worker something else still nests those,
but this is a first start.
2025-01-07 20:27:28 +01:00
Oneric
8e5defe6ca stats: estimate remote user count
This value is currently only used by Prometheus metrics
but (after optimisng the peer query inthe preceeding commit)
the most costly part of instance stats.
2025-01-07 20:27:28 +01:00
Oneric
138b1aea2f stats: use cheaper peers query
This query is one of the top cost offenders during an instances
lifetime. For small instances it was shown to take up 30-50% percent of
the total database query time, while for bigger isntaces it still held
a spot in the top 3 — alost as or even more expensive overall than
timeline queries!

The good news is, there’s a cheaper way using the instance table:
no need to process each entry, no need to filter NULLs
and no need to dedupe. EXPLAIN estimates the cost of the
old query as 13272.39 and the cost of the new query as 395.74
for me; i.e. a 33-fold reduction.

Results can slightly differ. E.g. we might have an old user
predating the instance tables existence and no interaction with since
or no instance table entry due to failure to query nodeinfo.
Conversely, we might have an instance entry but all known users got
deleted since.
However, this seems unproblematic in practice
and well worth the perf improvment.

Given the previous query didn’t exclude unreachable instances
neither does the new query.
2025-01-07 20:27:28 +01:00
Oneric
8b5183cb74 stats: fix stat spec 2025-01-07 20:27:28 +01:00
Oneric
cbb0d4b0a8 receiver_worker: log unecpected errors
This can't handle process crash errors
but i hope those get a stacktrace logged by default
2025-01-07 20:27:28 +01:00
Oneric
be2c857845 receiver_worker: don't reattempt invalid documents
Ideally we’d like to split this up more and count most invalid documents
as an error, but silently drop e.g. Deletes for unknown objects.
However, this is hard to extract from the changeset and jobs canceled
with :discard don’t count as exceptions and I’m not aware of a idiomatic
way to cancel further retries while retaining the exception status.

Thus at least keep a log, but since superfluous "Delete"s
seem kinda frequent, don't log at error, only info level.
2025-01-07 20:27:28 +01:00
Oneric
9f4d3a936f cosmetic/receiver_worker: reformat error cases
The next commit adds a multi-statement case
and then mix format will enforce this anyway
2025-01-07 20:27:28 +01:00
Oneric
f9724b5879 Don’t reattempt insertion of already known objects
Might happen if we receive e.g. a Like before the Note arrives
in our inbox and we thus already queried the Note ourselves.
2025-01-07 20:27:27 +01:00
Oneric
041dedb86e Don't reattempt RichMediaBackfill by default
Retrying seems unlikely to be helpful:
 - if it timed out, chances are the short delay before reattempting
   won't give the remote enough time to recover from its outage and
   a longer delay makes the job pointless as users likely scrolled
   further already. (Likely this is already be the case after the
   first 20s timeout)
 - if the remote data is so borked we couldn't even parse it far enough
   for an "invalid metadata" error, chances are it will remain borked
   upon reattempt
2025-01-07 20:27:27 +01:00
Oneric
280652651c rich_media: don't reattempt parsing on rejected URLs 2025-01-07 20:27:27 +01:00
Oneric
92544e8f99 Don't enqueue a plethora of unnecessary NodeInfoFetcher jobs
There were two issues leading to needles effort:
Most importnatly, the use of AP IDs as "source_url" meant multiple
simultaneous jobs got scheduled for the same instance even with the
default unique settings.
Also jobs were scheduled uncontionally for each processed AP object
meaning we incured oberhead from managing Oban jobs even if we knew it
wasn't necessary. By comparison the single query to check if an update
is needed should be cheaper overall.
2025-01-07 20:27:27 +01:00
Oneric
d283ac52c3 Don't create noop SearchIndexingWorker jobs for passive index 2025-01-07 20:27:27 +01:00
Oneric
ed4019e7a3 workers: make custom filtering ahead of enqueue possible 2025-01-07 20:27:27 +01:00
Oneric
25d24cc5f6 validators/add_remove: don't crash on failure to resolve reference
It allows for informed error handling and retry/discard job
decisions lateron which a future commit will add.
2025-01-07 20:27:27 +01:00
Oneric
ead44c6671 federator: don't fetch the user for no reason
The return value is never used here; later stages which actually need it
fetch the user themselves and it doesn't matter wheter we wait for the
fech here or later (if needed at all).

Even more, this early fetch always fails if the user was already deleted
or never known to begin with, but we get something referencing it; e.g.
the very Delete action carrying out the user deletion.
This prevents processing of the Delete, but before that it will be
reattempted several times, each time attempring to fetch the
non-existing profile, wasting resources.
2025-01-07 20:27:27 +01:00
Oneric
4859f38624 add_remove_validator: limit refetch rate to 1 per 5s
This matches the maximum_age used when processing Move activities
2025-01-07 20:27:27 +01:00
Oneric
0f4a7a185f Drop ap_enabled indicator from atom feeds 2025-01-07 20:27:27 +01:00
Haelwenn (lanodan) Monnier
c17681ae1e Purge obsolete ap_enabled indicator
It was used to migrate OStatus connections to ActivityPub if possible,
but support for OStatus was long since dropped, all new actors always AP
and if anything wasn't migrated before, their instance is already marked
as unreachable anyway.

The associated logic was also buggy in several ways and deleted users
got set to ap_enabled=false also causing some issues.

This patch is a pretty direct port of the original Pleroma MR;
follow-up commits will further fix and clean up remaining issues.
Changes made (other than trivial merge conflict resolutions):
  - converted CHANGELOG format
  - adapted migration id for Akkoma’s timeline
  - removed ap_enabled from additional tests

Ported-from: https://git.pleroma.social/pleroma/pleroma/-/merge_requests/3880
2025-01-07 20:27:26 +01:00
51 changed files with 480 additions and 334 deletions

View file

@ -6,6 +6,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## Unreleased
## Added
## Fixed
## Changed
- Dropped obsolete `ap_enabled` indicator from user table and associated buggy logic
- The remote user count in prometheus metrics is now an estimate instead of an exact number
since the latter proved unreasonably costly to obtain for a merely nice-to-have statistic
- Various other tweaks improving stat query performance and avoiding unecessary work on received AP documents
## 2025.01.01
Hotfix: Federation could break if a null value found its way into `should_federate?\1`

View file

@ -10,6 +10,7 @@
## Supported FEPs
- [FEP-67ff: FEDERATION](https://codeberg.org/fediverse/fep/src/branch/main/fep/67ff/fep-67ff.md)
- [FEP-dc88: Formatting Mathematics](https://codeberg.org/fediverse/fep/src/branch/main/fep/dc88/fep-dc88.md)
- [FEP-f1d5: NodeInfo in Fediverse Software](https://codeberg.org/fediverse/fep/src/branch/main/fep/f1d5/fep-f1d5.md)
- [FEP-fffd: Proxy Objects](https://codeberg.org/fediverse/fep/src/branch/main/fep/fffd/fep-fffd.md)

View file

@ -602,7 +602,7 @@ config :pleroma, :workers,
federator_incoming: 5,
federator_outgoing: 5,
search_indexing: 2,
rich_media_backfill: 3
rich_media_backfill: 1
],
timeout: [
activity_expiration: :timer.seconds(5),

View file

@ -14,7 +14,7 @@ defmodule Akkoma.Collections.Fetcher do
@spec fetch_collection(String.t() | map()) :: {:ok, [Pleroma.Object.t()]} | {:error, any()}
def fetch_collection(ap_id) when is_binary(ap_id) do
with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
{:ok, objects_from_collection(page)}
partial_as_success(objects_from_collection(page))
else
e ->
Logger.error("Could not fetch collection #{ap_id} - #{inspect(e)}")
@ -24,9 +24,12 @@ defmodule Akkoma.Collections.Fetcher do
def fetch_collection(%{"type" => type} = page)
when type in ["Collection", "OrderedCollection", "CollectionPage", "OrderedCollectionPage"] do
{:ok, objects_from_collection(page)}
partial_as_success(objects_from_collection(page))
end
defp partial_as_success({:partial, items}), do: {:ok, items}
defp partial_as_success(res), do: res
defp items_in_page(%{"type" => type, "orderedItems" => items})
when is_list(items) and type in ["OrderedCollection", "OrderedCollectionPage"],
do: items
@ -53,11 +56,11 @@ defmodule Akkoma.Collections.Fetcher do
fetch_page_items(id)
end
defp objects_from_collection(_page), do: []
defp objects_from_collection(_page), do: {:ok, []}
defp fetch_page_items(id, items \\ []) do
if Enum.count(items) >= Config.get([:activitypub, :max_collection_objects]) do
items
{:ok, items}
else
with {:ok, page} <- Fetcher.fetch_and_contain_remote_object_from_id(id) do
objects = items_in_page(page)
@ -65,18 +68,22 @@ defmodule Akkoma.Collections.Fetcher do
if Enum.count(objects) > 0 do
maybe_next_page(page, items ++ objects)
else
items
{:ok, items}
end
else
{:error, :not_found} ->
items
{:ok, items}
{:error, :forbidden} ->
items
{:ok, items}
{:error, error} ->
Logger.error("Could not fetch page #{id} - #{inspect(error)}")
{:error, error}
case items do
[] -> {:error, error}
_ -> {:partial, items}
end
end
end
end
@ -85,5 +92,5 @@ defmodule Akkoma.Collections.Fetcher do
fetch_page_items(id, items)
end
defp maybe_next_page(_, items), do: items
defp maybe_next_page(_, items), do: {:ok, items}
end

View file

@ -158,6 +158,14 @@ defmodule Pleroma.Instances.Instance do
NaiveDateTime.diff(now, metadata_updated_at) > 86_400
end
def needs_update(%URI{host: host}) do
with %Instance{} = instance <- Repo.get_by(Instance, %{host: host}) do
needs_update(instance)
else
_ -> true
end
end
def local do
%Instance{
host: Pleroma.Web.Endpoint.host(),
@ -180,7 +188,7 @@ defmodule Pleroma.Instances.Instance do
defp do_update_metadata(%URI{host: host} = uri, existing_record) do
if existing_record do
if needs_update(existing_record) do
Logger.info("Updating metadata for #{host}")
Logger.debug("Updating metadata for #{host}")
favicon = scrape_favicon(uri)
nodeinfo = scrape_nodeinfo(uri)
@ -199,7 +207,7 @@ defmodule Pleroma.Instances.Instance do
favicon = scrape_favicon(uri)
nodeinfo = scrape_nodeinfo(uri)
Logger.info("Creating metadata for #{host}")
Logger.debug("Creating metadata for #{host}")
%Instance{}
|> changeset(%{

View file

@ -215,6 +215,11 @@ defmodule Pleroma.Object do
end
end
# Intentionally accepts non-Object arguments!
@spec is_tombstone_object?(term()) :: boolean()
def is_tombstone_object?(%Object{data: %{"type" => "Tombstone"}}), do: true
def is_tombstone_object?(_), do: false
def make_tombstone(%Object{data: %{"id" => id, "type" => type}}, deleted \\ DateTime.utc_now()) do
%ObjectTombstone{
id: id,

View file

@ -40,12 +40,6 @@ defmodule Pleroma.Search.DatabaseSearch do
end
end
@impl true
def add_to_index(_activity), do: nil
@impl true
def remove_from_index(_object), do: nil
def maybe_restrict_author(query, %User{} = author) do
Activity.Queries.by_author(query, author)
end

View file

@ -14,4 +14,6 @@ defmodule Pleroma.Search.SearchBackend do
from index.
"""
@callback remove_from_index(object :: Pleroma.Object.t()) :: {:ok, any()} | {:error, any()}
@optional_callbacks add_to_index: 1, remove_from_index: 1
end

View file

@ -10,6 +10,7 @@ defmodule Pleroma.Stats do
alias Pleroma.CounterCache
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Instances.Instance
@interval :timer.seconds(300)
@ -39,7 +40,8 @@ defmodule Pleroma.Stats do
@spec get_stats() :: %{
domain_count: non_neg_integer(),
status_count: non_neg_integer(),
user_count: non_neg_integer()
user_count: non_neg_integer(),
remote_user_count: non_neg_integer()
}
def get_stats do
%{stats: stats} = GenServer.call(__MODULE__, :get_state)
@ -60,41 +62,39 @@ defmodule Pleroma.Stats do
stats: %{
domain_count: non_neg_integer(),
status_count: non_neg_integer(),
user_count: non_neg_integer()
user_count: non_neg_integer(),
remote_user_count: non_neg_integer()
}
}
def calculate_stat_data do
# instances table has an unique constraint on the host column
peers =
from(
u in User,
select: fragment("distinct split_part(?, '@', 2)", u.nickname),
where: u.local != ^true
i in Instance,
select: i.host
)
|> Repo.all()
|> Enum.filter(& &1)
domain_count = Enum.count(peers)
status_count = Repo.aggregate(User.Query.build(%{local: true}), :sum, :note_count)
users_query =
# there are few enough local users for postgres to use an index scan
# (also here an exact count is a bit more important)
user_count =
from(u in User,
where: u.is_active == true,
where: u.local == true,
where: not is_nil(u.nickname),
where: not u.invisible
)
|> Repo.aggregate(:count, :id)
remote_users_query =
from(u in User,
where: u.is_active == true,
where: u.local == false,
where: not is_nil(u.nickname),
where: not u.invisible
)
user_count = Repo.aggregate(users_query, :count, :id)
remote_user_count = Repo.aggregate(remote_users_query, :count, :id)
# but mostly numerous remote users leading to a full a full table scan
# (ecto currently doesn't allow building queries without explicit table)
%{rows: [[remote_user_count]]} =
"SELECT estimate_remote_user_count();"
|> Pleroma.Repo.query!()
%{
peers: peers,

View file

@ -127,7 +127,6 @@ defmodule Pleroma.User do
field(:domain_blocks, {:array, :string}, default: [])
field(:is_active, :boolean, default: true)
field(:no_rich_text, :boolean, default: false)
field(:ap_enabled, :boolean, default: false)
field(:is_moderator, :boolean, default: false)
field(:is_admin, :boolean, default: false)
field(:show_role, :boolean, default: true)
@ -473,7 +472,6 @@ defmodule Pleroma.User do
:shared_inbox,
:nickname,
:avatar,
:ap_enabled,
:banner,
:background,
:is_locked,
@ -1006,11 +1004,7 @@ defmodule Pleroma.User do
end
def maybe_direct_follow(%User{} = follower, %User{} = followed) do
if not ap_enabled?(followed) do
follow(follower, followed)
else
{:ok, follower, followed}
end
{:ok, follower, followed}
end
@doc "A mass follow for local users. Respects blocks in both directions but does not create activities."
@ -1826,7 +1820,6 @@ defmodule Pleroma.User do
confirmation_token: nil,
domain_blocks: [],
is_active: false,
ap_enabled: false,
is_moderator: false,
is_admin: false,
mastofe_settings: nil,
@ -2006,8 +1999,20 @@ defmodule Pleroma.User do
{%User{} = user, _} ->
{:ok, user}
e ->
{_, {:error, {:reject, :mrf}}} ->
Logger.debug("Rejected to fetch user due to MRF: #{ap_id}")
{:error, {:reject, :mrf}}
{_, {:error, :not_found}} ->
Logger.debug("User doesn't exist (anymore): #{ap_id}")
{:error, :not_found}
{_, {:error, e}} ->
Logger.error("Could not fetch user #{ap_id}, #{inspect(e)}")
{:error, e}
e ->
Logger.error("Unexpected error condition while fetching user #{ap_id}, #{inspect(e)}")
{:error, :not_found}
end
end
@ -2073,10 +2078,6 @@ defmodule Pleroma.User do
end
end
def ap_enabled?(%User{local: true}), do: true
def ap_enabled?(%User{ap_enabled: ap_enabled}), do: ap_enabled
def ap_enabled?(_), do: false
@doc "Gets or fetch a user by uri or nickname."
@spec get_or_fetch(String.t()) :: {:ok, User.t()} | {:error, String.t()}
def get_or_fetch("http://" <> _host = uri), do: get_or_fetch_by_ap_id(uri)
@ -2580,10 +2581,10 @@ defmodule Pleroma.User do
[pinned_objects: "You have already pinned the maximum number of statuses"]
end
end)
|> update_and_set_cache()
else
change(user)
{:ok, user}
end
|> update_and_set_cache()
end
@spec remove_pinned_object_id(User.t(), String.t()) :: {:ok, t()} | {:error, term()}

View file

@ -1626,7 +1626,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
%{
ap_id: data["id"],
uri: get_actor_url(data["url"]),
ap_enabled: true,
banner: normalize_image(data["image"]),
background: normalize_image(data["backgroundUrl"]),
fields: fields,
@ -1743,7 +1742,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
end
end
def fetch_and_prepare_user_from_ap_id(ap_id, additional \\ []) do
defp fetch_and_prepare_user_from_ap_id(ap_id, additional) do
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
{:valid, {:ok, _, _}} <- {:valid, UserValidator.validate(data, [])},
{:ok, data} <- user_data_from_user_object(data, additional) do
@ -1751,19 +1750,16 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
else
# If this has been deleted, only log a debug and not an error
{:error, {"Object has been deleted", _, _} = e} ->
Logger.debug("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
{:error, e}
Logger.debug("User was explicitly deleted #{ap_id}, #{inspect(e)}")
{:error, :not_found}
{:reject, reason} = e ->
Logger.debug("Rejected user #{ap_id}: #{inspect(reason)}")
{:reject, _reason} = e ->
{:error, e}
{:valid, reason} ->
Logger.debug("Data is not a valid user #{ap_id}: #{inspect(reason)}")
{:error, "Not a user"}
{:error, {:validate, reason}}
{:error, e} ->
Logger.error("Could not decode user at fetch #{ap_id}, #{inspect(e)}")
{:error, e}
end
end
@ -1801,7 +1797,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
else
e ->
Logger.error("Could not decode featured collection at fetch #{first}, #{inspect(e)}")
{:ok, %{}}
%{}
end
end
@ -1811,14 +1807,18 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
} = collection
)
when type in ["OrderedCollection", "Collection"] do
{:ok, objects} = Collections.Fetcher.fetch_collection(collection)
# Items can either be a map _or_ a string
objects
|> Map.new(fn
ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
%{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
end)
with {:ok, objects} <- Collections.Fetcher.fetch_collection(collection) do
# Items can either be a map _or_ a string
objects
|> Map.new(fn
ap_id when is_binary(ap_id) -> {ap_id, NaiveDateTime.utc_now()}
%{"id" => object_ap_id} -> {object_ap_id, NaiveDateTime.utc_now()}
end)
else
e ->
Logger.warning("Failed to fetch featured collection #{collection}, #{inspect(e)}")
%{}
end
end
def pin_data_from_featured_collection(obj) do
@ -1857,31 +1857,27 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
def make_user_from_ap_id(ap_id, additional \\ []) do
user = User.get_cached_by_ap_id(ap_id)
if user && !User.ap_enabled?(user) do
Transmogrifier.upgrade_user_from_ap_id(ap_id)
else
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
user =
if data.ap_id != ap_id do
User.get_cached_by_ap_id(data.ap_id)
else
user
end
if user do
user
|> User.remote_user_changeset(data)
|> User.update_and_set_cache()
|> tap(fn _ -> enqueue_pin_fetches(data) end)
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
user =
if data.ap_id != ap_id do
User.get_cached_by_ap_id(data.ap_id)
else
maybe_handle_clashing_nickname(data)
data
|> User.remote_user_changeset()
|> Repo.insert()
|> User.set_cache()
|> tap(fn _ -> enqueue_pin_fetches(data) end)
user
end
if user do
user
|> User.remote_user_changeset(data)
|> User.update_and_set_cache()
|> tap(fn _ -> enqueue_pin_fetches(data) end)
else
maybe_handle_clashing_nickname(data)
data
|> User.remote_user_changeset()
|> Repo.insert()
|> User.set_cache()
|> tap(fn _ -> enqueue_pin_fetches(data) end)
end
end
end

View file

@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidator do
alias Pleroma.EctoType.ActivityPub.ObjectValidators
alias Pleroma.Object
alias Pleroma.Object.Containment
alias Pleroma.Object.Fetcher
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ObjectValidators.AcceptRejectValidator
alias Pleroma.Web.ActivityPub.ObjectValidators.AddRemoveValidator
@ -253,9 +254,28 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidator do
end
def fetch_actor_and_object(object) do
fetch_actor(object)
Object.normalize(object["object"], fetch: true)
:ok
# Fetcher.fetch_object_from_id already first does a local db lookup
with {:ok, %User{}} <- fetch_actor(object),
{:ap_id, id} when is_binary(id) <-
{:ap_id, Pleroma.Web.ActivityPub.Utils.get_ap_id(object["object"])},
{:ok, %Object{}} <- Fetcher.fetch_object_from_id(id) do
:ok
else
{:ap_id, id} ->
{:error, {:validate, "Invalid AP id: #{inspect(id)}"}}
# if actor: late post from a previously unknown, deleted profile
# if object: private post we're not allowed to access
# (other HTTP replies might just indicate a temporary network failure though!)
{:error, e} when e in [:not_found, :forbidden] ->
{:error, :ignore}
{:error, _} = e ->
e
e ->
{:error, e}
end
end
defp for_each_history_item(

View file

@ -9,6 +9,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.AddRemoveValidator do
import Pleroma.Web.ActivityPub.ObjectValidators.CommonValidations
require Pleroma.Constants
require Logger
alias Pleroma.User
@ -27,14 +28,21 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.AddRemoveValidator do
end
def cast_and_validate(data) do
{:ok, actor} = User.get_or_fetch_by_ap_id(data["actor"])
with {_, {:ok, actor}} <- {:user, User.get_or_fetch_by_ap_id(data["actor"])},
{_, {:ok, actor}} <- {:feataddr, maybe_refetch_user(actor)} do
data
|> maybe_fix_data_for_mastodon(actor)
|> cast_data()
|> validate_data(actor)
else
{:feataddr, _} ->
{:error,
{:validate,
"Actor doesn't provide featured collection address to verify against: #{data["id"]}"}}
{:ok, actor} = maybe_refetch_user(actor)
data
|> maybe_fix_data_for_mastodon(actor)
|> cast_data()
|> validate_data(actor)
{:user, _} ->
{:error, :link_resolve_failed}
end
end
defp maybe_fix_data_for_mastodon(data, actor) do
@ -73,6 +81,9 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.AddRemoveValidator do
end
defp maybe_refetch_user(%User{ap_id: ap_id}) do
Pleroma.Web.ActivityPub.Transmogrifier.upgrade_user_from_ap_id(ap_id)
# If the user didn't expose a featured collection before,
# recheck now so we can verify perms for add/remove.
# But wait at least 5s to avoid rapid refetches in edge cases
User.get_or_fetch_by_ap_id(ap_id, maximum_age: 5)
end
end

View file

@ -71,7 +71,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.ArticleNotePageValidator do
defp fix_replies(%{"replies" => replies} = data) when is_list(replies), do: data
defp fix_replies(%{"replies" => %{"first" => first}} = data) do
defp fix_replies(%{"replies" => %{"first" => first}} = data) when is_binary(first) do
with {:ok, replies} <- Akkoma.Collections.Fetcher.fetch_collection(first) do
Map.put(data, "replies", replies)
else
@ -81,6 +81,10 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.ArticleNotePageValidator do
end
end
defp fix_replies(%{"replies" => %{"first" => %{"items" => replies}}} = data)
when is_list(replies),
do: Map.put(data, "replies", replies)
defp fix_replies(%{"replies" => %{"items" => replies}} = data) when is_list(replies),
do: Map.put(data, "replies", replies)

View file

@ -54,10 +54,14 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.CommonValidations do
def validate_object_presence(cng, options \\ []) do
field_name = Keyword.get(options, :field_name, :object)
allowed_types = Keyword.get(options, :allowed_types, false)
allowed_categories = Keyword.get(options, :allowed_object_categores, [:object, :activity])
cng
|> validate_change(field_name, fn field_name, object_id ->
object = Object.get_cached_by_ap_id(object_id) || Activity.get_by_ap_id(object_id)
object =
(:object in allowed_categories && Object.get_cached_by_ap_id(object_id)) ||
(:activity in allowed_categories && Activity.get_by_ap_id(object_id)) ||
nil
cond do
!object ->

View file

@ -61,7 +61,10 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.DeleteValidator do
|> validate_inclusion(:type, ["Delete"])
|> validate_delete_actor(:actor)
|> validate_modification_rights()
|> validate_object_or_user_presence(allowed_types: @deletable_types)
|> validate_object_or_user_presence(
allowed_types: @deletable_types,
allowed_object_categories: [:object]
)
|> add_deleted_activity_id()
end

View file

@ -129,7 +129,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.EmojiReactValidator do
|> validate_inclusion(:type, ["EmojiReact"])
|> validate_required([:id, :type, :object, :actor, :context, :to, :cc, :content])
|> validate_actor_presence()
|> validate_object_presence()
|> validate_object_presence(allowed_object_categories: [:object])
|> validate_emoji()
|> maybe_validate_tag_presence()
end

View file

@ -66,7 +66,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.LikeValidator do
|> validate_inclusion(:type, ["Like"])
|> validate_required([:id, :type, :object, :actor, :context, :to, :cc])
|> validate_actor_presence()
|> validate_object_presence()
|> validate_object_presence(allowed_object_categories: [:object])
|> validate_existing_like()
end

View file

@ -44,7 +44,7 @@ defmodule Pleroma.Web.ActivityPub.ObjectValidators.UndoValidator do
|> validate_inclusion(:type, ["Undo"])
|> validate_required([:id, :type, :object, :actor, :to, :cc])
|> validate_undo_actor(:actor)
|> validate_object_presence()
|> validate_object_presence(allowed_object_categories: [:activity])
|> validate_undo_rights()
end

View file

@ -219,7 +219,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
inboxes =
recipients
|> Enum.filter(&User.ap_enabled?/1)
|> Enum.map(fn actor -> actor.inbox end)
|> Enum.filter(fn inbox -> should_federate?(inbox) end)
|> Instances.filter_reachable()
@ -261,7 +260,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
json = Jason.encode!(data)
recipients(actor, activity)
|> Enum.filter(fn user -> User.ap_enabled?(user) end)
|> Enum.map(fn %User{} = user ->
determine_inbox(activity, user)
end)

View file

@ -21,7 +21,6 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.ActivityPub.ObjectValidators.CommonFixes
alias Pleroma.Web.Federator
alias Pleroma.Workers.TransmogrifierWorker
import Ecto.Query
@ -520,10 +519,22 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
defp handle_incoming_normalised(%{"type" => type} = data, _options)
when type in ~w{Like EmojiReact Announce Add Remove} do
with :ok <- ObjectValidator.fetch_actor_and_object(data),
with {_, :ok} <- {:link, ObjectValidator.fetch_actor_and_object(data)},
{:ok, activity, _meta} <- Pipeline.common_pipeline(data, local: false) do
{:ok, activity}
else
{:link, {:error, :ignore}} ->
{:error, :ignore}
{:link, {:error, {:validate, _}} = e} ->
e
{:link, {:error, {:reject, _}} = e} ->
e
{:link, _} ->
{:error, :link_resolve_failed}
e ->
{:error, e}
end
@ -545,22 +556,45 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
%{"type" => "Delete"} = data,
_options
) do
with {:ok, activity, _} <-
Pipeline.common_pipeline(data, local: false) do
oid_result = ObjectValidators.ObjectID.cast(data["object"])
with {_, {:ok, object_id}} <- {:object_id, oid_result},
object <- Object.get_cached_by_ap_id(object_id),
{_, false} <- {:tombstone, Object.is_tombstone_object?(object) && !data["actor"]},
{:ok, activity, _} <- Pipeline.common_pipeline(data, local: false) do
{:ok, activity}
else
{:error, {:validate, _}} = e ->
# Check if we have a create activity for this
with {:ok, object_id} <- ObjectValidators.ObjectID.cast(data["object"]),
%Activity{data: %{"actor" => actor}} <-
Activity.create_by_object_ap_id(object_id) |> Repo.one(),
# We have one, insert a tombstone and retry
{:ok, tombstone_data, _} <- Builder.tombstone(actor, object_id),
{:ok, _tombstone} <- Object.create(tombstone_data) do
handle_incoming(data)
{:object_id, _} ->
{:error, {:validate, "Invalid object id: #{data["object"]}"}}
{:tombstone, true} ->
{:error, :ignore}
{:error, {:validate, {:error, %Ecto.Changeset{errors: errors}}}} = e ->
if errors[:object] == {"can't find object", []} do
# Check if we have a create activity for this
# (e.g. from a db prune without --prune-activities)
# We'd still like to process side effects so insert a fake tombstone and retry
# (real tombstones from Object.delete do not have an actor field)
with {:ok, object_id} <- ObjectValidators.ObjectID.cast(data["object"]),
{_, %Activity{data: %{"actor" => actor}}} <-
{:create, Activity.create_by_object_ap_id(object_id) |> Repo.one()},
{:ok, tombstone_data, _} <- Builder.tombstone(actor, object_id),
{:ok, _tombstone} <- Object.create(tombstone_data) do
handle_incoming(data)
else
{:create, _} -> {:error, :ignore}
_ -> e
end
else
_ -> e
e
end
{:error, _} = e ->
e
e ->
{:error, e}
end
end
@ -593,6 +627,20 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
when type in ["Like", "EmojiReact", "Announce", "Block"] do
with {:ok, activity, _} <- Pipeline.common_pipeline(data, local: false) do
{:ok, activity}
else
{:error, {:validate, {:error, %Ecto.Changeset{errors: errors}}}} = e ->
# If we never saw the activity being undone, no need to do anything.
# Inspectinging the validation error content is a bit akward, but looking up the Activity
# ahead of time here would be too costly since Activity queries are not cached
# and there's no way atm to pass the retrieved result along along
if errors[:object] == {"can't find object", []} do
{:error, :ignore}
else
e
end
e ->
e
end
end
@ -1007,47 +1055,6 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
defp strip_internal_tags(object), do: object
def perform(:user_upgrade, user) do
# we pass a fake user so that the followers collection is stripped away
old_follower_address = User.ap_followers(%User{nickname: user.nickname})
from(
a in Activity,
where: ^old_follower_address in a.recipients,
update: [
set: [
recipients:
fragment(
"array_replace(?,?,?)",
a.recipients,
^old_follower_address,
^user.follower_address
)
]
]
)
|> Repo.update_all([])
end
def upgrade_user_from_ap_id(ap_id) do
with %User{local: false} = user <- User.get_cached_by_ap_id(ap_id),
{:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id),
{:ok, user} <- update_user(user, data) do
ActivityPub.enqueue_pin_fetches(user)
TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
{:ok, user}
else
%User{} = user -> {:ok, user}
e -> e
end
end
defp update_user(user, data) do
user
|> User.remote_user_changeset(data)
|> User.update_and_set_cache()
end
def maybe_fix_user_url(%{"url" => url} = data) when is_map(url) do
Map.put(data, "url", url["href"])
end

View file

@ -6,7 +6,6 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.Activity
alias Pleroma.Object.Containment
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
@ -92,8 +91,7 @@ defmodule Pleroma.Web.Federator do
# NOTE: we use the actor ID to do the containment, this is fine because an
# actor shouldn't be acting on objects outside their own AP server.
with {_, {:ok, _user}} <- {:actor, ap_enabled_actor(actor)},
nil <- Activity.normalize(params["id"]),
with nil <- Activity.normalize(params["id"]),
{_, :ok} <-
{:correct_origin?, Containment.contain_origin_from_id(actor, params)},
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
@ -119,17 +117,11 @@ defmodule Pleroma.Web.Federator do
e ->
# Just drop those for now
Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end)
{:error, e}
end
end
def ap_enabled_actor(id) do
user = User.get_cached_by_ap_id(id)
if User.ap_enabled?(user) do
{:ok, user}
else
ActivityPub.make_user_from_ap_id(id)
case e do
{:error, _} -> e
_ -> {:error, e}
end
end
end
end

View file

@ -57,6 +57,10 @@ defmodule Pleroma.Web.RichMedia.Backfill do
Logger.debug("Rich media error for #{url}: :content_type is #{type}")
negative_cache(url_hash, :timer.minutes(30))
{:error, {:url, reason}} ->
Logger.debug("Rich media error for #{url}: refusing URL #{inspect(reason)}")
negative_cache(url_hash, :timer.minutes(180))
e ->
Logger.debug("Rich media error for #{url}: #{inspect(e)}")
{:error, e}
@ -82,7 +86,7 @@ defmodule Pleroma.Web.RichMedia.Backfill do
end
defp stream_update(%{"activity_id" => activity_id}) do
Logger.info("Rich media backfill: streaming update for activity #{activity_id}")
Logger.debug("Rich media backfill: streaming update for activity #{activity_id}")
Pleroma.Activity.get_by_id(activity_id)
|> Pleroma.Activity.normalize()

View file

@ -16,12 +16,13 @@ defmodule Pleroma.Web.RichMedia.Parser do
@spec parse(String.t()) :: {:ok, map()} | {:error, any()}
def parse(url) do
with {_, true} <- {:config, @config_impl.get([:rich_media, :enabled])},
:ok <- validate_page_url(url),
{_, :ok} <- {:url, validate_page_url(url)},
{:ok, data} <- parse_url(url) do
data = Map.put(data, "url", url)
{:ok, data}
else
{:config, _} -> {:error, :rich_media_disabled}
{:url, {:error, reason}} -> {:error, {:url, reason}}
e -> e
end
end
@ -62,7 +63,7 @@ defmodule Pleroma.Web.RichMedia.Parser do
|> Map.new()
end
@spec validate_page_url(URI.t() | binary()) :: :ok | :error
@spec validate_page_url(URI.t() | binary()) :: :ok | {:error, term()}
defp validate_page_url(page_url) when is_binary(page_url) do
validate_tld = @config_impl.get([Pleroma.Formatter, :validate_tld])
@ -74,20 +75,20 @@ defmodule Pleroma.Web.RichMedia.Parser do
defp validate_page_url(%URI{host: host, scheme: "https"}) do
cond do
Linkify.Parser.ip?(host) ->
:error
{:error, :ip}
host in @config_impl.get([:rich_media, :ignore_hosts], []) ->
:error
{:error, :ignore_hosts}
get_tld(host) in @config_impl.get([:rich_media, :ignore_tld], []) ->
:error
{:error, :ignore_tld}
true ->
:ok
end
end
defp validate_page_url(_), do: :error
defp validate_page_url(_), do: {:error, "scheme mismatch"}
defp parse_uri(true, url) do
url
@ -95,7 +96,7 @@ defmodule Pleroma.Web.RichMedia.Parser do
|> validate_page_url
end
defp parse_uri(_, _), do: :error
defp parse_uri(_, _), do: {:error, "not an URL"}
defp get_tld(host) do
host

View file

@ -208,8 +208,10 @@ defmodule Pleroma.Web.Telemetry do
dist_metrics ++ vm_metrics
end
defp common_metrics do
defp common_metrics(byte_unit \\ :byte) do
[
last_value("vm.portio.in.total", unit: {:byte, byte_unit}),
last_value("vm.portio.out.total", unit: {:byte, byte_unit}),
last_value("pleroma.local_users.total"),
last_value("pleroma.domains.total"),
last_value("pleroma.local_statuses.total"),
@ -220,14 +222,22 @@ defmodule Pleroma.Web.Telemetry do
def prometheus_metrics,
do: common_metrics() ++ distribution_metrics() ++ summary_fallback_metrics()
def live_dashboard_metrics, do: common_metrics() ++ summary_metrics(:megabyte)
def live_dashboard_metrics, do: common_metrics(:megabyte) ++ summary_metrics(:megabyte)
defp periodic_measurements do
[
{__MODULE__, :io_stats, []},
{__MODULE__, :instance_stats, []}
]
end
def io_stats do
# All IO done via erlang ports, i.e. mostly network but also e.g. fasthtml_workers. NOT disk IO!
{{:input, input}, {:output, output}} = :erlang.statistics(:io)
:telemetry.execute([:vm, :portio, :in], %{total: input}, %{})
:telemetry.execute([:vm, :portio, :out], %{total: output}, %{})
end
def instance_stats do
stats = Stats.get_stats()
:telemetry.execute([:pleroma, :local_users], %{total: stats.user_count}, %{})

View file

@ -11,7 +11,4 @@
<%= if User.banner_url(@user) do %>
<link rel="header" href="<%= User.banner_url(@user) %>"/>
<% end %>
<%= if @user.local do %>
<ap_enabled>true</ap_enabled>
<% end %>
</author>

View file

@ -11,7 +11,4 @@
<%= if User.banner_url(@user) do %>
<link rel="header"><%= User.banner_url(@user) %></link>
<% end %>
<%= if @user.local do %>
<ap_enabled>true</ap_enabled>
<% end %>
</managingEditor>

View file

@ -8,9 +8,6 @@
<%= if User.banner_url(@actor) do %>
<link rel="header" href="<%= User.banner_url(@actor) %>"/>
<% end %>
<%= if @actor.local do %>
<ap_enabled>true</ap_enabled>
<% end %>
<poco:preferredUsername><%= @actor.nickname %></poco:preferredUsername>
<poco:displayName><%= @actor.name %></poco:displayName>

View file

@ -1,4 +1,4 @@
<a class="attachment" href="<%= @url %>" alt=<%= @name %>" title="<%= @name %>">
<a class="attachment" href="<%= @url %>" alt="<%= @name %>" title="<%= @name %>">
<%= if @nsfw do %>
<div class="nsfw-banner">
<div><%= gettext("Hover to show content") %></div>

View file

@ -1,9 +1,30 @@
defmodule Pleroma.Workers.NodeInfoFetcherWorker do
use Pleroma.Workers.WorkerHelper, queue: "nodeinfo_fetcher"
use Pleroma.Workers.WorkerHelper,
queue: "nodeinfo_fetcher",
unique: [
keys: [:op, :source_url],
# old jobs still get pruned after a short while
period: :infinity,
states: Oban.Job.states()
]
alias Oban.Job
alias Pleroma.Instances.Instance
def enqueue(op, %{"source_url" => ap_id} = params, worker_args) do
# reduce to base url to avoid enqueueing unneccessary duplicates
domain =
ap_id
|> URI.parse()
|> URI.merge("/")
if Instance.needs_update(domain) do
do_enqueue(op, %{params | "source_url" => URI.to_string(domain)}, worker_args)
else
:ok
end
end
@impl Oban.Worker
def perform(%Job{
args: %{"op" => "process", "source_url" => domain}

View file

@ -3,6 +3,8 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ReceiverWorker do
require Logger
alias Pleroma.Web.Federator
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
@ -12,10 +14,49 @@ defmodule Pleroma.Workers.ReceiverWorker do
with {:ok, res} <- Federator.perform(:incoming_ap_doc, params) do
{:ok, res}
else
{:error, :origin_containment_failed} -> {:discard, :origin_containment_failed}
{:error, {:reject, reason}} -> {:discard, reason}
{:error, _} = e -> e
e -> {:error, e}
{:error, :origin_containment_failed} ->
{:discard, :origin_containment_failed}
{:error, {:reject, reason}} ->
{:discard, reason}
{:error, :already_present} ->
{:discard, :already_present}
{:error, :ignore} ->
{:discard, :ignore}
# invalid data or e.g. deleting an object we don't know about anyway
{:error, {:validate, issue}} ->
Logger.info("Received invalid AP document: #{inspect(issue)}")
{:discard, :invalid}
# rarer, but sometimes theres an additional :error in front
{:error, {:error, {:validate, issue}}} ->
Logger.info("Received invalid AP document: (2e) #{inspect(issue)}")
{:discard, :invalid}
# failed to resolve a necessary referenced remote AP object;
# might be temporary server/network trouble thus reattempt
{:error, :link_resolve_failed} = e ->
Logger.info("Failed to resolve AP link; may retry: #{inspect(params)}")
e
{:error, _} = e ->
Logger.error("Unexpected AP doc error: #{inspect(e)} from #{inspect(params)}")
e
e ->
Logger.error("Unexpected AP doc error: (raw) #{inspect(e)} from #{inspect(params)}")
{:error, e}
end
rescue
err ->
Logger.error(
"Receiver worker CRASH on #{inspect(params)} with: #{Exception.format(:error, err, __STACKTRACE__)}"
)
# reraise to let oban handle transaction conflicts without deductig an attempt
reraise err, __STACKTRACE__
end
end

View file

@ -1,23 +1,38 @@
defmodule Pleroma.Workers.SearchIndexingWorker do
use Pleroma.Workers.WorkerHelper, queue: "search_indexing"
@impl Oban.Worker
defp search_module(), do: Pleroma.Config.get!([Pleroma.Search, :module])
def enqueue("add_to_index", params, worker_args) do
if Kernel.function_exported?(search_module(), :add_to_index, 1) do
do_enqueue("add_to_index", params, worker_args)
else
# XXX: or {:ok, nil} to more closely match Oban.inset()'s {:ok, job}?
# or similar to unique coflict: %Oban.Job{conflict?: true} (but omitting all other fileds...)
:ok
end
end
def enqueue("remove_from_index", params, worker_args) do
if Kernel.function_exported?(search_module(), :remove_from_index, 1) do
do_enqueue("remove_from_index", params, worker_args)
else
:ok
end
end
@impl Oban.Worker
def perform(%Job{args: %{"op" => "add_to_index", "activity" => activity_id}}) do
activity = Pleroma.Activity.get_by_id_with_object(activity_id)
search_module = Pleroma.Config.get([Pleroma.Search, :module])
search_module.add_to_index(activity)
search_module().add_to_index(activity)
:ok
end
def perform(%Job{args: %{"op" => "remove_from_index", "object" => object_id}}) do
search_module = Pleroma.Config.get([Pleroma.Search, :module])
# Fake the object so we can remove it from the index without having to keep it in the DB
search_module.remove_from_index(%Pleroma.Object{id: object_id})
search_module().remove_from_index(%Pleroma.Object{id: object_id})
:ok
end

View file

@ -1,15 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.TransmogrifierWorker do
alias Pleroma.User
use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
@impl Oban.Worker
def perform(%Job{args: %{"op" => "user_upgrade", "user_id" => user_id}}) do
user = User.get_cached_by_id(user_id)
Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
end
end

View file

@ -38,7 +38,7 @@ defmodule Pleroma.Workers.WorkerHelper do
alias Oban.Job
def enqueue(op, params, worker_args \\ []) do
defp do_enqueue(op, params, worker_args \\ []) do
params = Map.merge(%{"op" => op}, params)
queue_atom = String.to_atom(unquote(queue))
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
@ -48,11 +48,16 @@ defmodule Pleroma.Workers.WorkerHelper do
|> Oban.insert()
end
def enqueue(op, params, worker_args \\ []),
do: do_enqueue(op, params, worker_args)
@impl Oban.Worker
def timeout(_job) do
queue_atom = String.to_atom(unquote(queue))
Config.get([:workers, :timeout, queue_atom], :timer.minutes(1))
end
defoverridable enqueue: 3
end
end
end

View file

@ -0,0 +1,38 @@
# Akkoma: Magically expressive social media
# Copyright © 2024 Akkoma Authors <https://akkoma.dev/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Repo.Migrations.RemoteUserCountEstimateFunction do
use Ecto.Migration
@function_name "estimate_remote_user_count"
def up() do
# yep, this EXPLAIN (ab)use is blessed by the PostgreSQL wiki:
# https://wiki.postgresql.org/wiki/Count_estimate
"""
CREATE OR REPLACE FUNCTION #{@function_name}()
RETURNS integer
LANGUAGE plpgsql AS $$
DECLARE plan jsonb;
BEGIN
EXECUTE '
EXPLAIN (FORMAT JSON)
SELECT *
FROM public.users
WHERE local = false AND
is_active = true AND
invisible = false AND
nickname IS NOT NULL;
' INTO plan;
RETURN plan->0->'Plan'->'Plan Rows';
END;
$$;
"""
|> execute()
end
def down() do
execute("DROP FUNCTION IF EXISTS #{@function_name}()")
end
end

View file

@ -0,0 +1,13 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2023 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Repo.Migrations.RemoveUserApEnabled do
use Ecto.Migration
def change do
alter table(:users) do
remove(:ap_enabled, :boolean, default: false, null: false)
end
end
end

View file

@ -1694,7 +1694,6 @@ defmodule Pleroma.UserTest do
confirmation_token: "qqqq",
domain_blocks: ["lain.com"],
is_active: false,
ap_enabled: true,
is_moderator: true,
is_admin: true,
mastofe_settings: %{"a" => "b"},
@ -1734,7 +1733,6 @@ defmodule Pleroma.UserTest do
confirmation_token: nil,
domain_blocks: [],
is_active: false,
ap_enabled: false,
is_moderator: false,
is_admin: false,
mastofe_settings: nil,
@ -2217,8 +2215,7 @@ defmodule Pleroma.UserTest do
insert(:user,
local: false,
follower_address: "http://remote.org/users/masto_closed/followers",
following_address: "http://remote.org/users/masto_closed/following",
ap_enabled: true
following_address: "http://remote.org/users/masto_closed/following"
)
assert other_user.following_count == 0
@ -2239,8 +2236,7 @@ defmodule Pleroma.UserTest do
insert(:user,
local: false,
follower_address: "http://remote.org/users/masto_closed/followers",
following_address: "http://remote.org/users/masto_closed/following",
ap_enabled: true
following_address: "http://remote.org/users/masto_closed/following"
)
assert other_user.following_count == 0
@ -2261,8 +2257,7 @@ defmodule Pleroma.UserTest do
insert(:user,
local: false,
follower_address: "http://remote.org/users/masto_closed/followers",
following_address: "http://remote.org/users/masto_closed/following",
ap_enabled: true
following_address: "http://remote.org/users/masto_closed/following"
)
assert other_user.following_count == 0

View file

@ -579,7 +579,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
user =
insert(:user,
ap_id: "https://mastodon.example.org/users/raymoo",
ap_enabled: true,
local: false,
last_refreshed_at: nil
)

View file

@ -178,7 +178,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
{:ok, user} = ActivityPub.make_user_from_ap_id(user_id)
assert user.ap_id == user_id
assert user.nickname == "admin@mastodon.example.org"
assert user.ap_enabled
assert user.follower_address == "http://mastodon.example.org/users/admin/followers"
end

View file

@ -241,11 +241,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.AntiLinkSpamPolicyTest do
assert capture_log(fn ->
{:reject, _} = AntiLinkSpamPolicy.filter(message)
end) =~ "[error] Could not decode user at fetch http://invalid.actor"
end) =~ "[error] Could not fetch user http://invalid.actor,"
assert capture_log(fn ->
{:reject, _} = AntiLinkSpamPolicy.filter(update_message)
end) =~ "[error] Could not decode user at fetch http://invalid.actor"
end) =~ "[error] Could not fetch user http://invalid.actor,"
end
test "it rejects posts with links" do
@ -259,11 +259,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.AntiLinkSpamPolicyTest do
assert capture_log(fn ->
{:reject, _} = AntiLinkSpamPolicy.filter(message)
end) =~ "[error] Could not decode user at fetch http://invalid.actor"
end) =~ "[error] Could not fetch user http://invalid.actor,"
assert capture_log(fn ->
{:reject, _} = AntiLinkSpamPolicy.filter(update_message)
end) =~ "[error] Could not decode user at fetch http://invalid.actor"
end) =~ "[error] Could not fetch user http://invalid.actor,"
end
end

View file

@ -306,15 +306,13 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do
follower =
insert(:user, %{
local: false,
inbox: "https://domain.com/users/nick1/inbox",
ap_enabled: true
inbox: "https://domain.com/users/nick1/inbox"
})
another_follower =
insert(:user, %{
local: false,
inbox: "https://rejected.com/users/nick2/inbox",
ap_enabled: true
inbox: "https://rejected.com/users/nick2/inbox"
})
actor =
@ -386,8 +384,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do
follower =
insert(:user, %{
local: false,
inbox: "https://domain.com/users/nick1/inbox",
ap_enabled: true
inbox: "https://domain.com/users/nick1/inbox"
})
actor =
@ -425,8 +422,7 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do
follower =
insert(:user, %{
local: false,
inbox: "https://domain.com/users/nick1/inbox",
ap_enabled: true
inbox: "https://domain.com/users/nick1/inbox"
})
actor = insert(:user, follower_address: follower.ap_id)
@ -461,15 +457,13 @@ defmodule Pleroma.Web.ActivityPub.PublisherTest do
fetcher =
insert(:user,
local: false,
inbox: "https://domain.com/users/nick1/inbox",
ap_enabled: true
inbox: "https://domain.com/users/nick1/inbox"
)
another_fetcher =
insert(:user,
local: false,
inbox: "https://domain2.com/users/nick1/inbox",
ap_enabled: true
inbox: "https://domain2.com/users/nick1/inbox"
)
actor = insert(:user)

View file

@ -29,7 +29,7 @@ defmodule Pleroma.Web.ActivityPub.RelayTest do
test "returns errors when user not found" do
assert capture_log(fn ->
{:error, _} = Relay.follow("test-ap-id")
end) =~ "Could not decode user at fetch"
end) =~ "Could not fetch user test-ap-id,"
end
test "returns activity" do
@ -48,7 +48,7 @@ defmodule Pleroma.Web.ActivityPub.RelayTest do
test "returns errors when user not found" do
assert capture_log(fn ->
{:error, _} = Relay.unfollow("test-ap-id")
end) =~ "Could not decode user at fetch"
end) =~ "Could not fetch user test-ap-id,"
end
test "returns activity" do

View file

@ -46,7 +46,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
assert_enqueued(
worker: Pleroma.Workers.NodeInfoFetcherWorker,
args: %{"op" => "process", "source_url" => "https://wowee.example.com/users/1"}
args: %{"op" => "process", "source_url" => "https://wowee.example.com/"}
)
end
end

View file

@ -102,6 +102,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier.AddRemoveHandlingTest do
user =
user
|> Ecto.Changeset.change(featured_address: nil)
|> Ecto.Changeset.change(last_refreshed_at: ~N[2013-03-14 11:50:00.000000])
|> Repo.update!()
%{host: host} = URI.parse(user.ap_id)

View file

@ -8,7 +8,6 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
@moduletag :mocked
alias Pleroma.Activity
alias Pleroma.Object
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
@ -53,6 +52,25 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
refute User.following?(User.get_cached_by_ap_id(data["actor"]), user)
end
test "it ignores Undo activities for unknown objects" do
undo_data = %{
"id" => "https://remote.com/undo",
"type" => "Undo",
"actor" => "https:://remote.com/users/unknown",
"object" => %{
"id" => "https://remote.com/undone_activity/unknown",
"type" => "Like"
}
}
assert {:error, :ignore} == Transmogrifier.handle_incoming(undo_data)
user = insert(:user, local: false, ap_id: "https://remote.com/users/known")
undo_data = %{undo_data | "actor" => user.ap_id}
assert {:error, :ignore} == Transmogrifier.handle_incoming(undo_data)
end
test "it accepts Flag activities" do
user = insert(:user)
other_user = insert(:user)
@ -348,69 +366,6 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
end
end
describe "user upgrade" do
test "it upgrades a user to activitypub" do
user =
insert(:user, %{
nickname: "rye@niu.moe",
local: false,
ap_id: "https://niu.moe/users/rye",
follower_address: User.ap_followers(%User{nickname: "rye@niu.moe"})
})
user_two = insert(:user)
Pleroma.FollowingRelationship.follow(user_two, user, :follow_accept)
{:ok, activity} = CommonAPI.post(user, %{status: "test"})
{:ok, unrelated_activity} = CommonAPI.post(user_two, %{status: "test"})
assert "http://localhost:4001/users/rye@niu.moe/followers" in activity.recipients
user = User.get_cached_by_id(user.id)
assert user.note_count == 1
{:ok, user} = Transmogrifier.upgrade_user_from_ap_id("https://niu.moe/users/rye")
ObanHelpers.perform_all()
assert user.ap_enabled
assert user.note_count == 1
assert user.follower_address == "https://niu.moe/users/rye/followers"
assert user.following_address == "https://niu.moe/users/rye/following"
user = User.get_cached_by_id(user.id)
assert user.note_count == 1
activity = Activity.get_by_id(activity.id)
assert user.follower_address in activity.recipients
assert %{
"url" => [
%{
"href" =>
"https://cdn.niu.moe/accounts/avatars/000/033/323/original/fd7f8ae0b3ffedc9.jpeg"
}
]
} = user.avatar
assert %{
"url" => [
%{
"href" =>
"https://cdn.niu.moe/accounts/headers/000/033/323/original/850b3448fa5fd477.png"
}
]
} = user.banner
refute "..." in activity.recipients
unrelated_activity = Activity.get_by_id(unrelated_activity.id)
refute user.follower_address in unrelated_activity.recipients
user_two = User.get_cached_by_id(user_two.id)
assert User.following?(user_two, user)
refute "..." in User.following(user_two)
end
end
describe "actor rewriting" do
test "it fixes the actor URL property to be a proper URI" do
data = %{

View file

@ -1129,7 +1129,7 @@ defmodule Pleroma.Web.CommonAPITest do
test "cancels a pending follow for a remote user" do
follower = insert(:user)
followed = insert(:user, is_locked: true, local: false, ap_enabled: true)
followed = insert(:user, is_locked: true, local: false)
assert {:ok, follower, followed, %{id: _activity_id, data: %{"state" => "pending"}}} =
CommonAPI.follow(follower, followed)

View file

@ -79,16 +79,14 @@ defmodule Pleroma.Web.FederatorTest do
local: false,
nickname: "nick1@domain.com",
ap_id: "https://domain.com/users/nick1",
inbox: inbox1,
ap_enabled: true
inbox: inbox1
})
insert(:user, %{
local: false,
nickname: "nick2@domain2.com",
ap_id: "https://domain2.com/users/nick2",
inbox: inbox2,
ap_enabled: true
inbox: inbox2
})
dt = NaiveDateTime.utc_now()
@ -134,7 +132,7 @@ defmodule Pleroma.Web.FederatorTest do
assert {:ok, _activity} = ObanHelpers.perform(job)
assert {:ok, job} = Federator.incoming_ap_doc(params)
assert {:error, :already_present} = ObanHelpers.perform(job)
assert {:discard, :already_present} = ObanHelpers.perform(job)
end
test "successfully normalises public scope descriptors" do

View file

@ -61,7 +61,9 @@ defmodule Pleroma.Web.MastodonAPI.InstanceControllerTest do
{:ok, _user2} = User.set_activation(user2, false)
insert(:user, %{local: false, nickname: "u@peer1.com"})
insert(:instance, %{domain: "peer1.com"})
insert(:user, %{local: false, nickname: "u@peer2.com"})
insert(:instance, %{domain: "peer2.com"})
{:ok, _} = Pleroma.Web.CommonAPI.post(user, %{status: "cofe"})
@ -81,7 +83,9 @@ defmodule Pleroma.Web.MastodonAPI.InstanceControllerTest do
test "get peers", %{conn: conn} do
insert(:user, %{local: false, nickname: "u@peer1.com"})
insert(:instance, %{domain: "peer1.com"})
insert(:user, %{local: false, nickname: "u@peer2.com"})
insert(:instance, %{domain: "peer2.com"})
Pleroma.Stats.force_update()

View file

@ -109,25 +109,40 @@ defmodule Pleroma.Web.RichMedia.ParserTest do
test "refuses to crawl incomplete URLs" do
url = "example.com/ogp"
assert :error == Parser.parse(url)
assert {:error, {:url, "scheme mismatch"}} == Parser.parse(url)
end
test "refuses to crawl plain HTTP and other scheme URL" do
[
"http://example.com/ogp",
"ftp://example.org/dist/"
]
|> Enum.each(fn url ->
res = Parser.parse(url)
assert {:error, {:url, "scheme mismatch"}} == res or
{:error, {:url, "not an URL"}} == res
end)
end
test "refuses to crawl malformed URLs" do
url = "example.com[]/ogp"
assert :error == Parser.parse(url)
assert {:error, {:url, "not an URL"}} == Parser.parse(url)
end
test "refuses to crawl URLs of private network from posts" do
[
"http://127.0.0.1:4000/notice/9kCP7VNyPJXFOXDrgO",
"https://127.0.0.1:4000/notice/9kCP7VNyPJXFOXDrgO",
"https://10.111.10.1/notice/9kCP7V",
"https://172.16.32.40/notice/9kCP7V",
"https://192.168.10.40/notice/9kCP7V",
"https://pleroma.local/notice/9kCP7V"
"https://192.168.10.40/notice/9kCP7V"
]
|> Enum.each(fn url ->
assert :error == Parser.parse(url)
assert {:error, {:url, :ip}} == Parser.parse(url)
end)
url = "https://pleroma.local/notice/9kCP7V"
assert {:error, {:url, :ignore_tld}} == Parser.parse(url)
end
test "returns error when disabled" do

View file

@ -132,7 +132,7 @@ defmodule Pleroma.Web.TwitterAPI.RemoteFollowControllerTest do
|> html_response(200)
assert response =~ "Error fetching user"
end) =~ ":not_found"
end) =~ "User doesn't exist (anymore): https://mastodon.social/users/not_found"
end
end

View file

@ -62,8 +62,7 @@ defmodule Pleroma.Factory do
last_digest_emailed_at: NaiveDateTime.utc_now(),
last_refreshed_at: NaiveDateTime.utc_now(),
notification_settings: %Pleroma.User.NotificationSetting{},
multi_factor_authentication_settings: %Pleroma.MFA.Settings{},
ap_enabled: true
multi_factor_authentication_settings: %Pleroma.MFA.Settings{}
}
urls =