Skip to content

Commit 080a72b

Browse files
takaokoujiGemini
andcommitted
feat: implement last data send tracking for order guarantee
- Added lastDataSendPromise to track the most recent data transmission completion - Updated fireEventsBatch to await lastDataSendPromise before sending events - Updated handleDataUpdate and fetchAllNodesData to use server timestamps - Added unit test for data and event ordering (mesh_service_v2_order.js) - Updated mesh_service_v2_timestamp.js to match new server timestamp behavior Related to smalruby/smalruby3-gui#500 🤖 Generated with [Gemini Code](https://gemini.google.com/code) Co-Authored-By: Gemini <noreply@google.com>
1 parent a5bfd1a commit 080a72b

File tree

3 files changed

+179
-30
lines changed

3 files changed

+179
-30
lines changed

src/extensions/scratch3_mesh_v2/mesh-service.js

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ class MeshV2Service {
4848
this.connectionTimer = null;
4949
this.heartbeatTimer = null;
5050
this.dataSyncTimer = null;
51+
52+
// Last data send promise to track completion of the most recent data transmission
53+
this.lastDataSendPromise = Promise.resolve();
54+
5155
this.memberHeartbeatInterval = 120; // Default 2 min
5256

5357
// Data from other nodes: { nodeId: { key: { value: string, timestamp: number } } }
@@ -439,10 +443,13 @@ class MeshV2Service {
439443
this.remoteData[nodeId] = {};
440444
}
441445

446+
// Use server timestamp with fallback to current time
447+
const serverTimestamp = nodeStatus.timestamp ? new Date(nodeStatus.timestamp).getTime() : Date.now();
448+
442449
nodeStatus.data.forEach(item => {
443450
this.remoteData[nodeId][item.key] = {
444451
value: item.value,
445-
timestamp: Date.now() // Add timestamp
452+
timestamp: serverTimestamp
446453
};
447454
});
448455
}
@@ -617,8 +624,8 @@ class MeshV2Service {
617624
if (!this.groupId || !this.client || events.length === 0) return;
618625

619626
try {
620-
// データ送信完了を待つ (Removed to prevent blocking, relying on unified subscription for order)
621-
// await this.dataRateLimiter.waitForCompletion();
627+
// Wait for last data send to complete
628+
await this.lastDataSendPromise;
622629

623630
this.costTracking.mutationCount++;
624631
this.costTracking.fireEventsCount++;
@@ -778,7 +785,9 @@ class MeshV2Service {
778785
}
779786

780787
try {
781-
await this.dataRateLimiter.send(dataArray, this._reportDataBound);
788+
// Save Promise to track completion (including queue time)
789+
this.lastDataSendPromise = this.dataRateLimiter.send(dataArray, this._reportDataBound);
790+
await this.lastDataSendPromise;
782791
} catch (error) {
783792
log.error(`Mesh V2: Failed to send data: ${error}`);
784793
const reason = this.shouldDisconnectOnError(error);
@@ -792,25 +801,43 @@ class MeshV2Service {
792801
* Internal method to send data to the server.
793802
* Used as sendFunction in dataRateLimiter.
794803
* @param {Array} payload - Array of {key, value} objects.
804+
* @returns {Promise} - Resolves with the mutation result.
795805
* @private
796806
*/
797807
async _reportData (payload) {
798-
this.costTracking.mutationCount++;
799-
this.costTracking.reportDataCount++;
800-
await this.client.mutate({
801-
mutation: REPORT_DATA,
802-
variables: {
803-
groupId: this.groupId,
804-
domain: this.domain,
805-
nodeId: this.meshId,
806-
data: payload
807-
}
808-
});
808+
if (!this.groupId || !this.client) return;
809809

810-
// Update last sent data on success
811-
payload.forEach(item => {
812-
this.lastSentData[item.key] = item.value;
813-
});
810+
try {
811+
this.costTracking.mutationCount++;
812+
this.costTracking.reportDataCount++;
813+
814+
// Save Promise to track completion
815+
this.lastDataSendPromise = this.client.mutate({
816+
mutation: REPORT_DATA,
817+
variables: {
818+
groupId: this.groupId,
819+
domain: this.domain,
820+
nodeId: this.meshId,
821+
data: payload
822+
}
823+
});
824+
825+
const result = await this.lastDataSendPromise;
826+
827+
// Update last sent data on success
828+
payload.forEach(item => {
829+
this.lastSentData[item.key] = item.value;
830+
});
831+
832+
return result;
833+
} catch (error) {
834+
log.error(`Mesh V2: Failed to report data: ${error}`);
835+
const reason = this.shouldDisconnectOnError(error);
836+
if (reason) {
837+
this.cleanupAndDisconnect(reason);
838+
}
839+
throw error;
840+
}
814841
}
815842

816843
fireEvent (eventName, payload = '') {
@@ -902,10 +929,11 @@ class MeshV2Service {
902929
if (!this.remoteData[status.nodeId]) {
903930
this.remoteData[status.nodeId] = {};
904931
}
932+
const serverTimestamp = status.timestamp ? new Date(status.timestamp).getTime() : Date.now();
905933
status.data.forEach(item => {
906934
this.remoteData[status.nodeId][item.key] = {
907935
value: item.value,
908-
timestamp: Date.now()
936+
timestamp: serverTimestamp
909937
};
910938
});
911939
});

test/unit/mesh_service_v2_order.js

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
const test = require('tap').test;
2+
const MeshV2Service = require('../../src/extensions/scratch3_mesh_v2/mesh-service');
3+
const {REPORT_DATA, FIRE_EVENTS} = require('../../src/extensions/scratch3_mesh_v2/gql-operations');
4+
5+
const createMockBlocks = () => ({
6+
runtime: {
7+
sequencer: {},
8+
emit: () => {},
9+
on: () => {},
10+
off: () => {},
11+
getTargetForStage: () => ({
12+
variables: {}
13+
})
14+
},
15+
opcodeFunctions: {
16+
event_broadcast: () => {}
17+
}
18+
});
19+
20+
test('MeshV2Service Data and Event Order', t => {
21+
t.test('fireEventsBatch awaits lastDataSendPromise', async st => {
22+
const blocks = createMockBlocks();
23+
const service = new MeshV2Service(blocks, 'node1', 'domain1');
24+
service.stopEventBatchTimer();
25+
service.groupId = 'group1';
26+
27+
let dataMutationStarted = false;
28+
let dataMutationFinished = false;
29+
let eventMutationStarted = false;
30+
31+
service.client = {
32+
mutate: options => {
33+
if (options.mutation === REPORT_DATA) {
34+
dataMutationStarted = true;
35+
return new Promise(resolve => {
36+
setTimeout(() => {
37+
dataMutationFinished = true;
38+
resolve({data: {reportDataByNode: {
39+
nodeId: 'node1',
40+
timestamp: new Date().toISOString(),
41+
data: []
42+
}}});
43+
}, 50); // Delay data mutation
44+
});
45+
}
46+
if (options.mutation === FIRE_EVENTS) {
47+
eventMutationStarted = true;
48+
st.ok(dataMutationStarted, 'Data mutation should have started');
49+
st.ok(dataMutationFinished, 'Data mutation should have finished before event mutation starts');
50+
return Promise.resolve({data: {fireEventsByNode: {}}});
51+
}
52+
return Promise.resolve({});
53+
}
54+
};
55+
56+
// 1. Send data
57+
const dataPromise = service.sendData([{key: 'var1', value: '10'}]);
58+
st.ok(service.lastDataSendPromise, 'lastDataSendPromise should be set');
59+
60+
// 2. Fire event batch immediately (should wait for dataPromise)
61+
const eventPromise = service.fireEventsBatch([{eventName: 'msg1', payload: '', firedAt: 't1'}]);
62+
63+
await Promise.all([dataPromise, eventPromise]);
64+
65+
st.ok(dataMutationFinished, 'Data mutation finished');
66+
st.ok(eventMutationStarted, 'Event mutation started');
67+
68+
st.end();
69+
});
70+
71+
t.test('handleDataUpdate uses server timestamp', st => {
72+
const blocks = createMockBlocks();
73+
const service = new MeshV2Service(blocks, 'node1', 'domain1');
74+
75+
const serverTimestamp = '2025-12-30T12:34:56.789Z';
76+
const expectedTime = new Date(serverTimestamp).getTime();
77+
78+
const nodeStatus = {
79+
nodeId: 'node2',
80+
timestamp: serverTimestamp,
81+
data: [
82+
{key: 'var1', value: '100'}
83+
]
84+
};
85+
86+
service.handleDataUpdate(nodeStatus);
87+
88+
st.equal(service.remoteData.node2.var1.value, '100');
89+
st.equal(service.remoteData.node2.var1.timestamp, expectedTime, 'Should use server timestamp');
90+
91+
st.end();
92+
});
93+
94+
t.test('fireEventsBatch works without preceding data send', async st => {
95+
const blocks = createMockBlocks();
96+
const service = new MeshV2Service(blocks, 'node1', 'domain1');
97+
service.stopEventBatchTimer();
98+
service.groupId = 'group1';
99+
100+
let eventMutationStarted = false;
101+
102+
service.client = {
103+
mutate: options => {
104+
if (options.mutation === FIRE_EVENTS) {
105+
eventMutationStarted = true;
106+
return Promise.resolve({data: {fireEventsByNode: {}}});
107+
}
108+
return Promise.resolve({});
109+
}
110+
};
111+
112+
// fireEventsBatch should work even if lastDataSendPromise is just Promise.resolve()
113+
await service.fireEventsBatch([{eventName: 'msg1', payload: '', firedAt: 't1'}]);
114+
115+
st.ok(eventMutationStarted, 'Event mutation started without data send');
116+
117+
st.end();
118+
});
119+
120+
t.end();
121+
});

test/unit/mesh_service_v2_timestamp.js

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -35,48 +35,48 @@ test('MeshV2Service Timestamp-based getRemoteVariable', t => {
3535
st.end();
3636
});
3737

38-
t.test('handleDataUpdate should add timestamp', st => {
38+
t.test('handleDataUpdate should add timestamp from nodeStatus', st => {
39+
const serverTimestamp = new Date().toISOString();
40+
const expectedTimestamp = new Date(serverTimestamp).getTime();
3941
const nodeStatus = {
4042
nodeId: 'node4',
43+
timestamp: serverTimestamp,
4144
data: [
4245
{key: 'var1', value: '100'}
4346
]
4447
};
4548

46-
const beforeUpdate = Date.now();
4749
service.handleDataUpdate(nodeStatus);
48-
const afterUpdate = Date.now();
4950

5051
st.ok(service.remoteData.node4, 'Node 4 should be added');
5152
st.ok(service.remoteData.node4.var1, 'var1 should be added');
5253
st.equal(service.remoteData.node4.var1.value, '100');
53-
st.ok(service.remoteData.node4.var1.timestamp >= beforeUpdate);
54-
st.ok(service.remoteData.node4.var1.timestamp <= afterUpdate);
54+
st.equal(service.remoteData.node4.var1.timestamp, expectedTimestamp, 'Should use server timestamp');
5555
st.end();
5656
});
5757

58-
t.test('fetchAllNodesData should add timestamp', async st => {
58+
t.test('fetchAllNodesData should add timestamp from status', async st => {
59+
const serverTimestamp = new Date().toISOString();
60+
const expectedTimestamp = new Date(serverTimestamp).getTime();
5961
service.client = {
6062
query: () => Promise.resolve({
6163
data: {
6264
listGroupStatuses: [
6365
{
6466
nodeId: 'node5',
67+
timestamp: serverTimestamp,
6568
data: [{key: 'var2', value: '200'}]
6669
}
6770
]
6871
}
6972
})
7073
};
7174

72-
const beforeFetch = Date.now();
7375
await service.fetchAllNodesData();
74-
const afterFetch = Date.now();
7576

7677
st.ok(service.remoteData.node5, 'Node 5 should be added');
7778
st.equal(service.remoteData.node5.var2.value, '200');
78-
st.ok(service.remoteData.node5.var2.timestamp >= beforeFetch);
79-
st.ok(service.remoteData.node5.var2.timestamp <= afterFetch);
79+
st.equal(service.remoteData.node5.var2.timestamp, expectedTimestamp, 'Should use server timestamp');
8080
st.end();
8181
});
8282

0 commit comments

Comments
 (0)