Skip to content

Session Management Flow

This document explains the session management flows used by the SessionManager and CacheManager to handle conversation history, execution results, and session lifecycle across the Aurite framework.

Overview

The session management system provides persistent storage and retrieval of execution sessions through a two-tier architecture: the SessionManager provides high-level session operations while the CacheManager handles low-level file-based storage with in-memory caching. Sessions support both agent conversations and workflow executions with comprehensive metadata tracking.

Core Session Operations

The session management system supports four primary operations with different flows based on the operation type and session characteristics.

Objective: Create new sessions with proper metadata initialization and storage backend setup.

flowchart TD
    A[Session Creation Request] --> B[Session ID Generation]
    B --> C[Metadata Extraction]
    C --> D[Session Data Assembly]
    D --> E[CacheManager Storage]
    E --> F[In-Memory Cache Update]
    F --> G[File Persistence]
    G --> H[Session Created]

    style A fill:#2196F3,stroke:#1976D2,stroke-width:2px,color:#fff
    style H fill:#4CAF50,stroke:#388E3C,stroke-width:2px,color:#fff
    style E fill:#FF9800,stroke:#F57C00,stroke-width:2px,color:#fff
    style G fill:#9C27B0,stroke:#7B1FA2,stroke-width:2px,color:#fff

Phase 1: Session Data Assembly

# SessionManager._save_result
now = datetime.utcnow().isoformat()
existing_data = self._cache.get_result(session_id) or {}

metadata = self._extract_metadata(execution_result)
session_data = {
    "session_id": session_id,
    "base_session_id": base_session_id,
    "execution_result": execution_result,
    "result_type": result_type,  # "agent" or "workflow"
    "created_at": existing_data.get("created_at", now),
    "last_updated": now,
    **metadata,  # name, message_count, agents_involved
}

Phase 2: Metadata Extraction

# Different extraction logic based on result type
if is_workflow:
    name = execution_result.get("workflow_name")
    # Extract message count from all step results
    for step in execution_result.get("step_results", []):
        if "conversation_history" in step_result:
            message_count += len(step_result.get("conversation_history", []))

    # Track agents involved in workflow
    agent_session_id = step_result.get("session_id")
    agent_name_in_step = step_result.get("agent_name")
    if agent_session_id and agent_name_in_step:
        agents_involved[agent_session_id] = agent_name_in_step
else:  # Agent result
    name = execution_result.get("agent_name")
    message_count = len(execution_result.get("conversation_history", []))

Phase 3: Storage Persistence

# CacheManager.save_result
# Update in-memory cache first
self._result_cache[session_id] = session_data

# Persist to disk with error handling
session_file = self._get_session_file(session_id)
with open(session_file, "w") as f:
    json.dump(session_data, f, indent=2)

Session File Structure:

{
  "session_id": "agent-a1b2c3d4",
  "base_session_id": "agent-a1b2c3d4",
  "execution_result": {
    "status": "success",
    "conversation_history": [...],
    "agent_name": "weather_agent"
  },
  "result_type": "agent",
  "created_at": "2025-01-09T19:08:48.959750",
  "last_updated": "2025-01-09T19:08:52.329089",
  "name": "weather_agent",
  "message_count": 4
}

Objective: Retrieve session data with support for partial ID matching and metadata validation.

flowchart TD
    A[Session Retrieval Request] --> B[Direct ID Lookup]
    B --> C{Session Found?}
    C -->|Yes| D[Return Session Data]
    C -->|No| E[Base Session ID Search]
    E --> F{Matches Found?}
    F -->|None| G[Return Not Found]
    F -->|One| H[Return Matched Session]
    F -->|Multiple| I[Primary Session Resolution]
    I --> J{Primary Found?}
    J -->|Yes| H
    J -->|No| K[Return Ambiguous Error]

    style A fill:#2196F3,stroke:#1976D2,stroke-width:2px,color:#fff
    style D fill:#4CAF50,stroke:#388E3C,stroke-width:2px,color:#fff
    style H fill:#4CAF50,stroke:#388E3C,stroke-width:2px,color:#fff
    style G fill:#F44336,stroke:#D32F2F,stroke-width:2px,color:#fff
    style K fill:#F44336,stroke:#D32F2F,stroke-width:2px,color:#fff

Phase 1: Direct Lookup

# SessionManager.get_full_session_details
execution_result = self.get_session_result(session_id)
metadata_model = self.get_session_metadata(session_id)

if execution_result is None:
    # Proceed to base session ID search
    all_sessions_result = self.get_sessions_list(limit=10000, offset=0)
    matching_sessions = [
        s for s in all_sessions_result["sessions"]
        if s.base_session_id and s.base_session_id == session_id
    ]

Phase 2: Cache Lookup with Disk Fallback

# CacheManager.get_result
# Check memory cache first
if session_id in self._result_cache:
    return self._result_cache[session_id]

# Try to load from disk if not in memory
session_file = self._get_session_file(session_id)
if session_file.exists():
    with open(session_file, "r") as f:
        data = json.load(f)
    self._result_cache[session_id] = data
    return data

Phase 3: Primary Session Resolution

# Handle multiple matches by finding primary session
if len(matching_sessions) > 1:
    # Primary session doesn't have suffix like -0, -1
    primary_match = [
        s for s in matching_sessions
        if not (s.session_id[-2] == "-" and s.session_id[-1].isdigit())
    ]
    if len(primary_match) == 1:
        matched_session_id = primary_match[0].session_id
    else:
        # Ambiguous case - return error
        session_ids = [s.session_id for s in matching_sessions]
        raise HTTPException(
            status_code=400,
            detail=f"Ambiguous partial ID '{session_id}'. Multiple sessions found: {session_ids[:5]}"
        )

Objective: Update existing sessions with new conversation messages or execution results while preserving metadata.

flowchart TD
    A[Session Update Request] --> B[Load Existing Session]
    B --> C[Update Session Data]
    C --> D[Refresh Metadata]
    D --> E[Update Timestamps]
    E --> F[Cache Update]
    F --> G[File Persistence]
    G --> H[Session Updated]

    style A fill:#2196F3,stroke:#1976D2,stroke-width:2px,color:#fff
    style H fill:#4CAF50,stroke:#388E3C,stroke-width:2px,color:#fff
    style C fill:#FF9800,stroke:#F57C00,stroke-width:2px,color:#fff
    style D fill:#FF9800,stroke:#F57C00,stroke-width:2px,color:#fff

Phase 1: Message Addition (Streaming)

# SessionManager.add_message_to_history
existing_history = self.get_session_history(session_id) or []
updated_history = existing_history + [message]
self.save_conversation_history(session_id, updated_history, agent_name)

Phase 2: Complete Result Update

# SessionManager.save_agent_result / save_workflow_result
def _save_result(self, session_id: str, execution_result: Dict[str, Any],
                 result_type: str, base_session_id: Optional[str] = None):
    now = datetime.utcnow().isoformat()
    existing_data = self._cache.get_result(session_id) or {}

    # Preserve creation timestamp, update last_updated
    session_data = {
        "session_id": session_id,
        "base_session_id": base_session_id,
        "execution_result": execution_result,
        "result_type": result_type,
        "created_at": existing_data.get("created_at", now),  # Preserve original
        "last_updated": now,  # Always update
        **self._extract_metadata(execution_result),
    }
    self._cache.save_result(session_id, session_data)

Phase 3: Metadata Refresh

# Automatic metadata extraction on update
def _extract_metadata(self, execution_result: Dict[str, Any]) -> Dict[str, Any]:
    message_count = 0
    name = None
    agents_involved: Dict[str, str] = {}
    is_workflow = "step_results" in execution_result

    # Extract current metadata from execution result
    if is_workflow:
        name = execution_result.get("workflow_name")
        # Recalculate message count from all steps
        for step in execution_result.get("step_results", []):
            if "conversation_history" in step_result:
                message_count += len(step_result.get("conversation_history", []))
    else:
        name = execution_result.get("agent_name")
        message_count = len(execution_result.get("conversation_history", []))

    return {"name": name, "message_count": message_count, "agents_involved": agents_involved}

Objective: Delete sessions with cascading cleanup for workflow hierarchies and parent-child relationships.

flowchart TD
    A[Session Deletion Request] --> B[Load Session Metadata]
    B --> C{Session Type?}
    C -->|Workflow| D[Find Child Agent Sessions]
    C -->|Child Agent| E[Update Parent Workflow]
    C -->|Standalone Agent| F[Direct Deletion]
    D --> G[Delete Child Sessions]
    G --> F
    E --> H[Remove from Parent Agents List]
    H --> F
    F --> I[Delete from Cache]
    I --> J[Delete File]
    J --> K[Session Deleted]

    style A fill:#2196F3,stroke:#1976D2,stroke-width:2px,color:#fff
    style K fill:#4CAF50,stroke:#388E3C,stroke-width:2px,color:#fff
    style D fill:#FF9800,stroke:#F57C00,stroke-width:2px,color:#fff
    style E fill:#FF9800,stroke:#F57C00,stroke-width:2px,color:#fff
    style G fill:#9C27B0,stroke:#7B1FA2,stroke-width:2px,color:#fff

Phase 1: Workflow Cascade Deletion

# SessionManager.delete_session
session_to_delete = self.get_session_metadata(session_id)

# Case 1: Deleting a workflow - cascade to child agents
if session_to_delete.is_workflow:
    all_sessions = self.get_sessions_list(limit=10000)["sessions"]
    child_agent_sessions = [
        s for s in all_sessions
        if not s.is_workflow
        and s.base_session_id == session_to_delete.base_session_id
        and s.session_id != session_to_delete.session_id
    ]
    for child in child_agent_sessions:
        self._cache.delete_session(child.session_id)
        logger.info(f"Cascading delete: removed child agent session '{child.session_id}'")

Phase 2: Parent Workflow Update

# Case 2: Deleting a child agent - update parent workflow
elif session_to_delete.base_session_id and session_to_delete.base_session_id != session_id:
    all_sessions = self.get_sessions_list(limit=10000)["sessions"]
    parent_workflows = [
        s for s in all_sessions
        if s.is_workflow and s.base_session_id == session_to_delete.base_session_id
    ]
    for parent in parent_workflows:
        parent_data = self._cache.get_result(parent.session_id)
        if parent_data and parent_data.get("agents_involved") and session_id in parent_data["agents_involved"]:
            del parent_data["agents_involved"][session_id]
            self._cache.save_result(parent.session_id, parent_data)

Phase 3: Physical Deletion

# CacheManager.delete_session
# Remove from memory cache
session_exists_in_mem = self._result_cache.pop(session_id, None) is not None

# Remove from disk
session_file = self._get_session_file(session_id)
session_exists_on_disk = session_file.exists()
if session_exists_on_disk:
    session_file.unlink()

return session_exists_in_mem or session_exists_on_disk

Session Lifecycle Management

Session ID Generation Patterns

The session management system supports different ID generation patterns based on execution context:

Agent Sessions:

  • Standalone: agent-{uuid4().hex[:8]} (e.g., agent-a1b2c3d4)
  • User Provided: Prefixed with agent- if not already prefixed
  • Workflow Context: Uses workflow's base session ID without additional prefixing

Workflow Sessions:

  • Standalone: workflow-{uuid4().hex[:8]} (e.g., workflow-x9y8z7w6)
  • User Provided: Prefixed with workflow- if not already prefixed
  • Base Session Tracking: Original session ID preserved for step coordination

Session Metadata Validation

Pydantic Model Validation:

# SessionManager._validate_and_transform_metadata
def _validate_and_transform_metadata(self, session_data: Dict[str, Any]) -> SessionMetadata:
    session_id = session_data.get("session_id", "N/A")
    result_type = session_data.get("result_type")
    is_workflow = result_type == "workflow"

    # Extract name with fallback handling
    name = session_data.get("name")
    if not name:
        type_str = "Workflow" if is_workflow else "Agent"
        logger.warning(f"{type_str} session '{session_id}' is missing a name. Using placeholder.")
        name = f"Untitled {type_str} ({session_id[:8]})"

    return SessionMetadata(
        session_id=session_id,
        name=name,
        created_at=session_data.get("created_at"),
        last_updated=session_data.get("last_updated"),
        message_count=session_data.get("message_count"),
        is_workflow=is_workflow,
        agents_involved=session_data.get("agents_involved"),
        base_session_id=session_data.get("base_session_id"),
    )

Backwards Compatibility:

# Handle legacy sessions without message_count
if "message_count" not in session_data:
    metadata = self._extract_metadata(session_data.get("execution_result", {}))
    session_data.update(metadata)

Session Filtering and Pagination

Query Processing:

# SessionManager.get_sessions_list
def get_sessions_list(self, agent_name: Optional[str] = None,
                     workflow_name: Optional[str] = None,
                     limit: int = 50, offset: int = 0) -> Dict[str, Any]:
    # Load and validate all sessions
    all_validated_sessions: List[SessionMetadata] = []
    cache_dir = self._cache.get_cache_dir()

    for session_file in cache_dir.glob("*.json"):
        try:
            with open(session_file, "r") as f:
                session_data = json.load(f)

            # Ensure backwards compatibility
            if "message_count" not in session_data:
                metadata = self._extract_metadata(session_data.get("execution_result", {}))
                session_data.update(metadata)

            model = self._validate_and_transform_metadata(session_data)
            all_validated_sessions.append(model)
        except (json.JSONDecodeError, ValidationError) as e:
            logger.warning(f"Skipping invalid session file: {session_file}")

Filtering Logic:

# Apply filtering based on request parameters
filtered_sessions: List[SessionMetadata] = []
if workflow_name:
    # Only return parent workflow sessions
    filtered_sessions = [s for s in all_validated_sessions
                        if s.is_workflow and s.name == workflow_name]
elif agent_name:
    # Only return direct agent runs (not workflow steps)
    filtered_sessions = [s for s in all_validated_sessions
                        if not s.is_workflow and s.name == agent_name]
else:
    filtered_sessions = all_validated_sessions

# Sort by last_updated descending and apply pagination
filtered_sessions.sort(key=lambda x: x.last_updated or "", reverse=True)
total = len(filtered_sessions)
paginated_sessions = filtered_sessions[offset:offset + limit]

Storage Architecture

Two-Tier Storage Design

SessionManager (High-Level):

  • Provides business logic for session operations
  • Handles metadata extraction and validation
  • Manages session relationships and cascading operations
  • Implements filtering, pagination, and search functionality

CacheManager (Low-Level):

  • Handles file I/O operations with error handling
  • Maintains in-memory cache for performance
  • Manages session file naming and sanitization
  • Provides atomic read/write operations

File System Organization

Cache Directory Structure:

.aurite_cache/
├── agent-a1b2c3d4.json          # Agent session
├── workflow-x9y8z7w6.json       # Workflow session
├── workflow-x9y8z7w6-0.json     # Workflow step 0
├── workflow-x9y8z7w6-1.json     # Workflow step 1
└── ...

Session ID Sanitization:

# CacheManager._get_session_file
def _get_session_file(self, session_id: str) -> Path:
    # Sanitize session_id to prevent directory traversal
    safe_session_id = "".join(c for c in session_id if c.isalnum() or c in "-_")
    return self._cache_dir / f"{safe_session_id}.json"

Performance Optimizations

In-Memory Caching:

  • All sessions loaded into memory on startup
  • Memory cache updated immediately on write operations
  • Disk operations performed asynchronously when possible

Lazy Loading:

  • Sessions loaded from disk only when not in memory cache
  • Failed disk reads don't prevent memory cache operations
  • Graceful degradation on file system errors

Cleanup and Retention

Retention Policy Implementation

Age-Based Cleanup:

# SessionManager.cleanup_old_sessions
cutoff_date = datetime.utcnow() - timedelta(days=days)

for session in all_sessions:
    last_updated_str = session.last_updated
    if last_updated_str:
        last_updated = datetime.fromisoformat(last_updated_str.replace("Z", "+00:00")).replace(tzinfo=None)
        if last_updated < cutoff_date:
            sessions_to_delete.add(session.session_id)

Count-Based Cleanup:

# Keep only the most recent max_sessions
excess_count = len(sessions_kept) - max_sessions
if excess_count > 0:
    # Sessions already sorted oldest to newest
    for i in range(excess_count):
        sessions_to_delete.add(sessions_kept[i].session_id)

Cascading Cleanup:

  • Workflow deletion automatically removes child agent sessions
  • Parent workflow metadata updated when child agents are deleted
  • Orphaned sessions cleaned up during retention policy execution

Integration with AuriteEngine

Session Creation Integration

Agent Execution:

# AuriteEngine integration points
if agent_instance.config.include_history and final_session_id and self._session_manager:
    self._session_manager.save_agent_result(
        session_id=final_session_id,
        agent_result=run_result,
        base_session_id=final_base_session_id
    )

Workflow Execution:

# Workflow result persistence
if result.session_id and self._session_manager:
    self._session_manager.save_workflow_result(
        session_id=result.session_id,
        workflow_result=result,
        base_session_id=base_session_id
    )

History Loading Integration

Pre-Execution History Loading:

# AuriteEngine._prepare_agent_for_run
if effective_include_history and session_id and self._session_manager:
    history = self._session_manager.get_session_history(session_id)
    if history:
        initial_messages.extend(history)

# Immediate message addition for streaming
self._session_manager.add_message_to_history(
    session_id=session_id,
    message=current_user_message,
    agent_name=agent_name,
)

References

  • Implementation: src/aurite/lib/storage/sessions/session_manager.py - Main SessionManager implementation
  • Storage Backend: src/aurite/lib/storage/sessions/cache_manager.py - CacheManager file operations
  • Data Models: src/aurite/lib/models/api/responses.py - Session metadata and response models
  • API Integration: src/aurite/bin/api/routes/execution_routes.py - Session management endpoints
  • Engine Integration: AuriteEngine Execution Flow - Session integration patterns