From abf82a63ec242885672e7add20ddfc9554d7f81d Mon Sep 17 00:00:00 2001
From: Ekaterina Vaartis <vaartis@kotobank.ch>
Date: Mon, 16 Aug 2021 22:30:56 +0300
Subject: [PATCH] Make the indexing batch differently and more, show number
 indexed

---
 lib/mix/tasks/pleroma/search/meilisearch.ex | 65 ++++++++++++---------
 1 file changed, 39 insertions(+), 26 deletions(-)

diff --git a/lib/mix/tasks/pleroma/search/meilisearch.ex b/lib/mix/tasks/pleroma/search/meilisearch.ex
index 0b86fdece..2a6438528 100644
--- a/lib/mix/tasks/pleroma/search/meilisearch.ex
+++ b/lib/mix/tasks/pleroma/search/meilisearch.ex
@@ -28,33 +28,46 @@ def run(["index"]) do
         ])
       )
 
-    Pleroma.Repo.chunk_stream(
-      from(Pleroma.Object,
-        # Only index public posts which are notes and have some text
-        where:
-          fragment("data->>'type' = 'Note'") and
-            fragment("LENGTH(data->>'source') > 0") and
-            fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public())
-      ),
-      200,
-      :batches
-    )
-    |> Stream.map(fn objects ->
-      Enum.map(objects, fn object ->
-        data = object.data
-        %{id: object.id, source: data["source"], ap: data["id"]}
-      end)
-    end)
-    |> Stream.each(fn objects ->
-      {:ok, _} =
-        Pleroma.HTTP.post(
-          "#{endpoint}/indexes/objects/documents",
-          Jason.encode!(objects)
-        )
+    chunk_size = 100_000
 
-      IO.puts("Indexed #{Enum.count(objects)} entries")
-    end)
-    |> Stream.run()
+    Pleroma.Repo.transaction(
+      fn ->
+        Pleroma.Repo.stream(
+          from(Pleroma.Object,
+            # Only index public posts which are notes and have some text
+            where:
+              fragment("data->>'type' = 'Note'") and
+                fragment("LENGTH(data->>'source') > 0") and
+                fragment("data->'to' \\? ?", ^Pleroma.Constants.as_public()),
+            order_by: fragment("data->'published' DESC")
+          ),
+          timeout: :infinity
+        )
+        |> Stream.chunk_every(chunk_size)
+        |> Stream.transform(0, fn objects, acc ->
+          new_acc = acc + Enum.count(objects)
+
+          IO.puts("Indexed #{new_acc} entries")
+
+          {[objects], new_acc}
+        end)
+        |> Stream.map(fn objects ->
+          Enum.map(objects, fn object ->
+            data = object.data
+            %{id: object.id, source: data["source"], ap: data["id"]}
+          end)
+        end)
+        |> Stream.each(fn objects ->
+          {:ok, _} =
+            Pleroma.HTTP.post(
+              "#{endpoint}/indexes/objects/documents",
+              Jason.encode!(objects)
+            )
+        end)
+        |> Stream.run()
+      end,
+      timeout: :infinity
+    )
   end
 
   def run(["clear"]) do