diff --git a/packages/service/core/workflow/dispatch/index.ts b/packages/service/core/workflow/dispatch/index.ts index a99caec699fa..42b59b72de31 100644 --- a/packages/service/core/workflow/dispatch/index.ts +++ b/packages/service/core/workflow/dispatch/index.ts @@ -63,6 +63,8 @@ import { observeWorkflowRun, observeWorkflowStep } from '../metrics'; import { withActiveSpan } from '../../../common/tracing'; import { delAgentRuntimeStopSign, shouldWorkflowStop } from './workflowStatus'; import { runWithContext } from '../utils/context'; +import { createClientAbortTracker } from './utils/clientAbort'; +import type { IncomingMessage } from 'node:http'; const logger = getLogger(LogCategories.MODULE.WORKFLOW.DISPATCH); @@ -73,6 +75,7 @@ type Props = Omit< variables: Record; runtimeNodes: RuntimeNodeItemType[]; runtimeEdges: RuntimeEdgeItemType[]; + req?: IncomingMessage; defaultSkipNodeQueue?: WorkflowDebugResponse['skipNodeQueue']; }; type NodeResponseType = DispatchNodeResultType<{ @@ -219,6 +222,8 @@ export async function dispatchWorkFlow({ ]); let streamCheckTimer: NodeJS.Timeout | null = null; + const clientAbortTracker = + apiVersion === 'v1' ? createClientAbortTracker({ req: data.req, res }) : undefined; // set sse response headers if (res) { @@ -266,8 +271,7 @@ export async function dispatchWorkFlow({ return stopping; } if (apiVersion === 'v1') { - if (!res) return false; - return res.closed || !!res.errored; + return clientAbortTracker?.isClientAborted() ?? false; } return false; }; @@ -315,6 +319,7 @@ export async function dispatchWorkFlow({ if (checkStoppingTimer) { clearInterval(checkStoppingTimer); } + clientAbortTracker?.cleanup(); // Close mcpClient connections Object.values(ctx.mcpClientMemory).forEach((client) => { @@ -1645,7 +1650,7 @@ const mergeAssistantResponseAnswerText = (response: AIChatItemValueItemType[]) = for (let i = 0; i < response.length; i++) { const item = response[i]; if (item.text) { - let text = item.text?.content || ''; + const text = item.text?.content || ''; const lastItem = result[result.length - 1]; if (lastItem && lastItem.text?.content && item.stepId === lastItem.stepId) { lastItem.text.content += text; diff --git a/packages/service/core/workflow/dispatch/utils/clientAbort.ts b/packages/service/core/workflow/dispatch/utils/clientAbort.ts new file mode 100644 index 000000000000..7ab3c7681ba0 --- /dev/null +++ b/packages/service/core/workflow/dispatch/utils/clientAbort.ts @@ -0,0 +1,52 @@ +import type { NextApiResponse } from 'next'; +import type { IncomingMessage } from 'node:http'; + +type ResponseWithWritableAborted = NextApiResponse & { + writableAborted?: boolean; +}; + +export const createClientAbortTracker = ({ + req, + res +}: { + req?: IncomingMessage; + res?: NextApiResponse; +}) => { + let clientAborted = false; + + const responseFinished = () => !!(res?.writableEnded || res?.writableFinished); + const responseWritableAborted = () => + !!(res as ResponseWithWritableAborted | undefined)?.writableAborted; + const isAbortedSnapshot = () => { + if (responseFinished()) return false; + + return !!( + req?.aborted || + req?.socket?.destroyed || + res?.closed || + res?.destroyed || + responseWritableAborted() || + res?.errored + ); + }; + const markClientAborted = () => { + if (!responseFinished()) { + clientAborted = true; + } + }; + + req?.on('aborted', markClientAborted); + req?.socket?.on('close', markClientAborted); + res?.on('close', markClientAborted); + res?.on('error', markClientAborted); + + return { + isClientAborted: () => clientAborted || isAbortedSnapshot(), + cleanup: () => { + req?.off('aborted', markClientAborted); + req?.socket?.off('close', markClientAborted); + res?.off('close', markClientAborted); + res?.off('error', markClientAborted); + } + }; +}; diff --git a/packages/service/test/core/workflow/dispatch/index.test.ts b/packages/service/test/core/workflow/dispatch/index.test.ts index 50a85ce99aa0..fe88bfb1015f 100644 --- a/packages/service/test/core/workflow/dispatch/index.test.ts +++ b/packages/service/test/core/workflow/dispatch/index.test.ts @@ -1,8 +1,77 @@ import { describe, expect, it } from 'vitest'; +import { EventEmitter } from 'node:events'; import { FlowNodeTypeEnum } from '@fastgpt/global/core/workflow/node/constant'; import { WorkflowQueue } from '@fastgpt/service/core/workflow/dispatch/index'; +import { createClientAbortTracker } from '@fastgpt/service/core/workflow/dispatch/utils/clientAbort'; import { createNode, createEdge } from '../utils'; +describe('createClientAbortTracker', () => { + const mockRes = (overrides: Record = {}) => { + const res = new EventEmitter() as any; + Object.assign(res, { + closed: false, + destroyed: false, + errored: null, + writableAborted: false, + writableEnded: false, + writableFinished: false, + ...overrides + }); + return res; + }; + + const mockReq = () => { + const req = new EventEmitter() as any; + req.aborted = false; + req.socket = new EventEmitter() as any; + req.socket.destroyed = false; + return req; + }; + + it('响应正常结束后 close,不应判定为客户端 abort', () => { + const req = mockReq(); + const res = mockRes({ writableEnded: true, writableFinished: true }); + const tracker = createClientAbortTracker({ req, res }); + + res.closed = true; + res.emit('close'); + + expect(tracker.isClientAborted()).toBe(false); + tracker.cleanup(); + }); + + it('响应未结束时 close,应判定为客户端 abort', () => { + const req = mockReq(); + const res = mockRes(); + const tracker = createClientAbortTracker({ req, res }); + + res.emit('close'); + + expect(tracker.isClientAborted()).toBe(true); + tracker.cleanup(); + }); + + it('socket 在响应结束前关闭,应判定为客户端 abort', () => { + const req = mockReq(); + const res = mockRes(); + const tracker = createClientAbortTracker({ req, res }); + + req.socket.emit('close'); + + expect(tracker.isClientAborted()).toBe(true); + tracker.cleanup(); + }); + + it('创建 tracker 前响应已经异常关闭,应通过快照判定为客户端 abort', () => { + const req = mockReq(); + const res = mockRes({ closed: true }); + const tracker = createClientAbortTracker({ req, res }); + + expect(tracker.isClientAborted()).toBe(true); + tracker.cleanup(); + }); +}); + describe('WorkflowQueue', () => { describe('WorkflowQueue utils', () => { // buildNodeEdgeGroupsMap 已经单独写了 diff --git a/projects/app/src/pages/api/v1/chat/completions.ts b/projects/app/src/pages/api/v1/chat/completions.ts index b668b6f25063..a8055fc88737 100644 --- a/projects/app/src/pages/api/v1/chat/completions.ts +++ b/projects/app/src/pages/api/v1/chat/completions.ts @@ -68,7 +68,8 @@ import { pushTrack } from '@fastgpt/service/common/middle/tracks/utils'; const logger = getLogger(LogCategories.MODULE.CHAT.ITEM); async function handler(req: NextApiRequest, res: NextApiResponse) { - let { + const completionProps = CompletionsPropsSchema.parse(req.body); + const { chatId, appId, customUid, @@ -80,14 +81,12 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { teamToken, stream = false, - detail = false, - retainDatasetCite = false, showSkillReferences, messages = [], - variables = {}, responseChatItemId = getNanoid(), metadata - } = CompletionsPropsSchema.parse(req.body); + } = completionProps; + let { detail = false, retainDatasetCite = false, variables = {} } = completionProps; const startTime = Date.now(); @@ -276,6 +275,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { if (app.version === 'v2') { return dispatchWorkFlow({ apiVersion: 'v1', + req, res, lang: getLocale(req), requestOrigin: req.headers.origin,