Architecture

πŸ—οΈ System design and architectural patterns in PiSovereign

This document explains the architectural decisions, design patterns, and structure of PiSovereign.

Table of Contents


Overview

PiSovereign follows Clean Architecture (also known as Hexagonal Architecture or Ports & Adapters) to achieve:

  • Independence from frameworks - Business logic doesn’t depend on Axum, SQLite, or any external library
  • Testability - Core logic can be tested without infrastructure
  • Flexibility - Adapters can be swapped without changing business rules
  • Maintainability - Clear boundaries between concerns
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     External World                               β”‚
β”‚  (HTTP Clients, WhatsApp, Email Servers, AI Hardware)           β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
                              β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   Presentation Layer                             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚  β”‚ presentation_   β”‚          β”‚ presentation_   β”‚              β”‚
β”‚  β”‚     http        β”‚          β”‚     cli         β”‚              β”‚
β”‚  β”‚  (Axum API)     β”‚          β”‚  (Clap CLI)     β”‚              β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
                              β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                   Application Layer                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚  β”‚                    application                            β”‚   β”‚
β”‚  β”‚  (Services, Use Cases, Orchestration, Port Definitions)  β”‚   β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                              β”‚
              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
              β–Ό               β–Ό               β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Domain Layer   β”‚ β”‚  AI Layer    β”‚ β”‚   Infrastructure Layer   β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚   domain   β”‚  β”‚ β”‚ β”‚ ai_core  β”‚ β”‚ β”‚ β”‚    infrastructure    β”‚ β”‚
β”‚  β”‚ (Entities, β”‚  β”‚ β”‚ β”‚(Inferenceβ”‚ β”‚ β”‚ β”‚  (Adapters, Repos,   β”‚ β”‚
β”‚  β”‚  Values,   β”‚  β”‚ β”‚ β”‚ Engine)  β”‚ β”‚ β”‚ β”‚  Cache, DB, Vault)   β”‚ β”‚
β”‚  β”‚ Commands)  β”‚  β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚                          β”‚
β”‚                  β”‚ β”‚ β”‚ai_speech β”‚ β”‚ β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”‚
β”‚                  β”‚ β”‚ β”‚(STT/TTS) β”‚ β”‚ β”‚  β”‚  integration_*   β”‚   β”‚
β”‚                  β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”‚  β”‚ (WhatsApp, Mail, β”‚   β”‚
β”‚                  β”‚ β”‚              β”‚ β”‚  β”‚  Calendar, etc.) β”‚   β”‚
β”‚                  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Clean Architecture

Layer Responsibilities

LayerCratesResponsibility
DomaindomainCore business entities, value objects, commands, domain errors
ApplicationapplicationUse cases, service orchestration, port definitions
Infrastructureinfrastructure, integration_*Adapters for external systems (DB, cache, APIs)
AIai_core, ai_speechAI-specific logic (inference, speech processing)
Presentationpresentation_http, presentation_cliUser interfaces (REST API, CLI)

Dependency Rule

Inner layers NEVER depend on outer layers

domain          β†’ (no dependencies on other PiSovereign crates)
application     β†’ domain
ai_core         β†’ domain, application (ports)
ai_speech       β†’ domain, application (ports)
infrastructure  β†’ domain, application (ports)
integration_*   β†’ domain, application (ports)
presentation_*  β†’ domain, application, infrastructure, ai_*, integration_*

This means:

  • domain knows nothing about databases, HTTP, or external services
  • application defines what it needs via ports (traits), not how it’s done
  • Only presentation crates wire everything together

Crate Dependencies

Dependency Graph

graph TB
    subgraph "Presentation"
        HTTP[presentation_http]
        CLI[presentation_cli]
    end
    
    subgraph "Integration"
        WA[integration_whatsapp]
        PM[integration_email]
        CAL[integration_caldav]
        WX[integration_weather]
    end
    
    subgraph "Infrastructure"
        INFRA[infrastructure]
    end
    
    subgraph "AI"
        CORE[ai_core]
        SPEECH[ai_speech]
    end
    
    subgraph "Core"
        APP[application]
        DOM[domain]
    end
    
    HTTP --> APP
    HTTP --> INFRA
    HTTP --> CORE
    HTTP --> SPEECH
    HTTP --> WA
    HTTP --> PM
    HTTP --> CAL
    HTTP --> WX
    
    CLI --> APP
    CLI --> INFRA
    
    WA --> APP
    WA --> DOM
    
    PM --> APP
    PM --> DOM
    
    CAL --> APP
    CAL --> DOM
    
    WX --> APP
    WX --> DOM
    
    INFRA --> APP
    INFRA --> DOM
    
    CORE --> APP
    CORE --> DOM
    
    SPEECH --> APP
    SPEECH --> DOM
    
    APP --> DOM

Workspace Structure

PiSovereign/
β”œβ”€β”€ Cargo.toml              # Workspace manifest
β”œβ”€β”€ crates/
β”‚   β”œβ”€β”€ domain/             # Core business logic (no external deps)
β”‚   β”‚   β”œβ”€β”€ Cargo.toml
β”‚   β”‚   └── src/
β”‚   β”‚       β”œβ”€β”€ lib.rs
β”‚   β”‚       β”œβ”€β”€ entities/   # User, Conversation, Message, etc.
β”‚   β”‚       β”œβ”€β”€ values/     # UserId, MessageContent, etc.
β”‚   β”‚       β”œβ”€β”€ commands/   # UserCommand, SystemCommand
β”‚   β”‚       └── errors.rs   # Domain errors
β”‚   β”‚
β”‚   β”œβ”€β”€ application/        # Use cases and ports
β”‚   β”‚   β”œβ”€β”€ Cargo.toml
β”‚   β”‚   └── src/
β”‚   β”‚       β”œβ”€β”€ lib.rs
β”‚   β”‚       β”œβ”€β”€ services/   # ConversationService, CommandService, etc.
β”‚   β”‚       └── ports/      # Trait definitions (InferencePort, etc.)
β”‚   β”‚
β”‚   β”œβ”€β”€ infrastructure/     # Framework-dependent implementations
β”‚   β”‚   β”œβ”€β”€ Cargo.toml
β”‚   β”‚   └── src/
β”‚   β”‚       β”œβ”€β”€ lib.rs
β”‚   β”‚       β”œβ”€β”€ adapters/   # VaultSecretStore, etc.
β”‚   β”‚       β”œβ”€β”€ cache/      # MokaCache, RedbCache
β”‚   β”‚       β”œβ”€β”€ persistence/# SQLite repositories
β”‚   β”‚       └── telemetry/  # OpenTelemetry setup
β”‚   β”‚
β”‚   β”œβ”€β”€ ai_core/            # Inference engine
β”‚   β”‚   └── src/
β”‚   β”‚       β”œβ”€β”€ hailo/      # Hailo-Ollama client
β”‚   β”‚       └── selector/   # Model routing
β”‚   β”‚
β”‚   β”œβ”€β”€ ai_speech/          # Speech processing
β”‚   β”‚   └── src/
β”‚   β”‚       β”œβ”€β”€ providers/  # Hybrid, Local, OpenAI
β”‚   β”‚       └── converter/  # Audio format conversion
β”‚   β”‚
β”‚   β”œβ”€β”€ integration_*/      # External service adapters
β”‚   β”‚
β”‚   └── presentation_*/     # User interfaces

Port/Adapter Pattern

Ports (Interfaces)

Ports are traits defined in application/src/ports/ that describe what the application needs:

// application/src/ports/inference.rs
#[async_trait]
pub trait InferencePort: Send + Sync {
    async fn generate(
        &self,
        prompt: &str,
        options: InferenceOptions,
    ) -> Result<InferenceResponse, InferenceError>;
    
    async fn generate_stream(
        &self,
        prompt: &str,
        options: InferenceOptions,
    ) -> Result<impl Stream<Item = Result<String, InferenceError>>, InferenceError>;
    
    async fn health_check(&self) -> Result<bool, InferenceError>;
}
// application/src/ports/secret_store.rs
#[async_trait]
pub trait SecretStore: Send + Sync {
    async fn get_secret(&self, path: &str) -> Result<Option<String>, SecretError>;
    async fn health_check(&self) -> Result<bool, SecretError>;
}
// application/src/ports/memory_context.rs β€” RAG context injection
#[async_trait]
pub trait MemoryContextPort: Send + Sync {
    async fn retrieve_context(
        &self,
        user_id: &UserId,
        query: &str,
        limit: usize,
    ) -> Result<Vec<MemoryContext>, MemoryError>;
}
// application/src/ports/embedding.rs β€” Vector embeddings
#[async_trait]
pub trait EmbeddingPort: Send + Sync {
    async fn embed(&self, text: &str) -> Result<Vec<f32>, EmbeddingError>;
}
// application/src/ports/encryption.rs β€” Content encryption at rest
pub trait EncryptionPort: Send + Sync {
    fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, EncryptionError>;
    fn decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, EncryptionError>;
}

Adapters (Implementations)

Adapters implement ports and live in infrastructure/ or integration_*/:

// infrastructure/src/adapters/vault_secret_store.rs
pub struct VaultSecretStore {
    client: VaultClient,
    mount_path: String,
}

#[async_trait]
impl SecretStore for VaultSecretStore {
    async fn get_secret(&self, path: &str) -> Result<Option<String>, SecretError> {
        let full_path = format!("{}/{}", self.mount_path, path);
        self.client.read_secret(&full_path).await
    }
    
    async fn health_check(&self) -> Result<bool, SecretError> {
        self.client.health().await
    }
}
// infrastructure/src/adapters/env_secret_store.rs
pub struct EnvironmentSecretStore {
    prefix: Option<String>,
}

#[async_trait]
impl SecretStore for EnvironmentSecretStore {
    async fn get_secret(&self, path: &str) -> Result<Option<String>, SecretError> {
        // Convert "database/password" to "DATABASE_PASSWORD"
        let env_key = self.path_to_env_var(path);
        Ok(std::env::var(&env_key).ok())
    }
    
    async fn health_check(&self) -> Result<bool, SecretError> {
        Ok(true) // Environment is always available
    }
}

Example: Secret Store

The ChainedSecretStore demonstrates the adapter pattern:

// infrastructure/src/adapters/chained_secret_store.rs
pub struct ChainedSecretStore {
    stores: Vec<Box<dyn SecretStore>>,
}

impl ChainedSecretStore {
    pub fn new() -> Self {
        Self { stores: Vec::new() }
    }
    
    pub fn add_store(mut self, store: impl SecretStore + 'static) -> Self {
        self.stores.push(Box::new(store));
        self
    }
}

#[async_trait]
impl SecretStore for ChainedSecretStore {
    async fn get_secret(&self, path: &str) -> Result<Option<String>, SecretError> {
        for store in &self.stores {
            if let Ok(Some(secret)) = store.get_secret(path).await {
                return Ok(Some(secret));
            }
        }
        Ok(None)
    }
}

Usage in application:

// Wiring in presentation layer
let secret_store = ChainedSecretStore::new()
    .add_store(VaultSecretStore::new(vault_config)?)
    .add_store(EnvironmentSecretStore::new(Some("PISOVEREIGN")));

let command_service = CommandService::new(
    Arc::new(secret_store),  // Injected as trait object
    // ... other dependencies
);

Data Flow

Example: Intent Routing Pipeline

User input is routed through a multi-stage pipeline that minimizes LLM calls. Each stage acts as a progressively more expensive filter:

1. User Input: "Hey, it's Andreas. I'm naming you Macci."
   β”‚
   β–Ό
2. Conversational Filter (zero LLM cost)
   β”‚  Regex-based detection of greetings, introductions, small talk.
   β”‚  If matched β†’ skip to chat (no workflow/intent parsing).
   β”‚
   β–Ό (not conversational)
3. Quick Pattern Matching
   β”‚  Regex patterns for well-known commands (e.g., "remind me",
   β”‚  "search for", "send email"). Fast, deterministic.
   β”‚
   β–Ό (no quick match)
4. Guarded Workflow Detection
   β”‚  Only invoked when input has β‰₯8 words AND contains β‰₯2
   β”‚  workflow-hint keywords ("create", "plan", "distribute", etc.).
   β”‚  Uses LLM to detect multi-step workflows.
   β”‚
   β–Ό (not a workflow)
5. LLM Intent Parsing
   β”‚  Full LLM-based intent classification with confidence score.
   β”‚  Post-validated by keyword presence per intent category.
   β”‚  Intents below 0.7 confidence are downgraded to chat.
   β”‚
   β–Ό
6. Dispatch to appropriate handler or fall through to chat

Example: Chat Request

1. HTTP Request arrives at /v1/chat
   β”‚
   β–Ό
2. presentation_http extracts request, validates auth
   β”‚
   β–Ό
3. Calls ConversationService.send_message() [application layer]
   β”‚
   β–Ό
4. ConversationService:
   β”œβ”€β”€ Loads conversation from ConversationRepository [port]
   β”œβ”€β”€ Calls InferencePort.generate() [port]
   └── Saves message via ConversationRepository [port]
   β”‚
   β–Ό
5. InferencePort implementation (ai_core::HailoClient):
   β”œβ”€β”€ Sends request to Hailo-Ollama
   └── Returns response
   β”‚
   β–Ό
6. Response flows back through layers
   β”‚
   β–Ό
7. HTTP Response returned to client

Example: WhatsApp Voice Message

1. WhatsApp Webhook POST to /v1/webhooks/whatsapp
   β”‚
   β–Ό
2. integration_whatsapp validates signature, parses message
   β”‚
   β–Ό
3. VoiceMessageService.process() [application layer]
   β”‚
   β”œβ”€β”€ Download audio via WhatsAppPort
   β”œβ”€β”€ Convert format via AudioConverter [ai_speech]
   β”œβ”€β”€ Transcribe via SpeechPort (STT)
   β”œβ”€β”€ Process text via CommandService
   β”œβ”€β”€ (Optional) Synthesize via SpeechPort (TTS)
   └── Send response via WhatsAppPort
   β”‚
   β–Ό
4. Response sent back to user via WhatsApp

Key Design Decisions

1. Async-First

All I/O operations are async using Tokio:

#[async_trait]
pub trait InferencePort: Send + Sync {
    async fn generate(&self, ...) -> Result<..., ...>;
}

Rationale: Maximizes throughput on limited Raspberry Pi resources.

2. Error Handling via thiserror

Each layer defines its own error types:

// domain/src/errors.rs
#[derive(Debug, thiserror::Error)]
pub enum DomainError {
    #[error("Invalid message content: {0}")]
    InvalidContent(String),
}

// application/src/errors.rs
#[derive(Debug, thiserror::Error)]
pub enum ServiceError {
    #[error("Domain error: {0}")]
    Domain(#[from] DomainError),
    #[error("Inference failed: {0}")]
    Inference(String),
}

Rationale: Clear error boundaries, easy conversion between layers.

3. Feature Flags

Optional features reduce binary size:

# Cargo.toml
[features]
default = ["http"]
http = ["axum", "tower", ...]
cli = ["clap", ...]
speech = ["whisper", "piper", ...]

Rationale: Raspberry Pi has limited storage; include only what’s needed.

4. Configuration via config Crate

Layered configuration (defaults β†’ file β†’ env vars):

let config = Config::builder()
    .add_source(config::File::with_name("config"))
    .add_source(config::Environment::with_prefix("PISOVEREIGN"))
    .build()?;

Rationale: Flexibility for different deployment scenarios.

5. Multi-Layer Caching

Request β†’ L1 (Moka, in-memory) β†’ L2 (Redb, persistent) β†’ L3 (Semantic, pgvector) β†’ Backend
LayerTypeStorageMatch MethodUse Case
L1MokaCacheIn-memoryExact stringHot data, sub-ms access
L2RedbCacheDiskExact stringWarm data, persists across restarts
L3PgSemanticCachePostgreSQL/pgvectorCosine similaritySemantically equivalent queries

Decorator Chain Order:

SanitizedInferencePort (outermost)
  └─ CachedInferenceAdapter (exact L1+L2)
       └─ SemanticCachedInferenceAdapter (similarity L3)
            └─ DegradedInferenceAdapter
                 └─ OllamaInferenceAdapter (innermost)

Rationale: Minimize latency and reduce load on inference engine. The semantic layer catches queries that are phrased differently but mean the same thing, significantly improving cache hit rates.

6. In-Process Event Bus

Post-processing work (fact extraction, audit logging, metrics) runs asynchronously via an in-process event bus backed by tokio::sync::broadcast:

ChatService / AgentService
        β”‚
        β–Ό publish(DomainEvent)
 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
 β”‚  TokioBroadcastBus   β”‚
 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    β”‚    β”‚    β”‚    β”‚
    β–Ό    β–Ό    β–Ό    β–Ό
  Fact  Audit Conv Metrics
  Ext.  Log   Pers. Handler

Key properties:

  • Fire-and-forget β€” handlers never block the response path
  • DomainEvent enum defined in the domain layer (7 variants)
  • EventBusPort / EventSubscriberPort defined in application ports
  • TokioBroadcastEventBus adapter in infrastructure
  • Handlers spawned conditionally based on available dependencies
  • Channel overflow β†’ Lagged warning, not data loss for the publisher

Rationale: Moves 100–500 ms of per-request post-processing off the critical path, crucial on resource-constrained Raspberry Pi hardware.

7. Agentic Multi-Agent Orchestration

Complex tasks can be decomposed into parallel sub-tasks, each executed by an independent AI agent:

User Request: "Plan my trip to Berlin next week"
        β”‚
        β–Ό POST /v1/agentic/tasks
 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
 β”‚  AgenticOrchestrator     β”‚
 β”‚  (application service)   β”‚
 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    β”‚        β”‚        β”‚
    β–Ό        β–Ό        β–Ό
 SubAgent  SubAgent  SubAgent
 (weather) (calendar)(transit)
    β”‚        β”‚        β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”˜
             β”‚
             β–Ό
      Aggregated Result

Key properties:

  • Wave-based parallel execution with configurable concurrency limits
  • Dependency tracking between sub-tasks
  • Individual sub-agent timeouts and total task timeouts
  • Real-time progress via SSE streaming (/v1/agentic/tasks/{id}/stream)
  • Task cancellation support
  • Approval gates for sensitive operations
  • Domain entities in domain, orchestration in application, event bus in infrastructure, REST handlers in presentation_http, UI in presentation_web

Further Reading