Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@ codegen.log
Brewfile.lock.json

.ipynb_checkpoints
.DS_Store
.DS_Store

.worktrees/
88 changes: 88 additions & 0 deletions src/openlayer/lib/integrations/_openai_embedding_common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Shared parsing helpers for OpenAI-shaped embedding tracers (OpenAI, AsyncOpenAI, LiteLLM)."""

from typing import Any, Dict, List, Optional, Tuple, Union


def parse_embedding_response(
response: Any,
) -> Tuple[Union[List[float], List[List[float]]], int, int]:
"""Extract (embeddings, dimensions, count) from an OpenAI-shaped EmbeddingResponse.

For a single input, returns the vector directly.
For a batch, returns a list of vectors.
"""
try:
data = getattr(response, "data", None)
if data is None and isinstance(response, dict):
data = response.get("data", [])
if not data:
return [], 0, 0
embeddings = [
item["embedding"] if isinstance(item, dict) else item.embedding
for item in data
]
if not embeddings:
return [], 0, 0
if len(embeddings) == 1:
return embeddings[0], len(embeddings[0]), 1
return embeddings, len(embeddings[0]), len(embeddings)
except Exception:
return [], 0, 0


def get_embedding_model_parameters(kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""Extract embedding-relevant model parameters from create() kwargs."""
return {
"dimensions": kwargs.get("dimensions"),
"encoding_format": kwargs.get("encoding_format"),
"user": kwargs.get("user"),
}


def build_embedding_step_kwargs(
response: Any,
call_kwargs: Dict[str, Any],
start_time: float,
end_time: float,
*,
name: str,
provider: str,
inference_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Build the kwargs to pass to ``tracer.add_embedding_step_to_trace``.

Common boilerplate for OpenAI-shaped responses (OpenAI sync/async, LiteLLM).
Callers may layer extra fields (cost, extra_metadata, model_parameters) on
top of the returned dict before invoking the tracer helper.
"""
model_name = getattr(response, "model", call_kwargs.get("model", "unknown"))
embeddings, dim, count = parse_embedding_response(response)
usage = getattr(response, "usage", None)
prompt_tokens = getattr(usage, "prompt_tokens", 0) if usage else 0
total_tokens = getattr(usage, "total_tokens", prompt_tokens) if usage else prompt_tokens

step_kwargs: Dict[str, Any] = {
"name": name,
"end_time": end_time,
"inputs": {"input": call_kwargs.get("input")},
"output": embeddings,
"latency": (end_time - start_time) * 1000,
"tokens": total_tokens,
"prompt_tokens": prompt_tokens,
"model": model_name,
"model_parameters": get_embedding_model_parameters(call_kwargs),
"embedding_dimensions": dim,
"embedding_count": count,
"raw_output": (
response.model_dump()
if hasattr(response, "model_dump")
else str(response)
),
"provider": provider,
"metadata": {"provider": provider},
}
# Only include id when truthy: passing id=None would overwrite the step's
# auto-generated UUID via step.log() → setattr().
if inference_id:
step_kwargs["id"] = inference_id
return step_kwargs
57 changes: 57 additions & 0 deletions src/openlayer/lib/integrations/async_openai_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
if TYPE_CHECKING:
import openai

from ..tracing import tracer
from ._openai_embedding_common import build_embedding_step_kwargs
from .openai_tracer import (
get_model_parameters,
create_trace_args,
Expand Down Expand Up @@ -155,6 +157,23 @@ async def traced_responses_create_func(*args, **kwargs):
else:
logger.debug("Responses API not available in this AsyncOpenAI client version")

# Patch Embeddings API
if hasattr(client, "embeddings"):
embeddings_create_func = client.embeddings.create

@wraps(embeddings_create_func)
async def traced_embeddings_create_func(*args, **kwargs):
inference_id = kwargs.pop("inference_id", None)
return await handle_embedding_async(
*args,
**kwargs,
original_func=embeddings_create_func,
inference_id=inference_id,
is_azure_openai=is_azure_openai,
)

client.embeddings.create = traced_embeddings_create_func

return client


Expand Down Expand Up @@ -698,3 +717,41 @@ async def handle_async_non_streaming_parse(
)

return response


# ----------------------------- Async Embeddings ----------------------------- #
async def handle_embedding_async(
original_func: callable,
*args,
inference_id: Optional[str] = None,
is_azure_openai: bool = False,
**kwargs,
) -> Any:
"""Trace an async AsyncOpenAI client.embeddings.create() call."""
start_time = time.time()
response = await original_func(*args, **kwargs)
end_time = time.time()

try:
if is_azure_openai:
name, provider = "Azure OpenAI Embedding", "Azure"
else:
name, provider = "OpenAI Embedding", "OpenAI"

tracer.add_embedding_step_to_trace(
**build_embedding_step_kwargs(
response,
kwargs,
start_time,
end_time,
name=name,
provider=provider,
inference_id=inference_id,
)
)
except Exception as e:
logger.error(
"Failed to trace the OpenAI embedding request with Openlayer. %s", e
)

return response
121 changes: 120 additions & 1 deletion src/openlayer/lib/integrations/bedrock_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import time
from functools import wraps
from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple, Union

from botocore.response import StreamingBody

Expand All @@ -25,6 +25,11 @@
logger = logging.getLogger(__name__)


def _is_embedding_model(model_id: str) -> bool:
"""Return True when modelId refers to a Bedrock embedding model."""
return "embed" in (model_id or "").lower()


def trace_bedrock(client: "boto3.client") -> "boto3.client":
"""Patch the Bedrock client to trace model invocations.

Expand Down Expand Up @@ -63,6 +68,14 @@ def trace_bedrock(client: "boto3.client") -> "boto3.client":
@wraps(invoke_model_func)
def traced_invoke_model(*args, **kwargs):
inference_id = kwargs.pop("inference_id", None)
model_id = kwargs.get("modelId", "")
if _is_embedding_model(model_id):
return handle_embedding_invoke(
*args,
**kwargs,
invoke_func=invoke_model_func,
inference_id=inference_id,
)
return handle_non_streaming_invoke(
*args,
**kwargs,
Expand Down Expand Up @@ -153,6 +166,112 @@ def handle_non_streaming_invoke(
return response


def handle_embedding_invoke(
invoke_func: callable,
*args,
inference_id: Optional[str] = None,
**kwargs,
) -> Dict[str, Any]:
"""Handle invoke_model for embedding models (Titan, Cohere)."""
start_time = time.time()
response = invoke_func(*args, **kwargs)
end_time = time.time()

try:
# Parse the request body
body_str = kwargs.get("body", "{}")
if isinstance(body_str, bytes):
body_str = body_str.decode("utf-8")
body_data = json.loads(body_str) if isinstance(body_str, str) else body_str

# Read and replace the response body so callers can still consume it
original_body = response["body"]
response_body_bytes = original_body.read()
response_data = json.loads(
response_body_bytes.decode("utf-8")
if isinstance(response_body_bytes, bytes)
else response_body_bytes
)
new_stream = io.BytesIO(response_body_bytes)
response["body"] = StreamingBody(new_stream, len(response_body_bytes))

model_id = kwargs.get("modelId", "")
inputs = _parse_embedding_input(body_data, model_id)
embeddings, dim, count = _parse_embedding_output(response_data, model_id)
prompt_tokens = _parse_embedding_tokens(response_data, model_id)
model_parameters = _get_embedding_model_parameters(body_data, model_id)

tracer.add_embedding_step_to_trace(
name="AWS Bedrock Embedding",
end_time=end_time,
inputs=inputs,
output=embeddings,
latency=(end_time - start_time) * 1000,
tokens=prompt_tokens,
prompt_tokens=prompt_tokens,
model=model_id,
model_parameters=model_parameters,
embedding_dimensions=dim,
embedding_count=count,
raw_output=response_data,
provider="Bedrock",
id=inference_id,
metadata={},
)

except Exception as e:
logger.error(
"Failed to trace the Bedrock embedding invocation with Openlayer. %s", e
)

return response


def _parse_embedding_input(body_data: Dict[str, Any], model_id: str) -> Dict[str, Any]:
if model_id.startswith("amazon.titan-embed"):
return {"input": body_data.get("inputText", "")}
if model_id.startswith("cohere.embed"):
return {"input": body_data.get("texts", [])}
return {"input": body_data}


def _parse_embedding_output(
response_data: Dict[str, Any], model_id: str
) -> Tuple[Union[List[float], List[List[float]]], int, int]:
"""Returns (embeddings, dimensions, count)."""
if model_id.startswith("amazon.titan-embed"):
emb = response_data.get("embedding", [])
return emb, len(emb), 1
if model_id.startswith("cohere.embed"):
embs = response_data.get("embeddings", [])
dim = len(embs[0]) if embs else 0
return embs, dim, len(embs)
return [], 0, 0


def _parse_embedding_tokens(response_data: Dict[str, Any], model_id: str) -> int:
if model_id.startswith("amazon.titan-embed"):
return response_data.get("inputTextTokenCount", 0)
return 0


def _get_embedding_model_parameters(
body_data: Dict[str, Any], model_id: str
) -> Dict[str, Any]:
if model_id.startswith("amazon.titan-embed"):
return {
"dimensions": body_data.get("dimensions"),
"normalize": body_data.get("normalize"),
}
if model_id.startswith("cohere.embed"):
return {
"input_type": body_data.get("input_type"),
"truncate": body_data.get("truncate"),
"embedding_types": body_data.get("embedding_types"),
}
return {}


def handle_streaming_invoke(
invoke_func: callable,
*args,
Expand Down
Loading