Architecture
ποΈ System design and architectural patterns in PiSovereign
This document explains the architectural decisions, design patterns, and structure of PiSovereign.
Table of Contents
Overview
PiSovereign follows Clean Architecture (also known as Hexagonal Architecture or Ports & Adapters) to achieve:
- Independence from frameworks - Business logic doesnβt depend on Axum, SQLite, or any external library
- Testability - Core logic can be tested without infrastructure
- Flexibility - Adapters can be swapped without changing business rules
- Maintainability - Clear boundaries between concerns
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β External World β
β (HTTP Clients, WhatsApp, Email Servers, AI Hardware) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Presentation Layer β
β βββββββββββββββββββ βββββββββββββββββββ β
β β presentation_ β β presentation_ β β
β β http β β cli β β
β β (Axum API) β β (Clap CLI) β β
β βββββββββββββββββββ βββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Application Layer β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β application β β
β β (Services, Use Cases, Orchestration, Port Definitions) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββΌββββββββββββββββ
βΌ βΌ βΌ
ββββββββββββββββββββ ββββββββββββββββ ββββββββββββββββββββββββββββ
β Domain Layer β β AI Layer β β Infrastructure Layer β
β ββββββββββββββ β β ββββββββββββ β β ββββββββββββββββββββββββ β
β β domain β β β β ai_core β β β β infrastructure β β
β β (Entities, β β β β(Inferenceβ β β β (Adapters, Repos, β β
β β Values, β β β β Engine) β β β β Cache, DB, Vault) β β
β β Commands) β β β ββββββββββββ β β ββββββββββββββββββββββββ β
β ββββββββββββββ β β ββββββββββββ β β β
β β β βai_speech β β β ββββββββββββββββββββ β
β β β β(STT/TTS) β β β β integration_* β β
β β β ββββββββββββ β β β (WhatsApp, Mail, β β
β β β β β β Calendar, etc.) β β
β β ββββββββββββββββ β ββββββββββββββββββββ β
ββββββββββββββββββββ ββββββββββββββββββββββββββββ
Clean Architecture
Layer Responsibilities
| Layer | Crates | Responsibility |
|---|---|---|
| Domain | domain | Core business entities, value objects, commands, domain errors |
| Application | application | Use cases, service orchestration, port definitions |
| Infrastructure | infrastructure, integration_* | Adapters for external systems (DB, cache, APIs) |
| AI | ai_core, ai_speech | AI-specific logic (inference, speech processing) |
| Presentation | presentation_http, presentation_cli | User interfaces (REST API, CLI) |
Dependency Rule
Inner layers NEVER depend on outer layers
domain β (no dependencies on other PiSovereign crates)
application β domain
ai_core β domain, application (ports)
ai_speech β domain, application (ports)
infrastructure β domain, application (ports)
integration_* β domain, application (ports)
presentation_* β domain, application, infrastructure, ai_*, integration_*
This means:
domainknows nothing about databases, HTTP, or external servicesapplicationdefines what it needs via ports (traits), not how itβs done- Only
presentationcrates wire everything together
Crate Dependencies
Dependency Graph
graph TB
subgraph "Presentation"
HTTP[presentation_http]
CLI[presentation_cli]
end
subgraph "Integration"
WA[integration_whatsapp]
PM[integration_email]
CAL[integration_caldav]
WX[integration_weather]
end
subgraph "Infrastructure"
INFRA[infrastructure]
end
subgraph "AI"
CORE[ai_core]
SPEECH[ai_speech]
end
subgraph "Core"
APP[application]
DOM[domain]
end
HTTP --> APP
HTTP --> INFRA
HTTP --> CORE
HTTP --> SPEECH
HTTP --> WA
HTTP --> PM
HTTP --> CAL
HTTP --> WX
CLI --> APP
CLI --> INFRA
WA --> APP
WA --> DOM
PM --> APP
PM --> DOM
CAL --> APP
CAL --> DOM
WX --> APP
WX --> DOM
INFRA --> APP
INFRA --> DOM
CORE --> APP
CORE --> DOM
SPEECH --> APP
SPEECH --> DOM
APP --> DOM
Workspace Structure
PiSovereign/
βββ Cargo.toml # Workspace manifest
βββ crates/
β βββ domain/ # Core business logic (no external deps)
β β βββ Cargo.toml
β β βββ src/
β β βββ lib.rs
β β βββ entities/ # User, Conversation, Message, etc.
β β βββ values/ # UserId, MessageContent, etc.
β β βββ commands/ # UserCommand, SystemCommand
β β βββ errors.rs # Domain errors
β β
β βββ application/ # Use cases and ports
β β βββ Cargo.toml
β β βββ src/
β β βββ lib.rs
β β βββ services/ # ConversationService, CommandService, etc.
β β βββ ports/ # Trait definitions (InferencePort, etc.)
β β
β βββ infrastructure/ # Framework-dependent implementations
β β βββ Cargo.toml
β β βββ src/
β β βββ lib.rs
β β βββ adapters/ # VaultSecretStore, etc.
β β βββ cache/ # MokaCache, RedbCache
β β βββ persistence/# SQLite repositories
β β βββ telemetry/ # OpenTelemetry setup
β β
β βββ ai_core/ # Inference engine
β β βββ src/
β β βββ hailo/ # Hailo-Ollama client
β β βββ selector/ # Model routing
β β
β βββ ai_speech/ # Speech processing
β β βββ src/
β β βββ providers/ # Hybrid, Local, OpenAI
β β βββ converter/ # Audio format conversion
β β
β βββ integration_*/ # External service adapters
β β
β βββ presentation_*/ # User interfaces
Port/Adapter Pattern
Ports (Interfaces)
Ports are traits defined in application/src/ports/ that describe what the application needs:
// application/src/ports/inference.rs
#[async_trait]
pub trait InferencePort: Send + Sync {
async fn generate(
&self,
prompt: &str,
options: InferenceOptions,
) -> Result<InferenceResponse, InferenceError>;
async fn generate_stream(
&self,
prompt: &str,
options: InferenceOptions,
) -> Result<impl Stream<Item = Result<String, InferenceError>>, InferenceError>;
async fn health_check(&self) -> Result<bool, InferenceError>;
}
// application/src/ports/secret_store.rs
#[async_trait]
pub trait SecretStore: Send + Sync {
async fn get_secret(&self, path: &str) -> Result<Option<String>, SecretError>;
async fn health_check(&self) -> Result<bool, SecretError>;
}
// application/src/ports/memory_context.rs β RAG context injection
#[async_trait]
pub trait MemoryContextPort: Send + Sync {
async fn retrieve_context(
&self,
user_id: &UserId,
query: &str,
limit: usize,
) -> Result<Vec<MemoryContext>, MemoryError>;
}
// application/src/ports/embedding.rs β Vector embeddings
#[async_trait]
pub trait EmbeddingPort: Send + Sync {
async fn embed(&self, text: &str) -> Result<Vec<f32>, EmbeddingError>;
}
// application/src/ports/encryption.rs β Content encryption at rest
pub trait EncryptionPort: Send + Sync {
fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>, EncryptionError>;
fn decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>, EncryptionError>;
}
Adapters (Implementations)
Adapters implement ports and live in infrastructure/ or integration_*/:
// infrastructure/src/adapters/vault_secret_store.rs
pub struct VaultSecretStore {
client: VaultClient,
mount_path: String,
}
#[async_trait]
impl SecretStore for VaultSecretStore {
async fn get_secret(&self, path: &str) -> Result<Option<String>, SecretError> {
let full_path = format!("{}/{}", self.mount_path, path);
self.client.read_secret(&full_path).await
}
async fn health_check(&self) -> Result<bool, SecretError> {
self.client.health().await
}
}
// infrastructure/src/adapters/env_secret_store.rs
pub struct EnvironmentSecretStore {
prefix: Option<String>,
}
#[async_trait]
impl SecretStore for EnvironmentSecretStore {
async fn get_secret(&self, path: &str) -> Result<Option<String>, SecretError> {
// Convert "database/password" to "DATABASE_PASSWORD"
let env_key = self.path_to_env_var(path);
Ok(std::env::var(&env_key).ok())
}
async fn health_check(&self) -> Result<bool, SecretError> {
Ok(true) // Environment is always available
}
}
Example: Secret Store
The ChainedSecretStore demonstrates the adapter pattern:
// infrastructure/src/adapters/chained_secret_store.rs
pub struct ChainedSecretStore {
stores: Vec<Box<dyn SecretStore>>,
}
impl ChainedSecretStore {
pub fn new() -> Self {
Self { stores: Vec::new() }
}
pub fn add_store(mut self, store: impl SecretStore + 'static) -> Self {
self.stores.push(Box::new(store));
self
}
}
#[async_trait]
impl SecretStore for ChainedSecretStore {
async fn get_secret(&self, path: &str) -> Result<Option<String>, SecretError> {
for store in &self.stores {
if let Ok(Some(secret)) = store.get_secret(path).await {
return Ok(Some(secret));
}
}
Ok(None)
}
}
Usage in application:
// Wiring in presentation layer
let secret_store = ChainedSecretStore::new()
.add_store(VaultSecretStore::new(vault_config)?)
.add_store(EnvironmentSecretStore::new(Some("PISOVEREIGN")));
let command_service = CommandService::new(
Arc::new(secret_store), // Injected as trait object
// ... other dependencies
);
Data Flow
Example: Intent Routing Pipeline
User input is routed through a multi-stage pipeline that minimizes LLM calls. Each stage acts as a progressively more expensive filter:
1. User Input: "Hey, it's Andreas. I'm naming you Macci."
β
βΌ
2. Conversational Filter (zero LLM cost)
β Regex-based detection of greetings, introductions, small talk.
β If matched β skip to chat (no workflow/intent parsing).
β
βΌ (not conversational)
3. Quick Pattern Matching
β Regex patterns for well-known commands (e.g., "remind me",
β "search for", "send email"). Fast, deterministic.
β
βΌ (no quick match)
4. Guarded Workflow Detection
β Only invoked when input has β₯8 words AND contains β₯2
β workflow-hint keywords ("create", "plan", "distribute", etc.).
β Uses LLM to detect multi-step workflows.
β
βΌ (not a workflow)
5. LLM Intent Parsing
β Full LLM-based intent classification with confidence score.
β Post-validated by keyword presence per intent category.
β Intents below 0.7 confidence are downgraded to chat.
β
βΌ
6. Dispatch to appropriate handler or fall through to chat
Example: Chat Request
1. HTTP Request arrives at /v1/chat
β
βΌ
2. presentation_http extracts request, validates auth
β
βΌ
3. Calls ConversationService.send_message() [application layer]
β
βΌ
4. ConversationService:
βββ Loads conversation from ConversationRepository [port]
βββ Calls InferencePort.generate() [port]
βββ Saves message via ConversationRepository [port]
β
βΌ
5. InferencePort implementation (ai_core::HailoClient):
βββ Sends request to Hailo-Ollama
βββ Returns response
β
βΌ
6. Response flows back through layers
β
βΌ
7. HTTP Response returned to client
Example: WhatsApp Voice Message
1. WhatsApp Webhook POST to /v1/webhooks/whatsapp
β
βΌ
2. integration_whatsapp validates signature, parses message
β
βΌ
3. VoiceMessageService.process() [application layer]
β
βββ Download audio via WhatsAppPort
βββ Convert format via AudioConverter [ai_speech]
βββ Transcribe via SpeechPort (STT)
βββ Process text via CommandService
βββ (Optional) Synthesize via SpeechPort (TTS)
βββ Send response via WhatsAppPort
β
βΌ
4. Response sent back to user via WhatsApp
Key Design Decisions
1. Async-First
All I/O operations are async using Tokio:
#[async_trait]
pub trait InferencePort: Send + Sync {
async fn generate(&self, ...) -> Result<..., ...>;
}
Rationale: Maximizes throughput on limited Raspberry Pi resources.
2. Error Handling via thiserror
Each layer defines its own error types:
// domain/src/errors.rs
#[derive(Debug, thiserror::Error)]
pub enum DomainError {
#[error("Invalid message content: {0}")]
InvalidContent(String),
}
// application/src/errors.rs
#[derive(Debug, thiserror::Error)]
pub enum ServiceError {
#[error("Domain error: {0}")]
Domain(#[from] DomainError),
#[error("Inference failed: {0}")]
Inference(String),
}
Rationale: Clear error boundaries, easy conversion between layers.
3. Feature Flags
Optional features reduce binary size:
# Cargo.toml
[features]
default = ["http"]
http = ["axum", "tower", ...]
cli = ["clap", ...]
speech = ["whisper", "piper", ...]
Rationale: Raspberry Pi has limited storage; include only whatβs needed.
4. Configuration via config Crate
Layered configuration (defaults β file β env vars):
let config = Config::builder()
.add_source(config::File::with_name("config"))
.add_source(config::Environment::with_prefix("PISOVEREIGN"))
.build()?;
Rationale: Flexibility for different deployment scenarios.
5. Multi-Layer Caching
Request β L1 (Moka, in-memory) β L2 (Redb, persistent) β L3 (Semantic, pgvector) β Backend
| Layer | Type | Storage | Match Method | Use Case |
|---|---|---|---|---|
| L1 | MokaCache | In-memory | Exact string | Hot data, sub-ms access |
| L2 | RedbCache | Disk | Exact string | Warm data, persists across restarts |
| L3 | PgSemanticCache | PostgreSQL/pgvector | Cosine similarity | Semantically equivalent queries |
Decorator Chain Order:
SanitizedInferencePort (outermost)
ββ CachedInferenceAdapter (exact L1+L2)
ββ SemanticCachedInferenceAdapter (similarity L3)
ββ DegradedInferenceAdapter
ββ OllamaInferenceAdapter (innermost)
Rationale: Minimize latency and reduce load on inference engine. The semantic layer catches queries that are phrased differently but mean the same thing, significantly improving cache hit rates.
6. In-Process Event Bus
Post-processing work (fact extraction, audit logging, metrics) runs asynchronously via an in-process event bus backed by tokio::sync::broadcast:
ChatService / AgentService
β
βΌ publish(DomainEvent)
ββββββββββββββββββββββββ
β TokioBroadcastBus β
ββββββββββββββββββββββββ
β β β β
βΌ βΌ βΌ βΌ
Fact Audit Conv Metrics
Ext. Log Pers. Handler
Key properties:
- Fire-and-forget β handlers never block the response path
DomainEventenum defined in the domain layer (7 variants)EventBusPort/EventSubscriberPortdefined in application portsTokioBroadcastEventBusadapter in infrastructure- Handlers spawned conditionally based on available dependencies
- Channel overflow β
Laggedwarning, not data loss for the publisher
Rationale: Moves 100β500 ms of per-request post-processing off the critical path, crucial on resource-constrained Raspberry Pi hardware.
7. Agentic Multi-Agent Orchestration
Complex tasks can be decomposed into parallel sub-tasks, each executed by an independent AI agent:
User Request: "Plan my trip to Berlin next week"
β
βΌ POST /v1/agentic/tasks
ββββββββββββββββββββββββββββ
β AgenticOrchestrator β
β (application service) β
ββββββββββββββββββββββββββββ
β β β
βΌ βΌ βΌ
SubAgent SubAgent SubAgent
(weather) (calendar)(transit)
β β β
ββββββββββ΄βββββββββ
β
βΌ
Aggregated Result
Key properties:
- Wave-based parallel execution with configurable concurrency limits
- Dependency tracking between sub-tasks
- Individual sub-agent timeouts and total task timeouts
- Real-time progress via SSE streaming (
/v1/agentic/tasks/{id}/stream) - Task cancellation support
- Approval gates for sensitive operations
- Domain entities in
domain, orchestration inapplication, event bus ininfrastructure, REST handlers inpresentation_http, UI inpresentation_web
Further Reading
- Crate Reference - Detailed documentation of each crate
- API Reference - REST API documentation
- Contributing - How to contribute