Real-world examples from simple single-span workflows to complex multi-span applications
import os
import uuid
from openai import OpenAI
class RAGPipeline:
def __init__(self):
self.client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url="https://gateway.adaline.ai/v1/openai/"
)
self.base_headers = {
"adaline-api-key": os.getenv("ADALINE_API_KEY"),
"adaline-project-id": os.getenv("ADALINE_PROJECT_ID"),
"adaline-prompt-id": os.getenv("ADALINE_PROMPT_ID")
}
def query(self, user_question: str, user_id: str) -> str:
# Generate unique trace ID for this RAG workflow
trace_id = str(uuid.uuid4())
# Step 1: Generate embedding for user question
query_embedding = self._generate_embedding(
user_question, trace_id, user_id, "query-embedding"
)
# Step 2: Simulate vector search
relevant_docs = self._vector_search(
query_embedding, trace_id, user_id, "vector-search"
)
# Step 3: Generate final response using retrieved context
response = self._generate_response(
user_question, relevant_docs, trace_id, user_id, "response-generation"
)
return response
def _generate_embedding(self, text: str, trace_id: str, user_id: str, span_name: str) -> list:
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "pending",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "rag-pipeline",
"adaline-span-name": span_name,
"adaline-trace-tags": f'[{"operation": "create", "tag": "production-v1.3"}]',
"adaline-trace-attributes": f'[{{"operation": "create", "key": "user_id", "value": "{user_id}"}}]',
"adaline-span-tags": f'["production-v1.3"]'
})
response = self.client.embeddings.create(
model="text-embedding-3-small",
input=text,
extra_headers=headers
)
return response.data[0].embedding
def _vector_search(self, embedding: list, trace_id: str, user_id: str, span_name: str) -> str:
# Simulate vector search - in real implementation, query your vector database
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "pending",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "rag-pipeline",
"adaline-span-name": span_name,
"adaline-trace-attributes": f'[{{"operation": "create", "key": "user_id", "value": "{user_id}"}}]',
"adaline-span-tags": f'["production-v1.3"]'
})
# Mock relevant documents (replace with actual vector search)
return "Relevant context: AI models are trained on large datasets and use neural networks..."
def _generate_response(self, question: str, context: str, trace_id: str, user_id: str, span_name: str) -> str:
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "success",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "rag-pipeline",
"adaline-span-name": span_name,
"adaline-trace-attributes": f'[{{"operation": "create", "key": "user_id", "value": "{user_id}"}}]',
"adaline-span-tags": f'["production-v1.3"]',
"adaline-span-variables": f'{{"user_id": {{"modality": "text", "value": "{user_id}"}}}}'
})
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"Answer the question using this context: {context}"},
{"role": "user", "content": question}
],
extra_headers=headers
)
return response.choices[0].message.content
# Usage
rag = RAGPipeline()
answer = rag.query("How do neural networks learn?", user_id="user123")
print(answer)
import os
import uuid
from openai import OpenAI
from anthropic import Anthropic
class ContentGenerator:
def __init__(self):
self.openai_client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url="https://gateway.adaline.ai/v1/openai/"
)
self.anthropic_client = Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
base_url="https://gateway.adaline.ai/v1/anthropic/"
)
self.base_headers = {
"adaline-api-key": os.getenv("ADALINE_API_KEY"),
"adaline-project-id": os.getenv("ADALINE_PROJECT_ID"),
"adaline-prompt-id": os.getenv("ADALINE_PROMPT_ID")
}
def generate_article(self, topic: str, target_audience: str, user_id: str) -> dict:
trace_id = str(uuid.uuid4())
workflow_data = {
"topic": topic,
"target_audience": target_audience,
"user_id": user_id,
"workflow_id": f"content-gen-{trace_id[:8]}"
}
# Step 1: Research phase - multiple embedding calls for topic analysis
research_data = self._research_topic(topic, trace_id, workflow_data)
# Step 2: Content synthesis using research
content = self._synthesize_content(research_data, trace_id, workflow_data)
# Step 3: Review and enhancement
final_content = self._review_content(content, trace_id, workflow_data)
return {
"topic": topic,
"research": research_data,
"content": content,
"final_content": final_content,
"workflow_id": workflow_data["workflow_id"]
}
def _research_topic(self, topic: str, trace_id: str, workflow_data: dict) -> dict:
research_queries = [
f"What are the key concepts in {topic}?",
f"What are recent developments in {topic}?",
f"What challenges exist in {topic}?"
]
embeddings = []
for i, query in enumerate(research_queries):
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "pending",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "content-generation",
"adaline-trace-tags": f'["production-v1.3"]',
"adaline-trace-attributes": f'[{{"operation": "create", "key": "query_type", "value": "research"}, {{"operation": "create", "key": "query_index", "value": "{i+1}"}, {{"operation": "create", "key": "total_queries", "value": "{len(research_queries)}"}}]',
"adaline-span-name": f"research-embedding-{i+1}",
})
response = self.openai_client.embeddings.create(
model="text-embedding-3-small",
input=query,
extra_headers=headers
)
embeddings.append({
"query": query,
"embedding": response.data[0].embedding
})
return {"queries": research_queries, "embeddings": embeddings}
def _synthesize_content(self, research_data: dict, trace_id: str, workflow_data: dict) -> str:
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "pending",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "content-generation",
"adaline-span-name": "content-synthesis",
"adaline-trace-attributes": f'[{"operation": "create", "key": "phase", "value": "synthesis"}, {"operation": "create", "key": "research_queries_count", "value": "{len(research_data["queries"])}"}]',
"adaline-span-variables": f'{{"topic": {{"modality": "text", "value": "{workflow_data["topic"]}"}}, "target_audience": {{"modality": "text", "value": "{workflow_data["target_audience"]}"}}}}'
})
research_context = "\n".join([f"- {query}" for query in research_data["queries"]])
response = self.anthropic_client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=2000,
messages=[
{"role": "user", "content": f"Write a comprehensive article about {workflow_data['topic']} for {workflow_data['target_audience']}. Base it on these research questions:\n{research_context}"}
],
extra_headers=headers
)
return response.content[0].text
def _review_content(self, content: str, trace_id: str, workflow_data: dict) -> str:
headers = self.base_headers.copy()
headers.update({
"adaline-trace-status": "success",
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "content-generation",
"adaline-span-name": "content-review",
"adaline-trace-attributes": f'[{{"operation": "create", "key": "phase", "value": "review"}, {{"operation": "create", "key": "content_length", "value": "{len(content)}"}, {{"operation": "create", "key": "review_type", "value": "enhancement"}}]',
"adaline-span-variables": f'{{"content": {{"modality": "text", "value": "{content}"}}}}'
})
response = self.openai_client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "Review and enhance the content for clarity, engagement, and accuracy."},
{"role": "user", "content": content}
],
extra_headers=headers
)
return response.choices[0].message.content
# Usage
generator = ContentGenerator()
result = generator.generate_article(
"Machine Learning in Healthcare",
"healthcare professionals",
user_id="user456"
)
print(f"Generated article: {result['workflow_id']}")
import os
import uuid
import json
from openai import OpenAI
class ConversationalAgent:
def __init__(self):
self.client = OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url="https://gateway.adaline.ai/v1/openai/"
)
self.base_headers = {
"adaline-api-key": os.getenv("ADALINE_API_KEY"),
"adaline-project-id": os.getenv("ADALINE_PROJECT_ID"),
"adaline-prompt-id": os.getenv("ADALINE_PROMPT_ID")
}
self.tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "City name"}
},
"required": ["location"]
}
}
},
{
"type": "function",
"function": {
"name": "calculate",
"description": "Perform mathematical calculations",
"parameters": {
"type": "object",
"properties": {
"expression": {"type": "string", "description": "Math expression to evaluate"}
},
"required": ["expression"]
}
}
}
]
def chat(self, user_message: str, user_id: str) -> str:
trace_id = str(uuid.uuid4())
try:
# Step 1: Initial query processing with tool selection
response = self._process_query(user_message, trace_id, user_id)
# Step 2: Handle tool calls if needed
if response.choices[0].message.tool_calls:
tool_results = self._execute_tools(response.choices[0].message.tool_calls, trace_id, user_id)
# Step 3: Generate final response with tool results
final_response = self._generate_final_response(
user_message, response.choices[0].message, tool_results, trace_id, user_id
)
# Update trace status to completed
self._update_trace_status(trace_id, user_id, "completed", "success")
return final_response
else:
# Direct response without tools
self._update_trace_status(trace_id, user_id, "completed", "success")
return response.choices[0].message.content
except Exception as e:
# Update trace status to failed
self._update_trace_status(trace_id, user_id, "failed", f"error: {str(e)}")
return f"I encountered an error: {str(e)}"
def _process_query(self, user_message: str, trace_id: str, user_id: str):
headers = self.base_headers.copy()
headers.update({
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "conversational-agent",
"adaline-trace-status": "in_progress",
"adaline-span-name": "query-processing",
"adaline-span-tags": '["tool_selection", "intent_analysis"]',
"adaline-span-attributes": f'{{"user_id": "{user_id}", "message_length": {len(user_message)}, "tools_available": {len(self.tools)}}}'
})
response = self.client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a helpful assistant with access to tools. Use them when appropriate."},
{"role": "user", "content": user_message}
],
tools=self.tools,
tool_choice="auto",
extra_headers=headers
)
return response
def _execute_tools(self, tool_calls, trace_id: str, user_id: str) -> list:
results = []
for i, tool_call in enumerate(tool_calls):
headers = self.base_headers.copy()
headers.update({
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "conversational-agent",
"adaline-span-name": f"tool-execution-{i+1}",
"adaline-span-tags": f'["tool_call", "{tool_call.function.name}"]',
"adaline-span-attributes": f'{{"tool_name": "{tool_call.function.name}", "tool_call_id": "{tool_call.id}", "execution_order": {i+1}}}'
})
# Simulate tool execution
if tool_call.function.name == "get_weather":
args = json.loads(tool_call.function.arguments)
result = f"Weather in {args['location']}: 72°F, sunny"
elif tool_call.function.name == "calculate":
args = json.loads(tool_call.function.arguments)
try:
result = str(eval(args['expression'])) # Note: Use safe evaluation in production
except:
result = "Error in calculation"
else:
result = "Tool not implemented"
results.append({
"tool_call_id": tool_call.id,
"result": result
})
return results
def _generate_final_response(self, original_message: str, assistant_message, tool_results: list, trace_id: str, user_id: str) -> str:
headers = self.base_headers.copy()
headers.update({
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "conversational-agent",
"adaline-span-name": "response-generation",
"adaline-span-tags": '["final_response", "tool_integration"]',
"adaline-span-attributes": f'{{"tools_used": {len(tool_results)}, "response_type": "tool_assisted"}}'
})
# Build messages with tool results
messages = [
{"role": "system", "content": "You are a helpful assistant. Provide a natural response using the tool results."},
{"role": "user", "content": original_message},
assistant_message,
]
# Add tool results
for tool_result in tool_results:
messages.append({
"role": "tool",
"tool_call_id": tool_result["tool_call_id"],
"content": tool_result["result"]
})
response = self.client.chat.completions.create(
model="gpt-4",
messages=messages,
extra_headers=headers
)
return response.choices[0].message.content
def _update_trace_status(self, trace_id: str, user_id: str, status: str, details: str):
headers = self.base_headers.copy()
headers.update({
"adaline-trace-reference-id": trace_id,
"adaline-trace-name": "conversational-agent",
"adaline-trace-status": status,
"adaline-span-name": "status-update",
"adaline-span-tags": f'["status_update", "{status}"]',
"adaline-span-attributes": f'{{"final_status": "{status}", "details": "{details}"}}'
})
# Make a minimal API call to update trace status
self.client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Status update"}],
max_tokens=1,
extra_headers=headers
)
# Usage
agent = ConversationalAgent()
response = agent.chat("What's the weather in New York and what's 15 * 24?", user_id="user789")
print(response)
trace_id = str(uuid.uuid4())
# Use the same trace_id for all spans in the workflow
"adaline-span-name": "query-embedding"
"adaline-span-name": "vector-search"
"adaline-span-name": "response-generation"
"adaline-trace-status": "in_progress" # During execution
"adaline-trace-status": "completed" # On success
"adaline-trace-status": "failed" # On error
Was this page helpful?