Skip to content

fix: unify mesh subscriptions and enable proper filtering (Issue #500)#31

Merged
takaokouji merged 6 commits intomainfrom
fix/issue-500-unify-mesh-subscriptions
Jan 3, 2026
Merged

fix: unify mesh subscriptions and enable proper filtering (Issue #500)#31
takaokouji merged 6 commits intomainfrom
fix/issue-500-unify-mesh-subscriptions

Conversation

@takaokouji
Copy link
Contributor

This PR implements the unified subscription model for Mesh v2 and fixes subscription filtering issues.

Summary of Changes

  1. Unified Subscription Type: Implemented a single type that wraps , , and fields. This ensures message ordering and simplifies client-side implementation.
  2. Subscription Filtering: Added top-level and fields to the type and updated all relevant resolvers (JS and Lambda) to populate these fields. This is required for AppSync's @aws_subscribe filtering to work correctly.
  3. Test Improvements:
    • Updated all integration tests (58 cases) to match the new response structure.
    • Added a suppress_errors option to the execute_graphql helper to clean up test output by silencing expected errors in negative test cases.
    • Fixed and verified WebSocket E2E tests.

Related Issues

Verification

  • All 58 integration tests passed successfully.
  • WebSocket E2E tests confirmed proper message delivery and filtering.

🤖 Generated with Gemini Code
Co-Authored-By: Gemini noreply@google.com

takaokouji and others added 3 commits January 2, 2026 23:18
Implements GitHub Issue #500 proposal to consolidate all group messages
into a single subscription (onMessageInGroup) using a wrapper type approach.

Changes:
- GraphQL Schema: Changed from Union to single MeshMessage type with
  nodeStatus, batchEvent, and groupDissolve fields
- Resolvers: Updated to return MeshMessage wrapper with only relevant
  field populated (others set to null)
- Mutations: reportDataByNode, fireEventsByNode, and dissolveGroup now
  return MeshMessage instead of individual types
- Integration Tests: Updated 56 test cases to handle new response structure

Technical Details:
- AppSync does not support Union types in Subscriptions, so we use a
  single type with optional fields as a workaround
- Client-side routing based on which field is non-null instead of __typename
- Maintains order guarantee: single WebSocket stream ensures mutations
  arrive in execution order

Test Results:
- 56/58 integration tests passing
- 2 WebSocket E2E tests require further investigation

Related: #500

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
… filtering

AppSync @aws_subscribe filtering requires fields at the top level of
the mutation return value. Added groupId and domain to MeshMessage
type to enable proper subscription filtering.

Changes:
- Schema: Added groupId and domain fields to MeshMessage
- Resolvers: Updated all three resolvers to include top-level fields
- Tests: Updated test expectations to validate new fields
- E2E: Fixed WebSocket subscription tests

Fixes: #500

🤖 Generated with [Gemini Code](https://gemini.google.com/code)

Co-Authored-By: Gemini <noreply@google.com>
- Updated execute_graphql helper to support suppress_errors option
- Enabled error suppression in negative test cases to clean up test output
- Ran standardrb --fix to ensure code style compliance

🤖 Generated with [Gemini Code](https://gemini.google.com/code)

Co-Authored-By: Gemini <noreply@google.com>
@takaokouji
Copy link
Contributor Author

JavaScript Prototypeへの変更反映手順

PR#31の統合サブスクリプション変更を infra/mesh-v2/examples/javascript-client/ に反映させる作業手順です。

📋 変更が必要なファイル

  1. mesh-client.js - GraphQLクライアントライブラリ
  2. app.js - アプリケーションロジック

🔍 現在の実装(PR#31適用前)

現在のプロトタイプは、以下の3つの個別サブスクリプションを使用しています:

// mesh-client.js (lines 398-549)
subscribeToDataUpdates(groupId, domain, callback)      // onDataUpdateInGroup
subscribeToBatchEvents(groupId, domain, callback)      // onBatchEventInGroup
subscribeToGroupDissolve(groupId, domain, callback)    // onGroupDissolve

app.jsでの使用箇所:

  • Line 279-283: データ更新サブスクリプション(グループ作成時)
  • Line 418-429: データ更新 + バッチイベントサブスクリプション(グループ参加時)
  • Line 432-436: グループ解散サブスクリプション(グループ参加時)

✅ 実装手順

Step 1: mesh-client.js の変更

1.1 GraphQL Mutationクエリの更新

3つのmutationクエリをMeshMessage形式に変更:

reportDataByNode (lines 336-359):

async reportDataByNode(nodeId, groupId, domain, data) {
  const query = `
    mutation ReportDataByNode($nodeId: ID!, $groupId: ID!, $domain: String!, $data: [SensorDataInput!]!) {
      reportDataByNode(nodeId: $nodeId, groupId: $groupId, domain: $domain, data: $data) {
        groupId
        domain
        nodeStatus {
          nodeId
          groupId
          domain
          data {
            key
            value
          }
          timestamp
        }
      }
    }
  `;

  const result = await this.execute(query, {
    nodeId,
    groupId,
    domain: domain || this.domain,
    data
  });

  return result.reportDataByNode;
}

fireEventsByNode (lines 365-393):

async fireEventsByNode(nodeId, groupId, domain, events) {
  const query = `
    mutation FireEventsByNode($nodeId: ID!, $groupId: ID!, $domain: String!, $events: [EventInput!]!) {
      fireEventsByNode(nodeId: $nodeId, groupId: $groupId, domain: $domain, events: $events) {
        groupId
        domain
        batchEvent {
          events {
            name
            firedByNodeId
            groupId
            domain
            payload
            timestamp
          }
          firedByNodeId
          groupId
          domain
          timestamp
        }
      }
    }
  `;

  const result = await this.execute(query, {
    nodeId,
    groupId,
    domain: domain || this.domain,
    events
  });

  return result.fireEventsByNode;
}

dissolveGroup (lines 312-330):

async dissolveGroup(groupId, hostId, domain) {
  const query = `
    mutation DissolveGroup($groupId: ID!, $hostId: ID!, $domain: String!) {
      dissolveGroup(groupId: $groupId, hostId: $hostId, domain: $domain) {
        groupId
        domain
        groupDissolve {
          groupId
          domain
          message
        }
      }
    }
  `;

  const data = await this.execute(query, {
    groupId,
    hostId,
    domain: domain || this.domain
  });

  return data.dissolveGroup;
}
1.2 統合サブスクリプションメソッドの追加

3つの個別メソッド(lines 398-549)を削除し、以下に置き換え:

/**
 * Subscribe to all group messages via unified subscription
 * @param {string} groupId - Group ID
 * @param {string} domain - Domain
 * @param {Object} callbacks - Callback functions for each message type
 * @param {Function} callbacks.onDataUpdate - Called when nodeStatus is received
 * @param {Function} callbacks.onBatchEvent - Called when batchEvent is received
 * @param {Function} callbacks.onGroupDissolve - Called when groupDissolve is received
 * @returns {string} Subscription ID
 */
subscribeToMessageInGroup(groupId, domain, callbacks) {
  console.log('Subscription: onMessageInGroup', { groupId, domain });

  const subscriptionId = `message-${groupId}`;

  // GraphQL subscription query - unified
  const subscription = `
    subscription OnMessageInGroup($groupId: ID!, $domain: String!) {
      onMessageInGroup(groupId: $groupId, domain: $domain) {
        groupId
        domain
        nodeStatus {
          nodeId
          groupId
          domain
          data {
            key
            value
          }
          timestamp
        }
        batchEvent {
          events {
            name
            firedByNodeId
            groupId
            domain
            payload
            timestamp
          }
          firedByNodeId
          groupId
          domain
          timestamp
        }
        groupDissolve {
          groupId
          domain
          message
        }
      }
    }
  `;

  // Subscribe using Amplify
  const sub = this.graphqlClient.graphql({
    query: subscription,
    variables: { groupId, domain: domain || this.domain }
  }).subscribe({
    next: ({ data }) => {
      console.log('Unified subscription data received:', data);
      
      if (!data || !data.onMessageInGroup) return;
      
      const message = data.onMessageInGroup;

      // Route to appropriate callback based on which field is non-null
      if (message.nodeStatus && callbacks.onDataUpdate) {
        // When nodeStatus is received, fetch all group statuses
        this.listGroupStatuses(groupId, domain || this.domain)
          .then(statuses => callbacks.onDataUpdate(statuses))
          .catch(error => console.error('Error fetching group statuses:', error));
      } else if (message.batchEvent && callbacks.onBatchEvent) {
        callbacks.onBatchEvent(message.batchEvent);
      } else if (message.groupDissolve && callbacks.onGroupDissolve) {
        callbacks.onGroupDissolve(message.groupDissolve);
      }
    },
    error: (error) => {
      console.error('Unified subscription error:', error);
      if (error.errors && error.errors.length > 0) {
        console.error('GraphQL errors:', error.errors);
        error.errors.forEach(err => {
          console.error('- Error:', err.message);
          if (err.path) console.error('  Path:', err.path);
          if (err.locations) console.error('  Locations:', err.locations);
        });
      }
    }
  });

  this.subscriptions.set(subscriptionId, sub);

  return subscriptionId;
}

Step 2: app.js の変更

2.1 State定義の変更 (lines 18-20)

3つの個別サブスクリプションIDを1つに統合:

// Before (削除)
dataSubscriptionId: null,
batchEventSubscriptionId: null,
dissolveSubscriptionId: null,

// After (追加)
messageSubscriptionId: null,
2.2 グループ作成時のサブスクリプション (lines 279-283)
// Before (削除)
state.dataSubscriptionId = state.client.subscribeToDataUpdates(
  state.currentGroup.id,
  state.currentGroup.domain,
  displayOtherNodesData
);

// After (追加)
state.messageSubscriptionId = state.client.subscribeToMessageInGroup(
  state.currentGroup.id,
  state.currentGroup.domain,
  {
    onDataUpdate: displayOtherNodesData,
    onBatchEvent: handleBatchEventReceived,
    onGroupDissolve: handleGroupDissolved
  }
);
2.3 グループ参加時のサブスクリプション (lines 418-436)
// Before (削除)
state.dataSubscriptionId = state.client.subscribeToDataUpdates(
  state.currentGroup.id,
  state.currentGroup.domain,
  displayOtherNodesData
);

state.batchEventSubscriptionId = state.client.subscribeToBatchEvents(
  state.currentGroup.id,
  state.currentGroup.domain,
  handleBatchEventReceived
);

state.dissolveSubscriptionId = state.client.subscribeToGroupDissolve(
  state.currentGroup.id,
  state.currentGroup.domain,
  handleGroupDissolved
);

// After (追加)
state.messageSubscriptionId = state.client.subscribeToMessageInGroup(
  state.currentGroup.id,
  state.currentGroup.domain,
  {
    onDataUpdate: displayOtherNodesData,
    onBatchEvent: handleBatchEventReceived,
    onGroupDissolve: handleGroupDissolved
  }
);
2.4 サブスクリプション解除箇所の変更

以下の箇所で3つの個別解除を1つに統合:

handleLeaveGroup (lines 468-483):

// Before (削除)
if (state.dataSubscriptionId) {
  state.client.unsubscribe(state.dataSubscriptionId);
  state.dataSubscriptionId = null;
}
if (state.batchEventSubscriptionId) {
  state.client.unsubscribe(state.batchEventSubscriptionId);
  state.batchEventSubscriptionId = null;
}
if (state.dissolveSubscriptionId) {
  state.client.unsubscribe(state.dissolveSubscriptionId);
  state.dissolveSubscriptionId = null;
}

// After (追加)
if (state.messageSubscriptionId) {
  state.client.unsubscribe(state.messageSubscriptionId);
  state.messageSubscriptionId = null;
}

同様の変更を以下の関数にも適用:

  • handleDissolveGroup (lines 534-543)
  • handleDisconnect (lines 600-609)
  • handleGroupDissolved (lines 819-832)

Step 3: ビルドとテスト

# ビルド
cd infra/mesh-v2/examples/javascript-client
npm run build

# サーバー起動
npm start

# ブラウザで動作確認
open http://localhost:3000

Step 4: テストシナリオ

  1. データ更新の確認:

    • 2つのブラウザウィンドウで同じグループに参加
    • センサー値を変更し、他のウィンドウで更新が表示されることを確認
  2. バッチイベントの確認:

    • "Send 3 Events Batch" ボタンをクリック
    • イベント履歴に3つのイベントが表示されることを確認
    • 他のウィンドウでもイベントを受信することを確認
  3. グループ解散の確認:

    • ホストウィンドウで "Dissolve Group" をクリック
    • メンバーウィンドウで自動的に切断されることを確認

🎯 期待される結果

  • ✅ 単一のWebSocket接続で3種類のメッセージを受信
  • ✅ メッセージの順序保証(単一ストリーム)
  • ✅ 既存の機能がすべて動作
  • ✅ コンソールログに "Unified subscription data received" が表示される

📝 注意事項

  1. 後方互換性なし: この変更により、古いバックエンドとは互換性がなくなります
  2. GraphQLスキーマ依存: stg2環境のスキーマが更新されていることが前提です
  3. エラーハンドリング: 既存のエラーハンドリングロジック(shouldDisconnectOnError等)はそのまま動作します

Related PR: #31
Related Issue: smalruby/smalruby3-gui#500

takaokouji and others added 3 commits January 3, 2026 09:44
- Updated mesh-client.js mutations to return new MeshMessage structure
- Implemented subscribeToMessageInGroup unified subscription method
- Removed deprecated individual subscription methods
- Updated app.js to use the unified subscription and handle message routing
- Verified with stg2 environment and integration tests

Fixes: #500

🤖 Generated with [Gemini Code](https://gemini.google.com/code)

Co-Authored-By: Gemini <noreply@google.com>
… joining

- Refactored event sending from a fixed batch of 3 events to a single event driven by form inputs
- Removed redundant getGroup query by utilizing information available from joinGroup and selection state
- Updated documentation and UI to reflect the new event sending behavior and unified subscriptions
- Added debug logging for unified subscription message types in mesh-client.js

🤖 Generated with [Gemini Code](https://gemini.google.com/code)

Co-Authored-By: Gemini <noreply@google.com>
- Removed getGroup query from schema.graphql
- Removed getGroup resolver from CDK stack
- Deleted js/resolvers/Query.getGroup.js implementation
- Updated request specs to use listGroupsByDomain instead of getGroup
- Deleted get_group.graphql test fixture
- Updated documentation (API reference, Architecture, Operations)
- Verified with standardrb and cdk synth

🤖 Generated with [Gemini Code](https://gemini.google.com/code)

Co-Authored-By: Gemini <noreply@google.com>
@takaokouji takaokouji merged commit a749611 into main Jan 3, 2026
3 checks passed
@takaokouji takaokouji deleted the fix/issue-500-unify-mesh-subscriptions branch January 3, 2026 02:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant