"""
title: Openlayer Filter Pipeline
author: Openlayer
date: 2025-01-17
version: 1.0.0
license: MIT
description: A filter pipeline that uses Openlayer for LLM observability and monitoring.
requirements: openlayer>=0.12.1
"""
import logging
import os
import time
import uuid
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("openlayer-filter")
def get_last_assistant_message(messages: List[dict]) -> str:
"""Retrieve the last assistant message content from the message list."""
for message in reversed(messages):
if message["role"] == "assistant":
content = message.get("content", "")
if isinstance(content, str):
return content
elif isinstance(content, list):
text_parts = [
part.get("text", "")
for part in content
if isinstance(part, dict) and part.get("type") == "text"
]
return "".join(text_parts)
return ""
def get_last_assistant_message_obj(messages: List[dict]) -> dict:
"""Retrieve the last assistant message object from the message list."""
for message in reversed(messages):
if message["role"] == "assistant":
return message
return {}
class Pipeline:
"""Openlayer Filter Pipeline for LLM observability."""
class Valves(BaseModel):
"""Configuration parameters for the Openlayer filter pipeline."""
pipelines: List[str] = []
priority: int = 0
api_key: str = ""
base_url: str = "https://api.openlayer.com/v1"
inference_pipeline_id: str = ""
insert_tags: bool = True
skip_internal_tasks: bool = False
debug: bool = False
def __init__(self):
"""Initialize the Openlayer filter pipeline."""
self.type = "filter"
self.name = "Openlayer Filter"
self.valves = self.Valves(
**{
"pipelines": ["*"],
"api_key": os.getenv("OPENLAYER_API_KEY", ""),
"base_url": os.getenv("OPENLAYER_BASE_URL", "https://api.openlayer.com/v1"),
"inference_pipeline_id": os.getenv("OPENLAYER_INFERENCE_PIPELINE_ID", ""),
"skip_internal_tasks": os.getenv("SKIP_INTERNAL_TASKS", "false").lower() == "true",
"debug": os.getenv("DEBUG_MODE", "false").lower() == "true",
}
)
self.tracer_configured = False
self.chat_traces: Dict[str, Dict[str, Any]] = {}
logger.info("Openlayer Filter Pipeline initialized")
def log_debug(self, message: str, *args):
"""Log debug messages if debugging is enabled."""
if self.valves.debug:
logger.info(message, *args)
async def on_startup(self):
"""Lifecycle hook called when the pipeline starts."""
self._configure_tracer()
async def on_shutdown(self):
"""Lifecycle hook called when the pipeline shuts down."""
self.chat_traces.clear()
async def on_valves_updated(self):
"""Lifecycle hook called when configuration is updated."""
self._configure_tracer()
def _configure_tracer(self):
"""Configure the Openlayer tracer with current settings."""
try:
from openlayer.lib.tracing import tracer
if not self.valves.api_key:
logger.error("OPENLAYER_API_KEY not configured - tracing disabled")
self.tracer_configured = False
return
if not self.valves.inference_pipeline_id:
logger.error("OPENLAYER_INFERENCE_PIPELINE_ID not configured - tracing disabled")
self.tracer_configured = False
return
tracer.configure(
api_key=self.valves.api_key,
inference_pipeline_id=self.valves.inference_pipeline_id,
base_url=self.valves.base_url if self.valves.base_url else None,
)
self.tracer_configured = True
self.log_debug("Openlayer tracer configured successfully")
except ImportError as e:
logger.error("Failed to import Openlayer SDK: %s", e)
self.tracer_configured = False
except Exception as e:
logger.error("Failed to configure Openlayer tracer: %s", e)
self.tracer_configured = False
def _build_tags(self, task_name: str) -> List[str]:
"""Build a list of tags based on valve settings."""
tags = []
if self.valves.insert_tags:
tags.append("open-webui")
if task_name and task_name not in ["user_response", "llm_response"]:
tags.append(task_name)
return tags
def _extract_messages(self, messages: List[dict]) -> List[dict]:
"""Extract and clean messages for logging to Openlayer."""
cleaned = []
for msg in messages:
content = msg.get("content", "")
if isinstance(content, list):
text_parts = []
for part in content:
if isinstance(part, dict):
if part.get("type") == "text":
text_parts.append(part.get("text", ""))
elif part.get("type") == "image_url":
text_parts.append("[Image]")
content = " ".join(text_parts) if text_parts else ""
cleaned.append({"role": msg.get("role", "user"), "content": content})
return cleaned
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""Handle incoming requests (user messages)."""
request_id = str(uuid.uuid4())[:8]
if not self.tracer_configured:
return body
metadata = body.get("metadata", {})
task = metadata.get("task", "")
internal_tasks = ["title_generation", "tags_generation", "query_generation", "follow_up_generation"]
is_internal_task = task in internal_tasks
if is_internal_task and self.valves.skip_internal_tasks:
return body
message_id = metadata.get("message_id", str(uuid.uuid4()))
chat_id = metadata.get("chat_id", str(uuid.uuid4()))
if chat_id == "local":
session_id = metadata.get("session_id", str(uuid.uuid4()))
chat_id = f"temporary-{session_id}"
metadata["chat_id"] = chat_id
body["metadata"] = metadata
trace_key = f"{message_id}::{task}" if is_internal_task else message_id
user_id = "anonymous"
user_email = None
user_name = None
if user:
user_email = user.get("email")
user_name = user.get("name")
user_id = user_email or user.get("id") or user_name or "anonymous"
if not isinstance(user_id, str):
user_id = str(user_id)
model_id = body.get("model", "unknown")
model_info = metadata.get("model", {})
model_name = model_info.get("name", model_id) if isinstance(model_info, dict) else model_id
provider = None
if isinstance(model_info, dict):
provider = model_info.get("owned_by")
if not provider:
if model_id.startswith("gpt-") or model_id.startswith("o1"):
provider = "openai"
elif model_id.startswith("claude-"):
provider = "anthropic"
elif model_id.startswith("gemini-"):
provider = "google"
messages = body.get("messages", [])
cleaned_messages = self._extract_messages(messages)
files_metadata = []
for file_entry in body.get("files", []):
if isinstance(file_entry, dict):
file_info = file_entry.get("file", file_entry)
if isinstance(file_info, dict):
files_metadata.append({
"filename": file_info.get("filename") or file_info.get("meta", {}).get("name"),
"content_type": file_info.get("meta", {}).get("content_type"),
"size": file_info.get("meta", {}).get("size"),
})
self.chat_traces[trace_key] = {
"request_id": request_id,
"message_id": message_id,
"start_time": time.time(),
"user_id": user_id,
"user_email": user_email,
"user_name": user_name,
"session_id": chat_id,
"model_id": model_id,
"model_name": model_name,
"provider": provider,
"messages": cleaned_messages,
"message_count": len(messages),
"tags": self._build_tags(task or "user_response"),
"task": task or "user_response",
"is_internal_task": is_internal_task,
"files": files_metadata,
}
return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""Handle outgoing responses (assistant messages)."""
if not self.tracer_configured:
return body
message_id = body.get("id")
if not message_id:
return body
trace_data = None
trace_key = None
if message_id in self.chat_traces:
trace_key = message_id
trace_data = self.chat_traces.pop(message_id)
else:
for key in list(self.chat_traces.keys()):
if key.startswith(f"{message_id}::"):
trace_key = key
trace_data = self.chat_traces.pop(key)
break
if not trace_data:
return body
messages = body.get("messages", [])
assistant_message = get_last_assistant_message(messages)
assistant_message_obj = get_last_assistant_message_obj(messages)
prompt_tokens = None
completion_tokens = None
total_tokens = None
if assistant_message_obj:
usage_info = assistant_message_obj.get("usage", {})
if isinstance(usage_info, dict):
prompt_tokens = (
usage_info.get("prompt_tokens")
or usage_info.get("prompt_eval_count")
or usage_info.get("input_tokens")
)
completion_tokens = (
usage_info.get("completion_tokens")
or usage_info.get("eval_count")
or usage_info.get("output_tokens")
)
if prompt_tokens is not None and completion_tokens is not None:
total_tokens = int(prompt_tokens) + int(completion_tokens)
latency_ms = (time.time() - trace_data["start_time"]) * 1000
try:
self._create_trace_with_steps(
trace_data=trace_data,
output=assistant_message,
latency_ms=latency_ms,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
)
except Exception as e:
logger.error("Failed to create trace: %s", e)
return body
def _create_trace_with_steps(
self,
trace_data: Dict[str, Any],
output: str,
latency_ms: float,
prompt_tokens: Optional[int] = None,
completion_tokens: Optional[int] = None,
total_tokens: Optional[int] = None,
):
"""Create a trace with nested steps using openlayer.lib.tracing.tracer."""
from openlayer.lib.tracing import tracer
from openlayer.lib.tracing.enums import StepType
from openlayer.lib import update_trace_user_session
step_inputs = {"messages": trace_data["messages"], "model": trace_data["model_name"]}
if trace_data.get("files"):
step_inputs["files"] = trace_data["files"]
step_metadata = {
"request_id": trace_data.get("request_id"),
"task": trace_data.get("task"),
"tags": trace_data["tags"],
"interface": "open-webui",
}
with tracer.create_step(
name="open-webui-request",
step_type=StepType.USER_CALL,
inputs=step_inputs,
metadata=step_metadata,
) as parent_step:
parent_step.start_time = trace_data["start_time"]
parent_step.end_time = time.time()
parent_step.latency = latency_ms
try:
update_trace_user_session(
user_id=trace_data["user_id"],
session_id=trace_data["session_id"],
)
except Exception as e:
logger.error("Failed to set user/session context: %s", e)
provider = trace_data.get("provider") or "unknown"
with tracer.create_step(
name="LLM Chat Completion",
step_type=StepType.CHAT_COMPLETION,
inputs={"messages": trace_data["messages"], "model": trace_data["model_id"]},
metadata={"model_id": trace_data["model_id"], "model_name": trace_data["model_name"]},
) as llm_step:
llm_step.provider = provider
llm_step.model = trace_data["model_id"]
llm_step.start_time = trace_data["start_time"]
llm_step.end_time = time.time()
llm_step.latency = latency_ms
if prompt_tokens is not None:
llm_step.prompt_tokens = prompt_tokens
if completion_tokens is not None:
llm_step.completion_tokens = completion_tokens
if total_tokens is not None:
llm_step.tokens = total_tokens
llm_step.log(
output=output,
tokens=total_tokens,
metadata={
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
},
)
parent_step.log(
output=output,
metadata={
"user_id": trace_data["user_id"],
"session_id": trace_data["session_id"],
"message_count": trace_data.get("message_count", 0),
},
)