Merge branch 'misc-otp-issues' into 'develop'

Misc OTP issues

See merge request pleroma/pleroma!1567
This commit is contained in:
rinpatch 2019-08-14 19:22:15 +00:00
commit 31d576de0c
14 changed files with 138 additions and 189 deletions

View file

@ -3,11 +3,14 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Application do defmodule Pleroma.Application do
import Cachex.Spec
use Application use Application
@name Mix.Project.config()[:name] @name Mix.Project.config()[:name]
@version Mix.Project.config()[:version] @version Mix.Project.config()[:version]
@repository Mix.Project.config()[:source_url] @repository Mix.Project.config()[:source_url]
@env Mix.env()
def name, do: @name def name, do: @name
def version, do: @version def version, do: @version
def named_version, do: @name <> " " <> @version def named_version, do: @name <> " " <> @version
@ -21,116 +24,25 @@ def user_agent do
# See http://elixir-lang.org/docs/stable/elixir/Application.html # See http://elixir-lang.org/docs/stable/elixir/Application.html
# for more information on OTP Applications # for more information on OTP Applications
def start(_type, _args) do def start(_type, _args) do
import Cachex.Spec
Pleroma.Config.DeprecationWarnings.warn() Pleroma.Config.DeprecationWarnings.warn()
setup_instrumenters() setup_instrumenters()
# Define workers and child supervisors to be supervised # Define workers and child supervisors to be supervised
children = children =
[ [
# Start the Ecto repository Pleroma.Repo,
%{id: Pleroma.Repo, start: {Pleroma.Repo, :start_link, []}, type: :supervisor}, Pleroma.Config.TransferTask,
%{id: Pleroma.Config.TransferTask, start: {Pleroma.Config.TransferTask, :start_link, []}}, Pleroma.Emoji,
%{id: Pleroma.Emoji, start: {Pleroma.Emoji, :start_link, []}}, Pleroma.Captcha,
%{id: Pleroma.Captcha, start: {Pleroma.Captcha, :start_link, []}}, Pleroma.FlakeId,
%{ Pleroma.ScheduledActivityWorker
id: :cachex_used_captcha_cache,
start:
{Cachex, :start_link,
[
:used_captcha_cache,
[
ttl_interval:
:timer.seconds(Pleroma.Config.get!([Pleroma.Captcha, :seconds_valid]))
]
]}
},
%{
id: :cachex_user,
start:
{Cachex, :start_link,
[
:user_cache,
[
default_ttl: 25_000,
ttl_interval: 1000,
limit: 2500
]
]}
},
%{
id: :cachex_object,
start:
{Cachex, :start_link,
[
:object_cache,
[
default_ttl: 25_000,
ttl_interval: 1000,
limit: 2500
]
]}
},
%{
id: :cachex_rich_media,
start:
{Cachex, :start_link,
[
:rich_media_cache,
[
default_ttl: :timer.minutes(120),
limit: 5000
]
]}
},
%{
id: :cachex_scrubber,
start:
{Cachex, :start_link,
[
:scrubber_cache,
[
limit: 2500
]
]}
},
%{
id: :cachex_idem,
start:
{Cachex, :start_link,
[
:idempotency_cache,
[
expiration:
expiration(
default: :timer.seconds(6 * 60 * 60),
interval: :timer.seconds(60)
),
limit: 2500
]
]}
},
%{id: Pleroma.FlakeId, start: {Pleroma.FlakeId, :start_link, []}},
%{
id: Pleroma.ScheduledActivityWorker,
start: {Pleroma.ScheduledActivityWorker, :start_link, []}
}
] ++ ] ++
cachex_children() ++
hackney_pool_children() ++ hackney_pool_children() ++
[ [
%{ Pleroma.Web.Federator.RetryQueue,
id: Pleroma.Web.Federator.RetryQueue, Pleroma.Web.OAuth.Token.CleanWorker,
start: {Pleroma.Web.Federator.RetryQueue, :start_link, []} Pleroma.Stats,
},
%{
id: Pleroma.Web.OAuth.Token.CleanWorker,
start: {Pleroma.Web.OAuth.Token.CleanWorker, :start_link, []}
},
%{
id: Pleroma.Stats,
start: {Pleroma.Stats, :start_link, []}
},
%{ %{
id: :web_push_init, id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]}, start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
@ -147,16 +59,12 @@ def start(_type, _args) do
restart: :temporary restart: :temporary
} }
] ++ ] ++
streamer_child() ++ oauth_cleanup_child(oauth_cleanup_enabled?()) ++
chat_child() ++ streamer_child(@env) ++
chat_child(@env, chat_enabled?()) ++
[ [
# Start the endpoint when the application starts Pleroma.Web.Endpoint,
%{ Pleroma.Gopher.Server
id: Pleroma.Web.Endpoint,
start: {Pleroma.Web.Endpoint, :start_link, []},
type: :supervisor
},
%{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}}
] ]
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
@ -201,28 +109,54 @@ def enabled_hackney_pools do
end end
end end
if Pleroma.Config.get(:env) == :test do defp cachex_children do
defp streamer_child, do: [] [
defp chat_child, do: [] build_cachex("used_captcha", ttl_interval: seconds_valid_interval()),
else build_cachex("user", default_ttl: 25_000, ttl_interval: 1000, limit: 2500),
defp streamer_child do build_cachex("object", default_ttl: 25_000, ttl_interval: 1000, limit: 2500),
[%{id: Pleroma.Web.Streamer, start: {Pleroma.Web.Streamer, :start_link, []}}] build_cachex("rich_media", default_ttl: :timer.minutes(120), limit: 5000),
build_cachex("scrubber", limit: 2500),
build_cachex("idempotency", expiration: idempotency_expiration(), limit: 2500)
]
end end
defp chat_child do defp idempotency_expiration,
if Pleroma.Config.get([:chat, :enabled]) do do: expiration(default: :timer.seconds(6 * 60 * 60), interval: :timer.seconds(60))
[
%{ defp seconds_valid_interval,
id: Pleroma.Web.ChatChannel.ChatChannelState, do: :timer.seconds(Pleroma.Config.get!([Pleroma.Captcha, :seconds_valid]))
start: {Pleroma.Web.ChatChannel.ChatChannelState, :start_link, []}
defp build_cachex(type, opts),
do: %{
id: String.to_atom("cachex_" <> type),
start: {Cachex, :start_link, [String.to_atom(type <> "_cache"), opts]},
type: :worker
} }
]
else defp chat_enabled?, do: Pleroma.Config.get([:chat, :enabled])
[]
end defp oauth_cleanup_enabled?,
do: Pleroma.Config.get([:oauth2, :clean_expired_tokens], false)
defp streamer_child(:test), do: []
defp streamer_child(_) do
[Pleroma.Web.Streamer]
end end
defp oauth_cleanup_child(true),
do: [Pleroma.Web.OAuth.Token.CleanWorker]
defp oauth_cleanup_child(_), do: []
defp chat_child(:test, _), do: []
defp chat_child(_env, true) do
[Pleroma.Web.ChatChannel.ChatChannelState]
end end
defp chat_child(_, _), do: []
defp hackney_pool_children do defp hackney_pool_children do
for pool <- enabled_hackney_pools() do for pool <- enabled_hackney_pools() do
options = Pleroma.Config.get([:hackney_pools, pool]) options = Pleroma.Config.get([:hackney_pools, pool])

View file

@ -12,7 +12,7 @@ defmodule Pleroma.Captcha do
use GenServer use GenServer
@doc false @doc false
def start_link do def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__) GenServer.start_link(__MODULE__, [], name: __MODULE__)
end end

View file

@ -6,7 +6,7 @@ defmodule Pleroma.Config.TransferTask do
use Task use Task
alias Pleroma.Web.AdminAPI.Config alias Pleroma.Web.AdminAPI.Config
def start_link do def start_link(_) do
load_and_update_env() load_and_update_env()
if Pleroma.Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Pleroma.Repo) if Pleroma.Config.get(:env) == :test, do: Ecto.Adapters.SQL.Sandbox.checkin(Pleroma.Repo)
:ignore :ignore

View file

@ -24,7 +24,7 @@ defmodule Pleroma.Emoji do
@ets_options [:ordered_set, :protected, :named_table, {:read_concurrency, true}] @ets_options [:ordered_set, :protected, :named_table, {:read_concurrency, true}]
@doc false @doc false
def start_link do def start_link(_) do
GenServer.start_link(__MODULE__, [], name: __MODULE__) GenServer.start_link(__MODULE__, [], name: __MODULE__)
end end

View file

@ -98,7 +98,7 @@ def dump(value) do
def autogenerate, do: get() def autogenerate, do: get()
# -- GenServer API # -- GenServer API
def start_link do def start_link(_) do
:gen_server.start_link({:local, :flake}, __MODULE__, [], []) :gen_server.start_link({:local, :flake}, __MODULE__, [], [])
end end

View file

@ -6,7 +6,7 @@ defmodule Pleroma.Gopher.Server do
use GenServer use GenServer
require Logger require Logger
def start_link do def start_link(_) do
config = Pleroma.Config.get(:gopher, []) config = Pleroma.Config.get(:gopher, [])
ip = Keyword.get(config, :ip, {0, 0, 0, 0}) ip = Keyword.get(config, :ip, {0, 0, 0, 0})
port = Keyword.get(config, :port, 1234) port = Keyword.get(config, :port, 1234)

View file

@ -16,7 +16,7 @@ defmodule Pleroma.ScheduledActivityWorker do
@schedule_interval :timer.minutes(1) @schedule_interval :timer.minutes(1)
def start_link do def start_link(_) do
GenServer.start_link(__MODULE__, nil) GenServer.start_link(__MODULE__, nil)
end end

View file

@ -7,31 +7,56 @@ defmodule Pleroma.Stats do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
def start_link do use GenServer
agent = Agent.start_link(fn -> {[], %{}} end, name: __MODULE__)
spawn(fn -> schedule_update() end) @interval 1000 * 60 * 60
agent
def start_link(_) do
GenServer.start_link(__MODULE__, initial_data(), name: __MODULE__)
end
def force_update do
GenServer.call(__MODULE__, :force_update)
end end
def get_stats do def get_stats do
Agent.get(__MODULE__, fn {_, stats} -> stats end) %{stats: stats} = GenServer.call(__MODULE__, :get_state)
stats
end end
def get_peers do def get_peers do
Agent.get(__MODULE__, fn {peers, _} -> peers end) %{peers: peers} = GenServer.call(__MODULE__, :get_state)
peers
end end
def schedule_update do def init(args) do
spawn(fn -> Process.send_after(self(), :run_update, @interval)
# 1 hour {:ok, args}
Process.sleep(1000 * 60 * 60)
schedule_update()
end)
update_stats()
end end
def update_stats do def handle_call(:force_update, _from, _state) do
new_stats = get_stat_data()
{:reply, new_stats, new_stats}
end
def handle_call(:get_state, _from, state) do
{:reply, state, state}
end
def handle_info(:run_update, _state) do
new_stats = get_stat_data()
Process.send_after(self(), :run_update, @interval)
{:noreply, new_stats}
end
defp initial_data do
%{peers: [], stats: %{}}
end
defp get_stat_data do
peers = peers =
from( from(
u in User, u in User,
@ -52,8 +77,9 @@ def update_stats do
user_count = Repo.aggregate(User.Query.build(%{local: true, active: true}), :count, :id) user_count = Repo.aggregate(User.Query.build(%{local: true, active: true}), :count, :id)
Agent.update(__MODULE__, fn _ -> %{
{peers, %{domain_count: domain_count, status_count: status_count, user_count: user_count}} peers: peers,
end) stats: %{domain_count: domain_count, status_count: status_count, user_count: user_count}
}
end end
end end

View file

@ -33,9 +33,11 @@ def handle_in("new_msg", %{"text" => text}, %{assigns: %{user_name: user_name}}
end end
defmodule Pleroma.Web.ChatChannel.ChatChannelState do defmodule Pleroma.Web.ChatChannel.ChatChannelState do
use Agent
@max_messages 20 @max_messages 20
def start_link do def start_link(_) do
Agent.start_link(fn -> %{max_id: 1, messages: []} end, name: __MODULE__) Agent.start_link(fn -> %{max_id: 1, messages: []} end, name: __MODULE__)
end end

View file

@ -13,7 +13,7 @@ def init(args) do
{:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}} {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
end end
def start_link do def start_link(_) do
enabled = enabled =
if Pleroma.Config.get(:env) == :test, if Pleroma.Config.get(:env) == :test,
do: true, do: true,

View file

@ -6,36 +6,30 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
@moduledoc """ @moduledoc """
The module represents functions to clean an expired oauth tokens. The module represents functions to clean an expired oauth tokens.
""" """
use GenServer
@ten_seconds 10_000
@one_day 86_400_000
# 10 seconds
@start_interval 10_000
@interval Pleroma.Config.get( @interval Pleroma.Config.get(
# 24 hours
[:oauth2, :clean_expired_tokens_interval], [:oauth2, :clean_expired_tokens_interval],
86_400_000 @one_day
) )
@queue :background
alias Pleroma.Web.OAuth.Token alias Pleroma.Web.OAuth.Token
def start_link, do: GenServer.start_link(__MODULE__, nil) def start_link(_), do: GenServer.start_link(__MODULE__, %{})
def init(_) do def init(_) do
if Pleroma.Config.get([:oauth2, :clean_expired_tokens], false) do Process.send_after(self(), :perform, @ten_seconds)
Process.send_after(self(), :perform, @start_interval)
{:ok, nil} {:ok, nil}
else
:ignore
end
end end
@doc false @doc false
def handle_info(:perform, state) do def handle_info(:perform, state) do
Token.delete_expired_tokens()
Process.send_after(self(), :perform, @interval) Process.send_after(self(), :perform, @interval)
PleromaJobQueue.enqueue(@queue, __MODULE__, [:clean])
{:noreply, state} {:noreply, state}
end end
# Job Worker Callbacks
def perform(:clean), do: Token.delete_expired_tokens()
end end

View file

@ -18,7 +18,7 @@ defmodule Pleroma.Web.Streamer do
@keepalive_interval :timer.seconds(30) @keepalive_interval :timer.seconds(30)
def start_link do def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__) GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end end
@ -35,28 +35,21 @@ def stream(topic, item) do
end end
def init(args) do def init(args) do
spawn(fn -> Process.send_after(self(), %{action: :ping}, @keepalive_interval)
# 30 seconds
Process.sleep(@keepalive_interval)
GenServer.cast(__MODULE__, %{action: :ping})
end)
{:ok, args} {:ok, args}
end end
def handle_cast(%{action: :ping}, topics) do def handle_info(%{action: :ping}, topics) do
Map.values(topics) topics
|> Map.values()
|> List.flatten() |> List.flatten()
|> Enum.each(fn socket -> |> Enum.each(fn socket ->
Logger.debug("Sending keepalive ping") Logger.debug("Sending keepalive ping")
send(socket.transport_pid, {:text, ""}) send(socket.transport_pid, {:text, ""})
end) end)
spawn(fn -> Process.send_after(self(), %{action: :ping}, @keepalive_interval)
# 30 seconds
Process.sleep(@keepalive_interval)
GenServer.cast(__MODULE__, %{action: :ping})
end)
{:noreply, topics} {:noreply, topics}
end end

View file

@ -31,7 +31,7 @@ test "transfer config values from db to env" do
value: [live: 15, com: 35] value: [live: 15, com: 35]
}) })
Pleroma.Config.TransferTask.start_link() Pleroma.Config.TransferTask.start_link([])
assert Application.get_env(:pleroma, :test_key) == [live: 2, com: 3] assert Application.get_env(:pleroma, :test_key) == [live: 2, com: 3]
assert Application.get_env(:idna, :test_key) == [live: 15, com: 35] assert Application.get_env(:idna, :test_key) == [live: 15, com: 35]
@ -50,7 +50,7 @@ test "non existing atom" do
}) })
assert ExUnit.CaptureLog.capture_log(fn -> assert ExUnit.CaptureLog.capture_log(fn ->
Pleroma.Config.TransferTask.start_link() Pleroma.Config.TransferTask.start_link([])
end) =~ end) =~
"updating env causes error, key: \"undefined_atom_key\", error: %ArgumentError{message: \"argument error\"}" "updating env causes error, key: \"undefined_atom_key\", error: %ArgumentError{message: \"argument error\"}"
end end

View file

@ -2624,7 +2624,7 @@ test "get instance stats", %{conn: conn} do
|> Changeset.put_embed(:info, info_change) |> Changeset.put_embed(:info, info_change)
|> User.update_and_set_cache() |> User.update_and_set_cache()
Pleroma.Stats.update_stats() Pleroma.Stats.force_update()
conn = get(conn, "/api/v1/instance") conn = get(conn, "/api/v1/instance")
@ -2642,7 +2642,7 @@ test "get peers", %{conn: conn} do
insert(:user, %{local: false, nickname: "u@peer1.com"}) insert(:user, %{local: false, nickname: "u@peer1.com"})
insert(:user, %{local: false, nickname: "u@peer2.com"}) insert(:user, %{local: false, nickname: "u@peer2.com"})
Pleroma.Stats.update_stats() Pleroma.Stats.force_update()
conn = get(conn, "/api/v1/instance/peers") conn = get(conn, "/api/v1/instance/peers")