diff --git a/lib/graphql/subscriptions/anycable_subscriptions.rb b/lib/graphql/subscriptions/anycable_subscriptions.rb index 34c3669..c514b66 100644 --- a/lib/graphql/subscriptions/anycable_subscriptions.rb +++ b/lib/graphql/subscriptions/anycable_subscriptions.rb @@ -97,10 +97,24 @@ def execute_all(event, object) def execute_grouped(fingerprint, subscription_ids, event, object) return if subscription_ids.empty? - subscription_id = with_redis { |redis| subscription_ids.find { |sid| redis.exists?(redis_key(SUBSCRIPTION_PREFIX) + sid) } } - return unless subscription_id # All subscriptions has expired but haven't cleaned up yet + result = nil + + # Iterate through all subscriptions to find the subscription which: + # 1. still exists in Redis + # 2. got a result when updated with the event + # This protects in cases where a subscription could expire between checking a subscription existing and + # update execution + # We need only one working subscription, because the result will be shared with all subscribers + with_redis do |redis| + subscription_ids.each do |sid| + next unless redis.exists?(redis_key(SUBSCRIPTION_PREFIX) + sid) + + result = execute_update(sid, event, object) + + break if result + end + end - result = execute_update(subscription_id, event, object) return unless result # Having calculated the result _once_, send the same payload to all subscribers @@ -165,12 +179,17 @@ def read_subscription(subscription_id) redis.mapped_hmget( "#{redis_key(SUBSCRIPTION_PREFIX)}#{subscription_id}", :query_string, :variables, :context, :operation_name - ).tap do |subscription| - next if subscription.values.all?(&:nil?) # Redis returns hash with all nils for missing key + ).then do |subscription| + # Redis returns hash with all nils for missing key + return nil if subscription.values.all?(&:nil?) + # query_string is a required field for executing a subscription, so we should be sure that it exists + return nil if subscription[:query_string].nil? subscription[:context] = @serializer.load(subscription[:context]) subscription[:variables] = JSON.parse(subscription[:variables]) - subscription[:operation_name] = nil if subscription[:operation_name].strip == "" + subscription[:operation_name] = nil if subscription[:operation_name].to_s.strip == "" + + subscription end end end diff --git a/spec/graphql/anycable_spec.rb b/spec/graphql/anycable_spec.rb index e6c7862..b224d19 100644 --- a/spec/graphql/anycable_spec.rb +++ b/spec/graphql/anycable_spec.rb @@ -1,12 +1,14 @@ # frozen_string_literal: true RSpec.describe GraphQL::AnyCable do + let(:operation_name) { "SomeSubscription" } + subject do AnycableSchema.execute( query: query, context: {channel: channel, subscription_id: subscription_id}, variables: {}, - operation_name: "SomeSubscription" + operation_name: operation_name ).tap do |result| expect(result.to_h.fetch("errors", [])).to be_empty end @@ -55,49 +57,55 @@ expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result) end - context "with multiple subscriptions in one query" do - let(:query) do - <<~GRAPHQL - subscription SomeSubscription { - productCreated { id title } - productUpdated { id } - } - GRAPHQL - end + context "with multiple subscriptions in two queries" do + context "product created subscription" do + let(:operation_name) { "ProductCreatedSubscription" } + let(:query) do + <<~GRAPHQL + subscription ProductCreatedSubscription { + productCreated { id title } + } + GRAPHQL + end + let(:expected_result) do + <<~JSON.strip + {"result":{"data":{"productCreated":{"id":"1","title":"Gravizapa"}}},"more":true} + JSON + end - context "triggering update event" do - it "broadcasts message only for update event" do + it "broadcasts message only for create event" do subject - AnycableSchema.subscriptions.trigger(:product_updated, {}, {id: 1, title: "foo"}) + AnycableSchema.subscriptions.trigger(:product_created, {}, {id: 1, title: "Gravizapa"}) expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result) end end - context "triggering create event" do + context "product updated subscription" do + let(:operation_name) { "ProductUpdatedSubscription" } + let(:query) do + <<~GRAPHQL + subscription ProductUpdatedSubscription { + productUpdated { id } + } + GRAPHQL + end + let(:expected_result) do <<~JSON.strip - {"result":{"data":{"productCreated":{"id":"1","title":"Gravizapa"}}},"more":true} + {"result":{"data":{"productUpdated":{"id":"1"}}},"more":true} JSON end - it "broadcasts message only for create event" do + it "broadcasts message for update event" do subject - AnycableSchema.subscriptions.trigger(:product_created, {}, {id: 1, title: "Gravizapa"}) - + AnycableSchema.subscriptions.trigger(:product_updated, {}, {id: 1, title: "foo"}) expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", expected_result) end end end context "with empty operation name" do - subject do - AnycableSchema.execute( - query: query, - context: {channel: channel, subscription_id: subscription_id}, - variables: {}, - operation_name: nil - ) - end + let(:operation_name) { nil } let(:query) do <<~GRAPHQL @@ -118,7 +126,7 @@ query: query, context: {channel: channel, subscription_id: subscription_id}, variables: {}, - operation_name: "SomeSubscription" + operation_name: operation_name ) end @@ -154,7 +162,7 @@ query: query, context: {channel: channel, subscription_id: subscription_id}, variables: {}, - operation_name: "SomeSubscription" + operation_name: operation_name ) end @@ -182,7 +190,7 @@ query: query, context: {}, # Intentionally left blank variables: {}, - operation_name: "SomeSubscription" + operation_name: operation_name ) end diff --git a/spec/graphql/broadcast_spec.rb b/spec/graphql/broadcast_spec.rb index bfbfd5e..5d38988 100644 --- a/spec/graphql/broadcast_spec.rb +++ b/spec/graphql/broadcast_spec.rb @@ -79,4 +79,79 @@ def subscribe(query) expect(AnyCable).to have_received(:broadcast).once end end + + context "when handling race conditions with subscription deleted between checks" do + let(:query) do + <<~GRAPHQL.strip + subscription SomeSubscription { postCreated{ id } } + GRAPHQL + end + + let(:redis) { $redis } + let(:object) { double("Post", id: 1, title: "Racing") } + let(:fingerprint) { ":postCreated:/SomeSubscription/race-condition-test/0/signature456=" } + let(:subscriptions) { BroadcastSchema.subscriptions } + + before do + allow_any_instance_of(GraphQL::Subscriptions::Event).to receive(:fingerprint).and_return(fingerprint) + + 3.times { subscribe(query) } + + @subscription_ids = redis.smembers("graphql-subscriptions:#{fingerprint}") + expect(@subscription_ids.size).to eq(3) + + # Emulate removing a subscription like race condition + allow(subscriptions).to receive(:read_subscription).and_wrap_original do |original, sid| + # Remove first subscription after `checking existing`, but before the read_subscription + if sid == @subscription_ids.first + redis.del("graphql-subscription:#{sid}") + + nil + else + original.call(sid) + end + end + + allow(AnyCable).to receive(:broadcast) + end + + it "handles subscription deleted between exists? check and read_subscription" do + subscriptions.execute_grouped( + fingerprint, + @subscription_ids, + GraphQL::Subscriptions::Event.new( + name: "postCreated", + arguments: {}, + field: BroadcastSchema.subscription.fields["postCreated"], + scope: nil, + context: {} + ), + object + ) + + # We must get broadcast here, because if the first subscription expired, we should process the rest of subscriptions + expect(AnyCable).to have_received(:broadcast).once + expect(AnyCable).to have_received(:broadcast).with("graphql-subscriptions:#{fingerprint}", anything) + end + + it "returns without broadcasting when all subscriptions were deleted between checks" do + # read_subscription always returns nil + allow(subscriptions).to receive(:read_subscription).and_return(nil) + + subscriptions.execute_grouped( + fingerprint, + @subscription_ids, + GraphQL::Subscriptions::Event.new( + name: "postCreated", + arguments: {}, + field: BroadcastSchema.subscription.fields["postCreated"], + scope: nil, + context: {} + ), + object + ) + + expect(AnyCable).not_to have_received(:broadcast) + end + end end diff --git a/spec/graphql/stats_spec.rb b/spec/graphql/stats_spec.rb index 557bb9f..db6b0fa 100644 --- a/spec/graphql/stats_spec.rb +++ b/spec/graphql/stats_spec.rb @@ -2,10 +2,17 @@ RSpec.describe GraphQL::AnyCable::Stats do describe "#collect" do - let(:query) do + let(:created_query) do <<~GRAPHQL - subscription SomeSubscription { + subscription ProductCreatedSubscription { productCreated { id title } + } + GRAPHQL + end + + let(:updated_query) do + <<~GRAPHQL + subscription ProductUpdatedSubscription { productUpdated { id } } GRAPHQL @@ -23,16 +30,23 @@ before do AnycableSchema.execute( - query: query, - context: {channel: channel, subscription_id: subscription_id}, + query: created_query, + context: {channel: channel, subscription_id: "#{subscription_id}-created"}, + variables: {}, + operation_name: "ProductCreatedSubscription" + ) + + AnycableSchema.execute( + query: updated_query, + context: {channel: channel, subscription_id: "#{subscription_id}-updated"}, variables: {}, - operation_name: "SomeSubscription" + operation_name: "ProductUpdatedSubscription" ) end context "when include_subscriptions is false" do let(:expected_result) do - {total: {subscription: 1, fingerprints: 2, subscriptions: 2, channel: 1}} + {total: {subscription: 2, fingerprints: 2, subscriptions: 2, channel: 2}} end it "returns total stat" do @@ -45,7 +59,7 @@ let(:expected_result) do { - total: {subscription: 1, fingerprints: 2, subscriptions: 2, channel: 1}, + total: {subscription: 2, fingerprints: 2, subscriptions: 2, channel: 2}, subscriptions: { "productCreated" => 1, "productUpdated" => 1