Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions lib/graphql/subscriptions/anycable_subscriptions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,24 @@ def execute_all(event, object)
def execute_grouped(fingerprint, subscription_ids, event, object)
return if subscription_ids.empty?

subscription_id = with_redis { |redis| subscription_ids.find { |sid| redis.exists?(redis_key(SUBSCRIPTION_PREFIX) + sid) } }
return unless subscription_id # All subscriptions has expired but haven't cleaned up yet
result = nil

# Iterate through all subscriptions to find the subscription which:
# 1. still exists in Redis
# 2. got a result when updated with the event
# This protects in cases where a subscription could expire between checking a subscription existing and
# update execution
# We need only one working subscription, because the result will be shared with all subscribers
with_redis do |redis|
subscription_ids.each do |sid|
next unless redis.exists?(redis_key(SUBSCRIPTION_PREFIX) + sid)

result = execute_update(sid, event, object)

break if result
end
end

result = execute_update(subscription_id, event, object)
return unless result

# Having calculated the result _once_, send the same payload to all subscribers
Expand Down Expand Up @@ -165,12 +179,17 @@ def read_subscription(subscription_id)
redis.mapped_hmget(
"#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}",
:query_string, :variables, :context, :operation_name
).tap do |subscription|
next if subscription.values.all?(&:nil?) # Redis returns hash with all nils for missing key
).then do |subscription|
# Redis returns hash with all nils for missing key
return nil if subscription.values.all?(&:nil?)
# query_string is a required field for executing a subscription, so we should be sure that it exists
return nil if subscription[:query_string].nil?

subscription[:context] = @serializer.load(subscription[:context])
subscription[:variables] = JSON.parse(subscription[:variables])
subscription[:operation_name] = nil if subscription[:operation_name].strip == ""
subscription[:operation_name] = nil if subscription[:operation_name].to_s.strip == ""

subscription
end
end
end
Expand Down
66 changes: 37 additions & 29 deletions spec/graphql/anycable_spec.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# frozen_string_literal: true

RSpec.describe GraphQL::AnyCable do
let(:operation_name) { "SomeSubscription" }

subject do
AnycableSchema.execute(
query: query,
context: {channel: channel, subscription_id: subscription_id},
variables: {},
operation_name: "SomeSubscription"
operation_name: operation_name
).tap do |result|
expect(result.to_h.fetch("errors", [])).to be_empty
end
Expand Down Expand Up @@ -55,49 +57,55 @@
expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result)
end

context "with multiple subscriptions in one query" do
let(:query) do
<<~GRAPHQL
subscription SomeSubscription {
productCreated { id title }
productUpdated { id }
}
GRAPHQL
end
context "with multiple subscriptions in two queries" do
context "product created subscription" do
let(:operation_name) { "ProductCreatedSubscription" }
let(:query) do
<<~GRAPHQL
subscription ProductCreatedSubscription {
productCreated { id title }
}
GRAPHQL
end
let(:expected_result) do
<<~JSON.strip
{"result":{"data":{"productCreated":{"id":"1","title":"Gravizapa"}}},"more":true}
JSON
end

context "triggering update event" do
it "broadcasts message only for update event" do
it "broadcasts message only for create event" do
subject
AnycableSchema.subscriptions.trigger(:product_updated, {}, {id: 1, title: "foo"})
AnycableSchema.subscriptions.trigger(:product_created, {}, {id: 1, title: "Gravizapa"})
expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result)
end
end

context "triggering create event" do
context "product updated subscription" do
let(:operation_name) { "ProductUpdatedSubscription" }
let(:query) do
<<~GRAPHQL
subscription ProductUpdatedSubscription {
productUpdated { id }
}
GRAPHQL
end

let(:expected_result) do
<<~JSON.strip
{"result":{"data":{"productCreated":{"id":"1","title":"Gravizapa"}}},"more":true}
{"result":{"data":{"productUpdated":{"id":"1"}}},"more":true}
JSON
end

it "broadcasts message only for create event" do
it "broadcasts message for update event" do
subject
AnycableSchema.subscriptions.trigger(:product_created, {}, {id: 1, title: "Gravizapa"})

AnycableSchema.subscriptions.trigger(:product_updated, {}, {id: 1, title: "foo"})
expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result)
end
end
end

context "with empty operation name" do
subject do
AnycableSchema.execute(
query: query,
context: {channel: channel, subscription_id: subscription_id},
variables: {},
operation_name: nil
)
end
let(:operation_name) { nil }

let(:query) do
<<~GRAPHQL
Expand All @@ -118,7 +126,7 @@
query: query,
context: {channel: channel, subscription_id: subscription_id},
variables: {},
operation_name: "SomeSubscription"
operation_name: operation_name
)
end

Expand Down Expand Up @@ -154,7 +162,7 @@
query: query,
context: {channel: channel, subscription_id: subscription_id},
variables: {},
operation_name: "SomeSubscription"
operation_name: operation_name
)
end

Expand Down Expand Up @@ -182,7 +190,7 @@
query: query,
context: {}, # Intentionally left blank
variables: {},
operation_name: "SomeSubscription"
operation_name: operation_name
)
end

Expand Down
75 changes: 75 additions & 0 deletions spec/graphql/broadcast_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,79 @@ def subscribe(query)
expect(AnyCable).to have_received(:broadcast).once
end
end

context "when handling race conditions with subscription deleted between checks" do
let(:query) do
<<~GRAPHQL.strip
subscription SomeSubscription { postCreated{ id } }
GRAPHQL
end

let(:redis) { $redis }
let(:object) { double("Post", id: 1, title: "Racing") }
let(:fingerprint) { ":postCreated:/SomeSubscription/race-condition-test/0/signature456=" }
let(:subscriptions) { BroadcastSchema.subscriptions }

before do
allow_any_instance_of(GraphQL::Subscriptions::Event).to receive(:fingerprint).and_return(fingerprint)

3.times { subscribe(query) }

@subscription_ids = redis.smembers("graphql-subscriptions:#{fingerprint}")
expect(@subscription_ids.size).to eq(3)

# Emulate removing a subscription like race condition
allow(subscriptions).to receive(:read_subscription).and_wrap_original do |original, sid|
# Remove first subscription after `checking existing`, but before the read_subscription
if sid == @subscription_ids.first
redis.del("graphql-subscription:#{sid}")

nil
else
original.call(sid)
end
end

allow(AnyCable).to receive(:broadcast)
end

it "handles subscription deleted between exists? check and read_subscription" do
subscriptions.execute_grouped(
fingerprint,
@subscription_ids,
GraphQL::Subscriptions::Event.new(
name: "postCreated",
arguments: {},
field: BroadcastSchema.subscription.fields["postCreated"],
scope: nil,
context: {}
),
object
)

# We must get broadcast here, because if the first subscription expired, we should process the rest of subscriptions
expect(AnyCable).to have_received(:broadcast).once
expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", anything)
end

it "returns without broadcasting when all subscriptions were deleted between checks" do
# read_subscription always returns nil
allow(subscriptions).to receive(:read_subscription).and_return(nil)

subscriptions.execute_grouped(
fingerprint,
@subscription_ids,
GraphQL::Subscriptions::Event.new(
name: "postCreated",
arguments: {},
field: BroadcastSchema.subscription.fields["postCreated"],
scope: nil,
context: {}
),
object
)

expect(AnyCable).not_to have_received(:broadcast)
end
end
end
28 changes: 21 additions & 7 deletions spec/graphql/stats_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

RSpec.describe GraphQL::AnyCable::Stats do
describe "#collect" do
let(:query) do
let(:created_query) do
<<~GRAPHQL
subscription SomeSubscription {
subscription ProductCreatedSubscription {
productCreated { id title }
}
GRAPHQL
end

let(:updated_query) do
<<~GRAPHQL
subscription ProductUpdatedSubscription {
productUpdated { id }
}
GRAPHQL
Expand All @@ -23,16 +30,23 @@

before do
AnycableSchema.execute(
query: query,
context: {channel: channel, subscription_id: subscription_id},
query: created_query,
context: {channel: channel, subscription_id: "#{subscription_id}-created"},
variables: {},
operation_name: "ProductCreatedSubscription"
)

AnycableSchema.execute(
query: updated_query,
context: {channel: channel, subscription_id: "#{subscription_id}-updated"},
variables: {},
operation_name: "SomeSubscription"
operation_name: "ProductUpdatedSubscription"
)
end

context "when include_subscriptions is false" do
let(:expected_result) do
{total: {subscription: 1, fingerprints: 2, subscriptions: 2, channel: 1}}
{total: {subscription: 2, fingerprints: 2, subscriptions: 2, channel: 2}}
end

it "returns total stat" do
Expand All @@ -45,7 +59,7 @@

let(:expected_result) do
{
total: {subscription: 1, fingerprints: 2, subscriptions: 2, channel: 1},
total: {subscription: 2, fingerprints: 2, subscriptions: 2, channel: 2},
subscriptions: {
"productCreated" => 1,
"productUpdated" => 1
Expand Down