RepoStreamer.chunk_stream -> Repo.chunk_stream

This commit is contained in:
Maksim Pechnikov 2020-09-16 09:47:18 +03:00
parent c74fad9e06
commit 599f8bb152
6 changed files with 47 additions and 40 deletions

View file

@ -99,7 +99,7 @@ def run(["fix_likes_collections"]) do
where: fragment("(?)->>'likes' is not null", object.data), where: fragment("(?)->>'likes' is not null", object.data),
select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)} select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
) )
|> Pleroma.RepoStreamer.chunk_stream(100) |> Pleroma.Repo.chunk_stream(100, :batches)
|> Stream.each(fn objects -> |> Stream.each(fn objects ->
ids = ids =
objects objects
@ -145,7 +145,7 @@ def run(["ensure_expiration"]) do
|> where(local: true) |> where(local: true)
|> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data)) |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
|> where([_a, o], fragment("?->>'type' = 'Note'", o.data)) |> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
|> Pleroma.RepoStreamer.chunk_stream(100) |> Pleroma.Repo.chunk_stream(100, :batches)
|> Stream.each(fn activities -> |> Stream.each(fn activities ->
Enum.each(activities, fn activity -> Enum.each(activities, fn activity ->
expires_at = expires_at =

View file

@ -179,7 +179,7 @@ def run(["deactivate_all_from_instance", instance]) do
start_pleroma() start_pleroma()
Pleroma.User.Query.build(%{nickname: "@#{instance}"}) Pleroma.User.Query.build(%{nickname: "@#{instance}"})
|> Pleroma.RepoStreamer.chunk_stream(500) |> Pleroma.Repo.chunk_stream(500, :batches)
|> Stream.each(fn users -> |> Stream.each(fn users ->
users users
|> Enum.each(fn user -> |> Enum.each(fn user ->
@ -370,7 +370,7 @@ def run(["list"]) do
start_pleroma() start_pleroma()
Pleroma.User.Query.build(%{local: true}) Pleroma.User.Query.build(%{local: true})
|> Pleroma.RepoStreamer.chunk_stream(500) |> Pleroma.Repo.chunk_stream(500, :batches)
|> Stream.each(fn users -> |> Stream.each(fn users ->
users users
|> Enum.each(fn user -> |> Enum.each(fn user ->

View file

@ -49,6 +49,20 @@ def get_assoc(resource, association) do
end end
end end
@doc """
Returns a lazy enumerable that emits all entries from the data store matching the given query.
`returns_as` use to group records. use the `batches` option to fetch records in bulk.
## Examples
# fetch records one-by-one
iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500)
# fetch records in bulk
iex> Pleroma.Repo.chunk_stream(Pleroma.Activity.Queries.by_actor(ap_id), 500, :batches)
"""
@spec chunk_stream(Ecto.Query.t(), integer(), atom()) :: Enumerable.t()
def chunk_stream(query, chunk_size, returns_as \\ :one) do def chunk_stream(query, chunk_size, returns_as \\ :one) do
# We don't actually need start and end funcitons of resource streaming, # We don't actually need start and end funcitons of resource streaming,
# but it seems to be the only way to not fetch records one-by-one and # but it seems to be the only way to not fetch records one-by-one and

View file

@ -1,34 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.RepoStreamer do
alias Pleroma.Repo
import Ecto.Query
def chunk_stream(query, chunk_size) do
Stream.unfold(0, fn
:halt ->
{[], :halt}
last_id ->
query
|> order_by(asc: :id)
|> where([r], r.id > ^last_id)
|> limit(^chunk_size)
|> Repo.all()
|> case do
[] ->
{[], :halt}
records ->
last_id = List.last(records).id
{records, last_id}
end
end)
|> Stream.take_while(fn
[] -> false
_ -> true
end)
end
end

View file

@ -25,7 +25,6 @@ defmodule Pleroma.User do
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Registration alias Pleroma.Registration
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.RepoStreamer
alias Pleroma.User alias Pleroma.User
alias Pleroma.UserRelationship alias Pleroma.UserRelationship
alias Pleroma.Web alias Pleroma.Web
@ -1775,7 +1774,7 @@ def delete_notifications_from_user_activities(%User{ap_id: ap_id}) do
def delete_user_activities(%User{ap_id: ap_id} = user) do def delete_user_activities(%User{ap_id: ap_id} = user) do
ap_id ap_id
|> Activity.Queries.by_actor() |> Activity.Queries.by_actor()
|> RepoStreamer.chunk_stream(50) |> Repo.chunk_stream(50, :batches)
|> Stream.each(fn activities -> |> Stream.each(fn activities ->
Enum.each(activities, fn activity -> delete_activity(activity, user) end) Enum.each(activities, fn activity -> delete_activity(activity, user) end)
end) end)

View file

@ -49,4 +49,32 @@ test "return error if has not assoc " do
assert Repo.get_assoc(token, :user) == {:error, :not_found} assert Repo.get_assoc(token, :user) == {:error, :not_found}
end end
end end
describe "chunk_stream/3" do
test "fetch records one-by-one" do
users = insert_list(50, :user)
{fetch_users, 50} =
from(t in User)
|> Repo.chunk_stream(5)
|> Enum.reduce({[], 0}, fn %User{} = user, {acc, count} ->
{acc ++ [user], count + 1}
end)
assert users == fetch_users
end
test "fetch records in bulk" do
users = insert_list(50, :user)
{fetch_users, 10} =
from(t in User)
|> Repo.chunk_stream(5, :batches)
|> Enum.reduce({[], 0}, fn users, {acc, count} ->
{acc ++ users, count + 1}
end)
assert users == fetch_users
end
end
end end