forked from YokaiRick/akkoma
Merge branch 'feature/list-streaming' into 'develop'
MastoAPI: Add streaming for lists. See merge request pleroma/pleroma!183
This commit is contained in:
commit
564c73ab24
6 changed files with 56 additions and 3 deletions
|
@ -1,7 +1,7 @@
|
||||||
defmodule Pleroma.List do
|
defmodule Pleroma.List do
|
||||||
use Ecto.Schema
|
use Ecto.Schema
|
||||||
import Ecto.{Changeset, Query}
|
import Ecto.{Changeset, Query}
|
||||||
alias Pleroma.{User, Repo}
|
alias Pleroma.{User, Repo, Activity}
|
||||||
|
|
||||||
schema "lists" do
|
schema "lists" do
|
||||||
belongs_to(:user, Pleroma.User)
|
belongs_to(:user, Pleroma.User)
|
||||||
|
@ -56,6 +56,19 @@ def get_following(%Pleroma.List{following: following} = list) do
|
||||||
{:ok, Repo.all(q)}
|
{:ok, Repo.all(q)}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Get lists the activity should be streamed to.
|
||||||
|
def get_lists_from_activity(%Activity{actor: ap_id}) do
|
||||||
|
actor = User.get_cached_by_ap_id(ap_id)
|
||||||
|
|
||||||
|
query =
|
||||||
|
from(
|
||||||
|
l in Pleroma.List,
|
||||||
|
where: fragment("? && ?", l.following, ^[actor.follower_address])
|
||||||
|
)
|
||||||
|
|
||||||
|
Repo.all(query)
|
||||||
|
end
|
||||||
|
|
||||||
def rename(%Pleroma.List{} = list, title) do
|
def rename(%Pleroma.List{} = list, title) do
|
||||||
list
|
list
|
||||||
|> title_changeset(%{title: title})
|
|> title_changeset(%{title: title})
|
||||||
|
|
|
@ -57,6 +57,7 @@ def stream_out(activity) do
|
||||||
|
|
||||||
if activity.data["type"] in ["Create", "Announce"] do
|
if activity.data["type"] in ["Create", "Announce"] do
|
||||||
Pleroma.Web.Streamer.stream("user", activity)
|
Pleroma.Web.Streamer.stream("user", activity)
|
||||||
|
Pleroma.Web.Streamer.stream("list", activity)
|
||||||
|
|
||||||
if Enum.member?(activity.data["to"], public) do
|
if Enum.member?(activity.data["to"], public) do
|
||||||
Pleroma.Web.Streamer.stream("public", activity)
|
Pleroma.Web.Streamer.stream("public", activity)
|
||||||
|
|
|
@ -15,10 +15,13 @@ def connect(params, socket) do
|
||||||
with token when not is_nil(token) <- params["access_token"],
|
with token when not is_nil(token) <- params["access_token"],
|
||||||
%Token{user_id: user_id} <- Repo.get_by(Token, token: token),
|
%Token{user_id: user_id} <- Repo.get_by(Token, token: token),
|
||||||
%User{} = user <- Repo.get(User, user_id),
|
%User{} = user <- Repo.get(User, user_id),
|
||||||
stream when stream in ["public", "public:local", "user", "direct"] <- params["stream"] do
|
stream when stream in ["public", "public:local", "user", "direct", "list"] <-
|
||||||
|
params["stream"] do
|
||||||
|
topic = if stream == "list", do: "list:#{params["list"]}", else: stream
|
||||||
|
|
||||||
socket =
|
socket =
|
||||||
socket
|
socket
|
||||||
|> assign(:topic, params["stream"])
|
|> assign(:topic, topic)
|
||||||
|> assign(:user, user)
|
|> assign(:user, user)
|
||||||
|
|
||||||
Pleroma.Web.Streamer.add_socket(params["stream"], socket)
|
Pleroma.Web.Streamer.add_socket(params["stream"], socket)
|
||||||
|
|
|
@ -59,6 +59,19 @@ def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
|
||||||
{:noreply, topics}
|
{:noreply, topics}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
|
||||||
|
recipient_topics =
|
||||||
|
Pleroma.List.get_lists_from_activity(item)
|
||||||
|
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
|
||||||
|
|
||||||
|
Enum.each(recipient_topics || [], fn list_topic ->
|
||||||
|
Logger.debug("Trying to push message to #{list_topic}\n\n")
|
||||||
|
push_to_socket(topics, list_topic, item)
|
||||||
|
end)
|
||||||
|
|
||||||
|
{:noreply, topics}
|
||||||
|
end
|
||||||
|
|
||||||
def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
|
def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
|
||||||
topic = "user:#{item.user_id}"
|
topic = "user:#{item.user_id}"
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,7 @@
|
||||||
|
defmodule Pleroma.Repo.Migrations.AddListFollowIndex do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def change do
|
||||||
|
create index(:lists, [:following])
|
||||||
|
end
|
||||||
|
end
|
|
@ -74,4 +74,20 @@ test "getting all lists by an user" do
|
||||||
assert list_two in lists
|
assert list_two in lists
|
||||||
refute list_three in lists
|
refute list_three in lists
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "getting all lists the user is a member of" do
|
||||||
|
user = insert(:user)
|
||||||
|
other_user = insert(:user)
|
||||||
|
{:ok, list_one} = Pleroma.List.create("title", user)
|
||||||
|
{:ok, list_two} = Pleroma.List.create("other title", user)
|
||||||
|
{:ok, list_three} = Pleroma.List.create("third title", other_user)
|
||||||
|
{:ok, list_one} = Pleroma.List.follow(list_one, other_user)
|
||||||
|
{:ok, list_two} = Pleroma.List.follow(list_two, other_user)
|
||||||
|
{:ok, list_three} = Pleroma.List.follow(list_three, user)
|
||||||
|
|
||||||
|
lists = Pleroma.List.get_lists_from_activity(%Pleroma.Activity{actor: other_user.ap_id})
|
||||||
|
assert list_one in lists
|
||||||
|
assert list_two in lists
|
||||||
|
refute list_three in lists
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue