Monitor Class
The Monitor class manages buffering, batching, and flushing of traces and spans to the Adaline API. It handles automatic retries, background flushing, and failure tracking.
Overview
The Monitor acts as a central coordinator for all observability operations:
Buffers traces and spans in memory
Batches multiple entries for efficient API calls
Flushes automatically using an async background loop
Retries failed requests with exponential backoff
Drops entries that fail after retries, following OpenTelemetry error handling principles
Creation
Create a Monitor using the Adaline.init_monitor() method:
from adaline.main import Adaline
adaline = Adaline()
monitor = adaline.init_monitor(
project_id = "your-project-id" ,
flush_interval_seconds = 5 ,
max_buffer_size = 100
)
Properties
buffer
In-memory list storing trace and span entries waiting to be flushed. Each entry is a dict with ready, data, and category keys.
Example:
print ( f "Buffer size: { len (monitor.buffer) } " )
for entry in monitor.buffer:
if entry[ "category" ] == "trace" :
print ( "Trace:" , entry[ "data" ].trace.trace.name)
else :
print ( "Span:" , entry[ "data" ].span.span.name)
sent_count
Number of entries successfully sent to the API.
dropped_count
Number of entries dropped due to buffer overflow or send failure.
Example:
print ( f "Sent: { monitor.sent_count } , Dropped: { monitor.dropped_count } " )
project_id
The project ID that all traces and spans are associated with.
default_content
default_content: LogSpanContent
Default span content used when no explicit content is provided. Defaults to LogSpanContent(actual_instance=LogSpanOtherContent(type="Other", input="{}", output="{}")).
Methods
log_trace()
Create a new trace and add it to the buffer.
log_trace(
* ,
name: str ,
status: str = "unknown" ,
session_id: Optional[ str ] = None ,
reference_id: Optional[ str ] = None ,
tags: Optional[List[ str ]] = None ,
attributes: Optional[Dict[ str , Any]] = None
) -> Trace
Parameters
Human-readable name for this trace (e.g., “User Login”, “Generate Report”).
Initial status: 'success' | 'failure' | 'aborted' | 'cancelled' | 'pending' | 'unknown'
Session identifier to group related traces.
Custom reference ID. Auto-generated UUID if not provided.
List of tags for categorization and filtering.
Key-value metadata for additional context. Values must be str, int, float, or bool.
Returns
Examples
Basic
With Session
Custom Reference ID
trace = monitor.log_trace( name = "API Request" )
trace.end()
flush()
Manually flush all ready entries in the buffer to the API. This is an async method.
This method is automatically called in a background loop (based on flush_interval_seconds). Manual calls are typically only needed during shutdown or testing.
Behavior
Skips if a flush is already in progress
Filters for entries marked as ready (via trace.end() or span.end())
Sends each entry to the API concurrently with automatic retry
Updates trace_id on traces after successful creation
Removes successfully flushed entries from buffer
Increments dropped_count for entries that fail after retries
Example
trace = monitor.log_trace( name = "Important Event" )
trace.end()
await monitor.flush()
stop()
Stop the background flush loop and cancel the flush task.
After calling stop(), the monitor will no longer automatically flush. You should call flush() before stop() to send remaining entries.
Example
await monitor.flush()
monitor.stop()
Complete Examples
Basic Usage
import asyncio
from adaline.main import Adaline
async def main ():
adaline = Adaline()
monitor = adaline.init_monitor(
project_id = "my-project" ,
flush_interval_seconds = 5 ,
max_buffer_size = 100
)
async def handle_request ( user_id : str ):
trace = monitor.log_trace(
name = "User Request" ,
session_id = user_id,
tags = [ "api" ]
)
span = trace.log_span(
name = "Process Data" ,
tags = [ "processing" ]
)
await process_data()
span.update({ "status" : "success" })
span.end()
trace.update({ "status" : "success" })
trace.end()
await handle_request( "user-123" )
await monitor.flush()
monitor.stop()
asyncio.run(main())
Production Setup with Health Monitoring
import asyncio
from adaline.main import Adaline
async def main ():
adaline = Adaline( debug = True )
monitor = adaline.init_monitor(
project_id = "production-app" ,
flush_interval_seconds = 5 ,
max_buffer_size = 200
)
# Periodic health check
async def health_check ():
while True :
print ( f "Monitor Health: buffer= { len (monitor.buffer) } , "
f "sent= { monitor.sent_count } , dropped= { monitor.dropped_count } " )
if monitor.dropped_count > 0 :
print ( "WARNING: Some telemetry entries were dropped" )
await asyncio.sleep( 60 )
health_task = asyncio.create_task(health_check())
try :
# ... your application logic ...
pass
finally :
health_task.cancel()
await monitor.flush()
monitor.stop()
asyncio.run(main())