From 1ecdb19de5fa54119c6a744c766bbd6c77d1b746 Mon Sep 17 00:00:00 2001 From: FloatingGhost Date: Thu, 30 Jun 2022 16:28:31 +0100 Subject: [PATCH] Refactor ES on top of search behaviour --- config/config.exs | 18 +- lib/mix/tasks/pleroma/search.ex | 64 ----- lib/mix/tasks/pleroma/search/meilisearch.ex | 144 ---------- lib/pleroma/application.ex | 11 + .../document_mappings/activity.ex | 19 -- .../document_mappings/hashtag.ex | 21 -- .../elasticsearch/document_mappings/user.ex | 17 -- lib/pleroma/elasticsearch/store.ex | 256 ------------------ lib/pleroma/hashtag.ex | 1 - lib/pleroma/search/builtin.ex | 138 ---------- lib/pleroma/search/elasticsearch.ex | 112 ++------ lib/pleroma/search/elasticsearch/cluster.ex | 4 + .../document_mappings/activity.ex | 55 ++++ .../search/elasticsearch/hashtag_parser.ex | 34 --- lib/pleroma/search/elasticsearch/store.ex | 52 ++++ .../search/elasticsearch/user_paser.ex | 57 ---- lib/pleroma/user.ex | 1 - lib/pleroma/web/activity_pub/pipeline.ex | 1 - lib/pleroma/web/activity_pub/side_effects.ex | 21 +- .../web/activity_pub/side_effects/handling.ex | 2 +- lib/pleroma/web/common_api.ex | 8 +- .../controllers/search_controller.ex | 37 ++- mix.exs | 1 + priv/es-mappings/activity.json | 34 +-- .../web/activity_pub/pipeline_test.exs | 1 - 25 files changed, 212 insertions(+), 897 deletions(-) delete mode 100644 lib/mix/tasks/pleroma/search.ex delete mode 100644 lib/mix/tasks/pleroma/search/meilisearch.ex delete mode 100644 lib/pleroma/elasticsearch/document_mappings/activity.ex delete mode 100644 lib/pleroma/elasticsearch/document_mappings/hashtag.ex delete mode 100644 lib/pleroma/elasticsearch/document_mappings/user.ex delete mode 100644 lib/pleroma/elasticsearch/store.ex delete mode 100644 lib/pleroma/search/builtin.ex create mode 100644 lib/pleroma/search/elasticsearch/cluster.ex create mode 100644 lib/pleroma/search/elasticsearch/document_mappings/activity.ex delete mode 100644 lib/pleroma/search/elasticsearch/hashtag_parser.ex create mode 100644 lib/pleroma/search/elasticsearch/store.ex delete mode 100644 lib/pleroma/search/elasticsearch/user_paser.ex diff --git a/config/config.exs b/config/config.exs index cf5f9cf27..727a2b0cb 100644 --- a/config/config.exs +++ b/config/config.exs @@ -856,8 +856,6 @@ {Pleroma.Search, [max_running: 30, max_waiting: 50]} ] -config :pleroma, :search, provider: Pleroma.Search.Builtin - config :pleroma, Pleroma.Search, module: Pleroma.Search.DatabaseSearch config :pleroma, Pleroma.Search.Meilisearch, @@ -865,6 +863,22 @@ private_key: nil, initial_indexing_chunk_size: 100_000 +config :pleroma, Pleroma.Search.Elasticsearch.Cluster, + url: "http://localhost:9200", + username: "elastic", + password: "changeme", + api: Elasticsearch.API.HTTP, + json_library: Jason, + indexes: %{ + activities: %{ + settings: "priv/es-mappings/activity.json", + store: Pleroma.Search.Elasticsearch.Store, + sources: [Pleroma.Activity], + bulk_page_size: 5000, + bulk_wait_interval: 15_000 + } + } + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{Mix.env()}.exs" diff --git a/lib/mix/tasks/pleroma/search.ex b/lib/mix/tasks/pleroma/search.ex deleted file mode 100644 index 1fd880eab..000000000 --- a/lib/mix/tasks/pleroma/search.ex +++ /dev/null @@ -1,64 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2021 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Mix.Tasks.Pleroma.Search do - use Mix.Task - import Mix.Pleroma - import Ecto.Query - alias Pleroma.Activity - alias Pleroma.Pagination - alias Pleroma.User - alias Pleroma.Hashtag - - @shortdoc "Manages elasticsearch" - - def run(["import", "activities" | _rest]) do - start_pleroma() - - from(a in Activity, where: not ilike(a.actor, "%/relay")) - |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data)) - |> Activity.with_preloaded_object() - |> Activity.with_preloaded_user_actor() - |> get_all(:activities) - end - - def run(["import", "users" | _rest]) do - start_pleroma() - - from(u in User, where: u.nickname not in ["internal.fetch", "relay"]) - |> get_all(:users) - end - - def run(["import", "hashtags" | _rest]) do - start_pleroma() - - from(h in Hashtag) - |> Pleroma.Repo.all() - |> Pleroma.Elasticsearch.bulk_post(:hashtags) - end - - defp get_all(query, index, max_id \\ nil) do - params = %{limit: 1000} - - params = - if max_id == nil do - params - else - Map.put(params, :max_id, max_id) - end - - res = - query - |> Pagination.fetch_paginated(params) - - if res == [] do - :ok - else - res - |> Pleroma.Elasticsearch.bulk_post(index) - - get_all(query, index, List.last(res).id) - end - end -end diff --git a/lib/mix/tasks/pleroma/search/meilisearch.ex b/lib/mix/tasks/pleroma/search/meilisearch.ex deleted file mode 100644 index d4a83c3cd..000000000 --- a/lib/mix/tasks/pleroma/search/meilisearch.ex +++ /dev/null @@ -1,144 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2021 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Mix.Tasks.Pleroma.Search.Meilisearch do - require Pleroma.Constants - - import Mix.Pleroma - import Ecto.Query - - import Pleroma.Search.Meilisearch, - only: [meili_post: 2, meili_put: 2, meili_get: 1, meili_delete!: 1] - - def run(["index"]) do - start_pleroma() - - meili_version = - ( - {:ok, result} = meili_get("/version") - - result["pkgVersion"] - ) - - # The ranking rule syntax was changed but nothing about that is mentioned in the changelog - if not Version.match?(meili_version, ">= 0.25.0") do - raise "Meilisearch <0.24.0 not supported" - end - - {:ok, _} = - meili_post( - "/indexes/objects/settings/ranking-rules", - [ - "published:desc", - "words", - "exactness", - "proximity", - "typo", - "attribute", - "sort" - ] - ) - - {:ok, _} = - meili_post( - "/indexes/objects/settings/searchable-attributes", - [ - "content" - ] - ) - - IO.puts("Created indices. Starting to insert posts.") - - chunk_size = Pleroma.Config.get([Pleroma.Search.Meilisearch, :initial_indexing_chunk_size]) - - Pleroma.Repo.transaction( - fn -> - query = - from(Pleroma.Object, - # Only index public and unlisted posts which are notes and have some text - where: - fragment("data->>'type' = 'Note'") and - (fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()) or - fragment("data->'cc' \\? ?", ^Pleroma.Constants.as_public())), - order_by: [desc: fragment("data->'published'")] - ) - - count = query |> Pleroma.Repo.aggregate(:count, :data) - IO.puts("Entries to index: #{count}") - - Pleroma.Repo.stream( - query, - timeout: :infinity - ) - |> Stream.map(&Pleroma.Search.Meilisearch.object_to_search_data/1) - |> Stream.filter(fn o -> not is_nil(o) end) - |> Stream.chunk_every(chunk_size) - |> Stream.transform(0, fn objects, acc -> - new_acc = acc + Enum.count(objects) - - # Reset to the beginning of the line and rewrite it - IO.write("\r") - IO.write("Indexed #{new_acc} entries") - - {[objects], new_acc} - end) - |> Stream.each(fn objects -> - result = - meili_put( - "/indexes/objects/documents", - objects - ) - - with {:ok, res} <- result do - if not Map.has_key?(res, "uid") do - IO.puts("\nFailed to index: #{inspect(result)}") - end - else - e -> IO.puts("\nFailed to index due to network error: #{inspect(e)}") - end - end) - |> Stream.run() - end, - timeout: :infinity - ) - - IO.write("\n") - end - - def run(["clear"]) do - start_pleroma() - - meili_delete!("/indexes/objects/documents") - end - - def run(["show-keys", master_key]) do - start_pleroma() - - endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url]) - - {:ok, result} = - Pleroma.HTTP.get( - Path.join(endpoint, "/keys"), - [{"Authorization", "Bearer #{master_key}"}] - ) - - decoded = Jason.decode!(result.body) - - if decoded["results"] do - Enum.each(decoded["results"], fn %{"description" => desc, "key" => key} -> - IO.puts("#{desc}: #{key}") - end) - else - IO.puts("Error fetching the keys, check the master key is correct: #{inspect(decoded)}") - end - end - - def run(["stats"]) do - start_pleroma() - - {:ok, result} = meili_get("/indexes/objects/stats") - IO.puts("Number of entries: #{result["numberOfDocuments"]}") - IO.puts("Indexing? #{result["isIndexing"]}") - end -end diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index be03cdffb..b709e737b 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -105,6 +105,7 @@ def start(_type, _args) do {Oban, Config.get(Oban)}, Pleroma.Web.Endpoint ] ++ + elasticsearch_children() ++ task_children(@mix_env) ++ dont_run_in_test(@mix_env) ++ shout_child(shout_enabled?()) @@ -303,6 +304,16 @@ defp http_children(Tesla.Adapter.Gun, _) do defp http_children(_, _), do: [] + def elasticsearch_children do + config = Config.get([Pleroma.Search, :module]) + + if config == Pleroma.Search.Elasticsearch do + [Pleroma.Search.Elasticsearch.Cluster] + else + [] + end + end + @spec limiters_setup() :: :ok def limiters_setup do config = Config.get(ConcurrentLimiter, []) diff --git a/lib/pleroma/elasticsearch/document_mappings/activity.ex b/lib/pleroma/elasticsearch/document_mappings/activity.ex deleted file mode 100644 index a028c6fad..000000000 --- a/lib/pleroma/elasticsearch/document_mappings/activity.ex +++ /dev/null @@ -1,19 +0,0 @@ -# Akkoma: A lightweight social networking server -# Copyright © 2022-2022 Akkoma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Elasticsearch.DocumentMappings.Activity do - alias Pleroma.Object - - def id(obj), do: obj.id - - def encode(%{object: %{data: %{"type" => "Note"}}} = activity) do - %{ - _timestamp: activity.inserted_at, - user: activity.user_actor.nickname, - content: activity.object.data["content"], - instance: URI.parse(activity.user_actor.ap_id).host, - hashtags: Object.hashtags(activity.object) - } - end -end diff --git a/lib/pleroma/elasticsearch/document_mappings/hashtag.ex b/lib/pleroma/elasticsearch/document_mappings/hashtag.ex deleted file mode 100644 index 7391983f6..000000000 --- a/lib/pleroma/elasticsearch/document_mappings/hashtag.ex +++ /dev/null @@ -1,21 +0,0 @@ -# Akkoma: A lightweight social networking server -# Copyright © 2022-2022 Akkoma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Elasticsearch.DocumentMappings.Hashtag do - def id(obj), do: obj.id - - def encode(%{timestamp: _} = hashtag) do - %{ - hashtag: hashtag.name, - timestamp: hashtag.timestamp - } - end - - def encode(hashtag) do - %{ - hashtag: hashtag.name, - timestamp: hashtag.inserted_at - } - end -end diff --git a/lib/pleroma/elasticsearch/document_mappings/user.ex b/lib/pleroma/elasticsearch/document_mappings/user.ex deleted file mode 100644 index d5cfca656..000000000 --- a/lib/pleroma/elasticsearch/document_mappings/user.ex +++ /dev/null @@ -1,17 +0,0 @@ -# Akkoma: A lightweight social networking server -# Copyright © 2022-2022 Akkoma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Elasticsearch.DocumentMappings.User do - def id(obj), do: obj.id - - def encode(%{actor_type: "Person"} = user) do - %{ - timestamp: user.inserted_at, - instance: URI.parse(user.ap_id).host, - nickname: user.nickname, - bio: user.bio, - display_name: user.name - } - end -end diff --git a/lib/pleroma/elasticsearch/store.ex b/lib/pleroma/elasticsearch/store.ex deleted file mode 100644 index 98c88a7c7..000000000 --- a/lib/pleroma/elasticsearch/store.ex +++ /dev/null @@ -1,256 +0,0 @@ -# Akkoma: A lightweight social networking server -# Copyright © 2022-2022 Akkoma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Elasticsearch do - alias Pleroma.Activity - alias Pleroma.User - alias Pleroma.Object - alias Pleroma.Elasticsearch.DocumentMappings - alias Pleroma.Config - require Logger - - defp url do - Config.get([:elasticsearch, :url]) - end - - defp enabled? do - Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch - end - - def delete_by_id(:activity, id) do - if enabled?() do - Elastix.Document.delete(url(), "activities", "activity", id) - end - end - - def put_by_id(:activity, id) do - id - |> Activity.get_by_id_with_object() - |> maybe_put_into_elasticsearch() - end - - def maybe_put_into_elasticsearch({:ok, item}) do - maybe_put_into_elasticsearch(item) - end - - def maybe_put_into_elasticsearch( - %{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity - ) do - if enabled?() do - actor = Pleroma.Activity.user_actor(activity) - - activity - |> Map.put(:user_actor, actor) - |> put() - end - end - - def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do - if enabled?() do - put(user) - end - end - - def maybe_put_into_elasticsearch(_) do - {:ok, :skipped} - end - - def maybe_bulk_post(data, type) do - if enabled?() do - bulk_post(data, type) - end - end - - def put(%Activity{} = activity) do - with {:ok, _} <- - Elastix.Document.index( - url(), - "activities", - "activity", - DocumentMappings.Activity.id(activity), - DocumentMappings.Activity.encode(activity) - ) do - activity - |> Map.get(:object) - |> Object.hashtags() - |> Enum.map(fn x -> - %{id: x, name: x, timestamp: DateTime.to_iso8601(DateTime.utc_now())} - end) - |> bulk_post(:hashtags) - else - {:error, %{reason: err}} -> - Logger.error("Could not put activity: #{err}") - :skipped - end - end - - def put(%User{} = user) do - with {:ok, _} <- - Elastix.Document.index( - url(), - "users", - "user", - DocumentMappings.User.id(user), - DocumentMappings.User.encode(user) - ) do - :ok - else - {:error, %{reason: err}} -> - Logger.error("Could not put user: #{err}") - :skipped - end - end - - def bulk_post(data, :activities) do - d = - data - |> Enum.filter(fn x -> - t = - x.object - |> Map.get(:data, %{}) - |> Map.get("type", "") - - t == "Note" - end) - |> Enum.map(fn d -> - [ - %{index: %{_id: DocumentMappings.Activity.id(d)}}, - DocumentMappings.Activity.encode(d) - ] - end) - |> List.flatten() - - with {:ok, %{body: %{"errors" => false}}} <- - Elastix.Bulk.post( - url(), - d, - index: "activities", - type: "activity" - ) do - :ok - else - {:error, %{reason: err}} -> - Logger.error("Could not bulk put activity: #{err}") - :skipped - - {:ok, %{body: _}} -> - :skipped - end - end - - def bulk_post(data, :users) do - d = - data - |> Enum.filter(fn x -> x.actor_type == "Person" end) - |> Enum.map(fn d -> - [ - %{index: %{_id: DocumentMappings.User.id(d)}}, - DocumentMappings.User.encode(d) - ] - end) - |> List.flatten() - - with {:ok, %{body: %{"errors" => false}}} <- - Elastix.Bulk.post( - url(), - d, - index: "users", - type: "user" - ) do - :ok - else - {:error, %{reason: err}} -> - Logger.error("Could not bulk put users: #{err}") - :skipped - - {:ok, %{body: _}} -> - :skipped - end - end - - def bulk_post(data, :hashtags) when is_list(data) do - d = - data - |> Enum.map(fn d -> - [ - %{index: %{_id: DocumentMappings.Hashtag.id(d)}}, - DocumentMappings.Hashtag.encode(d) - ] - end) - |> List.flatten() - - with {:ok, %{body: %{"errors" => false}}} <- - Elastix.Bulk.post( - url(), - d, - index: "hashtags", - type: "hashtag" - ) do - :ok - else - {:error, %{reason: err}} -> - Logger.error("Could not bulk put hashtags: #{err}") - :skipped - - {:ok, %{body: _}} -> - :skipped - end - end - - def bulk_post(_, :hashtags), do: {:ok, nil} - - def search(_, _, _, :skip), do: [] - - def search(:raw, index, type, q) do - with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do - results = - raw_results - |> Map.get(:body, %{}) - |> Map.get("hits", %{}) - |> Map.get("hits", []) - - {:ok, results} - else - {:error, e} -> - Logger.error(e) - {:error, e} - end - end - - def search(:activities, q) do - with {:ok, results} <- search(:raw, "activities", "activity", q) do - results - |> Enum.map(fn result -> result["_id"] end) - |> Pleroma.Activity.all_by_ids_with_object() - |> Enum.sort(&(&1.inserted_at >= &2.inserted_at)) - else - e -> - Logger.error(e) - [] - end - end - - def search(:users, q) do - with {:ok, results} <- search(:raw, "users", "user", q) do - results - |> Enum.map(fn result -> result["_id"] end) - |> Pleroma.User.get_all_by_ids() - else - e -> - Logger.error(e) - [] - end - end - - def search(:hashtags, q) do - with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do - results - |> Enum.map(fn result -> result["_source"]["hashtag"] end) - else - e -> - Logger.error(e) - [] - end - end -end diff --git a/lib/pleroma/hashtag.ex b/lib/pleroma/hashtag.ex index cdbfeab02..53e2e9c89 100644 --- a/lib/pleroma/hashtag.ex +++ b/lib/pleroma/hashtag.ex @@ -61,7 +61,6 @@ def get_or_create_by_names(names) when is_list(names) do {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))} end) |> Repo.transaction() do - Pleroma.Elasticsearch.maybe_bulk_post(hashtags, :hashtags) {:ok, hashtags} else {:error, _name, value, _changes_so_far} -> {:error, value} diff --git a/lib/pleroma/search/builtin.ex b/lib/pleroma/search/builtin.ex deleted file mode 100644 index 3cbe2207a..000000000 --- a/lib/pleroma/search/builtin.ex +++ /dev/null @@ -1,138 +0,0 @@ -defmodule Pleroma.Search.Builtin do - @behaviour Pleroma.Search - - alias Pleroma.Repo - alias Pleroma.User - alias Pleroma.Activity - alias Pleroma.Web.MastodonAPI.AccountView - alias Pleroma.Web.MastodonAPI.StatusView - alias Pleroma.Web.Endpoint - - require Logger - - @impl Pleroma.Search - def search(_conn, %{q: query} = params, options) do - version = Keyword.get(options, :version) - timeout = Keyword.get(Repo.config(), :timeout, 15_000) - query = String.trim(query) - default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []} - - default_values - |> Enum.map(fn {resource, default_value} -> - if params[:type] in [nil, resource] do - {resource, fn -> resource_search(version, resource, query, options) end} - else - {resource, fn -> default_value end} - end - end) - |> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end, - timeout: timeout, - on_timeout: :kill_task - ) - |> Enum.reduce(default_values, fn - {:ok, {resource, result}}, acc -> - Map.put(acc, resource, result) - - _error, acc -> - acc - end) - end - - defp resource_search(_, "accounts", query, options) do - accounts = with_fallback(fn -> User.search(query, options) end) - - AccountView.render("index.json", - users: accounts, - for: options[:for_user], - embed_relationships: options[:embed_relationships] - ) - end - - defp resource_search(_, "statuses", query, options) do - statuses = with_fallback(fn -> Activity.search(options[:for_user], query, options) end) - - StatusView.render("index.json", - activities: statuses, - for: options[:for_user], - as: :activity - ) - end - - defp resource_search(:v2, "hashtags", query, options) do - tags_path = Endpoint.url() <> "/tag/" - - query - |> prepare_tags(options) - |> Enum.map(fn tag -> - %{name: tag, url: tags_path <> tag} - end) - end - - defp resource_search(:v1, "hashtags", query, options) do - prepare_tags(query, options) - end - - defp prepare_tags(query, options) do - tags = - query - |> preprocess_uri_query() - |> String.split(~r/[^#\w]+/u, trim: true) - |> Enum.uniq_by(&String.downcase/1) - - explicit_tags = Enum.filter(tags, fn tag -> String.starts_with?(tag, "#") end) - - tags = - if Enum.any?(explicit_tags) do - explicit_tags - else - tags - end - - tags = Enum.map(tags, fn tag -> String.trim_leading(tag, "#") end) - - tags = - if Enum.empty?(explicit_tags) && !options[:skip_joined_tag] do - add_joined_tag(tags) - else - tags - end - - Pleroma.Pagination.paginate(tags, options) - end - - # If `query` is a URI, returns last component of its path, otherwise returns `query` - defp preprocess_uri_query(query) do - if query =~ ~r/https?:\/\// do - query - |> String.trim_trailing("/") - |> URI.parse() - |> Map.get(:path) - |> String.split("/") - |> Enum.at(-1) - else - query - end - end - - defp add_joined_tag(tags) do - tags - |> Kernel.++([joined_tag(tags)]) - |> Enum.uniq_by(&String.downcase/1) - end - - defp joined_tag(tags) do - tags - |> Enum.map(fn tag -> String.capitalize(tag) end) - |> Enum.join() - end - - defp with_fallback(f, fallback \\ []) do - try do - f.() - rescue - error -> - Logger.error("#{__MODULE__} search error: #{inspect(error)}") - fallback - end - end -end diff --git a/lib/pleroma/search/elasticsearch.ex b/lib/pleroma/search/elasticsearch.ex index 76d2c3277..7c7ca82c8 100644 --- a/lib/pleroma/search/elasticsearch.ex +++ b/lib/pleroma/search/elasticsearch.ex @@ -3,24 +3,22 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Search.Elasticsearch do - @behaviour Pleroma.Search + @behaviour Pleroma.Search.SearchBackend alias Pleroma.Activity alias Pleroma.Object.Fetcher - alias Pleroma.Web.MastodonAPI.StatusView - alias Pleroma.Web.MastodonAPI.AccountView alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Search.Elasticsearch.Parsers - alias Pleroma.Web.Endpoint - def es_query(:activity, query) do + def es_query(:activity, query, offset, limit) do must = Parsers.Activity.parse(query) if must == [] do :skip else %{ - size: 50, + size: limit, + from: offset, terminate_after: 50, timeout: "5s", sort: [ @@ -36,50 +34,6 @@ def es_query(:activity, query) do end end - def es_query(:user, query) do - must = Parsers.User.parse(query) - - if must == [] do - :skip - else - %{ - size: 50, - terminate_after: 50, - timeout: "5s", - sort: [ - "_score" - ], - query: %{ - bool: %{ - must: must - } - } - } - end - end - - def es_query(:hashtag, query) do - must = Parsers.Hashtag.parse(query) - - if must == [] do - :skip - else - %{ - size: 50, - terminate_after: 50, - timeout: "5s", - sort: [ - "_score" - ], - query: %{ - bool: %{ - must: Parsers.Hashtag.parse(query) - } - } - } - end - end - defp maybe_fetch(:activity, search_query) do with true <- Regex.match?(~r/https?:/, search_query), {:ok, object} <- Fetcher.fetch_object_from_id(search_query), @@ -90,8 +44,10 @@ defp maybe_fetch(:activity, search_query) do end end - @impl Pleroma.Search - def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) do + def search(user, query, options) do + limit = Enum.min([Keyword.get(options, :limit), 40]) + offset = Keyword.get(options, :offset, 0) + parsed_query = query |> String.trim() @@ -104,30 +60,13 @@ def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) d activity_task = Task.async(fn -> - q = es_query(:activity, parsed_query) + q = es_query(:activity, parsed_query, offset, limit) - Pleroma.Elasticsearch.search(:activities, q) + Pleroma.Search.Elasticsearch.Store.search(:activities, q) |> Enum.filter(fn x -> Visibility.visible_for_user?(x, user) end) end) - user_task = - Task.async(fn -> - q = es_query(:user, parsed_query) - - Pleroma.Elasticsearch.search(:users, q) - |> Enum.filter(fn x -> Pleroma.User.visible_for(x, user) == :visible end) - end) - - hashtag_task = - Task.async(fn -> - q = es_query(:hashtag, parsed_query) - - Pleroma.Elasticsearch.search(:hashtags, q) - end) - activity_results = Task.await(activity_task) - user_results = Task.await(user_task) - hashtag_results = Task.await(hashtag_task) direct_activity = Task.await(activity_fetch_task) activity_results = @@ -137,25 +76,16 @@ def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) d [direct_activity | activity_results] end - %{ - "accounts" => - AccountView.render("index.json", - users: user_results, - for: user - ), - "hashtags" => - Enum.map(hashtag_results, fn x -> - %{ - url: Endpoint.url() <> "/tag/" <> x, - name: x - } - end), - "statuses" => - StatusView.render("index.json", - activities: activity_results, - for: user, - as: :activity - ) - } + activity_results + end + + @impl true + def add_to_index(activity) do + Elasticsearch.put_document(Pleroma.Search.Elasticsearch.Cluster, activity, "activities") + end + + @impl true + def remove_from_index(object) do + Elasticsearch.delete_document(Pleroma.Search.Elasticsearch.Cluster, object, "activities") end end diff --git a/lib/pleroma/search/elasticsearch/cluster.ex b/lib/pleroma/search/elasticsearch/cluster.ex new file mode 100644 index 000000000..4f76c4ebc --- /dev/null +++ b/lib/pleroma/search/elasticsearch/cluster.ex @@ -0,0 +1,4 @@ +defmodule Pleroma.Search.Elasticsearch.Cluster do + @moduledoc false + use Elasticsearch.Cluster, otp_app: :pleroma +end diff --git a/lib/pleroma/search/elasticsearch/document_mappings/activity.ex b/lib/pleroma/search/elasticsearch/document_mappings/activity.ex new file mode 100644 index 000000000..edd8e03c1 --- /dev/null +++ b/lib/pleroma/search/elasticsearch/document_mappings/activity.ex @@ -0,0 +1,55 @@ +# Akkoma: A lightweight social networking server +# Copyright © 2022-2022 Akkoma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defimpl Elasticsearch.Document, for: Pleroma.Activity do + alias Pleroma.Object + require Pleroma.Constants + + def id(obj), do: obj.id + def routing(_), do: false + + def object_to_search_data(object) do + # Only index public or unlisted Notes + if not is_nil(object) and object.data["type"] == "Note" and + not is_nil(object.data["content"]) and + (Pleroma.Constants.as_public() in object.data["to"] or + Pleroma.Constants.as_public() in object.data["cc"]) and + String.length(object.data["content"]) > 1 do + data = object.data + + content_str = + case data["content"] do + [nil | rest] -> to_string(rest) + str -> str + end + + content = + with {:ok, scrubbed} <- FastSanitize.strip_tags(content_str), + trimmed <- String.trim(scrubbed) do + trimmed + end + + if String.length(content) > 1 do + {:ok, published, _} = DateTime.from_iso8601(data["published"]) + + %{ + _timestamp: published, + content: content, + instance: URI.parse(object.data["actor"]).host, + hashtags: Object.hashtags(object), + user: Pleroma.User.get_cached_by_ap_id(object.data["actor"]).nickname + } + else + %{} + end + else + %{} + end + end + + def encode(activity) do + object = Pleroma.Object.normalize(activity) + object_to_search_data(object) + end +end diff --git a/lib/pleroma/search/elasticsearch/hashtag_parser.ex b/lib/pleroma/search/elasticsearch/hashtag_parser.ex deleted file mode 100644 index 911dc651c..000000000 --- a/lib/pleroma/search/elasticsearch/hashtag_parser.ex +++ /dev/null @@ -1,34 +0,0 @@ -# Akkoma: A lightweight social networking server -# Copyright © 2022-2022 Akkoma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Search.Elasticsearch.Parsers.Hashtag do - defp to_es(term) when is_binary(term) do - %{ - term: %{ - hashtag: %{ - value: String.downcase(term) - } - } - } - end - - defp to_es({:quoted, term}), do: to_es(term) - - defp to_es({:filter, ["hashtag", query]}) do - %{ - term: %{ - hashtag: %{ - value: String.downcase(query) - } - } - } - end - - defp to_es({:filter, _}), do: nil - - def parse(q) do - Enum.map(q, &to_es/1) - |> Enum.filter(fn x -> x != nil end) - end -end diff --git a/lib/pleroma/search/elasticsearch/store.ex b/lib/pleroma/search/elasticsearch/store.ex new file mode 100644 index 000000000..895b76d7f --- /dev/null +++ b/lib/pleroma/search/elasticsearch/store.ex @@ -0,0 +1,52 @@ +# Akkoma: A lightweight social networking server +# Copyright © 2022-2022 Akkoma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Search.Elasticsearch.Store do + @behaviour Elasticsearch.Store + alias Pleroma.Search.Elasticsearch.Cluster + require Logger + + alias Pleroma.Repo + + @impl true + def stream(schema) do + Repo.stream(schema) + end + + @impl true + def transaction(fun) do + {:ok, result} = Repo.transaction(fun, timeout: :infinity) + result + end + + def search(_, _, _, :skip), do: [] + + def search(:raw, index, q) do + with {:ok, raw_results} <- Elasticsearch.post(Cluster, "/#{index}/_search", q) do + results = + raw_results + |> Map.get("hits", %{}) + |> Map.get("hits", []) + + {:ok, results} + else + {:error, e} -> + Logger.error(e) + {:error, e} + end + end + + def search(:activities, q) do + with {:ok, results} <- search(:raw, "activities", q) do + results + |> Enum.map(fn result -> result["_id"] end) + |> Pleroma.Activity.all_by_ids_with_object() + |> Enum.sort(&(&1.inserted_at >= &2.inserted_at)) + else + e -> + Logger.error(e) + [] + end + end +end diff --git a/lib/pleroma/search/elasticsearch/user_paser.ex b/lib/pleroma/search/elasticsearch/user_paser.ex deleted file mode 100644 index 4176c6141..000000000 --- a/lib/pleroma/search/elasticsearch/user_paser.ex +++ /dev/null @@ -1,57 +0,0 @@ -# Akkoma: A lightweight social networking server -# Copyright © 2022-2022 Akkoma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Search.Elasticsearch.Parsers.User do - defp to_es(term) when is_binary(term) do - %{ - bool: %{ - minimum_should_match: 1, - should: [ - %{ - match: %{ - bio: %{ - query: term, - operator: "AND" - } - } - }, - %{ - term: %{ - nickname: %{ - value: term - } - } - }, - %{ - match: %{ - display_name: %{ - query: term, - operator: "AND" - } - } - } - ] - } - } - end - - defp to_es({:quoted, term}), do: to_es(term) - - defp to_es({:filter, ["user", query]}) do - %{ - term: %{ - nickname: %{ - value: query - } - } - } - end - - defp to_es({:filter, _}), do: nil - - def parse(q) do - Enum.map(q, &to_es/1) - |> Enum.filter(fn x -> x != nil end) - end -end diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index 9a50ee3ec..dc6c661ea 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -1095,7 +1095,6 @@ def update_and_set_cache(%{data: %Pleroma.User{} = user} = changeset) do was_superuser_before_update = User.superuser?(user) with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do - Pleroma.Elasticsearch.maybe_put_into_elasticsearch(user) set_cache(user) end |> maybe_remove_report_notifications(was_superuser_before_update) diff --git a/lib/pleroma/web/activity_pub/pipeline.ex b/lib/pleroma/web/activity_pub/pipeline.ex index 214647dbf..d4e507287 100644 --- a/lib/pleroma/web/activity_pub/pipeline.ex +++ b/lib/pleroma/web/activity_pub/pipeline.ex @@ -28,7 +28,6 @@ def common_pipeline(object, meta) do case Repo.transaction(fn -> do_common_pipeline(object, meta) end, Utils.query_timeout()) do {:ok, {:ok, activity, meta}} -> side_effects().handle_after_transaction(meta) - side_effects().handle_after_transaction(activity) {:ok, activity, meta} {:ok, value} -> diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index 517dd0a4f..e2371b693 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2021 Pleroma Authors +# Copyright © 2017-2022 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.ActivityPub.SideEffects do @@ -272,6 +272,7 @@ def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, met def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do reacted_object = Object.get_by_ap_id(object.data["object"]) Utils.add_emoji_reaction_to_object(object, reacted_object) + Notification.create_notifications(object) {:ok, object, meta} @@ -547,24 +548,6 @@ defp add_notifications(meta, notifications) do end @impl true - def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do - Pleroma.Elasticsearch.put_by_id(:activity, activity.id) - end - - def handle_after_transaction(%Pleroma.Activity{ - data: %{"type" => "Delete", "deleted_activity_id" => id} - }) do - Pleroma.Elasticsearch.delete_by_id(:activity, id) - end - - def handle_after_transaction(%Pleroma.Activity{}) do - :ok - end - - def handle_after_transaction(%Pleroma.Object{}) do - :ok - end - def handle_after_transaction(meta) do meta |> send_notifications() diff --git a/lib/pleroma/web/activity_pub/side_effects/handling.ex b/lib/pleroma/web/activity_pub/side_effects/handling.ex index a82305155..eb012f576 100644 --- a/lib/pleroma/web/activity_pub/side_effects/handling.ex +++ b/lib/pleroma/web/activity_pub/side_effects/handling.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2021 Pleroma Authors +# Copyright © 2017-2022 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.ActivityPub.SideEffects.Handling do diff --git a/lib/pleroma/web/common_api.ex b/lib/pleroma/web/common_api.ex index 92afd5cb6..856fa95b9 100644 --- a/lib/pleroma/web/common_api.ex +++ b/lib/pleroma/web/common_api.ex @@ -396,13 +396,7 @@ def listen(user, data) do def post(user, %{status: _} = data) do with {:ok, draft} <- ActivityDraft.create(user, data) do - activity = ActivityPub.create(draft.changes, draft.preview?) - - unless draft.preview? do - Pleroma.Elasticsearch.maybe_put_into_elasticsearch(activity) - end - - activity + ActivityPub.create(draft.changes, draft.preview?) end end diff --git a/lib/pleroma/web/mastodon_api/controllers/search_controller.ex b/lib/pleroma/web/mastodon_api/controllers/search_controller.ex index 751d46cdf..e4acba226 100644 --- a/lib/pleroma/web/mastodon_api/controllers/search_controller.ex +++ b/lib/pleroma/web/mastodon_api/controllers/search_controller.ex @@ -1,5 +1,5 @@ # Pleroma: A lightweight social networking server -# Copyright © 2017-2021 Pleroma Authors +# Copyright © 2017-2022 Pleroma Authors # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.MastodonAPI.SearchController do @@ -8,7 +8,9 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.ControllerHelper + alias Pleroma.Web.Endpoint alias Pleroma.Web.MastodonAPI.AccountView + alias Pleroma.Web.MastodonAPI.StatusView alias Pleroma.Web.Plugs.OAuthScopesPlug alias Pleroma.Web.Plugs.RateLimiter @@ -42,13 +44,34 @@ def account_search(%{assigns: %{user: user}} = conn, %{q: query} = params) do def search2(conn, params), do: do_search(:v2, conn, params) def search(conn, params), do: do_search(:v1, conn, params) - defp do_search(version, %{assigns: %{user: user}} = conn, params) do - options = - search_options(params, user) - |> Keyword.put(:version, version) + defp do_search(version, %{assigns: %{user: user}} = conn, %{q: query} = params) do + query = String.trim(query) + options = search_options(params, user) + timeout = Keyword.get(Repo.config(), :timeout, 15_000) + default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []} - search_provider = Pleroma.Config.get([:search, :provider]) - json(conn, search_provider.search(conn, params, options)) + result = + default_values + |> Enum.map(fn {resource, default_value} -> + if params[:type] in [nil, resource] do + {resource, fn -> resource_search(version, resource, query, options) end} + else + {resource, fn -> default_value end} + end + end) + |> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end, + timeout: timeout, + on_timeout: :kill_task + ) + |> Enum.reduce(default_values, fn + {:ok, {resource, result}}, acc -> + Map.put(acc, resource, result) + + _error, acc -> + acc + end) + + json(conn, result) end defp search_options(params, user) do diff --git a/mix.exs b/mix.exs index 564db2d75..558e71262 100644 --- a/mix.exs +++ b/mix.exs @@ -203,6 +203,7 @@ defp deps do {:nimble_parsec, "~> 1.0", override: true}, {:phoenix_live_dashboard, "~> 0.6.2"}, {:ecto_psql_extras, "~> 0.6"}, + {:elasticsearch, "~> 1.0.0"}, # indirect dependency version override {:plug, "~> 1.10.4", override: true}, diff --git a/priv/es-mappings/activity.json b/priv/es-mappings/activity.json index e476fd59f..052633496 100644 --- a/priv/es-mappings/activity.json +++ b/priv/es-mappings/activity.json @@ -1,20 +1,22 @@ { - "properties": { - "_timestamp": { - "type": "date", - "index": true - }, - "instance": { - "type": "keyword" - }, - "content": { - "type": "text" - }, - "hashtags": { - "type": "keyword" - }, - "user": { - "type": "text" + "mappings": { + "properties": { + "_timestamp": { + "type": "date", + "index": true + }, + "instance": { + "type": "keyword" + }, + "content": { + "type": "text" + }, + "hashtags": { + "type": "keyword" + }, + "user": { + "type": "text" + } } } } diff --git a/test/pleroma/web/activity_pub/pipeline_test.exs b/test/pleroma/web/activity_pub/pipeline_test.exs index 30fd5651b..e606fa3d1 100644 --- a/test/pleroma/web/activity_pub/pipeline_test.exs +++ b/test/pleroma/web/activity_pub/pipeline_test.exs @@ -28,7 +28,6 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do SideEffectsMock |> expect(:handle, fn o, m -> {:ok, o, m} end) |> expect(:handle_after_transaction, fn m -> m end) - |> expect(:handle_after_transaction, fn m -> m end) :ok end