@@ -5166,7 +5166,9 @@ def _chat_completion(self, messages, temperature=1.0, tools=None, stream=True, r
51665166 "display_fn" : self ._display_generating if self .verbose else None ,
51675167 "reasoning_steps" : reasoning_steps ,
51685168 "verbose" : self .verbose ,
5169- "max_iterations" : 10
5169+ "max_iterations" : 10 ,
5170+ "stream_callback" : self .stream_emitter .emit ,
5171+ "emit_events" : True ,
51705172 }
51715173 if response_format :
51725174 chat_kwargs ["response_format" ] = response_format
@@ -7419,11 +7421,31 @@ def _start_stream(self, prompt: str, **kwargs) -> Generator[str, None, None]:
74197421 if formatted_tools :
74207422 completion_args ["tools" ] = formatted_tools
74217423
7424+ # Import StreamEvent types for event emission
7425+ from ..streaming .events import StreamEvent , StreamEventType
7426+ import time as time_module
7427+
7428+ # Emit REQUEST_START event
7429+ request_start_perf = time_module .perf_counter ()
7430+ self .stream_emitter .emit (StreamEvent (
7431+ type = StreamEventType .REQUEST_START ,
7432+ timestamp = request_start_perf ,
7433+ metadata = {"model" : self .llm , "message_count" : len (messages )}
7434+ ))
7435+
74227436 completion = self ._openai_client .sync_client .chat .completions .create (** completion_args )
74237437
7438+ # Emit HEADERS_RECEIVED event
7439+ self .stream_emitter .emit (StreamEvent (
7440+ type = StreamEventType .HEADERS_RECEIVED ,
7441+ timestamp = time_module .perf_counter ()
7442+ ))
7443+
74247444 # Stream the response chunks without display
74257445 response_text = ""
74267446 tool_calls_data = []
7447+ first_token_emitted = False
7448+ last_content_time = None
74277449
74287450 for chunk in completion :
74297451 delta = chunk .choices [0 ].delta
@@ -7432,6 +7454,24 @@ def _start_stream(self, prompt: str, **kwargs) -> Generator[str, None, None]:
74327454 if delta .content is not None :
74337455 chunk_content = delta .content
74347456 response_text += chunk_content
7457+ last_content_time = time_module .perf_counter ()
7458+
7459+ # Emit FIRST_TOKEN on first content
7460+ if not first_token_emitted :
7461+ self .stream_emitter .emit (StreamEvent (
7462+ type = StreamEventType .FIRST_TOKEN ,
7463+ timestamp = last_content_time ,
7464+ content = chunk_content
7465+ ))
7466+ first_token_emitted = True
7467+ else :
7468+ # Emit DELTA_TEXT for subsequent tokens
7469+ self .stream_emitter .emit (StreamEvent (
7470+ type = StreamEventType .DELTA_TEXT ,
7471+ timestamp = last_content_time ,
7472+ content = chunk_content
7473+ ))
7474+
74357475 yield chunk_content
74367476
74377477 # Handle tool calls (accumulate but don't yield as chunks)
@@ -7449,6 +7489,18 @@ def _start_stream(self, prompt: str, **kwargs) -> Generator[str, None, None]:
74497489 if tool_call_delta .function .arguments :
74507490 tool_calls_data [tool_call_delta .index ]['function' ]['arguments' ] += tool_call_delta .function .arguments
74517491
7492+ # Emit LAST_TOKEN and STREAM_END events after streaming loop
7493+ if last_content_time :
7494+ self .stream_emitter .emit (StreamEvent (
7495+ type = StreamEventType .LAST_TOKEN ,
7496+ timestamp = last_content_time
7497+ ))
7498+ self .stream_emitter .emit (StreamEvent (
7499+ type = StreamEventType .STREAM_END ,
7500+ timestamp = time_module .perf_counter (),
7501+ metadata = {"response_length" : len (response_text )}
7502+ ))
7503+
74527504 # Handle any tool calls that were accumulated
74537505 if tool_calls_data :
74547506 # Add assistant message with tool calls to chat history
0 commit comments