attempt rabbitmq

This commit is contained in:
FloatingGhost 2022-12-12 18:55:28 +00:00
parent 07a48b9293
commit a78f76a326
7 changed files with 173 additions and 20 deletions

View file

@ -888,6 +888,8 @@
url: "http://127.0.0.1:5000",
api_key: nil
config :pleroma, :queue, module: Pleroma.Broadway
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"

View file

@ -0,0 +1,18 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Mix.Tasks.Pleroma.Queue do
use Mix.Task
import Mix.Pleroma
def run(["queues"]) do
start_pleroma()
Pleroma.Config.get([Oban, :queues])
|> Keyword.keys()
|> Enum.join("\n")
|> shell_info()
end
end

View file

@ -68,11 +68,11 @@ def start(_type, _args) do
] ++
cachex_children() ++
http_children() ++
queue_children() ++
[
Pleroma.Stats,
Pleroma.JobQueueMonitor,
{Majic.Pool, [name: Pleroma.MajicPool, pool_size: Config.get([:majic_pool, :size], 2)]},
{Oban, Config.get(Oban)},
Pleroma.Web.Endpoint
] ++
elasticsearch_children() ++
@ -267,4 +267,16 @@ defp http_children do
[{Finch, config}]
end
defp queue_children do
queue_module = Config.get([:queue, :module])
case queue_module do
Oban ->
[{Oban, Config.get(Oban)}]
Pleroma.Broadway ->
Pleroma.Broadway.children()
end
end
end

93
lib/pleroma/broadway.ex Normal file
View file

@ -0,0 +1,93 @@
defmodule Pleroma.Broadway do
use Broadway
alias Broadway.Message
require Logger
@queue "akkoma"
def start_link(_args) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayRabbitMQ.Producer,
queue: @queue,
declare: [
durable: true,
auto_delete: false
],
on_failure: :reject_and_requeue
}
],
processors: [
default: [
concurrency: 10
]
],
batchers: [
default: [
batch_size: 10,
batch_timeout: 100,
concurrency: 10
]
]
)
end
@impl true
def handle_message(_, %Message{data: data} = message, _) do
with {:ok, data} <- Jason.decode(data),
{module, data} <- Map.pop(data, "__module__"),
module <- String.to_existing_atom(module),
:ok <- perform_message(module, data) do
Logger.debug("Received message: #{inspect(data)}")
message
else
err ->
IO.inspect(err)
Message.failed(message, err)
end
end
defp perform_message(module, args) do
IO.inspect(args)
case module.perform(%Oban.Job{args: args}) do
:ok ->
:ok
{:ok, _} ->
:ok
err ->
err
end
end
@impl true
def handle_batch(_, batch, _, _) do
batch
end
@impl true
def handle_failed(messages, _) do
Logger.error("Failed messages: #{inspect(messages)}")
messages
end
def topics do
Pleroma.Config.get([Oban, :queues])
|> Keyword.keys()
end
def children do
[Pleroma.Broadway]
end
def produce(topic, args) do
IO.puts("Producing message on #{topic}: #{inspect(args)}")
{:ok, connection} = AMQP.Connection.open()
{:ok, channel} = AMQP.Channel.open(connection)
AMQP.Basic.publish(channel, "", @queue, args)
AMQP.Connection.close(connection)
end
end

View file

@ -15,6 +15,7 @@ def backoff(%Job{attempt: attempt}) when is_integer(attempt) do
@impl Oban.Worker
def perform(%Job{args: %{"op" => "publish", "activity_id" => activity_id, "object_data" => nil}}) do
activity = Activity.get_by_id(activity_id)
IO.inspect(activity)
Federator.perform(:publish, activity)
end

View file

@ -25,30 +25,55 @@ def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
defmacro __using__(opts) do
caller_module = __CALLER__.module
queue = Keyword.fetch!(opts, :queue)
queue_system = Config.get([:queue, :module])
quote do
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: unquote(queue),
max_attempts: 1
case queue_system do
Oban ->
quote do
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: unquote(queue),
max_attempts: 1
alias Oban.Job
alias Oban.Job
def enqueue(op, params, worker_args \\ []) do
params = Map.merge(%{"op" => op}, params)
queue_atom = String.to_atom(unquote(queue))
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
def enqueue(op, params, worker_args \\ []) do
params = Map.merge(%{"op" => op}, params)
queue_atom = String.to_atom(unquote(queue))
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
unquote(caller_module)
|> apply(:new, [params, worker_args])
|> Oban.insert()
end
unquote(caller_module)
|> apply(:new, [params, worker_args])
|> Oban.insert()
end
@impl Oban.Worker
def timeout(_job) do
queue_atom = String.to_atom(unquote(queue))
Config.get([:workers, :timeout, queue_atom], :timer.minutes(1))
end
@impl Oban.Worker
def timeout(_job) do
queue_atom = String.to_atom(unquote(queue))
Config.get([:workers, :timeout, queue_atom], :timer.minutes(1))
end
end
Pleroma.Broadway ->
quote do
@topic unquote(queue)
use Oban.Worker,
queue: unquote(queue),
max_attempts: 1
alias Oban.Job
def enqueue(op, params, worker_args \\ []) do
worker = to_string(__MODULE__)
params =
params
|> Map.put("__module__", worker)
|> Map.put("op", op)
Pleroma.Broadway.produce(unquote(queue), Jason.encode!(params))
end
end
end
end
end

View file

@ -190,6 +190,8 @@ defp deps do
git: "https://akkoma.dev/AkkomaGang/mfm-parser.git",
ref: "912fba81152d4d572e457fd5427f9875b2bc3dbe"},
{:poison, ">= 0.0.0"},
{:broadway, "~> 1.0"},
{:broadway_rabbitmq, "~> 0.7"},
## dev & test
{:ex_doc, "~> 0.22", only: :dev, runtime: false},