Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 3 additions & 53 deletions aperag/service/chat_completion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@
import json
import logging
import time
import uuid
from dataclasses import dataclass
from typing import Any, AsyncGenerator, Dict, List

from sqlalchemy.ext.asyncio import AsyncSession

from aperag.db.ops import AsyncDatabaseOps, async_db_ops
from aperag.flow.engine import FlowEngine
from aperag.flow.parser import FlowParser

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -117,58 +114,11 @@ async def stream_openai_sse_response(self, generator: AsyncGenerator[str, None],
yield f"data: {json.dumps(formatter.format_stream_end(msg_id))}\n\n"

async def openai_chat_completions(self, user, body_data, query_params):
"""Handle OpenAI-compatible chat completions"""
bot_id = query_params.get("bot_id") or query_params.get("app_id")
if not bot_id:
return None, OpenAIFormatter.format_error("bot_id is required")

api_request = APIRequest(
user=user,
bot_id=bot_id,
msg_id=str(uuid.uuid4()),
stream=body_data.get("stream", False),
messages=body_data.get("messages", []),
"""Handle OpenAI-compatible chat completions - Not implemented"""
return None, OpenAIFormatter.format_error(
"The /v1/chat/completions endpoint is not implemented. Please use WebSocket API for agent-type bots."
)

bot = await self.db_ops.query_bot(api_request.user, api_request.bot_id)
if not bot:
return None, OpenAIFormatter.format_error("Bot not found")

formatter = OpenAIFormatter()

# Get bot's flow configuration
bot_config = json.loads(bot.config or "{}")
flow_config = bot_config.get("flow")
if not flow_config:
return None, OpenAIFormatter.format_error("Bot flow config not found")

flow = FlowParser.parse(flow_config)
engine = FlowEngine()
initial_data = {
"query": api_request.messages[-1]["content"],
"user": api_request.user,
"message_id": api_request.msg_id,
}

try:
_, system_outputs = await engine.execute_flow(flow, initial_data)
logger.info("Flow executed successfully!")
except Exception as e:
logger.exception(e)
return None, OpenAIFormatter.format_error(str(e))

async_generator = None
nodes = engine.find_end_nodes(flow)
for node in nodes:
async_generator = system_outputs[node].get("async_generator")
if async_generator:
break

if not async_generator:
return None, OpenAIFormatter.format_error("No output node found")

return (api_request, formatter, async_generator), None


# Create a global service instance for easy access
# This uses the global db_ops instance and doesn't require session management in views
Expand Down
31 changes: 5 additions & 26 deletions aperag/views/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,39 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse

from aperag.db.models import User
from aperag.service.chat_completion_service import OpenAIFormatter, chat_completion_service
from aperag.service.chat_completion_service import OpenAIFormatter
from aperag.views.auth import required_user

logger = logging.getLogger(__name__)

router = APIRouter(tags=["openai"])


@router.post("/chat/completions")
async def openai_chat_completions_view(request: Request, user: User = Depends(required_user)):
try:
body_data = await request.json()
query_params = dict(request.query_params)
result, error = await chat_completion_service.openai_chat_completions(str(user.id), body_data, query_params)
if error:
return error
api_request, formatter, async_generator = result
if api_request.stream:
return StreamingResponse(
chat_completion_service.stream_openai_sse_response(async_generator(), formatter, api_request.msg_id),
media_type="text/event-stream",
)
else:
full_content = ""
async for chunk in async_generator():
full_content += chunk
return formatter.format_complete_response(api_request.msg_id, full_content)
except Exception as e:
logger.exception(e)
return OpenAIFormatter.format_error(str(e))
"""OpenAI-compatible chat completions endpoint - Not implemented for agent-type bots"""
return OpenAIFormatter.format_error(
"The /v1/chat/completions endpoint is not implemented. Please use WebSocket API for agent-type bots."
)
Loading