diff --git a/CHANGELOG.md b/CHANGELOG.md index 75fa50e00..2d59639bd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ Configuration: `federation_incoming_replies_max_depth` option - Admin API: Return users' tags when querying reports - Admin API: Return avatar and display name when querying users - Mastodon API, streaming: Add support for passing the token in the `Sec-WebSocket-Protocol` header +- Added synchronization of following/followers counters for external users ### Fixed - Not being able to pin unlisted posts diff --git a/config/config.exs b/config/config.exs index 675fbb551..09681f122 100644 --- a/config/config.exs +++ b/config/config.exs @@ -249,7 +249,14 @@ remote_post_retention_days: 90, skip_thread_containment: true, limit_to_local_content: :unauthenticated, - dynamic_configuration: false + dynamic_configuration: false, + external_user_synchronization: [ + enabled: false, + # every 2 hours + interval: 60 * 60 * 2, + max_retries: 3, + limit: 500 + ] config :pleroma, :markup, # XXX - unfortunately, inline images must be enabled by default right now, because diff --git a/docs/config.md b/docs/config.md index 822c34c51..931155fe9 100644 --- a/docs/config.md +++ b/docs/config.md @@ -125,6 +125,12 @@ config :pleroma, Pleroma.Emails.Mailer, * `skip_thread_containment`: Skip filter out broken threads. The default is `false`. * `limit_to_local_content`: Limit unauthenticated users to search for local statutes and users only. Possible values: `:unauthenticated`, `:all` and `false`. The default is `:unauthenticated`. * `dynamic_configuration`: Allow transferring configuration to DB with the subsequent customization from Admin api. +* `external_user_synchronization`: Following/followers counters synchronization settings. + * `enabled`: Enables synchronization + * `interval`: Interval between synchronization. + * `max_retries`: Max rettries for host. After exceeding the limit, the check will not be carried out for users from this host. + * `limit`: Users batch size for processing in one time. + ## :logger diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index ba4cf8486..86c348a0d 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -151,7 +151,11 @@ def start(_type, _args) do start: {Pleroma.Web.Endpoint, :start_link, []}, type: :supervisor }, - %{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}} + %{id: Pleroma.Gopher.Server, start: {Pleroma.Gopher.Server, :start_link, []}}, + %{ + id: Pleroma.User.SynchronizationWorker, + start: {Pleroma.User.SynchronizationWorker, :start_link, []} + } ] # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 09f86aaa2..d03810d1a 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -107,15 +107,25 @@ def ap_id(%User{nickname: nickname}) do def ap_followers(%User{follower_address: fa}) when is_binary(fa), do: fa def ap_followers(%User{} = user), do: "#{ap_id(user)}/followers" - def user_info(%User{} = user) do + def user_info(%User{} = user, args \\ %{}) do + following_count = + if args[:following_count], do: args[:following_count], else: following_count(user) + + follower_count = + if args[:follower_count], do: args[:follower_count], else: user.info.follower_count + %{ - following_count: following_count(user), note_count: user.info.note_count, - follower_count: user.info.follower_count, locked: user.info.locked, confirmation_pending: user.info.confirmation_pending, default_scope: user.info.default_scope } + |> Map.put(:following_count, following_count) + |> Map.put(:follower_count, follower_count) + end + + def set_info_cache(user, args) do + Cachex.put(:user_cache, "user_info:#{user.id}", user_info(user, args)) end def restrict_deactivated(query) do @@ -1000,6 +1010,56 @@ def perform(:follow_import, %User{} = follower, followed_identifiers) ) end + @spec sync_follow_counter() :: :ok + def sync_follow_counter, + do: PleromaJobQueue.enqueue(:background, __MODULE__, [:sync_follow_counters]) + + @spec perform(:sync_follow_counters) :: :ok + def perform(:sync_follow_counters) do + {:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors) + config = Pleroma.Config.get([:instance, :external_user_synchronization]) + + :ok = sync_follow_counters(config) + Agent.stop(:domain_errors) + end + + @spec sync_follow_counters(keyword()) :: :ok + def sync_follow_counters(opts \\ []) do + users = external_users(opts) + + if length(users) > 0 do + errors = Agent.get(:domain_errors, fn state -> state end) + {last, updated_errors} = User.Synchronization.call(users, errors, opts) + Agent.update(:domain_errors, fn _state -> updated_errors end) + sync_follow_counters(max_id: last.id, limit: opts[:limit]) + else + :ok + end + end + + @spec external_users(keyword()) :: [User.t()] + def external_users(opts \\ []) do + query = + User.Query.build(%{ + external: true, + active: true, + order_by: :id, + select: [:id, :ap_id, :info] + }) + + query = + if opts[:max_id], + do: where(query, [u], u.id > ^opts[:max_id]), + else: query + + query = + if opts[:limit], + do: limit(query, ^opts[:limit]), + else: query + + Repo.all(query) + end + def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers), do: PleromaJobQueue.enqueue(:background, __MODULE__, [ diff --git a/lib/pleroma/user/query.ex b/lib/pleroma/user/query.ex index ace9c05f2..f9bcc9e19 100644 --- a/lib/pleroma/user/query.ex +++ b/lib/pleroma/user/query.ex @@ -7,7 +7,7 @@ defmodule Pleroma.User.Query do User query builder module. Builds query from new query or another user query. ## Example: - query = Pleroma.User.Query(%{nickname: "nickname"}) + query = Pleroma.User.Query.build(%{nickname: "nickname"}) another_query = Pleroma.User.Query.build(query, %{email: "email@example.com"}) Pleroma.Repo.all(query) Pleroma.Repo.all(another_query) @@ -47,7 +47,10 @@ defmodule Pleroma.User.Query do friends: User.t(), recipients_from_activity: [String.t()], nickname: [String.t()], - ap_id: [String.t()] + ap_id: [String.t()], + order_by: term(), + select: term(), + limit: pos_integer() } | %{} @@ -141,6 +144,18 @@ defp compose_query({:recipients_from_activity, to}, query) do where(query, [u], u.ap_id in ^to or fragment("? && ?", u.following, ^to)) end + defp compose_query({:order_by, key}, query) do + order_by(query, [u], field(u, ^key)) + end + + defp compose_query({:select, keys}, query) do + select(query, [u], ^keys) + end + + defp compose_query({:limit, limit}, query) do + limit(query, ^limit) + end + defp compose_query(_unsupported_param, query), do: query defp prepare_tag_criteria(tag, query) do diff --git a/lib/pleroma/user/synchronization.ex b/lib/pleroma/user/synchronization.ex new file mode 100644 index 000000000..93660e08c --- /dev/null +++ b/lib/pleroma/user/synchronization.ex @@ -0,0 +1,60 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.User.Synchronization do + alias Pleroma.HTTP + alias Pleroma.User + + @spec call([User.t()], map(), keyword()) :: {User.t(), map()} + def call(users, errors, opts \\ []) do + do_call(users, errors, opts) + end + + defp do_call([user | []], errors, opts) do + updated = fetch_counters(user, errors, opts) + {user, updated} + end + + defp do_call([user | others], errors, opts) do + updated = fetch_counters(user, errors, opts) + do_call(others, updated, opts) + end + + defp fetch_counters(user, errors, opts) do + %{host: host} = URI.parse(user.ap_id) + + info = %{} + {following, errors} = fetch_counter(user.ap_id <> "/following", host, errors, opts) + info = if following, do: Map.put(info, :following_count, following), else: info + + {followers, errors} = fetch_counter(user.ap_id <> "/followers", host, errors, opts) + info = if followers, do: Map.put(info, :follower_count, followers), else: info + + User.set_info_cache(user, info) + errors + end + + defp available_domain?(domain, errors, opts) do + max_retries = Keyword.get(opts, :max_retries, 3) + not (Map.has_key?(errors, domain) && errors[domain] >= max_retries) + end + + defp fetch_counter(url, host, errors, opts) do + with true <- available_domain?(host, errors, opts), + {:ok, %{body: body, status: code}} when code in 200..299 <- + HTTP.get( + url, + [{:Accept, "application/activity+json"}] + ), + {:ok, data} <- Jason.decode(body) do + {data["totalItems"], errors} + else + false -> + {nil, errors} + + _ -> + {nil, Map.update(errors, host, 1, &(&1 + 1))} + end + end +end diff --git a/lib/pleroma/user/synchronization_worker.ex b/lib/pleroma/user/synchronization_worker.ex new file mode 100644 index 000000000..ba9cc3556 --- /dev/null +++ b/lib/pleroma/user/synchronization_worker.ex @@ -0,0 +1,32 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-onl + +defmodule Pleroma.User.SynchronizationWorker do + use GenServer + + def start_link do + config = Pleroma.Config.get([:instance, :external_user_synchronization]) + + if config[:enabled] do + GenServer.start_link(__MODULE__, interval: config[:interval]) + else + :ignore + end + end + + def init(opts) do + schedule_next(opts) + {:ok, opts} + end + + def handle_info(:sync_follow_counters, opts) do + Pleroma.User.sync_follow_counter() + schedule_next(opts) + {:noreply, opts} + end + + defp schedule_next(opts) do + Process.send_after(self(), :sync_follow_counters, opts[:interval]) + end +end diff --git a/test/fixtures/users_mock/masto_closed_followers.json b/test/fixtures/users_mock/masto_closed_followers.json new file mode 100644 index 000000000..da296892d --- /dev/null +++ b/test/fixtures/users_mock/masto_closed_followers.json @@ -0,0 +1,7 @@ +{ + "@context": "https://www.w3.org/ns/activitystreams", + "id": "http://localhost:4001/users/masto_closed/followers", + "type": "OrderedCollection", + "totalItems": 437, + "first": "http://localhost:4001/users/masto_closed/followers?page=1" +} diff --git a/test/fixtures/users_mock/masto_closed_following.json b/test/fixtures/users_mock/masto_closed_following.json new file mode 100644 index 000000000..146d49f9c --- /dev/null +++ b/test/fixtures/users_mock/masto_closed_following.json @@ -0,0 +1,7 @@ +{ + "@context": "https://www.w3.org/ns/activitystreams", + "id": "http://localhost:4001/users/masto_closed/following", + "type": "OrderedCollection", + "totalItems": 152, + "first": "http://localhost:4001/users/masto_closed/following?page=1" +} diff --git a/test/fixtures/users_mock/pleroma_followers.json b/test/fixtures/users_mock/pleroma_followers.json new file mode 100644 index 000000000..db71d084b --- /dev/null +++ b/test/fixtures/users_mock/pleroma_followers.json @@ -0,0 +1,20 @@ +{ + "type": "OrderedCollection", + "totalItems": 527, + "id": "http://localhost:4001/users/fuser2/followers", + "first": { + "type": "OrderedCollectionPage", + "totalItems": 527, + "partOf": "http://localhost:4001/users/fuser2/followers", + "orderedItems": [], + "next": "http://localhost:4001/users/fuser2/followers?page=2", + "id": "http://localhost:4001/users/fuser2/followers?page=1" + }, + "@context": [ + "https://www.w3.org/ns/activitystreams", + "http://localhost:4001/schemas/litepub-0.1.jsonld", + { + "@language": "und" + } + ] +} diff --git a/test/fixtures/users_mock/pleroma_following.json b/test/fixtures/users_mock/pleroma_following.json new file mode 100644 index 000000000..33d087703 --- /dev/null +++ b/test/fixtures/users_mock/pleroma_following.json @@ -0,0 +1,20 @@ +{ + "type": "OrderedCollection", + "totalItems": 267, + "id": "http://localhost:4001/users/fuser2/following", + "first": { + "type": "OrderedCollectionPage", + "totalItems": 267, + "partOf": "http://localhost:4001/users/fuser2/following", + "orderedItems": [], + "next": "http://localhost:4001/users/fuser2/following?page=2", + "id": "http://localhost:4001/users/fuser2/following?page=1" + }, + "@context": [ + "https://www.w3.org/ns/activitystreams", + "http://localhost:4001/schemas/litepub-0.1.jsonld", + { + "@language": "und" + } + ] +} diff --git a/test/support/http_request_mock.ex b/test/support/http_request_mock.ex index e6f357412..c593a5e4a 100644 --- a/test/support/http_request_mock.ex +++ b/test/support/http_request_mock.ex @@ -759,6 +759,54 @@ def get("https://pleroma.local/notice/9kCP7V", _, _, _) do {:ok, %Tesla.Env{status: 200, body: File.read!("test/fixtures/rich_media/ogp.html")}} end + def get("http://localhost:4001/users/masto_closed/followers", _, _, _) do + {:ok, + %Tesla.Env{ + status: 200, + body: File.read!("test/fixtures/users_mock/masto_closed_followers.json") + }} + end + + def get("http://localhost:4001/users/masto_closed/following", _, _, _) do + {:ok, + %Tesla.Env{ + status: 200, + body: File.read!("test/fixtures/users_mock/masto_closed_following.json") + }} + end + + def get("http://localhost:4001/users/fuser2/followers", _, _, _) do + {:ok, + %Tesla.Env{ + status: 200, + body: File.read!("test/fixtures/users_mock/pleroma_followers.json") + }} + end + + def get("http://localhost:4001/users/fuser2/following", _, _, _) do + {:ok, + %Tesla.Env{ + status: 200, + body: File.read!("test/fixtures/users_mock/pleroma_following.json") + }} + end + + def get("http://domain-with-errors:4001/users/fuser1/followers", _, _, _) do + {:ok, + %Tesla.Env{ + status: 504, + body: "" + }} + end + + def get("http://domain-with-errors:4001/users/fuser1/following", _, _, _) do + {:ok, + %Tesla.Env{ + status: 504, + body: "" + }} + end + def get("http://example.com/ogp-missing-data", _, _, _) do {:ok, %Tesla.Env{ diff --git a/test/user/synchronization_test.exs b/test/user/synchronization_test.exs new file mode 100644 index 000000000..67b669431 --- /dev/null +++ b/test/user/synchronization_test.exs @@ -0,0 +1,104 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.User.SynchronizationTest do + use Pleroma.DataCase + import Pleroma.Factory + alias Pleroma.User + alias Pleroma.User.Synchronization + + setup do + Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end) + :ok + end + + test "update following/followers counters" do + user1 = + insert(:user, + local: false, + ap_id: "http://localhost:4001/users/masto_closed" + ) + + user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2") + + users = User.external_users() + assert length(users) == 2 + {user, %{}} = Synchronization.call(users, %{}) + assert user == List.last(users) + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) + assert followers == 437 + assert following == 152 + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) + + assert followers == 527 + assert following == 267 + end + + test "don't check host if errors exist" do + user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1") + + user2 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser2") + + users = User.external_users() + assert length(users) == 2 + + {user, %{"domain-with-errors" => 2}} = + Synchronization.call(users, %{"domain-with-errors" => 2}, max_retries: 2) + + assert user == List.last(users) + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) + assert followers == 0 + assert following == 0 + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) + + assert followers == 0 + assert following == 0 + end + + test "don't check host if errors appeared" do + user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1") + + user2 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser2") + + users = User.external_users() + assert length(users) == 2 + + {user, %{"domain-with-errors" => 2}} = Synchronization.call(users, %{}, max_retries: 2) + + assert user == List.last(users) + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) + assert followers == 0 + assert following == 0 + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) + + assert followers == 0 + assert following == 0 + end + + test "other users after error appeared" do + user1 = insert(:user, local: false, ap_id: "http://domain-with-errors:4001/users/fuser1") + user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2") + + users = User.external_users() + assert length(users) == 2 + + {user, %{"domain-with-errors" => 2}} = Synchronization.call(users, %{}, max_retries: 2) + assert user == List.last(users) + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) + assert followers == 0 + assert following == 0 + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) + + assert followers == 527 + assert following == 267 + end +end diff --git a/test/user/synchronization_worker_test.exs b/test/user/synchronization_worker_test.exs new file mode 100644 index 000000000..835c5327f --- /dev/null +++ b/test/user/synchronization_worker_test.exs @@ -0,0 +1,49 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.User.SynchronizationWorkerTest do + use Pleroma.DataCase + import Pleroma.Factory + + setup do + Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) + + config = Pleroma.Config.get([:instance, :external_user_synchronization]) + + for_update = [enabled: true, interval: 1000] + + Pleroma.Config.put([:instance, :external_user_synchronization], for_update) + + on_exit(fn -> + Pleroma.Config.put([:instance, :external_user_synchronization], config) + end) + + :ok + end + + test "sync follow counters" do + user1 = + insert(:user, + local: false, + ap_id: "http://localhost:4001/users/masto_closed" + ) + + user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2") + + {:ok, _} = Pleroma.User.SynchronizationWorker.start_link() + :timer.sleep(1500) + + %{follower_count: followers, following_count: following} = + Pleroma.User.get_cached_user_info(user1) + + assert followers == 437 + assert following == 152 + + %{follower_count: followers, following_count: following} = + Pleroma.User.get_cached_user_info(user2) + + assert followers == 527 + assert following == 267 + end +end diff --git a/test/user_test.exs b/test/user_test.exs index fb497843c..0f27d73f7 100644 --- a/test/user_test.exs +++ b/test/user_test.exs @@ -1183,4 +1183,121 @@ test "it returns a list of AP ids for a given set of nicknames" do assert user_two.ap_id in ap_ids end end + + describe "sync followers count" do + setup do + user1 = insert(:user, local: false, ap_id: "http://localhost:4001/users/masto_closed") + user2 = insert(:user, local: false, ap_id: "http://localhost:4001/users/fuser2") + insert(:user, local: true) + insert(:user, local: false, info: %{deactivated: true}) + {:ok, user1: user1, user2: user2} + end + + test "external_users/1 external active users with limit", %{user1: user1, user2: user2} do + [fdb_user1] = User.external_users(limit: 1) + + assert fdb_user1.ap_id + assert fdb_user1.ap_id == user1.ap_id + assert fdb_user1.id == user1.id + + [fdb_user2] = User.external_users(max_id: fdb_user1.id, limit: 1) + + assert fdb_user2.ap_id + assert fdb_user2.ap_id == user2.ap_id + assert fdb_user2.id == user2.id + + assert User.external_users(max_id: fdb_user2.id, limit: 1) == [] + end + + test "sync_follow_counters/1", %{user1: user1, user2: user2} do + {:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors) + + :ok = User.sync_follow_counters() + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) + assert followers == 437 + assert following == 152 + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) + + assert followers == 527 + assert following == 267 + + Agent.stop(:domain_errors) + end + + test "sync_follow_counters/1 in separate batches", %{user1: user1, user2: user2} do + {:ok, _pid} = Agent.start_link(fn -> %{} end, name: :domain_errors) + + :ok = User.sync_follow_counters(limit: 1) + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) + assert followers == 437 + assert following == 152 + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) + + assert followers == 527 + assert following == 267 + + Agent.stop(:domain_errors) + end + + test "perform/1 with :sync_follow_counters", %{user1: user1, user2: user2} do + :ok = User.perform(:sync_follow_counters) + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user1) + assert followers == 437 + assert following == 152 + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user2) + + assert followers == 527 + assert following == 267 + end + end + + describe "set_info_cache/2" do + setup do + user = insert(:user) + {:ok, user: user} + end + + test "update from args", %{user: user} do + User.set_info_cache(user, %{following_count: 15, follower_count: 18}) + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user) + assert followers == 18 + assert following == 15 + end + + test "without args", %{user: user} do + User.set_info_cache(user, %{}) + + %{follower_count: followers, following_count: following} = User.get_cached_user_info(user) + assert followers == 0 + assert following == 0 + end + end + + describe "user_info/2" do + setup do + user = insert(:user) + {:ok, user: user} + end + + test "update from args", %{user: user} do + %{follower_count: followers, following_count: following} = + User.user_info(user, %{following_count: 15, follower_count: 18}) + + assert followers == 18 + assert following == 15 + end + + test "without args", %{user: user} do + %{follower_count: followers, following_count: following} = User.user_info(user) + + assert followers == 0 + assert following == 0 + end + end end