forked from AkkomaGang/akkoma
Streamer: Stream boosts to the boosting user.
This commit is contained in:
parent
1cd38dfffa
commit
7a3a88a13e
4 changed files with 51 additions and 19 deletions
|
@ -1180,7 +1180,9 @@ def get_users_from_set(ap_ids, local_only \\ true) do
|
|||
end
|
||||
|
||||
@spec get_recipients_from_activity(Activity.t()) :: [User.t()]
|
||||
def get_recipients_from_activity(%Activity{recipients: to}) do
|
||||
def get_recipients_from_activity(%Activity{recipients: to, actor: actor}) do
|
||||
to = [actor | to]
|
||||
|
||||
User.Query.build(%{recipients_from_activity: to, local: true, deactivated: false})
|
||||
|> Repo.all()
|
||||
end
|
||||
|
|
|
@ -158,24 +158,6 @@ defp should_send?(%User{} = user, %Notification{activity: activity}) do
|
|||
should_send?(user, activity)
|
||||
end
|
||||
|
||||
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
|
||||
Enum.each(topics[topic] || [], fn %StreamerSocket{
|
||||
transport_pid: transport_pid,
|
||||
user: socket_user
|
||||
} ->
|
||||
# Get the current user so we have up-to-date blocks etc.
|
||||
if socket_user do
|
||||
user = User.get_cached_by_ap_id(socket_user.ap_id)
|
||||
|
||||
if should_send?(user, item) do
|
||||
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
|
||||
end
|
||||
else
|
||||
send(transport_pid, {:text, StreamerView.render("update.json", item)})
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
def push_to_socket(topics, topic, %Participation{} = participation) do
|
||||
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
|
||||
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
|
||||
|
|
|
@ -987,6 +987,18 @@ test "it imports user blocks from list" do
|
|||
end
|
||||
|
||||
describe "get_recipients_from_activity" do
|
||||
test "works for announces" do
|
||||
actor = insert(:user)
|
||||
user = insert(:user, local: true)
|
||||
|
||||
{:ok, activity} = CommonAPI.post(actor, %{"status" => "hello"})
|
||||
{:ok, announce, _} = CommonAPI.repeat(activity.id, user)
|
||||
|
||||
recipients = User.get_recipients_from_activity(announce)
|
||||
|
||||
assert user in recipients
|
||||
end
|
||||
|
||||
test "get recipients" do
|
||||
actor = insert(:user)
|
||||
user = insert(:user, local: true)
|
||||
|
|
|
@ -28,6 +28,42 @@ defmodule Pleroma.Web.StreamerTest do
|
|||
{:ok, %{user: user, notify: notify}}
|
||||
end
|
||||
|
||||
test "it streams the user's post in the 'user' stream", %{user: user} do
|
||||
task =
|
||||
Task.async(fn ->
|
||||
assert_receive {:text, _}, @streamer_timeout
|
||||
end)
|
||||
|
||||
Streamer.add_socket(
|
||||
"user",
|
||||
%{transport_pid: task.pid, assigns: %{user: user}}
|
||||
)
|
||||
|
||||
{:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
|
||||
|
||||
Streamer.stream("user", activity)
|
||||
Task.await(task)
|
||||
end
|
||||
|
||||
test "it streams boosts of the user in the 'user' stream", %{user: user} do
|
||||
task =
|
||||
Task.async(fn ->
|
||||
assert_receive {:text, _}, @streamer_timeout
|
||||
end)
|
||||
|
||||
Streamer.add_socket(
|
||||
"user",
|
||||
%{transport_pid: task.pid, assigns: %{user: user}}
|
||||
)
|
||||
|
||||
other_user = insert(:user)
|
||||
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
|
||||
{:ok, announce, _} = CommonAPI.repeat(activity.id, user)
|
||||
|
||||
Streamer.stream("user", announce)
|
||||
Task.await(task)
|
||||
end
|
||||
|
||||
test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
|
||||
task =
|
||||
Task.async(fn ->
|
||||
|
|
Loading…
Reference in a new issue