Extend Pleroma.Pagination to support offset-based pagination, use async/await to execute status and account search in parallel

This commit is contained in:
Eugenij 2019-07-11 13:55:31 +00:00 committed by kaniini
parent 4d382d1347
commit 4198c3ac39
8 changed files with 193 additions and 62 deletions

View file

@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Metadata rendering errors resulting in the entire page being inaccessible - Metadata rendering errors resulting in the entire page being inaccessible
- Mastodon API: Handling of search timeouts (`/api/v1/search` and `/api/v2/search`) - Mastodon API: Handling of search timeouts (`/api/v1/search` and `/api/v2/search`)
- Mastodon API: Embedded relationships not being properly rendered in the Account entity of Status entity - Mastodon API: Embedded relationships not being properly rendered in the Account entity of Status entity
- Mastodon API: Add `account_id`, `type`, `offset`, and `limit` to search API (`/api/v1/search` and `/api/v2/search`)
### Added ### Added
- MRF: Support for priming the mediaproxy cache (`Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy`) - MRF: Support for priming the mediaproxy cache (`Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy`)

View file

@ -65,7 +65,9 @@
total_user_limit: 3, total_user_limit: 3,
enabled: false enabled: false
config :pleroma, :rate_limit, app_account_creation: {10_000, 5} config :pleroma, :rate_limit,
search: [{1000, 30}, {1000, 30}],
app_account_creation: {10_000, 5}
config :pleroma, :http_security, report_uri: "https://endpoint.com" config :pleroma, :http_security, report_uri: "https://endpoint.com"

View file

@ -344,5 +344,5 @@ def restrict_deactivated_users(query) do
) )
end end
defdelegate search(user, query), to: Pleroma.Activity.Search defdelegate search(user, query, options \\ []), to: Pleroma.Activity.Search
end end

View file

@ -5,14 +5,17 @@
defmodule Pleroma.Activity.Search do defmodule Pleroma.Activity.Search do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Object.Fetcher alias Pleroma.Object.Fetcher
alias Pleroma.Repo alias Pleroma.Pagination
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.ActivityPub.Visibility
import Ecto.Query import Ecto.Query
def search(user, search_query) do def search(user, search_query, options \\ []) do
index_type = if Pleroma.Config.get([:database, :rum_enabled]), do: :rum, else: :gin index_type = if Pleroma.Config.get([:database, :rum_enabled]), do: :rum, else: :gin
limit = Enum.min([Keyword.get(options, :limit), 40])
offset = Keyword.get(options, :offset, 0)
author = Keyword.get(options, :author)
Activity Activity
|> Activity.with_preloaded_object() |> Activity.with_preloaded_object()
@ -20,15 +23,23 @@ def search(user, search_query) do
|> restrict_public() |> restrict_public()
|> query_with(index_type, search_query) |> query_with(index_type, search_query)
|> maybe_restrict_local(user) |> maybe_restrict_local(user)
|> Repo.all() |> maybe_restrict_author(author)
|> Pagination.fetch_paginated(%{"offset" => offset, "limit" => limit}, :offset)
|> maybe_fetch(user, search_query) |> maybe_fetch(user, search_query)
end end
def maybe_restrict_author(query, %User{} = author) do
from([a, o] in query,
where: a.actor == ^author.ap_id
)
end
def maybe_restrict_author(query, _), do: query
defp restrict_public(q) do defp restrict_public(q) do
from([a, o] in q, from([a, o] in q,
where: fragment("?->>'type' = 'Create'", a.data), where: fragment("?->>'type' = 'Create'", a.data),
where: "https://www.w3.org/ns/activitystreams#Public" in a.recipients, where: "https://www.w3.org/ns/activitystreams#Public" in a.recipients
limit: 40
) )
end end

View file

@ -14,16 +14,28 @@ defmodule Pleroma.Pagination do
@default_limit 20 @default_limit 20
def fetch_paginated(query, params) do def fetch_paginated(query, params, type \\ :keyset)
def fetch_paginated(query, params, :keyset) do
options = cast_params(params) options = cast_params(params)
query query
|> paginate(options) |> paginate(options, :keyset)
|> Repo.all() |> Repo.all()
|> enforce_order(options) |> enforce_order(options)
end end
def paginate(query, options) do def fetch_paginated(query, params, :offset) do
options = cast_params(params)
query
|> paginate(options, :offset)
|> Repo.all()
end
def paginate(query, options, method \\ :keyset)
def paginate(query, options, :keyset) do
query query
|> restrict(:min_id, options) |> restrict(:min_id, options)
|> restrict(:since_id, options) |> restrict(:since_id, options)
@ -32,11 +44,18 @@ def paginate(query, options) do
|> restrict(:limit, options) |> restrict(:limit, options)
end end
def paginate(query, options, :offset) do
query
|> restrict(:offset, options)
|> restrict(:limit, options)
end
defp cast_params(params) do defp cast_params(params) do
param_types = %{ param_types = %{
min_id: :string, min_id: :string,
since_id: :string, since_id: :string,
max_id: :string, max_id: :string,
offset: :integer,
limit: :integer limit: :integer
} }
@ -70,6 +89,10 @@ defp restrict(query, :order, _options) do
order_by(query, [u], fragment("? desc nulls last", u.id)) order_by(query, [u], fragment("? desc nulls last", u.id))
end end
defp restrict(query, :offset, %{offset: offset}) do
offset(query, ^offset)
end
defp restrict(query, :limit, options) do defp restrict(query, :limit, options) do
limit = Map.get(options, :limit, @default_limit) limit = Map.get(options, :limit, @default_limit)

View file

@ -3,6 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.User.Search do defmodule Pleroma.User.Search do
alias Pleroma.Pagination
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
import Ecto.Query import Ecto.Query
@ -32,8 +33,7 @@ def search(query_string, opts \\ []) do
query_string query_string
|> search_query(for_user, following) |> search_query(for_user, following)
|> paginate(result_limit, offset) |> Pagination.fetch_paginated(%{"offset" => offset, "limit" => result_limit}, :offset)
|> Repo.all()
end) end)
results results
@ -87,10 +87,6 @@ defp filter_blocked_domains(query, %User{info: %{domain_blocks: domain_blocks}})
defp filter_blocked_domains(query, _), do: query defp filter_blocked_domains(query, _), do: query
defp paginate(query, limit, offset) do
from(q in query, limit: ^limit, offset: ^offset)
end
defp union_subqueries({fts_subquery, trigram_subquery}) do defp union_subqueries({fts_subquery, trigram_subquery}) do
from(s in trigram_subquery, union_all: ^fts_subquery) from(s in trigram_subquery, union_all: ^fts_subquery)
end end

View file

@ -7,6 +7,7 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Plugs.RateLimiter alias Pleroma.Plugs.RateLimiter
alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web alias Pleroma.Web
alias Pleroma.Web.ControllerHelper alias Pleroma.Web.ControllerHelper
@ -16,43 +17,6 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do
require Logger require Logger
plug(RateLimiter, :search when action in [:search, :search2, :account_search]) plug(RateLimiter, :search when action in [:search, :search2, :account_search])
def search2(%{assigns: %{user: user}} = conn, %{"q" => query} = params) do
accounts = with_fallback(fn -> User.search(query, search_options(params, user)) end, [])
statuses = with_fallback(fn -> Activity.search(user, query) end, [])
tags_path = Web.base_url() <> "/tag/"
tags =
query
|> prepare_tags
|> Enum.map(fn tag -> %{name: tag, url: tags_path <> tag} end)
res = %{
"accounts" => AccountView.render("accounts.json", users: accounts, for: user, as: :user),
"statuses" =>
StatusView.render("index.json", activities: statuses, for: user, as: :activity),
"hashtags" => tags
}
json(conn, res)
end
def search(%{assigns: %{user: user}} = conn, %{"q" => query} = params) do
accounts = with_fallback(fn -> User.search(query, search_options(params, user)) end)
statuses = with_fallback(fn -> Activity.search(user, query) end)
tags = prepare_tags(query)
res = %{
"accounts" => AccountView.render("accounts.json", users: accounts, for: user, as: :user),
"statuses" =>
StatusView.render("index.json", activities: statuses, for: user, as: :activity),
"hashtags" => tags
}
json(conn, res)
end
def account_search(%{assigns: %{user: user}} = conn, %{"q" => query} = params) do def account_search(%{assigns: %{user: user}} = conn, %{"q" => query} = params) do
accounts = User.search(query, search_options(params, user)) accounts = User.search(query, search_options(params, user))
res = AccountView.render("accounts.json", users: accounts, for: user, as: :user) res = AccountView.render("accounts.json", users: accounts, for: user, as: :user)
@ -60,12 +24,36 @@ def account_search(%{assigns: %{user: user}} = conn, %{"q" => query} = params) d
json(conn, res) json(conn, res)
end end
defp prepare_tags(query) do def search2(conn, params), do: do_search(:v2, conn, params)
query def search(conn, params), do: do_search(:v1, conn, params)
|> String.split()
|> Enum.uniq() defp do_search(version, %{assigns: %{user: user}} = conn, %{"q" => query} = params) do
|> Enum.filter(fn tag -> String.starts_with?(tag, "#") end) options = search_options(params, user)
|> Enum.map(fn tag -> String.slice(tag, 1..-1) end) timeout = Keyword.get(Repo.config(), :timeout, 15_000)
default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []}
result =
default_values
|> Enum.map(fn {resource, default_value} ->
if params["type"] == nil or params["type"] == resource do
{resource, fn -> resource_search(version, resource, query, options) end}
else
{resource, fn -> default_value end}
end
end)
|> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end,
timeout: timeout,
on_timeout: :kill_task
)
|> Enum.reduce(default_values, fn
{:ok, {resource, result}}, acc ->
Map.put(acc, resource, result)
_error, acc ->
acc
end)
json(conn, result)
end end
defp search_options(params, user) do defp search_options(params, user) do
@ -74,8 +62,45 @@ defp search_options(params, user) do
following: params["following"] == "true", following: params["following"] == "true",
limit: ControllerHelper.fetch_integer_param(params, "limit"), limit: ControllerHelper.fetch_integer_param(params, "limit"),
offset: ControllerHelper.fetch_integer_param(params, "offset"), offset: ControllerHelper.fetch_integer_param(params, "offset"),
type: params["type"],
author: get_author(params),
for_user: user for_user: user
] ]
|> Enum.filter(&elem(&1, 1))
end
defp resource_search(_, "accounts", query, options) do
accounts = with_fallback(fn -> User.search(query, options) end)
AccountView.render("accounts.json", users: accounts, for: options[:for_user], as: :user)
end
defp resource_search(_, "statuses", query, options) do
statuses = with_fallback(fn -> Activity.search(options[:for_user], query, options) end)
StatusView.render("index.json", activities: statuses, for: options[:for_user], as: :activity)
end
defp resource_search(:v2, "hashtags", query, _options) do
tags_path = Web.base_url() <> "/tag/"
query
|> prepare_tags()
|> Enum.map(fn tag ->
tag = String.trim_leading(tag, "#")
%{name: tag, url: tags_path <> tag}
end)
end
defp resource_search(:v1, "hashtags", query, _options) do
query
|> prepare_tags()
|> Enum.map(fn tag -> String.trim_leading(tag, "#") end)
end
defp prepare_tags(query) do
query
|> String.split()
|> Enum.uniq()
|> Enum.filter(fn tag -> String.starts_with?(tag, "#") end)
end end
defp with_fallback(f, fallback \\ []) do defp with_fallback(f, fallback \\ []) do
@ -87,4 +112,9 @@ defp with_fallback(f, fallback \\ []) do
fallback fallback
end end
end end
defp get_author(%{"account_id" => account_id}) when is_binary(account_id),
do: User.get_cached_by_id(account_id)
defp get_author(_params), do: nil
end end

View file

@ -22,7 +22,7 @@ defmodule Pleroma.Web.MastodonAPI.SearchControllerTest do
test "it returns empty result if user or status search return undefined error", %{conn: conn} do test "it returns empty result if user or status search return undefined error", %{conn: conn} do
with_mocks [ with_mocks [
{Pleroma.User, [], [search: fn _q, _o -> raise "Oops" end]}, {Pleroma.User, [], [search: fn _q, _o -> raise "Oops" end]},
{Pleroma.Activity, [], [search: fn _u, _q -> raise "Oops" end]} {Pleroma.Activity, [], [search: fn _u, _q, _o -> raise "Oops" end]}
] do ] do
conn = get(conn, "/api/v2/search", %{"q" => "2hu"}) conn = get(conn, "/api/v2/search", %{"q" => "2hu"})
@ -51,7 +51,6 @@ test "search", %{conn: conn} do
conn = get(conn, "/api/v2/search", %{"q" => "2hu #private"}) conn = get(conn, "/api/v2/search", %{"q" => "2hu #private"})
assert results = json_response(conn, 200) assert results = json_response(conn, 200)
# IO.inspect results
[account | _] = results["accounts"] [account | _] = results["accounts"]
assert account["id"] == to_string(user_three.id) assert account["id"] == to_string(user_three.id)
@ -98,7 +97,7 @@ test "account search", %{conn: conn} do
test "it returns empty result if user or status search return undefined error", %{conn: conn} do test "it returns empty result if user or status search return undefined error", %{conn: conn} do
with_mocks [ with_mocks [
{Pleroma.User, [], [search: fn _q, _o -> raise "Oops" end]}, {Pleroma.User, [], [search: fn _q, _o -> raise "Oops" end]},
{Pleroma.Activity, [], [search: fn _u, _q -> raise "Oops" end]} {Pleroma.Activity, [], [search: fn _u, _q, _o -> raise "Oops" end]}
] do ] do
conn = conn =
conn conn
@ -195,5 +194,74 @@ test "search doesn't fetch remote accounts if resolve is false", %{conn: conn} d
assert results = json_response(conn, 200) assert results = json_response(conn, 200)
assert [] == results["accounts"] assert [] == results["accounts"]
end end
test "search with limit and offset", %{conn: conn} do
user = insert(:user)
_user_two = insert(:user, %{nickname: "shp@shitposter.club"})
_user_three = insert(:user, %{nickname: "shp@heldscal.la", name: "I love 2hu"})
{:ok, _activity1} = CommonAPI.post(user, %{"status" => "This is about 2hu"})
{:ok, _activity2} = CommonAPI.post(user, %{"status" => "This is also about 2hu"})
result =
conn
|> get("/api/v1/search", %{"q" => "2hu", "limit" => 1})
assert results = json_response(result, 200)
assert [%{"id" => activity_id1}] = results["statuses"]
assert [_] = results["accounts"]
results =
conn
|> get("/api/v1/search", %{"q" => "2hu", "limit" => 1, "offset" => 1})
|> json_response(200)
assert [%{"id" => activity_id2}] = results["statuses"]
assert [] = results["accounts"]
assert activity_id1 != activity_id2
end
test "search returns results only for the given type", %{conn: conn} do
user = insert(:user)
_user_two = insert(:user, %{nickname: "shp@heldscal.la", name: "I love 2hu"})
{:ok, _activity} = CommonAPI.post(user, %{"status" => "This is about 2hu"})
assert %{"statuses" => [_activity], "accounts" => [], "hashtags" => []} =
conn
|> get("/api/v1/search", %{"q" => "2hu", "type" => "statuses"})
|> json_response(200)
assert %{"statuses" => [], "accounts" => [_user_two], "hashtags" => []} =
conn
|> get("/api/v1/search", %{"q" => "2hu", "type" => "accounts"})
|> json_response(200)
end
test "search uses account_id to filter statuses by the author", %{conn: conn} do
user = insert(:user, %{nickname: "shp@shitposter.club"})
user_two = insert(:user, %{nickname: "shp@heldscal.la", name: "I love 2hu"})
{:ok, activity1} = CommonAPI.post(user, %{"status" => "This is about 2hu"})
{:ok, activity2} = CommonAPI.post(user_two, %{"status" => "This is also about 2hu"})
results =
conn
|> get("/api/v1/search", %{"q" => "2hu", "account_id" => user.id})
|> json_response(200)
assert [%{"id" => activity_id1}] = results["statuses"]
assert activity_id1 == activity1.id
assert [_] = results["accounts"]
results =
conn
|> get("/api/v1/search", %{"q" => "2hu", "account_id" => user_two.id})
|> json_response(200)
assert [%{"id" => activity_id2}] = results["statuses"]
assert activity_id2 == activity2.id
end
end end
end end