Refactor ES on top of search behaviour

This commit is contained in:
FloatingGhost 2022-06-30 16:28:31 +01:00
parent 69d5d1a01b
commit 1ecdb19de5
25 changed files with 212 additions and 897 deletions

View file

@ -856,8 +856,6 @@ config :pleroma, ConcurrentLimiter, [
{Pleroma.Search, [max_running: 30, max_waiting: 50]} {Pleroma.Search, [max_running: 30, max_waiting: 50]}
] ]
config :pleroma, :search, provider: Pleroma.Search.Builtin
config :pleroma, Pleroma.Search, module: Pleroma.Search.DatabaseSearch config :pleroma, Pleroma.Search, module: Pleroma.Search.DatabaseSearch
config :pleroma, Pleroma.Search.Meilisearch, config :pleroma, Pleroma.Search.Meilisearch,
@ -865,6 +863,22 @@ config :pleroma, Pleroma.Search.Meilisearch,
private_key: nil, private_key: nil,
initial_indexing_chunk_size: 100_000 initial_indexing_chunk_size: 100_000
config :pleroma, Pleroma.Search.Elasticsearch.Cluster,
url: "http://localhost:9200",
username: "elastic",
password: "changeme",
api: Elasticsearch.API.HTTP,
json_library: Jason,
indexes: %{
activities: %{
settings: "priv/es-mappings/activity.json",
store: Pleroma.Search.Elasticsearch.Store,
sources: [Pleroma.Activity],
bulk_page_size: 5000,
bulk_wait_interval: 15_000
}
}
# Import environment specific config. This must remain at the bottom # Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above. # of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs" import_config "#{Mix.env()}.exs"

View file

@ -1,64 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Mix.Tasks.Pleroma.Search do
use Mix.Task
import Mix.Pleroma
import Ecto.Query
alias Pleroma.Activity
alias Pleroma.Pagination
alias Pleroma.User
alias Pleroma.Hashtag
@shortdoc "Manages elasticsearch"
def run(["import", "activities" | _rest]) do
start_pleroma()
from(a in Activity, where: not ilike(a.actor, "%/relay"))
|> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
|> Activity.with_preloaded_object()
|> Activity.with_preloaded_user_actor()
|> get_all(:activities)
end
def run(["import", "users" | _rest]) do
start_pleroma()
from(u in User, where: u.nickname not in ["internal.fetch", "relay"])
|> get_all(:users)
end
def run(["import", "hashtags" | _rest]) do
start_pleroma()
from(h in Hashtag)
|> Pleroma.Repo.all()
|> Pleroma.Elasticsearch.bulk_post(:hashtags)
end
defp get_all(query, index, max_id \\ nil) do
params = %{limit: 1000}
params =
if max_id == nil do
params
else
Map.put(params, :max_id, max_id)
end
res =
query
|> Pagination.fetch_paginated(params)
if res == [] do
:ok
else
res
|> Pleroma.Elasticsearch.bulk_post(index)
get_all(query, index, List.last(res).id)
end
end
end

View file

@ -1,144 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
require Pleroma.Constants
import Mix.Pleroma
import Ecto.Query
import Pleroma.Search.Meilisearch,
only: [meili_post: 2, meili_put: 2, meili_get: 1, meili_delete!: 1]
def run(["index"]) do
start_pleroma()
meili_version =
(
{:ok, result} = meili_get("/version")
result["pkgVersion"]
)
# The ranking rule syntax was changed but nothing about that is mentioned in the changelog
if not Version.match?(meili_version, ">= 0.25.0") do
raise "Meilisearch <0.24.0 not supported"
end
{:ok, _} =
meili_post(
"/indexes/objects/settings/ranking-rules",
[
"published:desc",
"words",
"exactness",
"proximity",
"typo",
"attribute",
"sort"
]
)
{:ok, _} =
meili_post(
"/indexes/objects/settings/searchable-attributes",
[
"content"
]
)
IO.puts("Created indices. Starting to insert posts.")
chunk_size = Pleroma.Config.get([Pleroma.Search.Meilisearch, :initial_indexing_chunk_size])
Pleroma.Repo.transaction(
fn ->
query =
from(Pleroma.Object,
# Only index public and unlisted posts which are notes and have some text
where:
fragment("data->>'type' = 'Note'") and
(fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()) or
fragment("data->'cc' \\? ?", ^Pleroma.Constants.as_public())),
order_by: [desc: fragment("data->'published'")]
)
count = query |> Pleroma.Repo.aggregate(:count, :data)
IO.puts("Entries to index: #{count}")
Pleroma.Repo.stream(
query,
timeout: :infinity
)
|> Stream.map(&Pleroma.Search.Meilisearch.object_to_search_data/1)
|> Stream.filter(fn o -> not is_nil(o) end)
|> Stream.chunk_every(chunk_size)
|> Stream.transform(0, fn objects, acc ->
new_acc = acc + Enum.count(objects)
# Reset to the beginning of the line and rewrite it
IO.write("\r")
IO.write("Indexed #{new_acc} entries")
{[objects], new_acc}
end)
|> Stream.each(fn objects ->
result =
meili_put(
"/indexes/objects/documents",
objects
)
with {:ok, res} <- result do
if not Map.has_key?(res, "uid") do
IO.puts("\nFailed to index: #{inspect(result)}")
end
else
e -> IO.puts("\nFailed to index due to network error: #{inspect(e)}")
end
end)
|> Stream.run()
end,
timeout: :infinity
)
IO.write("\n")
end
def run(["clear"]) do
start_pleroma()
meili_delete!("/indexes/objects/documents")
end
def run(["show-keys", master_key]) do
start_pleroma()
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
{:ok, result} =
Pleroma.HTTP.get(
Path.join(endpoint, "/keys"),
[{"Authorization", "Bearer #{master_key}"}]
)
decoded = Jason.decode!(result.body)
if decoded["results"] do
Enum.each(decoded["results"], fn %{"description" => desc, "key" => key} ->
IO.puts("#{desc}: #{key}")
end)
else
IO.puts("Error fetching the keys, check the master key is correct: #{inspect(decoded)}")
end
end
def run(["stats"]) do
start_pleroma()
{:ok, result} = meili_get("/indexes/objects/stats")
IO.puts("Number of entries: #{result["numberOfDocuments"]}")
IO.puts("Indexing? #{result["isIndexing"]}")
end
end

View file

@ -105,6 +105,7 @@ defmodule Pleroma.Application do
{Oban, Config.get(Oban)}, {Oban, Config.get(Oban)},
Pleroma.Web.Endpoint Pleroma.Web.Endpoint
] ++ ] ++
elasticsearch_children() ++
task_children(@mix_env) ++ task_children(@mix_env) ++
dont_run_in_test(@mix_env) ++ dont_run_in_test(@mix_env) ++
shout_child(shout_enabled?()) shout_child(shout_enabled?())
@ -303,6 +304,16 @@ defmodule Pleroma.Application do
defp http_children(_, _), do: [] defp http_children(_, _), do: []
def elasticsearch_children do
config = Config.get([Pleroma.Search, :module])
if config == Pleroma.Search.Elasticsearch do
[Pleroma.Search.Elasticsearch.Cluster]
else
[]
end
end
@spec limiters_setup() :: :ok @spec limiters_setup() :: :ok
def limiters_setup do def limiters_setup do
config = Config.get(ConcurrentLimiter, []) config = Config.get(ConcurrentLimiter, [])

View file

@ -1,19 +0,0 @@
# Akkoma: A lightweight social networking server
# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Elasticsearch.DocumentMappings.Activity do
alias Pleroma.Object
def id(obj), do: obj.id
def encode(%{object: %{data: %{"type" => "Note"}}} = activity) do
%{
_timestamp: activity.inserted_at,
user: activity.user_actor.nickname,
content: activity.object.data["content"],
instance: URI.parse(activity.user_actor.ap_id).host,
hashtags: Object.hashtags(activity.object)
}
end
end

View file

@ -1,21 +0,0 @@
# Akkoma: A lightweight social networking server
# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Elasticsearch.DocumentMappings.Hashtag do
def id(obj), do: obj.id
def encode(%{timestamp: _} = hashtag) do
%{
hashtag: hashtag.name,
timestamp: hashtag.timestamp
}
end
def encode(hashtag) do
%{
hashtag: hashtag.name,
timestamp: hashtag.inserted_at
}
end
end

View file

@ -1,17 +0,0 @@
# Akkoma: A lightweight social networking server
# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Elasticsearch.DocumentMappings.User do
def id(obj), do: obj.id
def encode(%{actor_type: "Person"} = user) do
%{
timestamp: user.inserted_at,
instance: URI.parse(user.ap_id).host,
nickname: user.nickname,
bio: user.bio,
display_name: user.name
}
end
end

View file

@ -1,256 +0,0 @@
# Akkoma: A lightweight social networking server
# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Elasticsearch do
alias Pleroma.Activity
alias Pleroma.User
alias Pleroma.Object
alias Pleroma.Elasticsearch.DocumentMappings
alias Pleroma.Config
require Logger
defp url do
Config.get([:elasticsearch, :url])
end
defp enabled? do
Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
end
def delete_by_id(:activity, id) do
if enabled?() do
Elastix.Document.delete(url(), "activities", "activity", id)
end
end
def put_by_id(:activity, id) do
id
|> Activity.get_by_id_with_object()
|> maybe_put_into_elasticsearch()
end
def maybe_put_into_elasticsearch({:ok, item}) do
maybe_put_into_elasticsearch(item)
end
def maybe_put_into_elasticsearch(
%{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
) do
if enabled?() do
actor = Pleroma.Activity.user_actor(activity)
activity
|> Map.put(:user_actor, actor)
|> put()
end
end
def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do
if enabled?() do
put(user)
end
end
def maybe_put_into_elasticsearch(_) do
{:ok, :skipped}
end
def maybe_bulk_post(data, type) do
if enabled?() do
bulk_post(data, type)
end
end
def put(%Activity{} = activity) do
with {:ok, _} <-
Elastix.Document.index(
url(),
"activities",
"activity",
DocumentMappings.Activity.id(activity),
DocumentMappings.Activity.encode(activity)
) do
activity
|> Map.get(:object)
|> Object.hashtags()
|> Enum.map(fn x ->
%{id: x, name: x, timestamp: DateTime.to_iso8601(DateTime.utc_now())}
end)
|> bulk_post(:hashtags)
else
{:error, %{reason: err}} ->
Logger.error("Could not put activity: #{err}")
:skipped
end
end
def put(%User{} = user) do
with {:ok, _} <-
Elastix.Document.index(
url(),
"users",
"user",
DocumentMappings.User.id(user),
DocumentMappings.User.encode(user)
) do
:ok
else
{:error, %{reason: err}} ->
Logger.error("Could not put user: #{err}")
:skipped
end
end
def bulk_post(data, :activities) do
d =
data
|> Enum.filter(fn x ->
t =
x.object
|> Map.get(:data, %{})
|> Map.get("type", "")
t == "Note"
end)
|> Enum.map(fn d ->
[
%{index: %{_id: DocumentMappings.Activity.id(d)}},
DocumentMappings.Activity.encode(d)
]
end)
|> List.flatten()
with {:ok, %{body: %{"errors" => false}}} <-
Elastix.Bulk.post(
url(),
d,
index: "activities",
type: "activity"
) do
:ok
else
{:error, %{reason: err}} ->
Logger.error("Could not bulk put activity: #{err}")
:skipped
{:ok, %{body: _}} ->
:skipped
end
end
def bulk_post(data, :users) do
d =
data
|> Enum.filter(fn x -> x.actor_type == "Person" end)
|> Enum.map(fn d ->
[
%{index: %{_id: DocumentMappings.User.id(d)}},
DocumentMappings.User.encode(d)
]
end)
|> List.flatten()
with {:ok, %{body: %{"errors" => false}}} <-
Elastix.Bulk.post(
url(),
d,
index: "users",
type: "user"
) do
:ok
else
{:error, %{reason: err}} ->
Logger.error("Could not bulk put users: #{err}")
:skipped
{:ok, %{body: _}} ->
:skipped
end
end
def bulk_post(data, :hashtags) when is_list(data) do
d =
data
|> Enum.map(fn d ->
[
%{index: %{_id: DocumentMappings.Hashtag.id(d)}},
DocumentMappings.Hashtag.encode(d)
]
end)
|> List.flatten()
with {:ok, %{body: %{"errors" => false}}} <-
Elastix.Bulk.post(
url(),
d,
index: "hashtags",
type: "hashtag"
) do
:ok
else
{:error, %{reason: err}} ->
Logger.error("Could not bulk put hashtags: #{err}")
:skipped
{:ok, %{body: _}} ->
:skipped
end
end
def bulk_post(_, :hashtags), do: {:ok, nil}
def search(_, _, _, :skip), do: []
def search(:raw, index, type, q) do
with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
results =
raw_results
|> Map.get(:body, %{})
|> Map.get("hits", %{})
|> Map.get("hits", [])
{:ok, results}
else
{:error, e} ->
Logger.error(e)
{:error, e}
end
end
def search(:activities, q) do
with {:ok, results} <- search(:raw, "activities", "activity", q) do
results
|> Enum.map(fn result -> result["_id"] end)
|> Pleroma.Activity.all_by_ids_with_object()
|> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
else
e ->
Logger.error(e)
[]
end
end
def search(:users, q) do
with {:ok, results} <- search(:raw, "users", "user", q) do
results
|> Enum.map(fn result -> result["_id"] end)
|> Pleroma.User.get_all_by_ids()
else
e ->
Logger.error(e)
[]
end
end
def search(:hashtags, q) do
with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
results
|> Enum.map(fn result -> result["_source"]["hashtag"] end)
else
e ->
Logger.error(e)
[]
end
end
end

View file

@ -61,7 +61,6 @@ defmodule Pleroma.Hashtag do
{:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))} {:ok, Repo.all(from(ht in Hashtag, where: ht.name in ^names))}
end) end)
|> Repo.transaction() do |> Repo.transaction() do
Pleroma.Elasticsearch.maybe_bulk_post(hashtags, :hashtags)
{:ok, hashtags} {:ok, hashtags}
else else
{:error, _name, value, _changes_so_far} -> {:error, value} {:error, _name, value, _changes_so_far} -> {:error, value}

View file

@ -1,138 +0,0 @@
defmodule Pleroma.Search.Builtin do
@behaviour Pleroma.Search
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Activity
alias Pleroma.Web.MastodonAPI.AccountView
alias Pleroma.Web.MastodonAPI.StatusView
alias Pleroma.Web.Endpoint
require Logger
@impl Pleroma.Search
def search(_conn, %{q: query} = params, options) do
version = Keyword.get(options, :version)
timeout = Keyword.get(Repo.config(), :timeout, 15_000)
query = String.trim(query)
default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []}
default_values
|> Enum.map(fn {resource, default_value} ->
if params[:type] in [nil, resource] do
{resource, fn -> resource_search(version, resource, query, options) end}
else
{resource, fn -> default_value end}
end
end)
|> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end,
timeout: timeout,
on_timeout: :kill_task
)
|> Enum.reduce(default_values, fn
{:ok, {resource, result}}, acc ->
Map.put(acc, resource, result)
_error, acc ->
acc
end)
end
defp resource_search(_, "accounts", query, options) do
accounts = with_fallback(fn -> User.search(query, options) end)
AccountView.render("index.json",
users: accounts,
for: options[:for_user],
embed_relationships: options[:embed_relationships]
)
end
defp resource_search(_, "statuses", query, options) do
statuses = with_fallback(fn -> Activity.search(options[:for_user], query, options) end)
StatusView.render("index.json",
activities: statuses,
for: options[:for_user],
as: :activity
)
end
defp resource_search(:v2, "hashtags", query, options) do
tags_path = Endpoint.url() <> "/tag/"
query
|> prepare_tags(options)
|> Enum.map(fn tag ->
%{name: tag, url: tags_path <> tag}
end)
end
defp resource_search(:v1, "hashtags", query, options) do
prepare_tags(query, options)
end
defp prepare_tags(query, options) do
tags =
query
|> preprocess_uri_query()
|> String.split(~r/[^#\w]+/u, trim: true)
|> Enum.uniq_by(&String.downcase/1)
explicit_tags = Enum.filter(tags, fn tag -> String.starts_with?(tag, "#") end)
tags =
if Enum.any?(explicit_tags) do
explicit_tags
else
tags
end
tags = Enum.map(tags, fn tag -> String.trim_leading(tag, "#") end)
tags =
if Enum.empty?(explicit_tags) && !options[:skip_joined_tag] do
add_joined_tag(tags)
else
tags
end
Pleroma.Pagination.paginate(tags, options)
end
# If `query` is a URI, returns last component of its path, otherwise returns `query`
defp preprocess_uri_query(query) do
if query =~ ~r/https?:\/\// do
query
|> String.trim_trailing("/")
|> URI.parse()
|> Map.get(:path)
|> String.split("/")
|> Enum.at(-1)
else
query
end
end
defp add_joined_tag(tags) do
tags
|> Kernel.++([joined_tag(tags)])
|> Enum.uniq_by(&String.downcase/1)
end
defp joined_tag(tags) do
tags
|> Enum.map(fn tag -> String.capitalize(tag) end)
|> Enum.join()
end
defp with_fallback(f, fallback \\ []) do
try do
f.()
rescue
error ->
Logger.error("#{__MODULE__} search error: #{inspect(error)}")
fallback
end
end
end

View file

@ -3,24 +3,22 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Search.Elasticsearch do defmodule Pleroma.Search.Elasticsearch do
@behaviour Pleroma.Search @behaviour Pleroma.Search.SearchBackend
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Object.Fetcher alias Pleroma.Object.Fetcher
alias Pleroma.Web.MastodonAPI.StatusView
alias Pleroma.Web.MastodonAPI.AccountView
alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Search.Elasticsearch.Parsers alias Pleroma.Search.Elasticsearch.Parsers
alias Pleroma.Web.Endpoint
def es_query(:activity, query) do def es_query(:activity, query, offset, limit) do
must = Parsers.Activity.parse(query) must = Parsers.Activity.parse(query)
if must == [] do if must == [] do
:skip :skip
else else
%{ %{
size: 50, size: limit,
from: offset,
terminate_after: 50, terminate_after: 50,
timeout: "5s", timeout: "5s",
sort: [ sort: [
@ -36,50 +34,6 @@ defmodule Pleroma.Search.Elasticsearch do
end end
end end
def es_query(:user, query) do
must = Parsers.User.parse(query)
if must == [] do
:skip
else
%{
size: 50,
terminate_after: 50,
timeout: "5s",
sort: [
"_score"
],
query: %{
bool: %{
must: must
}
}
}
end
end
def es_query(:hashtag, query) do
must = Parsers.Hashtag.parse(query)
if must == [] do
:skip
else
%{
size: 50,
terminate_after: 50,
timeout: "5s",
sort: [
"_score"
],
query: %{
bool: %{
must: Parsers.Hashtag.parse(query)
}
}
}
end
end
defp maybe_fetch(:activity, search_query) do defp maybe_fetch(:activity, search_query) do
with true <- Regex.match?(~r/https?:/, search_query), with true <- Regex.match?(~r/https?:/, search_query),
{:ok, object} <- Fetcher.fetch_object_from_id(search_query), {:ok, object} <- Fetcher.fetch_object_from_id(search_query),
@ -90,8 +44,10 @@ defmodule Pleroma.Search.Elasticsearch do
end end
end end
@impl Pleroma.Search def search(user, query, options) do
def search(%{assigns: %{user: user}} = _conn, %{q: query} = _params, _options) do limit = Enum.min([Keyword.get(options, :limit), 40])
offset = Keyword.get(options, :offset, 0)
parsed_query = parsed_query =
query query
|> String.trim() |> String.trim()
@ -104,30 +60,13 @@ defmodule Pleroma.Search.Elasticsearch do
activity_task = activity_task =
Task.async(fn -> Task.async(fn ->
q = es_query(:activity, parsed_query) q = es_query(:activity, parsed_query, offset, limit)
Pleroma.Elasticsearch.search(:activities, q) Pleroma.Search.Elasticsearch.Store.search(:activities, q)
|> Enum.filter(fn x -> Visibility.visible_for_user?(x, user) end) |> Enum.filter(fn x -> Visibility.visible_for_user?(x, user) end)
end) end)
user_task =
Task.async(fn ->
q = es_query(:user, parsed_query)
Pleroma.Elasticsearch.search(:users, q)
|> Enum.filter(fn x -> Pleroma.User.visible_for(x, user) == :visible end)
end)
hashtag_task =
Task.async(fn ->
q = es_query(:hashtag, parsed_query)
Pleroma.Elasticsearch.search(:hashtags, q)
end)
activity_results = Task.await(activity_task) activity_results = Task.await(activity_task)
user_results = Task.await(user_task)
hashtag_results = Task.await(hashtag_task)
direct_activity = Task.await(activity_fetch_task) direct_activity = Task.await(activity_fetch_task)
activity_results = activity_results =
@ -137,25 +76,16 @@ defmodule Pleroma.Search.Elasticsearch do
[direct_activity | activity_results] [direct_activity | activity_results]
end end
%{ activity_results
"accounts" => end
AccountView.render("index.json",
users: user_results, @impl true
for: user def add_to_index(activity) do
), Elasticsearch.put_document(Pleroma.Search.Elasticsearch.Cluster, activity, "activities")
"hashtags" => end
Enum.map(hashtag_results, fn x ->
%{ @impl true
url: Endpoint.url() <> "/tag/" <> x, def remove_from_index(object) do
name: x Elasticsearch.delete_document(Pleroma.Search.Elasticsearch.Cluster, object, "activities")
}
end),
"statuses" =>
StatusView.render("index.json",
activities: activity_results,
for: user,
as: :activity
)
}
end end
end end

View file

@ -0,0 +1,4 @@
defmodule Pleroma.Search.Elasticsearch.Cluster do
@moduledoc false
use Elasticsearch.Cluster, otp_app: :pleroma
end

View file

@ -0,0 +1,55 @@
# Akkoma: A lightweight social networking server
# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
# SPDX-License-Identifier: AGPL-3.0-only
defimpl Elasticsearch.Document, for: Pleroma.Activity do
alias Pleroma.Object
require Pleroma.Constants
def id(obj), do: obj.id
def routing(_), do: false
def object_to_search_data(object) do
# Only index public or unlisted Notes
if not is_nil(object) and object.data["type"] == "Note" and
not is_nil(object.data["content"]) and
(Pleroma.Constants.as_public() in object.data["to"] or
Pleroma.Constants.as_public() in object.data["cc"]) and
String.length(object.data["content"]) > 1 do
data = object.data
content_str =
case data["content"] do
[nil | rest] -> to_string(rest)
str -> str
end
content =
with {:ok, scrubbed} <- FastSanitize.strip_tags(content_str),
trimmed <- String.trim(scrubbed) do
trimmed
end
if String.length(content) > 1 do
{:ok, published, _} = DateTime.from_iso8601(data["published"])
%{
_timestamp: published,
content: content,
instance: URI.parse(object.data["actor"]).host,
hashtags: Object.hashtags(object),
user: Pleroma.User.get_cached_by_ap_id(object.data["actor"]).nickname
}
else
%{}
end
else
%{}
end
end
def encode(activity) do
object = Pleroma.Object.normalize(activity)
object_to_search_data(object)
end
end

View file

@ -1,34 +0,0 @@
# Akkoma: A lightweight social networking server
# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Search.Elasticsearch.Parsers.Hashtag do
defp to_es(term) when is_binary(term) do
%{
term: %{
hashtag: %{
value: String.downcase(term)
}
}
}
end
defp to_es({:quoted, term}), do: to_es(term)
defp to_es({:filter, ["hashtag", query]}) do
%{
term: %{
hashtag: %{
value: String.downcase(query)
}
}
}
end
defp to_es({:filter, _}), do: nil
def parse(q) do
Enum.map(q, &to_es/1)
|> Enum.filter(fn x -> x != nil end)
end
end

View file

@ -0,0 +1,52 @@
# Akkoma: A lightweight social networking server
# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Search.Elasticsearch.Store do
@behaviour Elasticsearch.Store
alias Pleroma.Search.Elasticsearch.Cluster
require Logger
alias Pleroma.Repo
@impl true
def stream(schema) do
Repo.stream(schema)
end
@impl true
def transaction(fun) do
{:ok, result} = Repo.transaction(fun, timeout: :infinity)
result
end
def search(_, _, _, :skip), do: []
def search(:raw, index, q) do
with {:ok, raw_results} <- Elasticsearch.post(Cluster, "/#{index}/_search", q) do
results =
raw_results
|> Map.get("hits", %{})
|> Map.get("hits", [])
{:ok, results}
else
{:error, e} ->
Logger.error(e)
{:error, e}
end
end
def search(:activities, q) do
with {:ok, results} <- search(:raw, "activities", q) do
results
|> Enum.map(fn result -> result["_id"] end)
|> Pleroma.Activity.all_by_ids_with_object()
|> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
else
e ->
Logger.error(e)
[]
end
end
end

View file

@ -1,57 +0,0 @@
# Akkoma: A lightweight social networking server
# Copyright © 2022-2022 Akkoma Authors <https://git.ihatebeinga.live/IHBAGang/akkoma/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Search.Elasticsearch.Parsers.User do
defp to_es(term) when is_binary(term) do
%{
bool: %{
minimum_should_match: 1,
should: [
%{
match: %{
bio: %{
query: term,
operator: "AND"
}
}
},
%{
term: %{
nickname: %{
value: term
}
}
},
%{
match: %{
display_name: %{
query: term,
operator: "AND"
}
}
}
]
}
}
end
defp to_es({:quoted, term}), do: to_es(term)
defp to_es({:filter, ["user", query]}) do
%{
term: %{
nickname: %{
value: query
}
}
}
end
defp to_es({:filter, _}), do: nil
def parse(q) do
Enum.map(q, &to_es/1)
|> Enum.filter(fn x -> x != nil end)
end
end

View file

@ -1095,7 +1095,6 @@ defmodule Pleroma.User do
was_superuser_before_update = User.superuser?(user) was_superuser_before_update = User.superuser?(user)
with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do with {:ok, user} <- Repo.update(changeset, stale_error_field: :id) do
Pleroma.Elasticsearch.maybe_put_into_elasticsearch(user)
set_cache(user) set_cache(user)
end end
|> maybe_remove_report_notifications(was_superuser_before_update) |> maybe_remove_report_notifications(was_superuser_before_update)

View file

@ -28,7 +28,6 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
case Repo.transaction(fn -> do_common_pipeline(object, meta) end, Utils.query_timeout()) do case Repo.transaction(fn -> do_common_pipeline(object, meta) end, Utils.query_timeout()) do
{:ok, {:ok, activity, meta}} -> {:ok, {:ok, activity, meta}} ->
side_effects().handle_after_transaction(meta) side_effects().handle_after_transaction(meta)
side_effects().handle_after_transaction(activity)
{:ok, activity, meta} {:ok, activity, meta}
{:ok, value} -> {:ok, value} ->

View file

@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server # Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.SideEffects do defmodule Pleroma.Web.ActivityPub.SideEffects do
@ -272,6 +272,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
reacted_object = Object.get_by_ap_id(object.data["object"]) reacted_object = Object.get_by_ap_id(object.data["object"])
Utils.add_emoji_reaction_to_object(object, reacted_object) Utils.add_emoji_reaction_to_object(object, reacted_object)
Notification.create_notifications(object) Notification.create_notifications(object)
{:ok, object, meta} {:ok, object, meta}
@ -547,24 +548,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
end end
@impl true @impl true
def handle_after_transaction(%Pleroma.Activity{data: %{"type" => "Create"}} = activity) do
Pleroma.Elasticsearch.put_by_id(:activity, activity.id)
end
def handle_after_transaction(%Pleroma.Activity{
data: %{"type" => "Delete", "deleted_activity_id" => id}
}) do
Pleroma.Elasticsearch.delete_by_id(:activity, id)
end
def handle_after_transaction(%Pleroma.Activity{}) do
:ok
end
def handle_after_transaction(%Pleroma.Object{}) do
:ok
end
def handle_after_transaction(meta) do def handle_after_transaction(meta) do
meta meta
|> send_notifications() |> send_notifications()

View file

@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server # Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.SideEffects.Handling do defmodule Pleroma.Web.ActivityPub.SideEffects.Handling do

View file

@ -396,13 +396,7 @@ defmodule Pleroma.Web.CommonAPI do
def post(user, %{status: _} = data) do def post(user, %{status: _} = data) do
with {:ok, draft} <- ActivityDraft.create(user, data) do with {:ok, draft} <- ActivityDraft.create(user, data) do
activity = ActivityPub.create(draft.changes, draft.preview?) ActivityPub.create(draft.changes, draft.preview?)
unless draft.preview? do
Pleroma.Elasticsearch.maybe_put_into_elasticsearch(activity)
end
activity
end end
end end

View file

@ -1,5 +1,5 @@
# Pleroma: A lightweight social networking server # Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.MastodonAPI.SearchController do defmodule Pleroma.Web.MastodonAPI.SearchController do
@ -8,7 +8,9 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ControllerHelper alias Pleroma.Web.ControllerHelper
alias Pleroma.Web.Endpoint
alias Pleroma.Web.MastodonAPI.AccountView alias Pleroma.Web.MastodonAPI.AccountView
alias Pleroma.Web.MastodonAPI.StatusView
alias Pleroma.Web.Plugs.OAuthScopesPlug alias Pleroma.Web.Plugs.OAuthScopesPlug
alias Pleroma.Web.Plugs.RateLimiter alias Pleroma.Web.Plugs.RateLimiter
@ -42,13 +44,34 @@ defmodule Pleroma.Web.MastodonAPI.SearchController do
def search2(conn, params), do: do_search(:v2, conn, params) def search2(conn, params), do: do_search(:v2, conn, params)
def search(conn, params), do: do_search(:v1, conn, params) def search(conn, params), do: do_search(:v1, conn, params)
defp do_search(version, %{assigns: %{user: user}} = conn, params) do defp do_search(version, %{assigns: %{user: user}} = conn, %{q: query} = params) do
options = query = String.trim(query)
search_options(params, user) options = search_options(params, user)
|> Keyword.put(:version, version) timeout = Keyword.get(Repo.config(), :timeout, 15_000)
default_values = %{"statuses" => [], "accounts" => [], "hashtags" => []}
search_provider = Pleroma.Config.get([:search, :provider]) result =
json(conn, search_provider.search(conn, params, options)) default_values
|> Enum.map(fn {resource, default_value} ->
if params[:type] in [nil, resource] do
{resource, fn -> resource_search(version, resource, query, options) end}
else
{resource, fn -> default_value end}
end
end)
|> Task.async_stream(fn {resource, f} -> {resource, with_fallback(f)} end,
timeout: timeout,
on_timeout: :kill_task
)
|> Enum.reduce(default_values, fn
{:ok, {resource, result}}, acc ->
Map.put(acc, resource, result)
_error, acc ->
acc
end)
json(conn, result)
end end
defp search_options(params, user) do defp search_options(params, user) do

View file

@ -203,6 +203,7 @@ defmodule Pleroma.Mixfile do
{:nimble_parsec, "~> 1.0", override: true}, {:nimble_parsec, "~> 1.0", override: true},
{:phoenix_live_dashboard, "~> 0.6.2"}, {:phoenix_live_dashboard, "~> 0.6.2"},
{:ecto_psql_extras, "~> 0.6"}, {:ecto_psql_extras, "~> 0.6"},
{:elasticsearch, "~> 1.0.0"},
# indirect dependency version override # indirect dependency version override
{:plug, "~> 1.10.4", override: true}, {:plug, "~> 1.10.4", override: true},

View file

@ -1,20 +1,22 @@
{ {
"properties": { "mappings": {
"_timestamp": { "properties": {
"type": "date", "_timestamp": {
"index": true "type": "date",
}, "index": true
"instance": { },
"type": "keyword" "instance": {
}, "type": "keyword"
"content": { },
"type": "text" "content": {
}, "type": "text"
"hashtags": { },
"type": "keyword" "hashtags": {
}, "type": "keyword"
"user": { },
"type": "text" "user": {
"type": "text"
}
} }
} }
} }

View file

@ -28,7 +28,6 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do
SideEffectsMock SideEffectsMock
|> expect(:handle, fn o, m -> {:ok, o, m} end) |> expect(:handle, fn o, m -> {:ok, o, m} end)
|> expect(:handle_after_transaction, fn m -> m end) |> expect(:handle_after_transaction, fn m -> m end)
|> expect(:handle_after_transaction, fn m -> m end)
:ok :ok
end end