Python SDK
The Python SDK provides a complete toolkit for integrating Adaline’s LLM deployment and observability features into your AI applications. The SDK is fully async-native, built onasyncio.
Installation
Copy
Ask AI
pip install adaline-client adaline-api
Quick Start
Copy
Ask AI
import asyncio
from adaline.main import Adaline
async def main():
# Initialize client (API key from ADALINE_API_KEY env var)
adaline = Adaline()
# Get latest deployment
deployment = await adaline.get_latest_deployment(
prompt_id="your-prompt-id",
deployment_environment_id="your-deployment-environment-id"
)
# Initialize monitoring
monitor = adaline.init_monitor(
project_id="your-project-id",
flush_interval_seconds=5,
max_buffer_size=100
)
# Create trace and spans
trace = monitor.log_trace(
name="User Request",
session_id="session-123"
)
span = trace.log_span(
name="LLM Call",
prompt_id=deployment.prompt_id,
deployment_id=deployment.id
)
# Update the span with the response
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
import json
span.update({
"status": "success",
"content": LogSpanContent(
actual_instance=LogSpanModelContent(
type="Model",
provider=deployment.prompt.config.provider_name,
model=deployment.prompt.config.model,
input=json.dumps(str(deployment.prompt.messages)),
output=json.dumps(str(response))
)
)
})
span.end()
trace.end()
await monitor.flush()
monitor.stop()
asyncio.run(main())
Type Definitions
The SDK uses types from theadaline-api package:
Copy
Ask AI
# API types (deployment, messages, content, logging)
from adaline_api.models.deployment import Deployment
from adaline_api.models.deployment_prompt import DeploymentPrompt
from adaline_api.models.message import Message
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
from adaline_api.models.log_span_other_content import LogSpanOtherContent
# SDK classes
from adaline.main import Adaline, Controller
from adaline.logs import Monitor, Trace, Span
Error Handling
The SDK uses automatic retry logic with exponential backoff via thetenacity library:
- 5xx errors: Automatically retried with exponential backoff (1s, 2s, 4s, … capped at 10s, 20s total budget)
- 4xx errors: Fail immediately (no retry)
- Network errors: Retried with exponential backoff
monitor.dropped_count. Following the OpenTelemetry error handling principle, telemetry failures never propagate to your application.
Async-Native Design
All deployment methods areasync and must be awaited:
Copy
Ask AI
import asyncio
from adaline.main import Adaline
async def main():
adaline = Adaline()
# All deployment methods are async
deployment = await adaline.get_deployment(
prompt_id="...",
deployment_id="..."
)
latest = await adaline.get_latest_deployment(
prompt_id="...",
deployment_environment_id="..."
)
controller = await adaline.init_latest_deployment(
prompt_id="...",
deployment_environment_id="..."
)
cached = await controller.get()
await controller.stop() # stop() is also async
# Monitor.flush() is async
monitor = adaline.init_monitor(project_id="...")
await monitor.flush()
monitor.stop() # stop() is synchronous on Monitor
asyncio.run(main())
Real-World Examples
Example 1: RAG Pipeline
Copy
Ask AI
import asyncio
import json
from openai import AsyncOpenAI
from adaline.main import Adaline
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
adaline = Adaline()
openai = AsyncOpenAI()
async def answer_question(session_id: str, question: str):
monitor = adaline.init_monitor(project_id="rag-system")
trace = monitor.log_trace(
name="RAG Query",
session_id=session_id,
tags=["rag", "qa"]
)
try:
# Step 1: Generate embedding
embed_span = trace.log_span(
name="Generate Query Embedding",
tags=["embedding"]
)
embedding_response = await openai.embeddings.create(
model="text-embedding-3-large",
input=question
)
embedding = embedding_response.data[0].embedding
embed_span.update({
"status": "success",
"content": LogSpanContent(
actual_instance=LogSpanEmbeddingsContent(
type="Embeddings",
input=json.dumps({"query": question}),
output=json.dumps({"dimensions": len(embedding)})
)
)
})
embed_span.end()
# Step 2: Retrieve documents
retrieval_span = trace.log_span(
name="Vector Search",
tags=["retrieval"]
)
results = await vector_db.query(embedding=embedding, top_k=5)
retrieval_span.update({
"status": "success",
"content": LogSpanContent(
actual_instance=LogSpanRetrievalContent(
type="Retrieval",
input=json.dumps({"query": question, "top_k": 5}),
output=json.dumps({"document_ids": results.ids})
)
)
})
retrieval_span.end()
# Step 3: Generate answer with deployment
deployment = await adaline.get_latest_deployment(
prompt_id="rag-answer-prompt",
deployment_environment_id="environment_abc123"
)
llm_span = trace.log_span(
name="Generate Answer",
prompt_id=deployment.prompt_id,
deployment_id=deployment.id,
run_evaluation=True,
tags=["llm", "answer"]
)
context = "\n\n".join(results.documents)
completion = await openai.chat.completions.create(
model=deployment.prompt.config.model,
messages=[
*deployment.prompt.messages,
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"}
],
**deployment.prompt.config.settings
)
answer = completion.choices[0].message.content
llm_span.update({
"status": "success",
"content": LogSpanContent(
actual_instance=LogSpanModelContent(
type="Model",
provider=deployment.prompt.config.provider_name,
model=deployment.prompt.config.model,
input=json.dumps(str(deployment.prompt.messages)),
output=json.dumps(completion.choices[0].message.model_dump())
)
),
"attributes": {"context_docs": len(results.ids)}
})
llm_span.end()
trace.update({"status": "success"})
return answer
except Exception as error:
trace.update({"status": "failure", "attributes": {"error": str(error)}})
raise
finally:
trace.end()
await monitor.flush()
monitor.stop()
Example 2: Multi-Step Agent
Copy
Ask AI
import asyncio
import json
from adaline.main import Adaline
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
from adaline_api.models.log_span_other_content import LogSpanOtherContent
async def run_agent():
adaline = Adaline(debug=True)
controller = await adaline.init_latest_deployment(
prompt_id="agent-prompt",
deployment_environment_id="environment_abc123",
refresh_interval=60
)
monitor = adaline.init_monitor(
project_id="agent-project",
flush_interval_seconds=5
)
queries = [
"What are the latest AI trends?",
"Compare Python and Rust for ML workloads"
]
session_id = "demo-session"
for query in queries:
deployment = await controller.get()
trace = monitor.log_trace(
name="Agent Query",
session_id=session_id,
tags=["agent", "research"],
attributes={"query_length": len(query)}
)
# Intent classification
intent_span = trace.log_span(
name="Intent Classification",
tags=["model", "classification"]
)
intent_span.update({
"status": "success",
"content": LogSpanContent(
actual_instance=LogSpanModelContent(
type="Model",
provider="openai",
model="gpt-4o-mini",
input=json.dumps({"query": query}),
output=json.dumps({"intent": "research"})
)
)
})
intent_span.end()
# Knowledge retrieval
retrieval_span = trace.log_span(
name="Knowledge Retrieval",
tags=["retrieval"]
)
retrieval_span.update({
"status": "success",
"content": LogSpanContent(
actual_instance=LogSpanRetrievalContent(
type="Retrieval",
input=json.dumps({"query": query}),
output=json.dumps({"chunks": ["result1", "result2"]})
)
)
})
retrieval_span.end()
# Response generation
response_span = trace.log_span(
name="Response Generation",
prompt_id=deployment.prompt_id,
deployment_id=deployment.id,
run_evaluation=True,
tags=["model", "generation"]
)
response_span.update({
"status": "success",
"content": LogSpanContent(
actual_instance=LogSpanModelContent(
type="Model",
provider=deployment.prompt.config.provider_name,
model=deployment.prompt.config.model,
input=json.dumps({"query": query, "context": "..."}),
output=json.dumps({"response": "Generated answer..."})
)
)
})
response_span.end()
trace.update({"status": "success"})
trace.end()
await monitor.flush()
monitor.stop()
await controller.stop()
asyncio.run(run_agent())