Sovereign Intelligence Engine
The Sovereign Intelligence Engine is the unified signal bus that connects all 30 innovation features into an emergent, cross-feature intelligence layer. It is the architectural heart of PiSovereign’s Generation 2 feature set.
Architecture Overview
┌──────────────────────────────────────────────────────────┐
│ Sovereign Intelligence Engine │
│ │
│ tokio::sync::broadcast<SovereignSignal> │
│ capacity: 1024 (configurable) │
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌───────────┐ │
│ │ Publish │ │ Publish │ │ Publish │ │ Subscribe │ │
│ │ (async) │ │ (async) │ │ (async) │ │ (async) │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └─────┬─────┘ │
│ │ │ │ │ │
└────────┼─────────────┼─────────────┼───────────────┼─────┘
│ │ │ │
EnergyMonitor AffectiveState ImmuneSystem PreCacheService
CognitiveLoad PromptEvolution ModelRouting FederatedSync
MeshNetwork PrivacyBudget DecisionProof KnowledgeDecay
Signal Flow
Every feature service receives a broadcast::Sender<SovereignSignal> at construction time. Services that need to react to cross-feature events also spawn a Tokio task that listens on a broadcast::Receiver.
Example cross-feature flow:
EnergyMonitorAdapterreads Pi 5 thermal sensor → publishesThermalUpdate { headroom: Hot, temp_celsius: 68.0 }PreCachePredictionServicereceives signal → pauses pre-generation (saving CPU)NeuroplasticRoutingServicereceives signal → biases toward smaller model (phi3-mini)EnergySchedulerServicereceives signal → pauses all background work
All of this happens without any service knowing about the others — pure event-driven coordination.
SovereignSignal Enum
Defined in crates/domain/src/entities/sovereign_signal.rs:
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SovereignSignal {
/// Energy/thermal state change
ThermalUpdate {
headroom: ThermalHeadroom,
temp_celsius: f32,
},
/// New energy consumption reading
EnergyUpdate {
watts: f32,
source: String,
},
/// Cognitive load estimation result
CognitiveLoadEstimate {
user_id: UserId,
load: f32,
format: ResponseFormat,
},
/// Affective state detection result
AffectiveStateEstimate {
user_id: UserId,
state: AffectiveState,
confidence: f32,
},
/// Immune system audit event
ImmuneAlert {
attack_category: String,
blocked: bool,
immunity_score: u8,
},
/// Model routing decision made
ModelRoutingDecision {
user_id: UserId,
model: String,
method: RoutingMethod,
},
/// Prompt genome evolved
PromptEvolution {
genome_id: Uuid,
generation: u32,
fitness: f64,
},
/// Privacy budget consumption
PrivacyBudgetUpdate {
user_id: UserId,
remaining_epsilon: f64,
status: BudgetStatus,
},
/// Federated sync completed
FederatedSyncComplete {
peer_count: usize,
delta_norm: f64,
},
/// Knowledge node decay event
KnowledgeDecayEvent {
node_id: Uuid,
retention: f64,
pruned: bool,
},
}
Port Traits
SovereignSignalBusPort
/// Port for publishing signals to the Sovereign Intelligence Engine bus.
#[async_trait]
#[cfg_attr(test, automock)]
pub trait SovereignSignalBusPort: Send + Sync {
/// Publish a signal to all subscribers.
fn publish(&self, signal: SovereignSignal) -> Result<(), DomainError>;
/// Get the number of active subscribers.
fn subscriber_count(&self) -> usize;
}
SovereignSignalSubscriberPort
/// Port for subscribing to signals from the Sovereign Intelligence Engine bus.
#[async_trait]
#[cfg_attr(test, automock)]
pub trait SovereignSignalSubscriberPort: Send + Sync {
/// Subscribe and receive the next signal. Returns None if the bus is closed.
async fn recv(&mut self) -> Option<SovereignSignal>;
}
Signal Persistence
Signals are persisted to a time-series table for historical analysis and ML training:
-- migrations/21_sovereign_signals.sql
CREATE TABLE sovereign_signals (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
signal_type TEXT NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);
-- Monthly partitioning with automatic creation
-- 90-day retention policy (configurable)
CREATE INDEX idx_sovereign_signals_type_time
ON sovereign_signals (signal_type, created_at DESC);
Historical signals enable:
- Pattern detection: Recurring thermal throttling at certain times
- Cross-feature correlation: Privacy budget exhaustion patterns vs. query volume
- Anomaly detection: Unusual signal frequency indicating attack or malfunction
Feature Toggles
Every feature connected to the signal bus respects its own enabled configuration flag. Disabled features neither publish nor subscribe.
[sovereign_intelligence]
enabled = true
signal_bus_capacity = 1024 # broadcast channel capacity
signal_persistence = true # persist signals to DB
signal_retention_days = 90 # cleanup older signals
When sovereign_intelligence.enabled = false, the bus is replaced with a no-op implementation — all feature services still function independently, they simply don’t coordinate.
Adding a New Signal
To connect a new feature to the bus:
- Add a variant to
SovereignSignalincrates/domain/src/entities/sovereign_signal.rs - Inject the bus into your service via
Arc<dyn SovereignSignalBusPort> - Publish at the appropriate point:
self.signal_bus.publish(SovereignSignal::YourVariant { ... })?; - Subscribe (optional): Spawn a Tokio task in your service’s constructor that calls
subscriber.recv().awaitin a loop and matches on relevant signal types - Test: Use
MockSovereignSignalBusPortin unit tests
Dependency Injection
The bus is constructed once in the presentation layer and passed to all services:
// In presentation_http startup
let (signal_tx, _) = tokio::sync::broadcast::channel(config.signal_bus_capacity);
let signal_bus: Arc<dyn SovereignSignalBusPort> = Arc::new(
BroadcastSignalBusAdapter::new(signal_tx.clone())
);
// Each service that publishes gets Arc::clone(&signal_bus)
// Each service that subscribes gets signal_tx.subscribe() wrapped in adapter
This follows the existing Arc<dyn PortTrait> dependency injection pattern used throughout PiSovereign.
See Also
- Innovation Features — All 30 features with configuration details
- Architecture — Clean Architecture layers
- INNOVATION_IDEAS.md — Full implementation specification