forked from AkkomaGang/akkoma
Merge branch 'issue/2089' into 'develop'
[#2089] fix notifications See merge request pleroma/pleroma!3000
This commit is contained in:
commit
6b088ed76a
13 changed files with 125 additions and 56 deletions
|
@ -99,7 +99,7 @@ def run(["fix_likes_collections"]) do
|
||||||
where: fragment("(?)->>'likes' is not null", object.data),
|
where: fragment("(?)->>'likes' is not null", object.data),
|
||||||
select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
|
select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
|
||||||
)
|
)
|
||||||
|> Pleroma.RepoStreamer.chunk_stream(100)
|
|> Pleroma.Repo.chunk_stream(100, :batches)
|
||||||
|> Stream.each(fn objects ->
|
|> Stream.each(fn objects ->
|
||||||
ids =
|
ids =
|
||||||
objects
|
objects
|
||||||
|
@ -145,7 +145,7 @@ def run(["ensure_expiration"]) do
|
||||||
|> where(local: true)
|
|> where(local: true)
|
||||||
|> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
|
|> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
|
||||||
|> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
|
|> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
|
||||||
|> Pleroma.RepoStreamer.chunk_stream(100)
|
|> Pleroma.Repo.chunk_stream(100, :batches)
|
||||||
|> Stream.each(fn activities ->
|
|> Stream.each(fn activities ->
|
||||||
Enum.each(activities, fn activity ->
|
Enum.each(activities, fn activity ->
|
||||||
expires_at =
|
expires_at =
|
||||||
|
|
|
@ -179,7 +179,7 @@ def run(["deactivate_all_from_instance", instance]) do
|
||||||
start_pleroma()
|
start_pleroma()
|
||||||
|
|
||||||
Pleroma.User.Query.build(%{nickname: "@#{instance}"})
|
Pleroma.User.Query.build(%{nickname: "@#{instance}"})
|
||||||
|> Pleroma.RepoStreamer.chunk_stream(500)
|
|> Pleroma.Repo.chunk_stream(500, :batches)
|
||||||
|> Stream.each(fn users ->
|
|> Stream.each(fn users ->
|
||||||
users
|
users
|
||||||
|> Enum.each(fn user ->
|
|> Enum.each(fn user ->
|
||||||
|
@ -370,7 +370,7 @@ def run(["list"]) do
|
||||||
start_pleroma()
|
start_pleroma()
|
||||||
|
|
||||||
Pleroma.User.Query.build(%{local: true})
|
Pleroma.User.Query.build(%{local: true})
|
||||||
|> Pleroma.RepoStreamer.chunk_stream(500)
|
|> Pleroma.Repo.chunk_stream(500, :batches)
|
||||||
|> Stream.each(fn users ->
|
|> Stream.each(fn users ->
|
||||||
users
|
users
|
||||||
|> Enum.each(fn user ->
|
|> Enum.each(fn user ->
|
||||||
|
|
|
@ -19,13 +19,13 @@ def fill_in_notification_types do
|
||||||
query
|
query
|
||||||
|> Repo.chunk_stream(100)
|
|> Repo.chunk_stream(100)
|
||||||
|> Enum.each(fn notification ->
|
|> Enum.each(fn notification ->
|
||||||
type =
|
if notification.activity do
|
||||||
notification.activity
|
type = type_from_activity(notification.activity)
|
||||||
|> type_from_activity()
|
|
||||||
|
|
||||||
notification
|
notification
|
||||||
|> Ecto.Changeset.change(%{type: type})
|
|> Ecto.Changeset.change(%{type: type})
|
||||||
|> Repo.update()
|
|> Repo.update()
|
||||||
|
end
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -72,8 +72,7 @@ defp type_from_activity(%{data: %{"type" => type}} = activity) do
|
||||||
"pleroma:emoji_reaction"
|
"pleroma:emoji_reaction"
|
||||||
|
|
||||||
"Create" ->
|
"Create" ->
|
||||||
activity
|
type_from_activity_object(activity)
|
||||||
|> type_from_activity_object()
|
|
||||||
|
|
||||||
t ->
|
t ->
|
||||||
raise "No notification type for activity type #{t}"
|
raise "No notification type for activity type #{t}"
|
||||||
|
|
|
@ -49,7 +49,21 @@ def get_assoc(resource, association) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def chunk_stream(query, chunk_size) do
|
@doc """
|
||||||
|
Returns a lazy enumerable that emits all entries from the data store matching the given query.
|
||||||
|
|
||||||
|
`returns_as` use to group records. use the `batches` option to fetch records in bulk.
|
||||||
|
|
||||||
|
## Examples
|
||||||
|
|
||||||
|
# fetch records one-by-one
|
||||||
|
iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500)
|
||||||
|
|
||||||
|
# fetch records in bulk
|
||||||
|
iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
|
||||||
|
"""
|
||||||
|
@spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
|
||||||
|
def chunk_stream(query, chunk_size, returns_as \\ :one) do
|
||||||
# We don't actually need start and end funcitons of resource streaming,
|
# We don't actually need start and end funcitons of resource streaming,
|
||||||
# but it seems to be the only way to not fetch records one-by-one and
|
# but it seems to be the only way to not fetch records one-by-one and
|
||||||
# have individual records be the elements of the stream, instead of
|
# have individual records be the elements of the stream, instead of
|
||||||
|
@ -69,7 +83,12 @@ def chunk_stream(query, chunk_size) do
|
||||||
|
|
||||||
records ->
|
records ->
|
||||||
last_id = List.last(records).id
|
last_id = List.last(records).id
|
||||||
{records, last_id}
|
|
||||||
|
if returns_as == :one do
|
||||||
|
{records, last_id}
|
||||||
|
else
|
||||||
|
{[records], last_id}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
fn _ -> :ok end
|
fn _ -> :ok end
|
||||||
|
|
|
@ -1,34 +0,0 @@
|
||||||
# Pleroma: A lightweight social networking server
|
|
||||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
defmodule Pleroma.RepoStreamer do
|
|
||||||
alias Pleroma.Repo
|
|
||||||
import Ecto.Query
|
|
||||||
|
|
||||||
def chunk_stream(query, chunk_size) do
|
|
||||||
Stream.unfold(0, fn
|
|
||||||
:halt ->
|
|
||||||
{[], :halt}
|
|
||||||
|
|
||||||
last_id ->
|
|
||||||
query
|
|
||||||
|> order_by(asc: :id)
|
|
||||||
|> where([r], r.id > ^last_id)
|
|
||||||
|> limit(^chunk_size)
|
|
||||||
|> Repo.all()
|
|
||||||
|> case do
|
|
||||||
[] ->
|
|
||||||
{[], :halt}
|
|
||||||
|
|
||||||
records ->
|
|
||||||
last_id = List.last(records).id
|
|
||||||
{records, last_id}
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
|> Stream.take_while(fn
|
|
||||||
[] -> false
|
|
||||||
_ -> true
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
end
|
|
|
@ -25,7 +25,6 @@ defmodule Pleroma.User do
|
||||||
alias Pleroma.Object
|
alias Pleroma.Object
|
||||||
alias Pleroma.Registration
|
alias Pleroma.Registration
|
||||||
alias Pleroma.Repo
|
alias Pleroma.Repo
|
||||||
alias Pleroma.RepoStreamer
|
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
alias Pleroma.UserRelationship
|
alias Pleroma.UserRelationship
|
||||||
alias Pleroma.Web
|
alias Pleroma.Web
|
||||||
|
@ -1775,7 +1774,7 @@ def delete_notifications_from_user_activities(%User{ap_id: ap_id}) do
|
||||||
def delete_user_activities(%User{ap_id: ap_id} = user) do
|
def delete_user_activities(%User{ap_id: ap_id} = user) do
|
||||||
ap_id
|
ap_id
|
||||||
|> Activity.Queries.by_actor()
|
|> Activity.Queries.by_actor()
|
||||||
|> RepoStreamer.chunk_stream(50)
|
|> Repo.chunk_stream(50, :batches)
|
||||||
|> Stream.each(fn activities ->
|
|> Stream.each(fn activities ->
|
||||||
Enum.each(activities, fn activity -> delete_activity(activity, user) end)
|
Enum.each(activities, fn activity -> delete_activity(activity, user) end)
|
||||||
end)
|
end)
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
defmodule Pleroma.Repo.Migrations.DeleteNotificationWithoutActivity do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
import Ecto.Query
|
||||||
|
alias Pleroma.Repo
|
||||||
|
|
||||||
|
def up do
|
||||||
|
from(
|
||||||
|
q in Pleroma.Notification,
|
||||||
|
left_join: c in assoc(q, :activity),
|
||||||
|
select: %{id: type(q.id, :integer)},
|
||||||
|
where: is_nil(c.id)
|
||||||
|
)
|
||||||
|
|> Repo.chunk_stream(1_000, :batches)
|
||||||
|
|> Stream.each(fn records ->
|
||||||
|
notification_ids = Enum.map(records, fn %{id: id} -> id end)
|
||||||
|
|
||||||
|
Repo.delete_all(
|
||||||
|
from(n in "notifications",
|
||||||
|
where: n.id in ^notification_ids
|
||||||
|
)
|
||||||
|
)
|
||||||
|
end)
|
||||||
|
|> Stream.run()
|
||||||
|
end
|
||||||
|
|
||||||
|
def down do
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
end
|
|
@ -0,0 +1,23 @@
|
||||||
|
defmodule Pleroma.Repo.Migrations.AddNotificationConstraints do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def up do
|
||||||
|
drop(constraint(:notifications, "notifications_activity_id_fkey"))
|
||||||
|
|
||||||
|
alter table(:notifications) do
|
||||||
|
modify(:activity_id, references(:activities, type: :uuid, on_delete: :delete_all),
|
||||||
|
null: false
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def down do
|
||||||
|
drop(constraint(:notifications, "notifications_activity_id_fkey"))
|
||||||
|
|
||||||
|
alter table(:notifications) do
|
||||||
|
modify(:activity_id, references(:activities, type: :uuid, on_delete: :delete_all),
|
||||||
|
null: true
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -33,8 +33,8 @@ test "return empty multi" do
|
||||||
test "returns user markers" do
|
test "returns user markers" do
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
marker = insert(:marker, user: user)
|
marker = insert(:marker, user: user)
|
||||||
insert(:notification, user: user)
|
insert(:notification, user: user, activity: insert(:note_activity))
|
||||||
insert(:notification, user: user)
|
insert(:notification, user: user, activity: insert(:note_activity))
|
||||||
insert(:marker, timeline: "home", user: user)
|
insert(:marker, timeline: "home", user: user)
|
||||||
|
|
||||||
assert Marker.get_markers(
|
assert Marker.get_markers(
|
||||||
|
|
|
@ -37,7 +37,9 @@ test "get one-to-one assoc from repo" do
|
||||||
|
|
||||||
test "get one-to-many assoc from repo" do
|
test "get one-to-many assoc from repo" do
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
notification = refresh_record(insert(:notification, user: user))
|
|
||||||
|
notification =
|
||||||
|
refresh_record(insert(:notification, user: user, activity: insert(:note_activity)))
|
||||||
|
|
||||||
assert Repo.get_assoc(user, :notifications) == {:ok, [notification]}
|
assert Repo.get_assoc(user, :notifications) == {:ok, [notification]}
|
||||||
end
|
end
|
||||||
|
@ -47,4 +49,32 @@ test "return error if has not assoc " do
|
||||||
assert Repo.get_assoc(token, :user) == {:error, :not_found}
|
assert Repo.get_assoc(token, :user) == {:error, :not_found}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "chunk_stream/3" do
|
||||||
|
test "fetch records one-by-one" do
|
||||||
|
users = insert_list(50, :user)
|
||||||
|
|
||||||
|
{fetch_users, 50} =
|
||||||
|
from(t in User)
|
||||||
|
|> Repo.chunk_stream(5)
|
||||||
|
|> Enum.reduce({[], 0}, fn %User{} = user, {acc, count} ->
|
||||||
|
{acc ++ [user], count + 1}
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert users == fetch_users
|
||||||
|
end
|
||||||
|
|
||||||
|
test "fetch records in bulk" do
|
||||||
|
users = insert_list(50, :user)
|
||||||
|
|
||||||
|
{fetch_users, 10} =
|
||||||
|
from(t in User)
|
||||||
|
|> Repo.chunk_stream(5, :batches)
|
||||||
|
|> Enum.reduce({[], 0}, fn users, {acc, count} ->
|
||||||
|
{acc ++ users, count + 1}
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert users == fetch_users
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1442,7 +1442,10 @@ test "returns lists to which the account belongs" do
|
||||||
describe "verify_credentials" do
|
describe "verify_credentials" do
|
||||||
test "verify_credentials" do
|
test "verify_credentials" do
|
||||||
%{user: user, conn: conn} = oauth_access(["read:accounts"])
|
%{user: user, conn: conn} = oauth_access(["read:accounts"])
|
||||||
[notification | _] = insert_list(7, :notification, user: user)
|
|
||||||
|
[notification | _] =
|
||||||
|
insert_list(7, :notification, user: user, activity: insert(:note_activity))
|
||||||
|
|
||||||
Pleroma.Notification.set_read_up_to(user, notification.id)
|
Pleroma.Notification.set_read_up_to(user, notification.id)
|
||||||
conn = get(conn, "/api/v1/accounts/verify_credentials")
|
conn = get(conn, "/api/v1/accounts/verify_credentials")
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ defmodule Pleroma.Web.MastodonAPI.MarkerControllerTest do
|
||||||
test "gets markers with correct scopes", %{conn: conn} do
|
test "gets markers with correct scopes", %{conn: conn} do
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
token = insert(:oauth_token, user: user, scopes: ["read:statuses"])
|
token = insert(:oauth_token, user: user, scopes: ["read:statuses"])
|
||||||
insert_list(7, :notification, user: user)
|
insert_list(7, :notification, user: user, activity: insert(:note_activity))
|
||||||
|
|
||||||
{:ok, %{"notifications" => marker}} =
|
{:ok, %{"notifications" => marker}} =
|
||||||
Pleroma.Marker.upsert(
|
Pleroma.Marker.upsert(
|
||||||
|
|
|
@ -448,7 +448,7 @@ test "shows unread_conversation_count only to the account owner" do
|
||||||
|
|
||||||
test "shows unread_count only to the account owner" do
|
test "shows unread_count only to the account owner" do
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
insert_list(7, :notification, user: user)
|
insert_list(7, :notification, user: user, activity: insert(:note_activity))
|
||||||
other_user = insert(:user)
|
other_user = insert(:user)
|
||||||
|
|
||||||
user = User.get_cached_by_ap_id(user.ap_id)
|
user = User.get_cached_by_ap_id(user.ap_id)
|
||||||
|
|
Loading…
Reference in a new issue