Skip to main content

Overview

whatsapp-rust uses a strict state management architecture to ensure consistency and prevent race conditions. All device state modifications must go through the PersistenceManager using the DeviceCommand pattern.
Critical: Never modify Device state directly. Always use DeviceCommand + PersistenceManager::process_command() for writes, or get_device_snapshot() for reads.

Architecture

The state management system has three main components:
src/store/
├── persistence_manager.rs  # Central state coordinator
├── commands.rs             # DeviceCommand pattern
├── device.rs              # Device state structure
└── backend/               # Storage backend (SQLite)

Device State

The Device struct holds all client state:
pub struct Device {
    pub core: DeviceCore,     // Platform-agnostic state
    backend: Arc<dyn Backend>,
}

pub struct DeviceCore {
    pub id: Jid,              // Device JID
    pub push_name: String,
    pub platform: String,
    pub noise_key: KeyPair,   // Noise protocol keypair
    pub identity_key: IdentityKeyPair,  // Signal protocol identity
    
    pub registration_id: u32,
    pub adv_secret_key: Vec<u8>,
    
    pub edge_routing_info: Option<Vec<u8>>,
    pub last_connected: Option<i64>,
    
    // Protocol state
    pub message_queue: VecDeque<QueuedMessage>,
    pub iq_handlers: HashMap<String, IqResponseHandler>,
}
Location: src/store/device.rs, wacore/src/store/device_core.rs

PersistenceManager

The PersistenceManager is the gatekeeper for all state changes.

Architecture

pub struct PersistenceManager {
    device: Arc<RwLock<Device>>,
    backend: Arc<dyn Backend>,
    dirty: Arc<AtomicBool>,
    save_notify: Arc<Notify>,
}
Location: src/store/persistence_manager.rs:11-16

Key Methods

Read-Only Access

// Get a snapshot of device state (cheap clone)
pub async fn get_device_snapshot(&self) -> Device {
    self.device.read().await.clone()
}
Location: src/store/persistence_manager.rs:61-63

State Modification

// Modify device state with a closure
pub async fn modify_device<F, R>(&self, modifier: F) -> R
where
    F: FnOnce(&mut Device) -> R,
{
    let mut device_guard = self.device.write().await;
    let result = modifier(&mut device_guard);
    
    // Mark dirty and notify background saver
    self.dirty.store(true, Ordering::Relaxed);
    self.save_notify.notify_one();
    
    result
}
Location: src/store/persistence_manager.rs:69-80

Command Processing

// Process a device command (preferred for state changes)
pub async fn process_command(&self, command: DeviceCommand) {
    self.modify_device(|device| {
        apply_command_to_device(device, command);
    }).await;
}
Location: src/store/persistence_manager.rs:145-150

Background Saver

The persistence manager runs a background task that periodically saves dirty state:
pub fn run_background_saver(self: Arc<Self>, interval: Duration) {
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = self.save_notify.notified() => {
                    debug!("Save notification received.");
                }
                _ = sleep(interval) => {}
            }
            
            if let Err(e) = self.save_to_disk().await {
                error!("Error saving device state: {e}");
            }
        }
    });
}
How it works:
  1. Wakes up when notified OR every interval (typically 30s)
  2. Checks if state is dirty (dirty flag)
  3. If dirty, serializes device state and saves to database
  4. Clears dirty flag
Location: src/store/persistence_manager.rs:123-140

Initialization

pub async fn new(backend: Arc<dyn Backend>) -> Result<Self> {
    // Ensure device row exists in database
    let exists = backend.exists().await?;
    if !exists {
        let id = backend.create().await?;
        debug!("Created device row with id={id}");
    }
    
    // Load existing state or create new
    let device = if let Some(serializable_device) = backend.load().await? {
        let mut dev = Device::new(backend.clone());
        dev.load_from_serializable(serializable_device);
        dev
    } else {
        Device::new(backend.clone())
    };
    
    Ok(Self {
        device: Arc::new(RwLock::new(device)),
        backend,
        dirty: Arc::new(AtomicBool::new(false)),
        save_notify: Arc::new(Notify::new()),
    })
}
Location: src/store/persistence_manager.rs:23-55

DeviceCommand Pattern

The DeviceCommand enum defines all possible state mutations:
pub enum DeviceCommand {
    // Identity & registration
    SetId { jid: Jid },
    SetPushName { name: String },
    SetPlatform { platform: String },
    SetRegistrationId { id: u32 },
    
    // Cryptographic keys
    SetNoiseKey { keypair: KeyPair },
    SetIdentityKey { keypair: IdentityKeyPair },
    SetAdvSecretKey { key: Vec<u8> },
    
    // Connection state
    SetEdgeRoutingInfo { info: Option<Vec<u8>> },
    SetLastConnected { timestamp: i64 },
    
    // Message queue
    EnqueueMessage { message: QueuedMessage },
    DequeueMessage { id: String },
    ClearMessageQueue,
    
    // IQ handlers
    RegisterIqHandler { id: String, handler: IqResponseHandler },
    UnregisterIqHandler { id: String },
}
Location: wacore/src/store/commands.rs

Why Commands?

The command pattern provides:
  1. Type safety: All state changes are explicitly defined
  2. Auditability: Easy to log/trace state mutations
  3. Testability: Commands can be tested in isolation
  4. Consistency: Single code path for all modifications
  5. Future compatibility: Easy to add undo/redo or migration logic

Applying Commands

Commands are applied via pattern matching:
pub fn apply_command_to_device(device: &mut DeviceCore, command: DeviceCommand) {
    match command {
        DeviceCommand::SetId { jid } => {
            device.id = jid;
        }
        DeviceCommand::SetPushName { name } => {
            device.push_name = name;
        }
        DeviceCommand::SetEdgeRoutingInfo { info } => {
            device.edge_routing_info = info;
        }
        DeviceCommand::EnqueueMessage { message } => {
            device.message_queue.push_back(message);
        }
        DeviceCommand::DequeueMessage { id } => {
            device.message_queue.retain(|msg| msg.id != id);
        }
        // ... handle all variants
    }
}
Location: wacore/src/store/commands.rs:50-150

Usage Patterns

Reading Device State

// In async function
let device = persistence_manager.get_device_snapshot().await;
println!("Device JID: {}", device.core.id);
println!("Push name: {}", device.core.push_name);
get_device_snapshot() returns a cloned Device. This is efficient because most fields are cheap to clone (strings, numbers). Large data like cryptographic keys use Arc internally.

Modifying Device State (Simple)

For simple state changes, use commands:
use wacore::store::commands::DeviceCommand;

// Update push name
persistence_manager.process_command(
    DeviceCommand::SetPushName { 
        name: "My New Name".to_string() 
    }
).await;

// Update edge routing info
persistence_manager.process_command(
    DeviceCommand::SetEdgeRoutingInfo { 
        info: Some(routing_data) 
    }
).await;

Modifying Device State (Complex)

For complex logic involving multiple fields or conditionals:
persistence_manager.modify_device(|device| {
    // Complex mutation logic
    if device.core.message_queue.len() > 100 {
        device.core.message_queue.drain(0..50);
    }
    
    device.core.last_connected = Some(Utc::now().timestamp());
    
    // Return result if needed
    device.core.message_queue.len()
}).await;
Keep the closure passed to modify_device as short as possible. It holds a write lock on the device state, blocking all other modifications.

Concurrency Patterns

RwLock Semantics

The Device is protected by a tokio::sync::RwLock:
  • Multiple readers: get_device_snapshot() can be called concurrently
  • Single writer: modify_device() blocks all other access
  • Writer priority: Pending writes block new reads (avoid reader starvation)

Chat Locks

For per-chat operations (like sending messages), the Client provides chat-level locks:
pub struct Client {
    pub(crate) chat_locks: DashMap<String, Arc<Mutex<()>>>,
    // ...
}

impl Client {
    pub async fn with_chat_lock<F, Fut, R>(
        &self,
        chat_jid: &str,
        f: F,
    ) -> R
    where
        F: FnOnce() -> Fut,
        Fut: Future<Output = R>,
    {
        let lock = self.chat_locks
            .entry(chat_jid.to_string())
            .or_insert_with(|| Arc::new(Mutex::new(())));
        
        let _guard = lock.lock().await;
        f().await
    }
}
Usage:
// Serialize operations for a specific chat
client.with_chat_lock(&chat_jid, || async {
    // Send message, update state, etc.
    client.send_message(&chat_jid, "Hello").await?;
    Ok(())
}).await?;
Location: src/client.rs

Blocking Operations

CPU-heavy or blocking operations must use spawn_blocking to avoid stalling the async runtime:
use tokio::task::spawn_blocking;

// Bad: Blocks async runtime
let encrypted = expensive_crypto_operation(&data);

// Good: Offloads to thread pool
let encrypted = spawn_blocking(move || {
    expensive_crypto_operation(&data)
}).await?;
For more details on async patterns, see the Architecture guide.

Storage Backend

Backend Trait

The Backend trait abstracts storage implementation:
#[async_trait]
pub trait Backend: Send + Sync {
    async fn exists(&self) -> Result<bool>;
    async fn create(&self) -> Result<i32>;
    async fn load(&self) -> Result<Option<SerializableDevice>>;
    async fn save(&self, device: &SerializableDevice) -> Result<()>;
    
    // Sender key distribution (group chat)
    async fn get_skdm_recipients(&self, group_jid: &str) -> Result<Vec<Jid>>;
    async fn add_skdm_recipients(&self, group_jid: &str, device_jids: &[Jid]) -> Result<()>;
    async fn clear_skdm_recipients(&self, group_jid: &str) -> Result<()>;
    
    // Optional: Database snapshots for debugging
    #[cfg(feature = "debug-snapshots")]
    async fn snapshot_db(&self, name: &str, extra_content: Option<&[u8]>) -> Result<()>;
}
Location: src/store/traits.rs

SQLite Implementation

pub struct SqliteStore {
    conn: Arc<Mutex<SqliteConnection>>,
    device_id: String,
}

impl SqliteStore {
    pub fn new(db_path: &str) -> Result<Self>
    pub fn new_for_device(db_path: &str, device_id: &str) -> Result<Self>
}
Tables:
CREATE TABLE devices (
    device_id TEXT PRIMARY KEY,
    jid TEXT,
    push_name TEXT,
    platform TEXT,
    noise_key BLOB,
    identity_key BLOB,
    registration_id INTEGER,
    adv_secret_key BLOB,
    edge_routing_info BLOB,
    last_connected INTEGER
);

CREATE TABLE skdm_recipients (
    group_jid TEXT,
    device_jid TEXT,
    PRIMARY KEY (group_jid, device_jid)
);

CREATE TABLE signal_sessions (
    our_jid TEXT,
    their_jid TEXT,
    record BLOB,
    PRIMARY KEY (our_jid, their_jid)
);

CREATE TABLE signal_prekeys (
    our_jid TEXT,
    prekey_id INTEGER,
    record BLOB,
    PRIMARY KEY (our_jid, prekey_id)
);

CREATE TABLE signal_sender_keys (
    our_jid TEXT,
    group_id TEXT,
    sender_id TEXT,
    record BLOB,
    PRIMARY KEY (our_jid, group_id, sender_id)
);
Location: src/store/sqlite.rs, schema in migrations/

Serialization

Device state is serialized to bincode format:
pub struct SerializableDevice {
    pub jid: String,
    pub push_name: String,
    pub platform: String,
    pub noise_key: Vec<u8>,
    pub identity_key: Vec<u8>,
    pub registration_id: u32,
    pub adv_secret_key: Vec<u8>,
    pub edge_routing_info: Option<Vec<u8>>,
    pub last_connected: Option<i64>,
}

impl Device {
    pub fn to_serializable(&self) -> SerializableDevice {
        SerializableDevice {
            jid: self.core.id.to_string(),
            push_name: self.core.push_name.clone(),
            noise_key: bincode::serialize(&self.core.noise_key).unwrap(),
            // ...
        }
    }
    
    pub fn load_from_serializable(&mut self, data: SerializableDevice) {
        self.core.id = data.jid.parse().unwrap();
        self.core.push_name = data.push_name;
        self.core.noise_key = bincode::deserialize(&data.noise_key).unwrap();
        // ...
    }
}
Location: src/store/device.rs:100-200

Debugging State

Database Snapshots

The debug-snapshots feature enables database snapshots for debugging:
// In error handler
if let Err(e) = decrypt_message(...) {
    persistence_manager.create_snapshot(
        "decrypt_error",
        Some(error_details.as_bytes())
    ).await?;
    
    return Err(e);
}
This creates a timestamped copy of the database:
chats.db
chats_snapshot_decrypt_error_20260228_143022.db
chats_snapshot_decrypt_error_20260228_143022.txt  (metadata)
Location: src/store/persistence_manager.rs:99-121

Logging

State changes are logged at debug level:
RUST_LOG=whatsapp_rust::store=debug cargo run
Output:
[DEBUG] PersistenceManager: Ensuring device row exists.
[DEBUG] PersistenceManager: Loaded existing device data (PushName: 'Alice')
[DEBUG] Device state is dirty, saving to disk.
[DEBUG] Device state saved successfully.

Best Practices

1. Always Use Commands for State Changes

// Bad: Direct modification
persistence_manager.modify_device(|device| {
    device.core.push_name = "New Name".to_string();
}).await;

// Good: Use command
persistence_manager.process_command(
    DeviceCommand::SetPushName { name: "New Name".to_string() }
).await;

2. Minimize Lock Duration

// Bad: Long lock duration
persistence_manager.modify_device(|device| {
    let data = expensive_calculation(&device.core.id);  // Blocks all access!
    device.core.some_field = data;
}).await;

// Good: Release lock during expensive operation
let device = persistence_manager.get_device_snapshot().await;
let data = expensive_calculation(&device.core.id);
persistence_manager.process_command(
    DeviceCommand::SetSomeField { data }
).await;

3. Use Chat Locks for Chat-Specific Operations

// Ensures message send + state update are atomic per-chat
client.with_chat_lock(&chat_jid, || async {
    let msg_id = client.send_message(&chat_jid, text).await?;
    client.persistence_manager.process_command(
        DeviceCommand::EnqueueMessage { message: msg_id }
    ).await;
    Ok(())
}).await?;

4. Offload Heavy Operations

use tokio::task::spawn_blocking;

// Crypto operations should use spawn_blocking
let ciphertext = spawn_blocking(move || {
    encrypt_message(&plaintext, &key)
}).await??;

References

  • Implementation: src/store/persistence_manager.rs
  • Commands: wacore/src/store/commands.rs
  • Device structure: src/store/device.rs, wacore/src/store/device_core.rs
  • Backend trait: src/store/traits.rs
  • SQLite backend: src/store/sqlite.rs
  • AGENTS.md: State management guidelines (section 3)