Multi-Step Workflows
Learn how to implement Proxy in real-world scenarios, from simple single API calls to complex multi-step workflows with automatic trace and span generation.Understanding Traces and Spans
- Trace: A complete request journey through your application
- Span: Individual operations within a trace (e.g., each AI API call)
- Automatic Generation: Proxy creates these automatically without manual instrumentation
Headers Reference
Proxy heavily relies on headers to mutate traces and spans created in Adaline. Refer to the Headers Reference for more details beyond the examples below.Example 1: Simple RAG Pipeline
A Retrieval-Augmented Generation (RAG) system that combines embedding generation, vector search, and chat completion in a single trace.Copy
Ask AI
import os
import uuid
from openai import OpenAI
class RAGPipeline:
def __init__(self):
self.client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url="https://gateway.adaline.ai/v1/openai/"
)
self.base_headers = {
"adaline-api-key": os.getenv("ADALINE_API_KEY"),
"adaline-project-id": os.getenv("ADALINE_PROJECT_ID"),
"adaline-prompt-id": os.getenv("ADALINE_PROMPT_ID")
}
def query(self, user_question: str, user_id: str) -> str:
# Generate unique trace ID for this RAG workflow
trace_id = str(uuid.uuid4())
# Step 1: Generate embedding for user question
query_embedding = self._generate_embedding(
user_question, trace_id, user_id, "query-embedding"
)
# Step 2: Simulate vector search
relevant_docs = self._vector_search(
query_embedding, trace_id, user_id, "vector-search"
)
# Step 3: Generate final response using retrieved context
response = self._generate_response(
user_question, relevant_docs, trace_id, user_id, "response-generation"
)
return response
def _generate_embedding(self, text: str, trace_id: str, user_id: str, span_name: str) -> list:
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "pending",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "rag-pipeline",
"adaline-span-name": span_name,
"adaline-trace-tags": f'[{"operation": "create", "tag": "production-v1.3"}]',
"adaline-trace-attributes": f'[{{"operation": "create", "key": "user_id", "value": "{user_id}"}}]',
"adaline-span-tags": f'["production-v1.3"]'
})
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text,
extra_headers=headers
)
return response.data[0].embedding
def _vector_search(self, embedding: list, trace_id: str, user_id: str, span_name: str) -> str:
# Simulate vector search - in real implementation, query your vector database
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "pending",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "rag-pipeline",
"adaline-span-name": span_name,
"adaline-trace-attributes": f'[{{"operation": "create", "key": "user_id", "value": "{user_id}"}}]',
"adaline-span-tags": f'["production-v1.3"]'
})
# Mock relevant documents (replace with actual vector search)
return "Relevant context: AI models are trained on large datasets and use neural networks..."
def _generate_response(self, question: str, context: str, trace_id: str, user_id: str, span_name: str) -> str:
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "success",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "rag-pipeline",
"adaline-span-name": span_name,
"adaline-trace-attributes": f'[{{"operation": "create", "key": "user_id", "value": "{user_id}"}}]',
"adaline-span-tags": f'["production-v1.3"]',
"adaline-span-variables": f'{{"user_id": {{"modality": "text", "value": "{user_id}"}}}}'
})
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"Answer the question using this context: {context}"},
{"role": "user", "content": question}
],
extra_headers=headers
)
return response.choices[0].message.content
# Usage
rag = RAGPipeline()
answer = rag.query("How do neural networks learn?", user_id="user123")
print(answer)
Example 2: Multi-Step Content Generation
A content generation workflow that uses multiple embedding calls and chat completions with span attributes and variables tracking.Copy
Ask AI
import os
import uuid
from openai import OpenAI
from anthropic import Anthropic
class ContentGenerator:
def __init__(self):
self.openai_client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url="https://gateway.adaline.ai/v1/openai/"
)
self.anthropic_client = Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
base_url="https://gateway.adaline.ai/v1/anthropic/"
)
self.base_headers = {
"adaline-api-key": os.getenv("ADALINE_API_KEY"),
"adaline-project-id": os.getenv("ADALINE_PROJECT_ID"),
"adaline-prompt-id": os.getenv("ADALINE_PROMPT_ID")
}
def generate_article(self, topic: str, target_audience: str, user_id: str) -> dict:
trace_id = str(uuid.uuid4())
workflow_data = {
"topic": topic,
"target_audience": target_audience,
"user_id": user_id,
"workflow_id": f"content-gen-{trace_id[:8]}"
}
# Step 1: Research phase - multiple embedding calls for topic analysis
research_data = self._research_topic(topic, trace_id, workflow_data)
# Step 2: Content synthesis using research
content = self._synthesize_content(research_data, trace_id, workflow_data)
# Step 3: Review and enhancement
final_content = self._review_content(content, trace_id, workflow_data)
return {
"topic": topic,
"research": research_data,
"content": content,
"final_content": final_content,
"workflow_id": workflow_data["workflow_id"]
}
def _research_topic(self, topic: str, trace_id: str, workflow_data: dict) -> dict:
research_queries = [
f"What are the key concepts in {topic}?",
f"What are recent developments in {topic}?",
f"What challenges exist in {topic}?"
]
embeddings = []
for i, query in enumerate(research_queries):
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "pending",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "content-generation",
"adaline-trace-tags": f'["production-v1.3"]',
"adaline-trace-attributes": f'[{{"operation": "create", "key": "query_type", "value": "research"}, {{"operation": "create", "key": "query_index", "value": "{i+1}"}, {{"operation": "create", "key": "total_queries", "value": "{len(research_queries)}"}}]',
"adaline-span-name": f"research-embedding-{i+1}",
})
response = self.openai_client.embeddings.create(
model="text-embedding-3-small",
input=query,
extra_headers=headers
)
embeddings.append({
"query": query,
"embedding": response.data[0].embedding
})
return {"queries": research_queries, "embeddings": embeddings}
def _synthesize_content(self, research_data: dict, trace_id: str, workflow_data: dict) -> str:
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "pending",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "content-generation",
"adaline-span-name": "content-synthesis",
"adaline-trace-attributes": f'[{"operation": "create", "key": "phase", "value": "synthesis"}, {"operation": "create", "key": "research_queries_count", "value": "{len(research_data["queries"])}"}]',
"adaline-span-variables": f'{{"topic": {{"modality": "text", "value": "{workflow_data["topic"]}"}}, "target_audience": {{"modality": "text", "value": "{workflow_data["target_audience"]}"}}}}'
})
research_context = "\n".join([f"- {query}" for query in research_data["queries"]])
response = self.anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2000,
messages=[
{"role": "user", "content": f"Write a comprehensive article about {workflow_data['topic']} for {workflow_data['target_audience']}. Base it on these research questions:\n{research_context}"}
],
extra_headers=headers
)
return response.content[0].text
def _review_content(self, content: str, trace_id: str, workflow_data: dict) -> str:
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "success",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "content-generation",
"adaline-span-name": "content-review",
"adaline-trace-attributes": f'[{{"operation": "create", "key": "phase", "value": "review"}, {{"operation": "create", "key": "content_length", "value": "{len(content)}"}, {{"operation": "create", "key": "review_type", "value": "enhancement"}}]',
"adaline-span-variables": f'{{"content": {{"modality": "text", "value": "{content}"}}}}'
})
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "Review and enhance the content for clarity, engagement, and accuracy."},
{"role": "user", "content": content}
],
extra_headers=headers
)
return response.choices[0].message.content
# Usage
generator = ContentGenerator()
result = generator.generate_article(
"Machine Learning in Healthcare",
"healthcare professionals",
user_id="user456"
)
print(f"Generated article: {result['workflow_id']}")
Example 3: Conversational Agent with Tools
A conversational agent that uses tool calling with trace status updates and comprehensive error handling.Copy
Ask AI
import os
import uuid
import json
from openai import OpenAI
class ConversationalAgent:
def __init__(self):
self.client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url="https://gateway.adaline.ai/v1/openai/"
)
self.base_headers = {
"adaline-api-key": os.getenv("ADALINE_API_KEY"),
"adaline-project-id": os.getenv("ADALINE_PROJECT_ID"),
"adaline-prompt-id": os.getenv("ADALINE_PROMPT_ID")
}
self.tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City name"}
},
"required": ["location"]
}
}
},
{
"type": "function",
"function": {
"name": "calculate",
"description": "Perform mathematical calculations",
"parameters": {
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Math expression to evaluate"}
},
"required": ["expression"]
}
}
}
]
def chat(self, user_message: str, user_id: str) -> str:
trace_id = str(uuid.uuid4())
try:
# Step 1: Initial query processing with tool selection
response = self._process_query(user_message, trace_id, user_id)
# Step 2: Handle tool calls if needed
if response.choices[0].message.tool_calls:
tool_results = self._execute_tools(response.choices[0].message.tool_calls, trace_id, user_id)
# Step 3: Generate final response with tool results
final_response = self._generate_final_response(
user_message, response.choices[0].message, tool_results, trace_id, user_id
)
# Update trace status to completed
self._update_trace_status(trace_id, user_id, "completed", "success")
return final_response
else:
# Direct response without tools
self._update_trace_status(trace_id, user_id, "completed", "success")
return response.choices[0].message.content
except Exception as e:
# Update trace status to failed
self._update_trace_status(trace_id, user_id, "failed", f"error: {str(e)}")
return f"I encountered an error: {str(e)}"
def _process_query(self, user_message: str, trace_id: str, user_id: str):
headers = self.base_headers.copy()
headers.update({
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "conversational-agent",
"adaline-trace-status": "in_progress",
"adaline-span-name": "query-processing",
"adaline-span-tags": '["tool_selection", "intent_analysis"]',
"adaline-span-attributes": f'{{"user_id": "{user_id}", "message_length": {len(user_message)}, "tools_available": {len(self.tools)}}}'
})
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant with access to tools. Use them when appropriate."},
{"role": "user", "content": user_message}
],
tools=self.tools,
tool_choice="auto",
extra_headers=headers
)
return response
def _execute_tools(self, tool_calls, trace_id: str, user_id: str) -> list:
results = []
for i, tool_call in enumerate(tool_calls):
headers = self.base_headers.copy()
headers.update({
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "conversational-agent",
"adaline-span-name": f"tool-execution-{i+1}",
"adaline-span-tags": f'["tool_call", "{tool_call.function.name}"]',
"adaline-span-attributes": f'{{"tool_name": "{tool_call.function.name}", "tool_call_id": "{tool_call.id}", "execution_order": {i+1}}}'
})
# Simulate tool execution
if tool_call.function.name == "get_weather":
args = json.loads(tool_call.function.arguments)
result = f"Weather in {args['location']}: 72°F, sunny"
elif tool_call.function.name == "calculate":
args = json.loads(tool_call.function.arguments)
try:
result = str(eval(args['expression'])) # Note: Use safe evaluation in production
except:
result = "Error in calculation"
else:
result = "Tool not implemented"
results.append({
"tool_call_id": tool_call.id,
"result": result
})
return results
def _generate_final_response(self, original_message: str, assistant_message, tool_results: list, trace_id: str, user_id: str) -> str:
headers = self.base_headers.copy()
headers.update({
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "conversational-agent",
"adaline-span-name": "response-generation",
"adaline-span-tags": '["final_response", "tool_integration"]',
"adaline-span-attributes": f'{{"tools_used": {len(tool_results)}, "response_type": "tool_assisted"}}'
})
# Build messages with tool results
messages = [
{"role": "system", "content": "You are a helpful assistant. Provide a natural response using the tool results."},
{"role": "user", "content": original_message},
assistant_message,
]
# Add tool results
for tool_result in tool_results:
messages.append({
"role": "tool",
"tool_call_id": tool_result["tool_call_id"],
"content": tool_result["result"]
})
response = self.client.chat.completions.create(
model="gpt-4",
messages=messages,
extra_headers=headers
)
return response.choices[0].message.content
def _update_trace_status(self, trace_id: str, user_id: str, status: str, details: str):
headers = self.base_headers.copy()
headers.update({
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "conversational-agent",
"adaline-trace-status": status,
"adaline-span-name": "status-update",
"adaline-span-tags": f'["status_update", "{status}"]',
"adaline-span-attributes": f'{{"final_status": "{status}", "details": "{details}"}}'
})
# Make a minimal API call to update trace status
self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Status update"}],
max_tokens=1,
extra_headers=headers
)
# Usage
agent = ConversationalAgent()
response = agent.chat("What's the weather in New York and what's 15 * 24?", user_id="user789")
print(response)
Best Practices for Complex Workflows
1. Consistent Trace Management
Use a single trace ID across all related operations:Copy
Ask AI
trace_id = str(uuid.uuid4())
# Use the same trace_id for all spans in the workflow
2. Meaningful Span Names
Use descriptive span names that indicate the operation:Copy
Ask AI
"adaline-span-name": "query-embedding"
"adaline-span-name": "vector-search"
"adaline-span-name": "response-generation"
3. Error Handling with Trace Status
Update trace status to reflect workflow state:Copy
Ask AI
"adaline-trace-status": "in_progress" # During execution
"adaline-trace-status": "completed" # On success
"adaline-trace-status": "failed" # On error