telemetry: expose count of currently pending jobs per queue
All checks were successful
ci/woodpecker/pr/test/1 Pipeline was successful
ci/woodpecker/pr/test/2 Pipeline was successful

Split into scheduled (intentionally delayed until a later trigger date)
and available (eligible for immediate processing but did not yet start).
This will help in diagnosing overloaded instances or too-low queue
limits as well as expose configuration mishaps like
#924.
(The latter by violently crashing the telemetry poller process while
attempting put_in for a non-configured queue creating well visible logs)
This commit is contained in:
Oneric 2025-09-21 00:00:00 +00:00
commit 43d4716b5a
4 changed files with 548 additions and 314 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 152 KiB

After

Width:  |  Height:  |  Size: 165 KiB

Before After
Before After

View file

@ -119,7 +119,8 @@ Thats it, youve got a fancy dashboard with long-term, 24/7 metrics now!
Updating the dashboard can be done by just repeating the import process.
Heres an example taken from a healthy, small instance where
nobody was logged in for about the first half of the displayed time span:
nobody was logged in except for a few minutes
resulting in an (expected) spike in incoming requests:
![Full view of the reference dashboard as it looked at the time of writing](img/grafana_dashboard.webp)
!!! note
@ -259,7 +260,6 @@ This too requires administrator rights to access and can be found under `/akkoma
The exposed aggregate info is mostly redundant with job statistics already tracked in Prometheus,
but it additionally also:
- shows not-yet executed jobs in the backlog of queues
- shows full argument and meta details for each job
- allows interactively deleting or manually retrying jobs
*(keep this in mind when granting people administrator rights!)*

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,6 @@
defmodule Pleroma.Web.Telemetry do
use Supervisor
import Ecto.Query
import Telemetry.Metrics
alias Pleroma.Stats
alias Pleroma.Config
@ -303,7 +304,9 @@ defmodule Pleroma.Web.Telemetry do
last_value("pleroma.domains.total"),
last_value("pleroma.local_statuses.total"),
last_value("pleroma.remote_users.total"),
counter("akkoma.ap.delivery.fail.final", tags: [:target, :reason])
counter("akkoma.ap.delivery.fail.final", tags: [:target, :reason]),
last_value("akkoma.job.queue.scheduled", tags: [:queue_name]),
last_value("akkoma.job.queue.available", tags: [:queue_name])
]
end
@ -315,7 +318,8 @@ defmodule Pleroma.Web.Telemetry do
defp periodic_measurements do
[
{__MODULE__, :io_stats, []},
{__MODULE__, :instance_stats, []}
{__MODULE__, :instance_stats, []},
{__MODULE__, :oban_pending, []}
]
end
@ -333,4 +337,26 @@ defmodule Pleroma.Web.Telemetry do
:telemetry.execute([:pleroma, :local_statuses], %{total: stats.status_count}, %{})
:telemetry.execute([:pleroma, :remote_users], %{total: stats.remote_user_count}, %{})
end
def oban_pending() do
query =
from(j in Oban.Job,
select: %{queue: j.queue, state: j.state, count: count()},
where: j.state in ["scheduled", "available"],
group_by: [j.queue, j.state]
)
conf = Oban.Config.new(Config.get!(Oban))
qres = Oban.Repo.all(conf, query)
acc = Enum.into(conf.queues, %{}, fn {x, _} -> {x, %{available: 0, scheduled: 0}} end)
acc =
Enum.reduce(qres, acc, fn %{queue: q, state: state, count: count}, acc ->
put_in(acc, [String.to_existing_atom(q), String.to_existing_atom(state)], count)
end)
for {queue, info} <- acc do
:telemetry.execute([:akkoma, :job, :queue], info, %{queue_name: queue})
end
end
end