Merge branch 'bugfix/1543-streaming-boosts' into 'develop'
Streamer: Stream boosts to the boosting user. Closes #1543 See merge request pleroma/pleroma!2415
This commit is contained in:
commit
238058ecae
4 changed files with 51 additions and 19 deletions
|
@ -1180,7 +1180,9 @@ def get_users_from_set(ap_ids, local_only \\ true) do
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec get_recipients_from_activity(Activity.t()) :: [User.t()]
|
@spec get_recipients_from_activity(Activity.t()) :: [User.t()]
|
||||||
def get_recipients_from_activity(%Activity{recipients: to}) do
|
def get_recipients_from_activity(%Activity{recipients: to, actor: actor}) do
|
||||||
|
to = [actor | to]
|
||||||
|
|
||||||
User.Query.build(%{recipients_from_activity: to, local: true, deactivated: false})
|
User.Query.build(%{recipients_from_activity: to, local: true, deactivated: false})
|
||||||
|> Repo.all()
|
|> Repo.all()
|
||||||
end
|
end
|
||||||
|
|
|
@ -158,24 +158,6 @@ defp should_send?(%User{} = user, %Notification{activity: activity}) do
|
||||||
should_send?(user, activity)
|
should_send?(user, activity)
|
||||||
end
|
end
|
||||||
|
|
||||||
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
|
|
||||||
Enum.each(topics[topic] || [], fn %StreamerSocket{
|
|
||||||
transport_pid: transport_pid,
|
|
||||||
user: socket_user
|
|
||||||
} ->
|
|
||||||
# Get the current user so we have up-to-date blocks etc.
|
|
||||||
if socket_user do
|
|
||||||
user = User.get_cached_by_ap_id(socket_user.ap_id)
|
|
||||||
|
|
||||||
if should_send?(user, item) do
|
|
||||||
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
|
|
||||||
end
|
|
||||||
else
|
|
||||||
send(transport_pid, {:text, StreamerView.render("update.json", item)})
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
def push_to_socket(topics, topic, %Participation{} = participation) do
|
def push_to_socket(topics, topic, %Participation{} = participation) do
|
||||||
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
|
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
|
||||||
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
|
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
|
||||||
|
|
|
@ -987,6 +987,18 @@ test "it imports user blocks from list" do
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "get_recipients_from_activity" do
|
describe "get_recipients_from_activity" do
|
||||||
|
test "works for announces" do
|
||||||
|
actor = insert(:user)
|
||||||
|
user = insert(:user, local: true)
|
||||||
|
|
||||||
|
{:ok, activity} = CommonAPI.post(actor, %{"status" => "hello"})
|
||||||
|
{:ok, announce, _} = CommonAPI.repeat(activity.id, user)
|
||||||
|
|
||||||
|
recipients = User.get_recipients_from_activity(announce)
|
||||||
|
|
||||||
|
assert user in recipients
|
||||||
|
end
|
||||||
|
|
||||||
test "get recipients" do
|
test "get recipients" do
|
||||||
actor = insert(:user)
|
actor = insert(:user)
|
||||||
user = insert(:user, local: true)
|
user = insert(:user, local: true)
|
||||||
|
|
|
@ -28,6 +28,42 @@ defmodule Pleroma.Web.StreamerTest do
|
||||||
{:ok, %{user: user, notify: notify}}
|
{:ok, %{user: user, notify: notify}}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "it streams the user's post in the 'user' stream", %{user: user} do
|
||||||
|
task =
|
||||||
|
Task.async(fn ->
|
||||||
|
assert_receive {:text, _}, @streamer_timeout
|
||||||
|
end)
|
||||||
|
|
||||||
|
Streamer.add_socket(
|
||||||
|
"user",
|
||||||
|
%{transport_pid: task.pid, assigns: %{user: user}}
|
||||||
|
)
|
||||||
|
|
||||||
|
{:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
|
||||||
|
|
||||||
|
Streamer.stream("user", activity)
|
||||||
|
Task.await(task)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "it streams boosts of the user in the 'user' stream", %{user: user} do
|
||||||
|
task =
|
||||||
|
Task.async(fn ->
|
||||||
|
assert_receive {:text, _}, @streamer_timeout
|
||||||
|
end)
|
||||||
|
|
||||||
|
Streamer.add_socket(
|
||||||
|
"user",
|
||||||
|
%{transport_pid: task.pid, assigns: %{user: user}}
|
||||||
|
)
|
||||||
|
|
||||||
|
other_user = insert(:user)
|
||||||
|
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
|
||||||
|
{:ok, announce, _} = CommonAPI.repeat(activity.id, user)
|
||||||
|
|
||||||
|
Streamer.stream("user", announce)
|
||||||
|
Task.await(task)
|
||||||
|
end
|
||||||
|
|
||||||
test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
|
test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
|
||||||
task =
|
task =
|
||||||
Task.async(fn ->
|
Task.async(fn ->
|
||||||
|
|
Loading…
Reference in a new issue