akkoma/lib/pleroma/elasticsearch/store.ex

256 lines
5.7 KiB
Elixir
Raw Normal View History

2021-12-11 17:36:49 +00:00
defmodule Pleroma.Elasticsearch do
alias Pleroma.Activity
alias Pleroma.User
2021-12-16 15:58:56 +00:00
alias Pleroma.Object
2021-12-11 17:36:49 +00:00
alias Pleroma.Elasticsearch.DocumentMappings
2021-12-12 19:31:47 +00:00
alias Pleroma.Config
require Logger
2021-12-11 17:36:49 +00:00
defp url do
2021-12-12 19:31:47 +00:00
Config.get([:elasticsearch, :url])
end
defp enabled? do
Config.get([:search, :provider]) == Pleroma.Search.Elasticsearch
end
2021-12-30 17:47:41 +00:00
def delete_by_id(:activity, id) do
if enabled?() do
Elastix.Document.delete(url(), "activities", "activity", id)
end
end
2021-12-30 18:03:02 +00:00
def put_by_id(:activity, id) do
2021-12-12 19:31:47 +00:00
id
|> Activity.get_by_id_with_object()
|> maybe_put_into_elasticsearch()
end
def maybe_put_into_elasticsearch({:ok, item}) do
maybe_put_into_elasticsearch(item)
2021-12-12 19:31:47 +00:00
end
def maybe_put_into_elasticsearch(
%{data: %{"type" => "Create"}, object: %{data: %{"type" => "Note"}}} = activity
) do
if enabled?() do
2021-12-12 19:31:47 +00:00
actor = Pleroma.Activity.user_actor(activity)
activity
|> Map.put(:user_actor, actor)
|> put()
end
end
2021-12-15 11:05:30 +00:00
def maybe_put_into_elasticsearch(%User{actor_type: "Person"} = user) do
if enabled?() do
put(user)
end
end
2021-12-12 19:31:47 +00:00
def maybe_put_into_elasticsearch(_) do
{:ok, :skipped}
2021-12-11 17:36:49 +00:00
end
2021-12-16 15:58:56 +00:00
def maybe_bulk_post(data, type) do
if enabled?() do
bulk_post(data, type)
end
end
2021-12-11 17:36:49 +00:00
def put(%Activity{} = activity) do
2021-12-30 18:03:02 +00:00
with {:ok, _} <-
Elastix.Document.index(
url(),
"activities",
"activity",
DocumentMappings.Activity.id(activity),
DocumentMappings.Activity.encode(activity)
) do
activity
|> Map.get(:object)
|> Object.hashtags()
|> Enum.map(fn x ->
%{id: x, name: x, timestamp: DateTime.to_iso8601(DateTime.utc_now())}
end)
|> bulk_post(:hashtags)
else
{:error, %{reason: err}} ->
Logger.error("Could not put activity: #{err}")
:skipped
end
end
def put(%User{} = user) do
2021-12-30 18:03:02 +00:00
with {:ok, _} <-
Elastix.Document.index(
url(),
"users",
"user",
DocumentMappings.User.id(user),
DocumentMappings.User.encode(user)
) do
:ok
else
{:error, %{reason: err}} ->
Logger.error("Could not put user: #{err}")
:skipped
end
2021-12-11 17:36:49 +00:00
end
def bulk_post(data, :activities) do
2021-12-12 17:23:44 +00:00
d =
data
2021-12-13 20:27:35 +00:00
|> Enum.filter(fn x ->
t =
x.object
|> Map.get(:data, %{})
|> Map.get("type", "")
2021-12-13 20:27:35 +00:00
t == "Note"
end)
2021-12-12 17:23:44 +00:00
|> Enum.map(fn d ->
2021-12-11 17:36:49 +00:00
[
2021-12-12 17:23:44 +00:00
%{index: %{_id: DocumentMappings.Activity.id(d)}},
DocumentMappings.Activity.encode(d)
2021-12-11 17:36:49 +00:00
]
2021-12-12 17:23:44 +00:00
end)
|> List.flatten()
2021-12-11 17:36:49 +00:00
2021-12-30 18:03:02 +00:00
with {:ok, %{body: %{"errors" => false}}} <-
Elastix.Bulk.post(
url(),
d,
index: "activities",
type: "activity"
) do
:ok
else
{:error, %{reason: err}} ->
Logger.error("Could not bulk put activity: #{err}")
:skipped
2021-12-30 18:03:09 +00:00
2021-12-30 18:03:02 +00:00
{:ok, %{body: body}} ->
IO.inspect(body)
:skipped
end
2021-12-11 17:36:49 +00:00
end
2021-12-11 18:48:46 +00:00
2021-12-13 20:27:35 +00:00
def bulk_post(data, :users) do
d =
data
2021-12-14 14:16:21 +00:00
|> Enum.filter(fn x -> x.actor_type == "Person" end)
2021-12-13 20:27:35 +00:00
|> Enum.map(fn d ->
[
%{index: %{_id: DocumentMappings.User.id(d)}},
DocumentMappings.User.encode(d)
]
end)
|> List.flatten()
2021-12-30 18:03:02 +00:00
with {:ok, %{body: %{"errors" => false}}} <-
Elastix.Bulk.post(
url(),
d,
index: "users",
type: "user"
) do
:ok
else
{:error, %{reason: err}} ->
Logger.error("Could not bulk put users: #{err}")
:skipped
2021-12-30 18:03:09 +00:00
2021-12-30 18:03:02 +00:00
{:ok, %{body: body}} ->
IO.inspect(body)
:skipped
end
2021-12-13 20:27:35 +00:00
end
2021-12-16 15:58:56 +00:00
def bulk_post(data, :hashtags) when is_list(data) do
2021-12-14 12:37:10 +00:00
d =
data
|> Enum.map(fn d ->
[
%{index: %{_id: DocumentMappings.Hashtag.id(d)}},
DocumentMappings.Hashtag.encode(d)
]
end)
|> List.flatten()
2021-12-30 18:03:02 +00:00
with {:ok, %{body: %{"errors" => false}}} <-
Elastix.Bulk.post(
url(),
d,
index: "hashtags",
type: "hashtag"
) do
:ok
else
{:error, %{reason: err}} ->
Logger.error("Could not bulk put hashtags: #{err}")
:skipped
2021-12-30 18:03:09 +00:00
2021-12-30 18:03:02 +00:00
{:ok, %{body: body}} ->
IO.inspect(body)
:skipped
end
2021-12-14 12:37:10 +00:00
end
2021-12-14 14:48:24 +00:00
def bulk_post(_, :hashtags), do: {:ok, nil}
2021-12-14 15:02:11 +00:00
def search(_, _, _, :skip), do: []
def search(:raw, index, type, q) do
with {:ok, raw_results} <- Elastix.Search.search(url(), index, [type], q) do
results =
raw_results
|> Map.get(:body, %{})
|> Map.get("hits", %{})
|> Map.get("hits", [])
{:ok, results}
else
{:error, e} ->
Logger.error(e)
{:error, e}
end
end
def search(:activities, q) do
with {:ok, results} <- search(:raw, "activities", "activity", q) do
results
|> Enum.map(fn result -> result["_id"] end)
|> Pleroma.Activity.all_by_ids_with_object()
2021-12-15 10:57:47 +00:00
|> Enum.sort(&(&1.inserted_at >= &2.inserted_at))
else
e ->
Logger.error(e)
[]
end
end
def search(:users, q) do
with {:ok, results} <- search(:raw, "users", "user", q) do
results
|> Enum.map(fn result -> result["_id"] end)
|> Pleroma.User.get_all_by_ids()
else
e ->
Logger.error(e)
[]
end
end
def search(:hashtags, q) do
with {:ok, results} <- search(:raw, "hashtags", "hashtag", q) do
results
|> Enum.map(fn result -> result["_source"]["hashtag"] end)
else
e ->
Logger.error(e)
[]
end
2021-12-11 18:48:46 +00:00
end
2021-12-11 17:36:49 +00:00
end