Initial meilisearch implementation, doesn't delete posts yet

This commit is contained in:
Ekaterina Vaartis 2021-08-15 21:53:04 +03:00 committed by FloatingGhost
parent 60eec9d0c5
commit e961cf2689
9 changed files with 220 additions and 11 deletions
config
lib
mix/tasks/pleroma/search
pleroma

View file

@ -850,17 +850,14 @@
config :pleroma, ConcurrentLimiter, [
{Pleroma.Web.RichMedia.Helpers, [max_running: 5, max_waiting: 5]},
{Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy, [max_running: 5, max_waiting: 5]}
{Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy, [max_running: 5, max_waiting: 5]},
{Pleroma.Search, [max_running: 20, max_waiting: 50]}
]
config :pleroma, :search, provider: Pleroma.Search.Builtin
config :pleroma, :telemetry,
slow_queries_logging: [
enabled: false,
min_duration: 500_000,
exclude_sources: [nil, "oban_jobs"]
]
config :pleroma, Pleroma.Search, module: Pleroma.Activity.Search
config :pleroma, Pleroma.Search.Meilisearch, url: "http://127.0.0.1:7700/"
# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.

View file

@ -134,6 +134,8 @@
ap_streamer: Pleroma.Web.ActivityPub.ActivityPubMock,
logger: Pleroma.LoggerMock
config :pleroma, Pleroma.Search, module: Pleroma.Activity.Search
# Reduce recompilation time
# https://dashbit.co/blog/speeding-up-re-compilation-of-elixir-projects
config :phoenix, :plug_init_mode, :runtime

View file

@ -0,0 +1,38 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Mix.Tasks.Pleroma.Search.Meilisearch do
import Mix.Pleroma
import Ecto.Query
def run(["index"]) do
start_pleroma()
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
Pleroma.Repo.chunk_stream(
from(Pleroma.Object,
limit: 200,
where: fragment("data->>'type' = 'Note'") and fragment("LENGTH(data->>'source') > 0")
),
100,
:batches
)
|> Stream.map(fn objects ->
Enum.map(objects, fn object ->
data = object.data
%{id: object.id, source: data["source"], ap: data["id"]}
end)
end)
|> Stream.each(fn activities ->
{:ok, _} =
Pleroma.HTTP.post(
"#{endpoint}/indexes/objects/documents",
Jason.encode!(activities)
)
end)
|> Stream.run()
end
end

View file

@ -368,6 +368,7 @@ def restrict_deactivated_users(query) do
end
defdelegate search(user, query, options \\ []), to: Pleroma.Activity.Search
def add_to_index(_activity), do: nil
def direct_conversation_id(activity, for_user) do
alias Pleroma.Conversation.Participation

View file

@ -57,7 +57,7 @@ def maybe_restrict_blocked(query, %User{} = user) do
def maybe_restrict_blocked(query, _), do: query
defp restrict_public(q) do
def restrict_public(q) do
from([a, o] in q,
where: fragment("?->>'type' = 'Create'", a.data),
where: ^Pleroma.Constants.as_public() in a.recipients
@ -124,7 +124,7 @@ defp query_with(q, :rum, search_query, :websearch) do
)
end
defp maybe_restrict_local(q, user) do
def maybe_restrict_local(q, user) do
limit = Pleroma.Config.get([:instance, :limit_to_local_content], :unauthenticated)
case {limit, user} do
@ -137,7 +137,7 @@ defp maybe_restrict_local(q, user) do
defp restrict_local(q), do: where(q, local: true)
defp maybe_fetch(activities, user, search_query) do
def maybe_fetch(activities, user, search_query) do
with true <- Regex.match?(~r/https?:/, search_query),
{:ok, object} <- Fetcher.fetch_object_from_id(search_query),
%Activity{} = activity <- Activity.get_create_by_object_ap_id(object.data["id"]),

View file

@ -307,7 +307,11 @@ defp http_children(_, _), do: []
def limiters_setup do
config = Config.get(ConcurrentLimiter, [])
[Pleroma.Web.RichMedia.Helpers, Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy]
[
Pleroma.Web.RichMedia.Helpers,
Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy,
Pleroma.Search
]
|> Enum.each(fn module ->
mod_config = Keyword.get(config, module, [])

View file

@ -0,0 +1,60 @@
defmodule Pleroma.Search.Meilisearch do
require Logger
alias Pleroma.Activity
import Pleroma.Activity.Search
import Ecto.Query
def search(user, query, options \\ []) do
limit = Enum.min([Keyword.get(options, :limit), 40])
offset = Keyword.get(options, :offset, 0)
author = Keyword.get(options, :author)
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
{:ok, result} =
Pleroma.HTTP.post(
"#{endpoint}/indexes/objects/search",
Jason.encode!(%{q: query, offset: offset, limit: limit})
)
hits = Jason.decode!(result.body)["hits"] |> Enum.map(& &1["ap"])
try do
hits
|> Activity.create_by_object_ap_id()
|> Activity.with_preloaded_object()
|> Activity.with_preloaded_object()
|> Activity.restrict_deactivated_users()
|> maybe_restrict_local(user)
|> maybe_restrict_author(author)
|> maybe_restrict_blocked(user)
|> maybe_fetch(user, query)
|> order_by([activity], desc: activity.id)
|> Pleroma.Repo.all()
rescue
_ -> maybe_fetch([], user, query)
end
end
def add_to_index(activity) do
object = activity.object
if activity.data["type"] == "Create" and not is_nil(object) and object.data["type"] == "Note" do
data = object.data
endpoint = Pleroma.Config.get([Pleroma.Search.Meilisearch, :url])
{:ok, result} =
Pleroma.HTTP.post(
"#{endpoint}/indexes/objects/documents",
Jason.encode!([%{id: object.id, source: data["source"], ap: data["id"]}])
)
if not Map.has_key?(Jason.decode!(result.body), "updateId") do
Logger.error("Failed to add activity #{activity.id} to index: #{result.body}")
end
end
end
end

View file

@ -140,6 +140,12 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
end)
search_module = Pleroma.Config.get([Pleroma.Search, :module])
ConcurrentLimiter.limit(Pleroma.Search, fn ->
Task.start(fn -> search_module.add_to_index(activity) end)
end)
{:ok, activity}
else
%Activity{} = activity ->

View file

@ -5,6 +5,7 @@
defmodule Pleroma.Web.MastodonAPI.SearchController do
use Pleroma.Web, :controller
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.ControllerHelper
alias Pleroma.Web.MastodonAPI.AccountView
@ -64,6 +65,106 @@ defp search_options(params, user) do
|> Enum.filter(&elem(&1, 1))
end
defp resource_search(_, "accounts", query, options) do
accounts = with_fallback(fn -> User.search(query, options) end)
AccountView.render("index.json",
users: accounts,
for: options[:for_user],
embed_relationships: options[:embed_relationships]
)
end
defp resource_search(_, "statuses", query, options) do
search_module = Pleroma.Config.get([Pleroma.Search, :module], Pleroma.Activity)
statuses = with_fallback(fn -> search_module.search(options[:for_user], query, options) end)
StatusView.render("index.json",
activities: statuses,
for: options[:for_user],
as: :activity
)
end
defp resource_search(:v2, "hashtags", query, options) do
tags_path = Endpoint.url() <> "/tag/"
query
|> prepare_tags(options)
|> Enum.map(fn tag ->
%{name: tag, url: tags_path <> tag}
end)
end
defp resource_search(:v1, "hashtags", query, options) do
prepare_tags(query, options)
end
defp prepare_tags(query, options) do
tags =
query
|> preprocess_uri_query()
|> String.split(~r/[^#\w]+/u, trim: true)
|> Enum.uniq_by(&String.downcase/1)
explicit_tags = Enum.filter(tags, fn tag -> String.starts_with?(tag, "#") end)
tags =
if Enum.any?(explicit_tags) do
explicit_tags
else
tags
end
tags = Enum.map(tags, fn tag -> String.trim_leading(tag, "#") end)
tags =
if Enum.empty?(explicit_tags) && !options[:skip_joined_tag] do
add_joined_tag(tags)
else
tags
end
Pleroma.Pagination.paginate(tags, options)
end
defp add_joined_tag(tags) do
tags
|> Kernel.++([joined_tag(tags)])
|> Enum.uniq_by(&String.downcase/1)
end
# If `query` is a URI, returns last component of its path, otherwise returns `query`
defp preprocess_uri_query(query) do
if query =~ ~r/https?:\/\// do
query
|> String.trim_trailing("/")
|> URI.parse()
|> Map.get(:path)
|> String.split("/")
|> Enum.at(-1)
else
query
end
end
defp joined_tag(tags) do
tags
|> Enum.map(fn tag -> String.capitalize(tag) end)
|> Enum.join()
end
defp with_fallback(f, fallback \\ []) do
try do
f.()
rescue
error ->
Logger.error("#{__MODULE__} search error: #{inspect(error)}")
fallback
end
end
defp get_author(%{account_id: account_id}) when is_binary(account_id),
do: User.get_cached_by_id(account_id)