Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions packages/service/core/workflow/dispatch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ import { observeWorkflowRun, observeWorkflowStep } from '../metrics';
import { withActiveSpan } from '../../../common/tracing';
import { delAgentRuntimeStopSign, shouldWorkflowStop } from './workflowStatus';
import { runWithContext } from '../utils/context';
import { createClientAbortTracker } from './utils/clientAbort';
import type { IncomingMessage } from 'node:http';

const logger = getLogger(LogCategories.MODULE.WORKFLOW.DISPATCH);

Expand All @@ -73,6 +75,7 @@ type Props = Omit<
variables: Record<string, any>;
runtimeNodes: RuntimeNodeItemType[];
runtimeEdges: RuntimeEdgeItemType[];
req?: IncomingMessage;
defaultSkipNodeQueue?: WorkflowDebugResponse['skipNodeQueue'];
};
type NodeResponseType = DispatchNodeResultType<{
Expand Down Expand Up @@ -219,6 +222,8 @@ export async function dispatchWorkFlow({
]);

let streamCheckTimer: NodeJS.Timeout | null = null;
const clientAbortTracker =
apiVersion === 'v1' ? createClientAbortTracker({ req: data.req, res }) : undefined;

// set sse response headers
if (res) {
Expand Down Expand Up @@ -266,8 +271,7 @@ export async function dispatchWorkFlow({
return stopping;
}
if (apiVersion === 'v1') {
if (!res) return false;
return res.closed || !!res.errored;
return clientAbortTracker?.isClientAborted() ?? false;
}
return false;
};
Expand Down Expand Up @@ -315,6 +319,7 @@ export async function dispatchWorkFlow({
if (checkStoppingTimer) {
clearInterval(checkStoppingTimer);
}
clientAbortTracker?.cleanup();

// Close mcpClient connections
Object.values(ctx.mcpClientMemory).forEach((client) => {
Expand Down Expand Up @@ -1645,7 +1650,7 @@ const mergeAssistantResponseAnswerText = (response: AIChatItemValueItemType[]) =
for (let i = 0; i < response.length; i++) {
const item = response[i];
if (item.text) {
let text = item.text?.content || '';
const text = item.text?.content || '';
const lastItem = result[result.length - 1];
if (lastItem && lastItem.text?.content && item.stepId === lastItem.stepId) {
lastItem.text.content += text;
Expand Down
52 changes: 52 additions & 0 deletions packages/service/core/workflow/dispatch/utils/clientAbort.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import type { NextApiResponse } from 'next';
import type { IncomingMessage } from 'node:http';

type ResponseWithWritableAborted = NextApiResponse & {
writableAborted?: boolean;
};

export const createClientAbortTracker = ({
req,
res
}: {
req?: IncomingMessage;
res?: NextApiResponse;
}) => {
let clientAborted = false;

const responseFinished = () => !!(res?.writableEnded || res?.writableFinished);
const responseWritableAborted = () =>
!!(res as ResponseWithWritableAborted | undefined)?.writableAborted;
const isAbortedSnapshot = () => {
if (responseFinished()) return false;

return !!(
req?.aborted ||
req?.socket?.destroyed ||
res?.closed ||
res?.destroyed ||
responseWritableAborted() ||
res?.errored
);
};
const markClientAborted = () => {
if (!responseFinished()) {
clientAborted = true;
}
};

req?.on('aborted', markClientAborted);
req?.socket?.on('close', markClientAborted);
res?.on('close', markClientAborted);
res?.on('error', markClientAborted);

return {
isClientAborted: () => clientAborted || isAbortedSnapshot(),
cleanup: () => {
req?.off('aborted', markClientAborted);
req?.socket?.off('close', markClientAborted);
res?.off('close', markClientAborted);
res?.off('error', markClientAborted);
}
};
};
69 changes: 69 additions & 0 deletions packages/service/test/core/workflow/dispatch/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,77 @@
import { describe, expect, it } from 'vitest';
import { EventEmitter } from 'node:events';
import { FlowNodeTypeEnum } from '@fastgpt/global/core/workflow/node/constant';
import { WorkflowQueue } from '@fastgpt/service/core/workflow/dispatch/index';
import { createClientAbortTracker } from '@fastgpt/service/core/workflow/dispatch/utils/clientAbort';
import { createNode, createEdge } from '../utils';

describe('createClientAbortTracker', () => {
const mockRes = (overrides: Record<string, any> = {}) => {
const res = new EventEmitter() as any;
Object.assign(res, {
closed: false,
destroyed: false,
errored: null,
writableAborted: false,
writableEnded: false,
writableFinished: false,
...overrides
});
return res;
};

const mockReq = () => {
const req = new EventEmitter() as any;
req.aborted = false;
req.socket = new EventEmitter() as any;
req.socket.destroyed = false;
return req;
};

it('响应正常结束后 close,不应判定为客户端 abort', () => {
const req = mockReq();
const res = mockRes({ writableEnded: true, writableFinished: true });
const tracker = createClientAbortTracker({ req, res });

res.closed = true;
res.emit('close');

expect(tracker.isClientAborted()).toBe(false);
tracker.cleanup();
});

it('响应未结束时 close,应判定为客户端 abort', () => {
const req = mockReq();
const res = mockRes();
const tracker = createClientAbortTracker({ req, res });

res.emit('close');

expect(tracker.isClientAborted()).toBe(true);
tracker.cleanup();
});

it('socket 在响应结束前关闭,应判定为客户端 abort', () => {
const req = mockReq();
const res = mockRes();
const tracker = createClientAbortTracker({ req, res });

req.socket.emit('close');

expect(tracker.isClientAborted()).toBe(true);
tracker.cleanup();
});

it('创建 tracker 前响应已经异常关闭,应通过快照判定为客户端 abort', () => {
const req = mockReq();
const res = mockRes({ closed: true });
const tracker = createClientAbortTracker({ req, res });

expect(tracker.isClientAborted()).toBe(true);
tracker.cleanup();
});
});

describe('WorkflowQueue', () => {
describe('WorkflowQueue utils', () => {
// buildNodeEdgeGroupsMap 已经单独写了
Expand Down
10 changes: 5 additions & 5 deletions projects/app/src/pages/api/v1/chat/completions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ import { pushTrack } from '@fastgpt/service/common/middle/tracks/utils';
const logger = getLogger(LogCategories.MODULE.CHAT.ITEM);

async function handler(req: NextApiRequest, res: NextApiResponse) {
let {
const completionProps = CompletionsPropsSchema.parse(req.body);
const {
chatId,
appId,
customUid,
Expand All @@ -80,14 +81,12 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
teamToken,

stream = false,
detail = false,
retainDatasetCite = false,
showSkillReferences,
messages = [],
variables = {},
responseChatItemId = getNanoid(),
metadata
} = CompletionsPropsSchema.parse(req.body);
} = completionProps;
let { detail = false, retainDatasetCite = false, variables = {} } = completionProps;

const startTime = Date.now();

Expand Down Expand Up @@ -276,6 +275,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
if (app.version === 'v2') {
return dispatchWorkFlow({
apiVersion: 'v1',
req,
res,
lang: getLocale(req),
requestOrigin: req.headers.origin,
Expand Down
Loading