Documentation Index Fetch the complete documentation index at: https://www.adaline.ai/docs/llms.txt
Use this file to discover all available pages before exploring further.
Python SDK
The Python SDK provides a complete toolkit for integrating Adaline’s LLM deployment and observability features into your AI applications. The SDK is fully async and uses asyncio throughout.
Installation
pip install adaline-client adaline-api
Quick Start
import asyncio
from adaline.main import Adaline
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
async def main ():
adaline = Adaline()
deployment = await adaline.get_latest_deployment(
prompt_id = "your-prompt-id" ,
deployment_environment_id = "your-environment-id"
)
monitor = adaline.init_monitor(
project_id = "your-project-id" ,
flush_interval_seconds = 1 ,
max_buffer_size = 1000
)
trace = monitor.log_trace(
name = "Chat Completion" ,
session_id = "user-session-123" ,
tags = [ "production" ]
)
llm_span = trace.log_span(
name = "OpenAI GPT-4 Call" ,
prompt_id = deployment.prompt_id,
deployment_id = deployment.id
)
# ... make your LLM call here ...
llm_span.update({
"status" : "success" ,
"content" : LogSpanContent(
actual_instance = LogSpanModelContent(
type = "Model" ,
provider = deployment.prompt.config.provider_name,
model = deployment.prompt.config.model,
input = str (deployment.prompt.messages),
output = "LLM response here"
)
)
})
trace.update({ "status" : "success" })
trace.end()
await monitor.flush()
monitor.stop()
asyncio.run(main())
Type Definitions
The SDK uses types from the adaline_api package:
from adaline_api.models.deployment import Deployment
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_model_stream_content import LogSpanModelStreamContent
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
from adaline_api.models.log_span_function_content import LogSpanFunctionContent
from adaline_api.models.log_span_tool_content import LogSpanToolContent
from adaline_api.models.log_span_guardrail_content import LogSpanGuardrailContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
from adaline_api.models.log_span_other_content import LogSpanOtherContent
Resource management
Beyond deployments and monitoring, Adaline exposes seven namespace clients that cover the platform’s full REST surface. Each method is async, retries 5xx responses, aborts on 4xx, and uses keyword-only arguments:
import asyncio
from adaline.main import Adaline
from adaline_api.models.create_dataset_request import CreateDatasetRequest
from adaline_api.models.add_dataset_rows_request import AddDatasetRowsRequest
from adaline_api.models.create_evaluation_request import CreateEvaluationRequest
async def main ():
adaline = Adaline()
# Projects and prompts
projects = await adaline.projects.list()
prompts = await adaline.prompts.list( project_id = "project_abc123" )
# Datasets: CRUD + columns + rows + dynamic resolution
dataset = await adaline.datasets.create(
dataset = CreateDatasetRequest( project_id = "project_abc123" , title = "Eval set" ),
)
await adaline.datasets.rows.create(
dataset_id = dataset.id,
values_by = "columnName" ,
rows = [{ "values" : { "question" : "How do I reset my password?" }}],
)
# Evaluations: kick off a run, then poll for results
evaluation = await adaline.prompts.evaluations.create(
prompt_id = "prompt_abc123" ,
evaluation = CreateEvaluationRequest(
dataset_id = dataset.id,
evaluator_ids = [ "evaluator_abc123" ],
deployment_id = "deploy_abc123" ,
),
)
results = await adaline.init_evaluation_results(
prompt_id = "prompt_abc123" ,
evaluation_id = evaluation.id,
expand = "row" ,
refresh_interval = 30 ,
)
page = await results.get()
asyncio.run(main())
See the per-client pages for full method lists:
Need the same retry behavior on a raw call? Use the exported with_retry helper:
from adaline.clients import with_retry
response = await with_retry(
lambda : adaline.deployments_api.get_deployment( "prompt_abc123" , "deploy_xyz789" )
)
Error Handling
The SDK uses automatic retry logic with exponential backoff for API calls:
5xx errors : Automatically retried with exponential backoff (1s, 2s, 4s, … capped at 10s) within a 20s budget
4xx errors : Fail immediately (no retry)
Failed flush entries are dropped and counted via monitor.dropped_count. Successfully sent entries are tracked via monitor.sent_count.
Real-World Example: RAG Pipeline
import asyncio
from adaline.main import Adaline
from adaline_api.models.log_span_content import LogSpanContent
from adaline_api.models.log_span_model_content import LogSpanModelContent
from adaline_api.models.log_span_embeddings_content import LogSpanEmbeddingsContent
from adaline_api.models.log_span_retrieval_content import LogSpanRetrievalContent
import openai
import json
async def answer_question (
adaline : Adaline,
monitor ,
session_id : str ,
question : str
):
trace = monitor.log_trace(
name = "RAG Query" ,
session_id = session_id,
tags = [ "rag" , "qa" ]
)
try :
# Step 1: Generate embedding
embed_span = trace.log_span(
name = "Generate Query Embedding" ,
tags = [ "embedding" ]
)
client = openai.AsyncOpenAI()
embedding_response = await client.embeddings.create(
model = "text-embedding-3-large" ,
input = question
)
embed_span.update({
"status" : "success" ,
"content" : LogSpanContent(
actual_instance = LogSpanEmbeddingsContent(
type = "Embeddings" ,
input = json.dumps({ "model" : "text-embedding-3-large" , "input" : question}),
output = json.dumps({ "dimensions" : len (embedding_response.data[ 0 ].embedding)})
)
)
})
embed_span.end()
# Step 2: Retrieve documents
retrieval_span = trace.log_span(
name = "Vector Search" ,
tags = [ "retrieval" ]
)
# ... perform vector search ...
search_results = { "ids" : [ "doc1" , "doc2" ], "scores" : [ 0.95 , 0.87 ]}
retrieval_span.update({
"status" : "success" ,
"content" : LogSpanContent(
actual_instance = LogSpanRetrievalContent(
type = "Retrieval" ,
input = json.dumps({ "query" : question, "top_k" : 5 }),
output = json.dumps(search_results)
)
)
})
retrieval_span.end()
# Step 3: Get LLM deployment and generate answer
deployment = await adaline.get_latest_deployment(
prompt_id = "rag-answer-prompt" ,
deployment_environment_id = "environment_abc123"
)
llm_span = trace.log_span(
name = "Generate Answer" ,
prompt_id = deployment.prompt_id,
deployment_id = deployment.id,
run_evaluation = True ,
tags = [ "llm" ]
)
messages = [
* [{ "role" : m.role, "content" : m.content} for m in deployment.prompt.messages],
{ "role" : "user" , "content" : question}
]
completion = await client.chat.completions.create(
model = deployment.prompt.config.model,
messages = messages
)
llm_span.update({
"status" : "success" ,
"content" : LogSpanContent(
actual_instance = LogSpanModelContent(
type = "Model" ,
provider = deployment.prompt.config.provider_name,
model = deployment.prompt.config.model,
input = json.dumps(messages),
output = json.dumps(completion.choices[ 0 ].message.model_dump())
)
)
})
llm_span.end()
trace.update({ "status" : "success" })
return completion.choices[ 0 ].message.content
except Exception as e:
trace.update({ "status" : "failure" , "attributes" : { "error" : str (e)}})
raise
finally :
trace.end()
async def main ():
adaline = Adaline()
monitor = adaline.init_monitor( project_id = "rag-system" )
answer = await answer_question(
adaline, monitor,
session_id = "session-1" ,
question = "How does authentication work?"
)
print (answer)
await monitor.flush()
monitor.stop()
asyncio.run(main())
API Reference
Adaline Class Core client for deployments and monitoring.
Monitor Class Buffered log submission with automatic flushing.
Trace Class High-level operation tracking.
Span Class Granular operation tracking with content types.