Sovereign Intelligence Engine

The Sovereign Intelligence Engine is the unified signal bus that connects all 30 innovation features into an emergent, cross-feature intelligence layer. It is the architectural heart of PiSovereign’s Generation 2 feature set.


Architecture Overview

┌──────────────────────────────────────────────────────────┐
│                Sovereign Intelligence Engine              │
│                                                          │
│   tokio::sync::broadcast<SovereignSignal>                │
│   capacity: 1024 (configurable)                          │
│                                                          │
│   ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌───────────┐ │
│   │ Publish  │  │ Publish  │  │ Publish  │  │ Subscribe │ │
│   │ (async)  │  │ (async)  │  │ (async)  │  │ (async)   │ │
│   └────┬─────┘  └────┬─────┘  └────┬─────┘  └─────┬─────┘ │
│        │             │             │               │     │
└────────┼─────────────┼─────────────┼───────────────┼─────┘
         │             │             │               │
    EnergyMonitor  AffectiveState  ImmuneSystem  PreCacheService
    CognitiveLoad  PromptEvolution ModelRouting  FederatedSync
    MeshNetwork    PrivacyBudget   DecisionProof KnowledgeDecay

Signal Flow

Every feature service receives a broadcast::Sender<SovereignSignal> at construction time. Services that need to react to cross-feature events also spawn a Tokio task that listens on a broadcast::Receiver.

Example cross-feature flow:

  1. EnergyMonitorAdapter reads Pi 5 thermal sensor → publishes ThermalUpdate { headroom: Hot, temp_celsius: 68.0 }
  2. PreCachePredictionService receives signal → pauses pre-generation (saving CPU)
  3. NeuroplasticRoutingService receives signal → biases toward smaller model (phi3-mini)
  4. EnergySchedulerService receives signal → pauses all background work

All of this happens without any service knowing about the others — pure event-driven coordination.


SovereignSignal Enum

Defined in crates/domain/src/entities/sovereign_signal.rs:

#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SovereignSignal {
    /// Energy/thermal state change
    ThermalUpdate {
        headroom: ThermalHeadroom,
        temp_celsius: f32,
    },
    /// New energy consumption reading
    EnergyUpdate {
        watts: f32,
        source: String,
    },
    /// Cognitive load estimation result
    CognitiveLoadEstimate {
        user_id: UserId,
        load: f32,
        format: ResponseFormat,
    },
    /// Affective state detection result
    AffectiveStateEstimate {
        user_id: UserId,
        state: AffectiveState,
        confidence: f32,
    },
    /// Immune system audit event
    ImmuneAlert {
        attack_category: String,
        blocked: bool,
        immunity_score: u8,
    },
    /// Model routing decision made
    ModelRoutingDecision {
        user_id: UserId,
        model: String,
        method: RoutingMethod,
    },
    /// Prompt genome evolved
    PromptEvolution {
        genome_id: Uuid,
        generation: u32,
        fitness: f64,
    },
    /// Privacy budget consumption
    PrivacyBudgetUpdate {
        user_id: UserId,
        remaining_epsilon: f64,
        status: BudgetStatus,
    },
    /// Federated sync completed
    FederatedSyncComplete {
        peer_count: usize,
        delta_norm: f64,
    },
    /// Knowledge node decay event
    KnowledgeDecayEvent {
        node_id: Uuid,
        retention: f64,
        pruned: bool,
    },
}

Port Traits

SovereignSignalBusPort

/// Port for publishing signals to the Sovereign Intelligence Engine bus.
#[async_trait]
#[cfg_attr(test, automock)]
pub trait SovereignSignalBusPort: Send + Sync {
    /// Publish a signal to all subscribers.
    fn publish(&self, signal: SovereignSignal) -> Result<(), DomainError>;

    /// Get the number of active subscribers.
    fn subscriber_count(&self) -> usize;
}

SovereignSignalSubscriberPort

/// Port for subscribing to signals from the Sovereign Intelligence Engine bus.
#[async_trait]
#[cfg_attr(test, automock)]
pub trait SovereignSignalSubscriberPort: Send + Sync {
    /// Subscribe and receive the next signal. Returns None if the bus is closed.
    async fn recv(&mut self) -> Option<SovereignSignal>;
}

Signal Persistence

Signals are persisted to a time-series table for historical analysis and ML training:

-- migrations/21_sovereign_signals.sql
CREATE TABLE sovereign_signals (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    signal_type TEXT NOT NULL,
    payload     JSONB NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
) PARTITION BY RANGE (created_at);

-- Monthly partitioning with automatic creation
-- 90-day retention policy (configurable)
CREATE INDEX idx_sovereign_signals_type_time
    ON sovereign_signals (signal_type, created_at DESC);

Historical signals enable:

  • Pattern detection: Recurring thermal throttling at certain times
  • Cross-feature correlation: Privacy budget exhaustion patterns vs. query volume
  • Anomaly detection: Unusual signal frequency indicating attack or malfunction

Feature Toggles

Every feature connected to the signal bus respects its own enabled configuration flag. Disabled features neither publish nor subscribe.

[sovereign_intelligence]
enabled = true
signal_bus_capacity = 1024    # broadcast channel capacity
signal_persistence = true     # persist signals to DB
signal_retention_days = 90    # cleanup older signals

When sovereign_intelligence.enabled = false, the bus is replaced with a no-op implementation — all feature services still function independently, they simply don’t coordinate.


Adding a New Signal

To connect a new feature to the bus:

  1. Add a variant to SovereignSignal in crates/domain/src/entities/sovereign_signal.rs
  2. Inject the bus into your service via Arc<dyn SovereignSignalBusPort>
  3. Publish at the appropriate point: self.signal_bus.publish(SovereignSignal::YourVariant { ... })?;
  4. Subscribe (optional): Spawn a Tokio task in your service’s constructor that calls subscriber.recv().await in a loop and matches on relevant signal types
  5. Test: Use MockSovereignSignalBusPort in unit tests

Dependency Injection

The bus is constructed once in the presentation layer and passed to all services:

// In presentation_http startup
let (signal_tx, _) = tokio::sync::broadcast::channel(config.signal_bus_capacity);
let signal_bus: Arc<dyn SovereignSignalBusPort> = Arc::new(
    BroadcastSignalBusAdapter::new(signal_tx.clone())
);

// Each service that publishes gets Arc::clone(&signal_bus)
// Each service that subscribes gets signal_tx.subscribe() wrapped in adapter

This follows the existing Arc<dyn PortTrait> dependency injection pattern used throughout PiSovereign.


See Also