Skip to content

feat: WebSocket 接続失敗時のポーリングによるイベント通知 #14

@takaokouji

Description

@takaokouji

概要

ネットワークフィルタにより AppSync の WebSocket プロトコル (wss://) が使用できない環境に対応するため、ポーリングによるイベント通知機能を実装します。

グループ作成時に接続方法(WebSocket or ポーリング)を選択し、メンバーはグループの設定に従って適切なプロトコルを使用します。

背景

現在の Mesh v2 実装では、リアルタイムイベント通知に AppSync の WebSocket Subscription を使用しています。しかし、企業や教育機関などのネットワーク環境では、セキュリティポリシーにより WebSocket 接続がブロックされる場合があります。

このような環境でも Mesh 拡張機能を使用できるよう、グループ作成時に接続方法を選択し、WebSocket が使えない場合はポーリングベースのプロトコルに切り替える仕組みを実装します。

要件

機能要件

1. グループ作成時の接続方法選択

  • ホストは createGroup 実行時に useWebSocket フラグを指定
  • フラグはグループに保存され、全メンバーに適用
  • コスト削減のため、WebSocket が使える環境では WebSocket を優先

2. 2つのプロトコル

プロトコル A: WebSocket 使用(既存)
  • イベント送信: fireEventsByNode mutation(None DataSource)
  • イベント受信: onMessageInGroup subscription(WebSocket)
  • 特徴: リアルタイム、低コスト
プロトコル B: ポーリング使用(新規)
  • イベント送信: recordEventsByNode mutation(DynamoDB に保存)
  • イベント受信: getEventsSince query(ポーリング、2秒間隔)
  • 特徴: WebSocket 不要、ネットワークフィルタ対応

3. メンバーの接続方法判定

  • メンバーは joinGroup のレスポンスで useWebSocket フラグを取得
  • フラグに応じてイベント発火・受信方法を切り替え

4. イベントの永続化(ポーリング時のみ)

  • recordEventsByNode mutation 実行時に DynamoDB にイベントを保存
  • TTL を 10秒に設定し、自動削除

5. ポーリング間隔の設定

  • ポーリング間隔は環境変数で設定(デフォルト: 2秒)
  • サーバーからのレスポンスに間隔を含める

現在の実装

fireEventsByNode mutation

ファイル: infra/mesh-v2/js/resolvers/Mutation.fireEventsByNode.js

None DataSource を使用し、DynamoDB にイベントを保存せず、WebSocket Subscription にのみ配信します。

export function response(ctx) {
    // MeshMessage型を返す(batchEventフィールドのみ設定)
    return {
        groupId: groupId,
        domain: domain,
        nodeStatus: null,
        batchEvent: { /* ... */ },
        groupDissolve: null
    };
}

この実装は変更しません。 WebSocket 使用時は従来通りこの mutation を使用します。

DynamoDB テーブル設計

Event エンティティの設計は既に定義されています:

  • PK: GROUP#{groupId}@{domain}
  • SK: EVENT#{timestamp}#{eventId}
  • TTL: 設定可能(現在は未使用)

参照: infra/mesh-v2/docs/architecture.md (line 364-386)

変更が必要な箇所

サーバーサイド (infra/mesh-v2)

1. 環境変数の追加

ファイル: infra/mesh-v2/.env.example

# Event Polling Settings
# Event TTL in seconds (events are auto-deleted after this time)
MESH_EVENT_TTL_SECONDS=10

# Polling interval for clients that cannot use WebSocket (in seconds)
MESH_POLLING_INTERVAL_SECONDS=2

2. GraphQL スキーマの変更

ファイル: infra/mesh-v2/graphql/schema.graphql

# Group 型にフラグを追加
type Group {
  id: ID!
  domain: String!
  fullId: String!
  name: String!
  hostId: ID!
  createdAt: AWSDateTime!
  expiresAt: AWSDateTime!
  heartbeatIntervalSeconds: Int
  useWebSocket: Boolean!       # NEW: WebSocket 使用フラグ
  pollingIntervalSeconds: Int  # NEW: ポーリング間隔(useWebSocket=false の場合のみ)
}

# Node 型にフラグを追加
type Node {
  id: ID!
  name: String!
  groupId: ID
  domain: String
  expiresAt: AWSDateTime
  heartbeatIntervalSeconds: Int
  useWebSocket: Boolean        # NEW: グループの設定を継承
  pollingIntervalSeconds: Int  # NEW: ポーリング間隔(useWebSocket=false の場合のみ)
}

# Mutation にパラメーターとエンドポイントを追加
type Mutation {
  # グループ管理
  createGroup(
    name: String!
    hostId: ID!
    domain: String!
    maxConnectionTimeSeconds: Int
    useWebSocket: Boolean!     # NEW: WebSocket 使用フラグ
  ): Group!

  # 既存の mutation...

  # NEW: ポーリング用のイベント記録 mutation
  recordEventsByNode(
    groupId: ID!
    domain: String!
    nodeId: ID!
    events: [EventInput!]!
  ): RecordEventsPayload!
}

# NEW: recordEventsByNode のレスポンス型
type RecordEventsPayload {
  groupId: ID!
  domain: String!
  recordedCount: Int!       # 記録したイベント数
  timestamp: AWSDateTime!   # サーバー側のリクエスト受信日時(getEventsSince の since 基準)
}

# Query にポーリング用エンドポイントを追加
type Query {
  # 既存の Query...

  # NEW: 前回取得日時以降のイベントを取得(ポーリング用)
  # since: サーバー側のタイムスタンプ(RecordEventsPayload.timestamp)を指定
  getEventsSince(
    groupId: ID!
    domain: String!
    since: AWSDateTime!
  ): [Event!]!
}

3. createGroup mutation の実装変更

ファイル: infra/mesh-v2/lambda/handlers/appsync_handler.rb (または該当 Lambda)

変更内容:

  • useWebSocket パラメーターを受け取る
  • DynamoDB の Group アイテムに useWebSocketpollingIntervalSeconds を保存
  • レスポンスに両フィールドを含める

実装例:

def handle_create_group(arguments)
  use_websocket = arguments['useWebSocket']
  polling_interval = use_websocket ? nil : ENV['MESH_POLLING_INTERVAL_SECONDS'].to_i

  group = {
    # 既存のフィールド...
    'useWebSocket' => use_websocket,
    'pollingIntervalSeconds' => polling_interval
  }

  # DynamoDB に保存...
  group
end

4. joinGroup mutation の実装変更

ファイル: infra/mesh-v2/js/resolvers/Mutation.joinGroup.js (または該当ファイル)

変更内容:

  • グループ情報から useWebSocketpollingIntervalSeconds を取得
  • レスポンスに両フィールドを含める

5. recordEventsByNode mutation の実装(新規)

ファイル: infra/mesh-v2/js/resolvers/Mutation.recordEventsByNode.js (新規作成)

実装内容:

  1. グループ存在確認(Pipeline Function で checkGroupExists を再利用)
  2. イベントを DynamoDB に保存(BatchWriteItem)
  3. サーバー側で timestamp を生成util.time.nowISO8601()
  4. TTL を 現在時刻 + MESH_EVENT_TTL_SECONDS に設定
  5. RecordEventsPayload を返す(Subscription は発火しない)

重要: クライアント側の event.firedAt使用しません。サーバー側でリクエストを受信した日時を timestamp として記録します。これは、デバイス上でのイベント発火から recordEventsByNode 呼び出しまでに既に 1秒程度の遅延があるためです。

実装方針:

  • Pipeline Resolver
    • Function 1: グループ存在確認
    • Function 2: イベント保存(新規作成)
    • Response Resolver: RecordEventsPayload を返す

DynamoDB アイテム構造:

{
  "pk": "GROUP#[email protected]",
  "sk": "EVENT#2026-01-11T12:34:56.789Z#evt-uuid",
  "eventName": "button_clicked",
  "firedByNodeId": "node-001",
  "groupId": "abc123",
  "domain": "192.168.1.1",
  "payload": "{\"button\":\"A\"}",
  "timestamp": "2026-01-11T12:34:56.789Z",  // サーバー側のリクエスト受信日時
  "ttl": 1736599506
}

実装例:

// Function 2: イベント保存
export function request(ctx) {
  const { groupId, domain, nodeId, events } = ctx.arguments;
  const serverTimestamp = util.time.nowISO8601();  // サーバー側のタイムスタンプ
  const ttl = util.time.nowEpochSeconds() + parseInt(ctx.env.MESH_EVENT_TTL_SECONDS);

  const items = events.map(event => ({
    pk: `GROUP#${groupId}@${domain}`,
    sk: `EVENT#${serverTimestamp}#${util.autoId()}`,
    eventName: event.eventName,
    firedByNodeId: nodeId,
    groupId: groupId,
    domain: domain,
    payload: event.payload || null,
    timestamp: serverTimestamp,  // サーバー側のタイムスタンプを使用
    ttl: ttl
  }));

  return {
    operation: 'BatchWriteItem',
    tables: {
      [ctx.env.DYNAMODB_TABLE_NAME]: items
    }
  };
}

// Response Resolver
export function response(ctx) {
  const serverTimestamp = util.time.nowISO8601();

  return {
    groupId: ctx.arguments.groupId,
    domain: ctx.arguments.domain,
    recordedCount: ctx.arguments.events.length,
    timestamp: serverTimestamp  // クライアントが次回の since として使用
  };
}

6. getEventsSince Query の実装(新規)

ファイル: infra/mesh-v2/js/resolvers/Query.getEventsSince.js (新規作成)

実装内容:

重要: since パラメーターは、サーバー側の timestamprecordEventsByNode のリクエスト受信日時)と比較します。クライアント側のイベント発火日時(event.firedAt)ではありません。

export function request(ctx) {
  const { groupId, domain, since } = ctx.arguments;

  return {
    operation: 'Query',
    query: {
      expression: 'pk = :pk AND sk > :sk',
      expressionValues: {
        ':pk': util.dynamodb.toDynamoDB(`GROUP#${groupId}@${domain}`),
        // since はサーバー側のタイムスタンプ(RecordEventsPayload.timestamp)
        ':sk': util.dynamodb.toDynamoDB(`EVENT#${since}`)
      }
    },
    limit: 100,
    scanIndexForward: true  // timestamp でソート
  };
}

export function response(ctx) {
  if (ctx.error) {
    util.error(ctx.error.message, ctx.error.type);
  }

  // Event[] を返す
  return ctx.result.items.map(item => ({
    name: item.eventName,
    firedByNodeId: item.firedByNodeId,
    groupId: item.groupId,
    domain: item.domain,
    payload: item.payload,
    timestamp: item.timestamp  // サーバー側のタイムスタンプ
  }));
}

7. CDK Stack の変更

ファイル: infra/mesh-v2/lib/mesh-v2-stack.ts

変更内容:

  • 環境変数 MESH_EVENT_TTL_SECONDS, MESH_POLLING_INTERVAL_SECONDS を読み込み
  • AppSync Resolver の環境変数に追加
  • recordEventsByNode Resolver と getEventsSince Resolver を追加

クライアントサイド (gui/smalruby3-gui)

対象ファイル: src/extensions/scratch3_mesh/index.js (推定)

1. グループ作成時の WebSocket 可否判定

実装内容:

  1. WebSocket 接続を試行(テスト接続)
  2. 接続成功: useWebSocket = true
  3. 接続失敗(4xx 以外のエラー): useWebSocket = false
  4. createGroupuseWebSocket パラメーターを渡す

2. メンバーの接続方法判定

実装内容:

  1. joinGroup のレスポンスから useWebSocketpollingIntervalSeconds を取得
  2. フラグに応じてイベント発火・受信方法を切り替え

3. イベント発火方法の切り替え

実装内容:

if (useWebSocket) {
  // プロトコル A: 従来通り
  await client.fireEventsByNode(nodeId, groupId, domain, events);
} else {
  // プロトコル B: ポーリング用
  const result = await client.recordEventsByNode(nodeId, groupId, domain, events);
  // result.timestamp を保存(次回の getEventsSince の since として使用可能)
}

4. イベント受信方法の切り替え

実装内容:

if (useWebSocket) {
  // プロトコル A: WebSocket Subscription
  client.subscribeToMessageInGroup(groupId, domain, {
    onBatchEvent: (batchEvent) => { /* handle events */ }
  });
} else {
  // プロトコル B: ポーリング
  // 重要: lastFetchTime はサーバー側のタイムスタンプを使用
  let lastFetchTime = new Date().toISOString();

  setInterval(async () => {
    const events = await client.getEventsSince(groupId, domain, lastFetchTime);

    if (events.length > 0) {
      events.forEach(event => { /* handle event */ });
      // サーバー側のタイムスタンプを次回の since として使用
      lastFetchTime = events[events.length - 1].timestamp;
    }
  }, pollingIntervalSeconds * 1000);
}

実装案の詳細

プロトコル比較

項目 プロトコル A (WebSocket) プロトコル B (ポーリング)
イベント送信 fireEventsByNode (None DataSource) recordEventsByNode (DynamoDB 保存)
イベント受信 onMessageInGroup subscription getEventsSince query (2秒ごと)
レイテンシ 低(リアルタイム) 中(最大2秒の遅延)
ネットワーク要件 WebSocket 接続必須 HTTPS のみ
コスト 低(Subscription 課金) 中(Query 課金 + DynamoDB ストレージ)
DynamoDB 使用 なし あり(TTL 10秒で自動削除)
タイムスタンプ クライアント側(event.firedAt) サーバー側(リクエスト受信日時)

データフロー

プロトコル A: WebSocket 使用時(既存、変更なし)

Client --> fireEventsByNode mutation --> AppSync (None DataSource)
                                           |
                                           v
                                    Subscription (onMessageInGroup)
                                           |
                                           v
                                    Other Clients (リアルタイム配信)

プロトコル B: ポーリング使用時(新規)

[イベント送信]
Client --> recordEventsByNode mutation --> AppSync --> DynamoDB (イベント保存、TTL=10秒)
   (event.firedAt は使用しない)                         (サーバー側で timestamp を生成)

[イベント受信]
Client --> getEventsSince query (2秒ごと) --> AppSync --> DynamoDB (since 以降のイベント取得)
   (since = サーバー側のタイムスタンプ)                      (timestamp で比較)
                                                              |
                                                              v
                                                        Client (イベント処理)

グループ作成フロー

1. Host: WebSocket 接続テスト
   ↓
2. Host: createGroup(name, hostId, domain, useWebSocket=<結果>)
   ↓
3. Server: Group に useWebSocket フラグを保存
   ↓
4. Server: Response に useWebSocket と pollingIntervalSeconds を含める
   ↓
5. Host: フラグに応じてプロトコルを選択

メンバー参加フロー

1. Member: joinGroup(groupId, nodeId, domain)
   ↓
2. Server: Group から useWebSocket と pollingIntervalSeconds を取得
   ↓
3. Server: Response に useWebSocket と pollingIntervalSeconds を含める
   ↓
4. Member: フラグに応じてプロトコルを選択

技術的な考慮事項

1. プロトコル選択の責任分担

  • Host: WebSocket 接続テストを実行し、グループ作成時に useWebSocket を決定
  • Server: グループに設定を保存し、全メンバーに通知
  • Member: グループの設定に従い、適切なプロトコルを使用

2. コスト最適化

  • WebSocket 優先: 接続可能な環境では WebSocket を使用(既存プロトコル)
  • 短い TTL: ポーリング時のイベントは 10秒で自動削除し、ストレージコストを最小化
  • 適切なポーリング間隔: 2秒間隔により、リアルタイム性とコストのバランスを取る

3. TTL (Time To Live) - ポーリング時のみ

  • 設定値: 10秒
  • 理由:
    • ポーリング間隔が 2秒のため、最大 5 回のポーリングでイベントが取得される
    • 短い TTL により DynamoDB のストレージコストを最小化
    • イベントは「通知」が目的であり、長期保存は不要

4. イベントの重複防止

  • クライアント側で前回取得日時 (lastFetchTime) を保持
  • since パラメーターで前回取得日時以降のイベントのみを取得
  • timestamp でソートし、順序を保証

5. スケーラビリティ

  • 最大イベント数: getEventsSince で最大 100 件取得
  • バッチサイズ: recordEventsByNode は最大 1,000 件のイベントをサポート(fireEventsByNode と同様)
  • 同時接続: ポーリングは WebSocket よりも接続数の制限が緩い

6. タイムスタンプの扱い(重要)

背景

デバイス上でのイベント発火から recordEventsByNode 呼び出しまでに、既に 1秒程度の遅延が発生します。このため、クライアント側の event.firedAt をそのまま使用すると、以下の問題が発生します:

  • クライアント側の時計のズレによる順序の不整合
  • ネットワーク遅延を考慮できない
  • getEventsSincesince パラメーターとの整合性が取れない

解決策

サーバー側でタイムスタンプを生成することで、以下のメリットがあります:

  1. 時刻の一貫性: サーバー側の時計を基準とするため、クライアント間の時刻のズレがない
  2. 順序の保証: リクエスト受信順にイベントが記録される
  3. since パラメーターとの整合性: getEventsSincesince はサーバー側のタイムスタンプと直接比較できる

実装上の注意点

  • recordEventsByNode mutation は、クライアント側の event.firedAt無視し、サーバー側で util.time.nowISO8601() を使用してタイムスタンプを生成
  • RecordEventsPayload.timestamp は、サーバー側のリクエスト受信日時を返す
  • クライアントは RecordEventsPayload.timestamp または getEventsSince で取得した最後のイベントの timestamp を、次回の since として使用
  • getEventsSincesince パラメーターは、サーバー側のタイムスタンプ(Event.timestamp)と比較される

タイムスタンプフロー

1. Client: イベント発火(デバイス上の時刻: T0)
   ↓
2. Client: recordEventsByNode 呼び出し(T0 + 1秒の遅延)
   event.firedAt = T0 (サーバーでは使用されない)
   ↓
3. Server: リクエスト受信(サーバー時刻: T1)
   timestamp = util.time.nowISO8601() = T1
   DynamoDB に timestamp = T1 として保存
   ↓
4. Server: RecordEventsPayload.timestamp = T1 を返す
   ↓
5. Client: 次回のポーリングで since = T1 を指定
   ↓
6. Server: getEventsSince(since = T1)
   DynamoDB Query: timestamp > T1 のイベントを取得

テスト計画

単体テスト

  1. createGroup mutation

    • useWebSocket フラグが DynamoDB に保存されること
    • pollingIntervalSeconds が正しく設定されること(useWebSocket=false の場合)
  2. joinGroup mutation

    • グループの useWebSocketpollingIntervalSeconds がレスポンスに含まれること
  3. recordEventsByNode mutation

    • DynamoDB にイベントが保存されること
    • timestamp がサーバー側で生成されること(util.time.nowISO8601()
    • クライアント側の event.firedAt が無視されること
    • TTL が正しく設定されること(現在時刻 + 10秒)
    • グループが存在しない場合、エラーが返ること
  4. getEventsSince query

    • 指定日時(サーバー側のタイムスタンプ)以降のイベントが取得されること
    • 最大 100 件まで取得されること
    • イベントが timestamp でソートされていること

結合テスト

  1. プロトコル A (WebSocket)

    • ホストが useWebSocket=true でグループ作成
    • メンバーが joinGroup で useWebSocket=true を取得
    • fireEventsByNode でイベント送信
    • onMessageInGroup subscription でリアルタイム受信
  2. プロトコル B (ポーリング)

    • ホストが useWebSocket=false でグループ作成
    • メンバーが joinGroup で useWebSocket=false を取得
    • recordEventsByNode でイベント送信
    • getEventsSince でポーリング受信(2秒間隔)
    • since パラメーターがサーバー側のタイムスタンプで正しく機能すること
  3. 混在環境

    • 1つのグループ内で全メンバーが同じプロトコルを使用すること
    • プロトコル A のグループとプロトコル B のグループが並立できること
  4. TTL 動作確認

    • 10秒後にイベントが自動削除されること(ポーリング時のみ)
    • 削除されたイベントは getEventsSince で取得されないこと
  5. タイムスタンプの整合性確認

    • 複数のクライアントから送信されたイベントが、サーバー側のタイムスタンプで正しく順序付けられること
    • クライアント側の時計がズレていても、正しくイベントが取得されること

完了条件

  • サーバーサイド実装完了
    • 環境変数の追加
    • GraphQL スキーマの変更
    • createGroup mutation の実装変更(useWebSocket パラメーター対応)
    • joinGroup mutation の実装変更(useWebSocket をレスポンスに含める)
    • recordEventsByNode mutation の実装(新規、サーバー側タイムスタンプ生成)
    • getEventsSince query の実装(新規、サーバー側タイムスタンプで比較)
    • CDK Stack の変更
  • クライアントサイド実装完了
    • WebSocket 接続テスト機能
    • グループ作成時の useWebSocket 判定
    • メンバー参加時の useWebSocket 取得
    • イベント発火方法の切り替え(fireEventsByNode / recordEventsByNode)
    • イベント受信方法の切り替え(subscription / polling)
    • サーバー側タイムスタンプを使用した lastFetchTime の管理
  • テスト完了
    • 単体テスト(タイムスタンプ生成の確認を含む)
    • 結合テスト(プロトコル A, B, 混在環境、タイムスタンプ整合性)
    • ネットワークフィルタ環境での動作確認
  • ドキュメント更新
    • README.md にポーリング機能の説明を追加
    • API Reference の更新(recordEventsByNode, getEventsSince)
    • architecture.md にプロトコル選択フローを追加
    • タイムスタンプの扱いに関する技術ドキュメント追加

参考資料

  • 現在の実装:
    • infra/mesh-v2/js/resolvers/Mutation.fireEventsByNode.js(変更なし)
    • infra/mesh-v2/graphql/schema.graphql
    • infra/mesh-v2/docs/architecture.md (line 162-190, 364-386)
  • DynamoDB テーブル設計: infra/mesh-v2/docs/architecture.md (line 257-399)
  • クライアント実装例: infra/mesh-v2/examples/javascript-client/mesh-client.js

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions