forked from AkkomaGang/akkoma
removing synchronization worker
This commit is contained in:
parent
ade213cb35
commit
beba7bbc85
12 changed files with 72 additions and 335 deletions
|
@ -250,13 +250,7 @@
|
|||
skip_thread_containment: true,
|
||||
limit_to_local_content: :unauthenticated,
|
||||
dynamic_configuration: false,
|
||||
external_user_synchronization: [
|
||||
enabled: false,
|
||||
# every 2 hours
|
||||
interval: 60 * 60 * 2,
|
||||
max_retries: 3,
|
||||
limit: 500
|
||||
]
|
||||
external_user_synchronization: false
|
||||
|
||||
config :pleroma, :markup,
|
||||
# XXX - unfortunately, inline images must be enabled by default right now, because
|
||||
|
|
|
@ -126,11 +126,7 @@ config :pleroma, Pleroma.Emails.Mailer,
|
|||
* `skip_thread_containment`: Skip filter out broken threads. The default is `false`.
|
||||
* `limit_to_local_content`: Limit unauthenticated users to search for local statutes and users only. Possible values: `:unauthenticated`, `:all` and `false`. The default is `:unauthenticated`.
|
||||
* `dynamic_configuration`: Allow transferring configuration to DB with the subsequent customization from Admin api.
|
||||
* `external_user_synchronization`: Following/followers counters synchronization settings.
|
||||
* `enabled`: Enables synchronization
|
||||
* `interval`: Interval between synchronization.
|
||||
* `max_retries`: Max rettries for host. After exceeding the limit, the check will not be carried out for users from this host.
|
||||
* `limit`: Users batch size for processing in one time.
|
||||
* `external_user_synchronization`: Enabling following/followers counters synchronization for external users.
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -151,11 +151,7 @@ def start(_type, _args) do
|
|||
start: {Pleroma.Web.Endpoint, :start_link, []},
|
||||
type: :supervisor
|
||||
},
|
||||
%{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}},
|
||||
%{
|
||||
id: Pleroma.User.SynchronizationWorker,
|
||||
start: {Pleroma.User.SynchronizationWorker, :start_link, []}
|
||||
}
|
||||
%{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}}
|
||||
]
|
||||
|
||||
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
|
||||
|
|
|
@ -108,6 +108,10 @@ def ap_id(%User{nickname: nickname}) do
|
|||
def ap_followers(%User{follower_address: fa}) when is_binary(fa), do: fa
|
||||
def ap_followers(%User{} = user), do: "#{ap_id(user)}/followers"
|
||||
|
||||
@spec ap_following(User.t()) :: Sring.t()
|
||||
def ap_following(%User{following_address: fa}) when is_binary(fa), do: fa
|
||||
def ap_following(%User{} = user), do: "#{ap_id(user)}/following"
|
||||
|
||||
def user_info(%User{} = user, args \\ %{}) do
|
||||
following_count =
|
||||
if args[:following_count], do: args[:following_count], else: following_count(user)
|
||||
|
@ -129,6 +133,7 @@ def set_info_cache(user, args) do
|
|||
Cachex.put(:user_cache, "user_info:#{user.id}", user_info(user, args))
|
||||
end
|
||||
|
||||
@spec restrict_deactivated(Ecto.Query.t()) :: Ecto.Query.t()
|
||||
def restrict_deactivated(query) do
|
||||
from(u in query,
|
||||
where: not fragment("? \\? 'deactivated' AND ?->'deactivated' @> 'true'", u.info, u.info)
|
||||
|
@ -1021,33 +1026,6 @@ def perform(:follow_import, %User{} = follower, followed_identifiers)
|
|||
)
|
||||
end
|
||||
|
||||
@spec sync_follow_counter() :: :ok
|
||||
def sync_follow_counter,
|
||||
do: PleromaJobQueue.enqueue(:background, __MODULE__, [:sync_follow_counters])
|
||||
|
||||
@spec perform(:sync_follow_counters) :: :ok
|
||||
def perform(:sync_follow_counters) do
|
||||
{:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors)
|
||||
config = Pleroma.Config.get([:instance, :external_user_synchronization])
|
||||
|
||||
:ok = sync_follow_counters(config)
|
||||
Agent.stop(:domain_errors)
|
||||
end
|
||||
|
||||
@spec sync_follow_counters(keyword()) :: :ok
|
||||
def sync_follow_counters(opts \\ []) do
|
||||
users = external_users(opts)
|
||||
|
||||
if length(users) > 0 do
|
||||
errors = Agent.get(:domain_errors, fn state -> state end)
|
||||
{last, updated_errors} = User.Synchronization.call(users, errors, opts)
|
||||
Agent.update(:domain_errors, fn _state -> updated_errors end)
|
||||
sync_follow_counters(max_id: last.id, limit: opts[:limit])
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
@spec external_users_query() :: Ecto.Query.t()
|
||||
def external_users_query do
|
||||
User.Query.build(%{
|
||||
|
|
|
@ -1,60 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.User.Synchronization do
|
||||
alias Pleroma.HTTP
|
||||
alias Pleroma.User
|
||||
|
||||
@spec call([User.t()], map(), keyword()) :: {User.t(), map()}
|
||||
def call(users, errors, opts \\ []) do
|
||||
do_call(users, errors, opts)
|
||||
end
|
||||
|
||||
defp do_call([user | []], errors, opts) do
|
||||
updated = fetch_counters(user, errors, opts)
|
||||
{user, updated}
|
||||
end
|
||||
|
||||
defp do_call([user | others], errors, opts) do
|
||||
updated = fetch_counters(user, errors, opts)
|
||||
do_call(others, updated, opts)
|
||||
end
|
||||
|
||||
defp fetch_counters(user, errors, opts) do
|
||||
%{host: host} = URI.parse(user.ap_id)
|
||||
|
||||
info = %{}
|
||||
{following, errors} = fetch_counter(user.ap_id <> "/following", host, errors, opts)
|
||||
info = if following, do: Map.put(info, :following_count, following), else: info
|
||||
|
||||
{followers, errors} = fetch_counter(user.ap_id <> "/followers", host, errors, opts)
|
||||
info = if followers, do: Map.put(info, :follower_count, followers), else: info
|
||||
|
||||
User.set_info_cache(user, info)
|
||||
errors
|
||||
end
|
||||
|
||||
defp available_domain?(domain, errors, opts) do
|
||||
max_retries = Keyword.get(opts, :max_retries, 3)
|
||||
not (Map.has_key?(errors, domain) && errors[domain] >= max_retries)
|
||||
end
|
||||
|
||||
defp fetch_counter(url, host, errors, opts) do
|
||||
with true <- available_domain?(host, errors, opts),
|
||||
{:ok, %{body: body, status: code}} when code in 200..299 <-
|
||||
HTTP.get(
|
||||
url,
|
||||
[{:Accept, "application/activity+json"}]
|
||||
),
|
||||
{:ok, data} <- Jason.decode(body) do
|
||||
{data["totalItems"], errors}
|
||||
else
|
||||
false ->
|
||||
{nil, errors}
|
||||
|
||||
_ ->
|
||||
{nil, Map.update(errors, host, 1, &(&1 + 1))}
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,32 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-onl
|
||||
|
||||
defmodule Pleroma.User.SynchronizationWorker do
|
||||
use GenServer
|
||||
|
||||
def start_link do
|
||||
config = Pleroma.Config.get([:instance, :external_user_synchronization])
|
||||
|
||||
if config[:enabled] do
|
||||
GenServer.start_link(__MODULE__, interval: config[:interval])
|
||||
else
|
||||
:ignore
|
||||
end
|
||||
end
|
||||
|
||||
def init(opts) do
|
||||
schedule_next(opts)
|
||||
{:ok, opts}
|
||||
end
|
||||
|
||||
def handle_info(:sync_follow_counters, opts) do
|
||||
Pleroma.User.sync_follow_counter()
|
||||
schedule_next(opts)
|
||||
{:noreply, opts}
|
||||
end
|
||||
|
||||
defp schedule_next(opts) do
|
||||
Process.send_after(self(), :sync_follow_counters, opts[:interval])
|
||||
end
|
||||
end
|
|
@ -1087,6 +1087,10 @@ def upgrade_user_from_ap_id(ap_id) do
|
|||
PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user])
|
||||
end
|
||||
|
||||
if Pleroma.Config.get([:instance, :external_user_synchronization]) do
|
||||
update_following_followers_counters(user)
|
||||
end
|
||||
|
||||
{:ok, user}
|
||||
else
|
||||
%User{} = user -> {:ok, user}
|
||||
|
@ -1119,4 +1123,27 @@ def maybe_fix_user_object(data) do
|
|||
data
|
||||
|> maybe_fix_user_url
|
||||
end
|
||||
|
||||
def update_following_followers_counters(user) do
|
||||
info = %{}
|
||||
|
||||
following = fetch_counter(user.following_address)
|
||||
info = if following, do: Map.put(info, :following_count, following), else: info
|
||||
|
||||
followers = fetch_counter(user.follower_address)
|
||||
info = if followers, do: Map.put(info, :follower_count, followers), else: info
|
||||
|
||||
User.set_info_cache(user, info)
|
||||
end
|
||||
|
||||
defp fetch_counter(url) do
|
||||
with {:ok, %{body: body, status: code}} when code in 200..299 <-
|
||||
Pleroma.HTTP.get(
|
||||
url,
|
||||
[{:Accept, "application/activity+json"}]
|
||||
),
|
||||
{:ok, data} <- Jason.decode(body) do
|
||||
data["totalItems"]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -38,6 +38,7 @@ def user_factory do
|
|||
user
|
||||
| ap_id: User.ap_id(user),
|
||||
follower_address: User.ap_followers(user),
|
||||
following_address: User.ap_following(user),
|
||||
following: [User.ap_id(user)]
|
||||
}
|
||||
end
|
||||
|
|
|
@ -1,104 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.User.SynchronizationTest do
|
||||
use Pleroma.DataCase
|
||||
import Pleroma.Factory
|
||||
alias Pleroma.User
|
||||
alias Pleroma.User.Synchronization
|
||||
|
||||
setup do
|
||||
Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end)
|
||||
:ok
|
||||
end
|
||||
|
||||
test "update following/followers counters" do
|
||||
user1 =
|
||||
insert(:user,
|
||||
local: false,
|
||||
ap_id: "http://localhost:4001/users/masto_closed"
|
||||
)
|
||||
|
||||
user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2")
|
||||
|
||||
users = User.external_users()
|
||||
assert length(users) == 2
|
||||
{user, %{}} = Synchronization.call(users, %{})
|
||||
assert user == List.last(users)
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
|
||||
assert followers == 437
|
||||
assert following == 152
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
|
||||
|
||||
assert followers == 527
|
||||
assert following == 267
|
||||
end
|
||||
|
||||
test "don't check host if errors exist" do
|
||||
user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1")
|
||||
|
||||
user2 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser2")
|
||||
|
||||
users = User.external_users()
|
||||
assert length(users) == 2
|
||||
|
||||
{user, %{"domain-with-errors" => 2}} =
|
||||
Synchronization.call(users, %{"domain-with-errors" => 2}, max_retries: 2)
|
||||
|
||||
assert user == List.last(users)
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
|
||||
assert followers == 0
|
||||
assert following == 0
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
|
||||
|
||||
assert followers == 0
|
||||
assert following == 0
|
||||
end
|
||||
|
||||
test "don't check host if errors appeared" do
|
||||
user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1")
|
||||
|
||||
user2 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser2")
|
||||
|
||||
users = User.external_users()
|
||||
assert length(users) == 2
|
||||
|
||||
{user, %{"domain-with-errors" => 2}} = Synchronization.call(users, %{}, max_retries: 2)
|
||||
|
||||
assert user == List.last(users)
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
|
||||
assert followers == 0
|
||||
assert following == 0
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
|
||||
|
||||
assert followers == 0
|
||||
assert following == 0
|
||||
end
|
||||
|
||||
test "other users after error appeared" do
|
||||
user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1")
|
||||
user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2")
|
||||
|
||||
users = User.external_users()
|
||||
assert length(users) == 2
|
||||
|
||||
{user, %{"domain-with-errors" => 2}} = Synchronization.call(users, %{}, max_retries: 2)
|
||||
assert user == List.last(users)
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
|
||||
assert followers == 0
|
||||
assert following == 0
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
|
||||
|
||||
assert followers == 527
|
||||
assert following == 267
|
||||
end
|
||||
end
|
|
@ -1,49 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.User.SynchronizationWorkerTest do
|
||||
use Pleroma.DataCase
|
||||
import Pleroma.Factory
|
||||
|
||||
setup do
|
||||
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
|
||||
|
||||
config = Pleroma.Config.get([:instance, :external_user_synchronization])
|
||||
|
||||
for_update = [enabled: true, interval: 1000]
|
||||
|
||||
Pleroma.Config.put([:instance, :external_user_synchronization], for_update)
|
||||
|
||||
on_exit(fn ->
|
||||
Pleroma.Config.put([:instance, :external_user_synchronization], config)
|
||||
end)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
test "sync follow counters" do
|
||||
user1 =
|
||||
insert(:user,
|
||||
local: false,
|
||||
ap_id: "http://localhost:4001/users/masto_closed"
|
||||
)
|
||||
|
||||
user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2")
|
||||
|
||||
{:ok, _} = Pleroma.User.SynchronizationWorker.start_link()
|
||||
:timer.sleep(1500)
|
||||
|
||||
%{follower_count: followers, following_count: following} =
|
||||
Pleroma.User.get_cached_user_info(user1)
|
||||
|
||||
assert followers == 437
|
||||
assert following == 152
|
||||
|
||||
%{follower_count: followers, following_count: following} =
|
||||
Pleroma.User.get_cached_user_info(user2)
|
||||
|
||||
assert followers == 527
|
||||
assert following == 267
|
||||
end
|
||||
end
|
|
@ -54,6 +54,14 @@ test "ap_followers returns the followers collection for the user" do
|
|||
assert expected_followers_collection == User.ap_followers(user)
|
||||
end
|
||||
|
||||
test "ap_following returns the following collection for the user" do
|
||||
user = UserBuilder.build()
|
||||
|
||||
expected_followers_collection = "#{User.ap_id(user)}/following"
|
||||
|
||||
assert expected_followers_collection == User.ap_following(user)
|
||||
end
|
||||
|
||||
test "returns all pending follow requests" do
|
||||
unlocked = insert(:user)
|
||||
locked = insert(:user, %{info: %{locked: true}})
|
||||
|
@ -1240,52 +1248,6 @@ test "external_users/1 external active users with limit", %{user1: user1, user2:
|
|||
|
||||
assert User.external_users(max_id: fdb_user2.id, limit: 1) == []
|
||||
end
|
||||
|
||||
test "sync_follow_counters/1", %{user1: user1, user2: user2} do
|
||||
{:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors)
|
||||
|
||||
:ok = User.sync_follow_counters()
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
|
||||
assert followers == 437
|
||||
assert following == 152
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
|
||||
|
||||
assert followers == 527
|
||||
assert following == 267
|
||||
|
||||
Agent.stop(:domain_errors)
|
||||
end
|
||||
|
||||
test "sync_follow_counters/1 in separate batches", %{user1: user1, user2: user2} do
|
||||
{:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors)
|
||||
|
||||
:ok = User.sync_follow_counters(limit: 1)
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
|
||||
assert followers == 437
|
||||
assert following == 152
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
|
||||
|
||||
assert followers == 527
|
||||
assert following == 267
|
||||
|
||||
Agent.stop(:domain_errors)
|
||||
end
|
||||
|
||||
test "perform/1 with :sync_follow_counters", %{user1: user1, user2: user2} do
|
||||
:ok = User.perform(:sync_follow_counters)
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
|
||||
assert followers == 437
|
||||
assert following == 152
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
|
||||
|
||||
assert followers == 527
|
||||
assert following == 267
|
||||
end
|
||||
end
|
||||
|
||||
describe "set_info_cache/2" do
|
||||
|
|
|
@ -1359,4 +1359,32 @@ test "removes recipient's follower collection from cc", %{user: user} do
|
|||
refute recipient.follower_address in fixed_object["to"]
|
||||
end
|
||||
end
|
||||
|
||||
test "update_following_followers_counters/1" do
|
||||
user1 =
|
||||
insert(:user,
|
||||
local: false,
|
||||
follower_address: "http://localhost:4001/users/masto_closed/followers",
|
||||
following_address: "http://localhost:4001/users/masto_closed/following"
|
||||
)
|
||||
|
||||
user2 =
|
||||
insert(:user,
|
||||
local: false,
|
||||
follower_address: "http://localhost:4001/users/fuser2/followers",
|
||||
following_address: "http://localhost:4001/users/fuser2/following"
|
||||
)
|
||||
|
||||
Transmogrifier.update_following_followers_counters(user1)
|
||||
Transmogrifier.update_following_followers_counters(user2)
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user1)
|
||||
assert followers == 437
|
||||
assert following == 152
|
||||
|
||||
%{follower_count: followers, following_count: following} = User.get_cached_user_info(user2)
|
||||
|
||||
assert followers == 527
|
||||
assert following == 267
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue