Overview
whatsapp-rust uses a strict state management architecture to ensure consistency and prevent race conditions. All device state modifications must go through the PersistenceManager using the DeviceCommand pattern.
Critical: Never modify Device state directly. Always use DeviceCommand + PersistenceManager::process_command() for writes, or get_device_snapshot() for reads.
Architecture
The state management system has three main components:
src/store/
├── persistence_manager.rs # Central state coordinator
├── commands.rs # DeviceCommand pattern
├── device.rs # Device state structure
└── backend/ # Storage backend (SQLite)
Device State
The Device struct holds all client state:
pub struct Device {
pub core: DeviceCore, // Platform-agnostic state
backend: Arc<dyn Backend>,
}
pub struct DeviceCore {
pub id: Jid, // Device JID
pub push_name: String,
pub platform: String,
pub noise_key: KeyPair, // Noise protocol keypair
pub identity_key: IdentityKeyPair, // Signal protocol identity
pub registration_id: u32,
pub adv_secret_key: Vec<u8>,
pub edge_routing_info: Option<Vec<u8>>,
pub last_connected: Option<i64>,
// Protocol state
pub message_queue: VecDeque<QueuedMessage>,
pub iq_handlers: HashMap<String, IqResponseHandler>,
}
Location: src/store/device.rs, wacore/src/store/device_core.rs
PersistenceManager
The PersistenceManager is the gatekeeper for all state changes.
Architecture
pub struct PersistenceManager {
device: Arc<RwLock<Device>>,
backend: Arc<dyn Backend>,
dirty: Arc<AtomicBool>,
save_notify: Arc<Notify>,
}
Location: src/store/persistence_manager.rs:11-16
Key Methods
Read-Only Access
// Get a snapshot of device state (cheap clone)
pub async fn get_device_snapshot(&self) -> Device {
self.device.read().await.clone()
}
Location: src/store/persistence_manager.rs:61-63
State Modification
// Modify device state with a closure
pub async fn modify_device<F, R>(&self, modifier: F) -> R
where
F: FnOnce(&mut Device) -> R,
{
let mut device_guard = self.device.write().await;
let result = modifier(&mut device_guard);
// Mark dirty and notify background saver
self.dirty.store(true, Ordering::Relaxed);
self.save_notify.notify_one();
result
}
Location: src/store/persistence_manager.rs:69-80
Command Processing
// Process a device command (preferred for state changes)
pub async fn process_command(&self, command: DeviceCommand) {
self.modify_device(|device| {
apply_command_to_device(device, command);
}).await;
}
Location: src/store/persistence_manager.rs:145-150
Background Saver
The persistence manager runs a background task that periodically saves dirty state:
pub fn run_background_saver(self: Arc<Self>, interval: Duration) {
tokio::spawn(async move {
loop {
tokio::select! {
_ = self.save_notify.notified() => {
debug!("Save notification received.");
}
_ = sleep(interval) => {}
}
if let Err(e) = self.save_to_disk().await {
error!("Error saving device state: {e}");
}
}
});
}
How it works:
- Wakes up when notified OR every
interval (typically 30s)
- Checks if state is dirty (
dirty flag)
- If dirty, serializes device state and saves to database
- Clears dirty flag
Location: src/store/persistence_manager.rs:123-140
Initialization
pub async fn new(backend: Arc<dyn Backend>) -> Result<Self> {
// Ensure device row exists in database
let exists = backend.exists().await?;
if !exists {
let id = backend.create().await?;
debug!("Created device row with id={id}");
}
// Load existing state or create new
let device = if let Some(serializable_device) = backend.load().await? {
let mut dev = Device::new(backend.clone());
dev.load_from_serializable(serializable_device);
dev
} else {
Device::new(backend.clone())
};
Ok(Self {
device: Arc::new(RwLock::new(device)),
backend,
dirty: Arc::new(AtomicBool::new(false)),
save_notify: Arc::new(Notify::new()),
})
}
Location: src/store/persistence_manager.rs:23-55
DeviceCommand Pattern
The DeviceCommand enum defines all possible state mutations:
pub enum DeviceCommand {
// Identity & registration
SetId { jid: Jid },
SetPushName { name: String },
SetPlatform { platform: String },
SetRegistrationId { id: u32 },
// Cryptographic keys
SetNoiseKey { keypair: KeyPair },
SetIdentityKey { keypair: IdentityKeyPair },
SetAdvSecretKey { key: Vec<u8> },
// Connection state
SetEdgeRoutingInfo { info: Option<Vec<u8>> },
SetLastConnected { timestamp: i64 },
// Message queue
EnqueueMessage { message: QueuedMessage },
DequeueMessage { id: String },
ClearMessageQueue,
// IQ handlers
RegisterIqHandler { id: String, handler: IqResponseHandler },
UnregisterIqHandler { id: String },
}
Location: wacore/src/store/commands.rs
Why Commands?
The command pattern provides:
- Type safety: All state changes are explicitly defined
- Auditability: Easy to log/trace state mutations
- Testability: Commands can be tested in isolation
- Consistency: Single code path for all modifications
- Future compatibility: Easy to add undo/redo or migration logic
Applying Commands
Commands are applied via pattern matching:
pub fn apply_command_to_device(device: &mut DeviceCore, command: DeviceCommand) {
match command {
DeviceCommand::SetId { jid } => {
device.id = jid;
}
DeviceCommand::SetPushName { name } => {
device.push_name = name;
}
DeviceCommand::SetEdgeRoutingInfo { info } => {
device.edge_routing_info = info;
}
DeviceCommand::EnqueueMessage { message } => {
device.message_queue.push_back(message);
}
DeviceCommand::DequeueMessage { id } => {
device.message_queue.retain(|msg| msg.id != id);
}
// ... handle all variants
}
}
Location: wacore/src/store/commands.rs:50-150
Usage Patterns
Reading Device State
// In async function
let device = persistence_manager.get_device_snapshot().await;
println!("Device JID: {}", device.core.id);
println!("Push name: {}", device.core.push_name);
get_device_snapshot() returns a cloned Device. This is efficient because most fields are cheap to clone (strings, numbers). Large data like cryptographic keys use Arc internally.
Modifying Device State (Simple)
For simple state changes, use commands:
use wacore::store::commands::DeviceCommand;
// Update push name
persistence_manager.process_command(
DeviceCommand::SetPushName {
name: "My New Name".to_string()
}
).await;
// Update edge routing info
persistence_manager.process_command(
DeviceCommand::SetEdgeRoutingInfo {
info: Some(routing_data)
}
).await;
Modifying Device State (Complex)
For complex logic involving multiple fields or conditionals:
persistence_manager.modify_device(|device| {
// Complex mutation logic
if device.core.message_queue.len() > 100 {
device.core.message_queue.drain(0..50);
}
device.core.last_connected = Some(Utc::now().timestamp());
// Return result if needed
device.core.message_queue.len()
}).await;
Keep the closure passed to modify_device as short as possible. It holds a write lock on the device state, blocking all other modifications.
Concurrency Patterns
RwLock Semantics
The Device is protected by a tokio::sync::RwLock:
- Multiple readers:
get_device_snapshot() can be called concurrently
- Single writer:
modify_device() blocks all other access
- Writer priority: Pending writes block new reads (avoid reader starvation)
Chat Locks
For per-chat operations (like sending messages), the Client provides chat-level locks:
pub struct Client {
pub(crate) chat_locks: DashMap<String, Arc<Mutex<()>>>,
// ...
}
impl Client {
pub async fn with_chat_lock<F, Fut, R>(
&self,
chat_jid: &str,
f: F,
) -> R
where
F: FnOnce() -> Fut,
Fut: Future<Output = R>,
{
let lock = self.chat_locks
.entry(chat_jid.to_string())
.or_insert_with(|| Arc::new(Mutex::new(())));
let _guard = lock.lock().await;
f().await
}
}
Usage:
// Serialize operations for a specific chat
client.with_chat_lock(&chat_jid, || async {
// Send message, update state, etc.
client.send_message(&chat_jid, "Hello").await?;
Ok(())
}).await?;
Location: src/client.rs
Blocking Operations
CPU-heavy or blocking operations must use spawn_blocking to avoid stalling the async runtime:
use tokio::task::spawn_blocking;
// Bad: Blocks async runtime
let encrypted = expensive_crypto_operation(&data);
// Good: Offloads to thread pool
let encrypted = spawn_blocking(move || {
expensive_crypto_operation(&data)
}).await?;
For more details on async patterns, see the Architecture guide.
Storage Backend
Backend Trait
The Backend trait abstracts storage implementation:
#[async_trait]
pub trait Backend: Send + Sync {
async fn exists(&self) -> Result<bool>;
async fn create(&self) -> Result<i32>;
async fn load(&self) -> Result<Option<SerializableDevice>>;
async fn save(&self, device: &SerializableDevice) -> Result<()>;
// Sender key distribution (group chat)
async fn get_skdm_recipients(&self, group_jid: &str) -> Result<Vec<Jid>>;
async fn add_skdm_recipients(&self, group_jid: &str, device_jids: &[Jid]) -> Result<()>;
async fn clear_skdm_recipients(&self, group_jid: &str) -> Result<()>;
// Optional: Database snapshots for debugging
#[cfg(feature = "debug-snapshots")]
async fn snapshot_db(&self, name: &str, extra_content: Option<&[u8]>) -> Result<()>;
}
Location: src/store/traits.rs
SQLite Implementation
pub struct SqliteStore {
conn: Arc<Mutex<SqliteConnection>>,
device_id: String,
}
impl SqliteStore {
pub fn new(db_path: &str) -> Result<Self>
pub fn new_for_device(db_path: &str, device_id: &str) -> Result<Self>
}
Tables:
CREATE TABLE devices (
device_id TEXT PRIMARY KEY,
jid TEXT,
push_name TEXT,
platform TEXT,
noise_key BLOB,
identity_key BLOB,
registration_id INTEGER,
adv_secret_key BLOB,
edge_routing_info BLOB,
last_connected INTEGER
);
CREATE TABLE skdm_recipients (
group_jid TEXT,
device_jid TEXT,
PRIMARY KEY (group_jid, device_jid)
);
CREATE TABLE signal_sessions (
our_jid TEXT,
their_jid TEXT,
record BLOB,
PRIMARY KEY (our_jid, their_jid)
);
CREATE TABLE signal_prekeys (
our_jid TEXT,
prekey_id INTEGER,
record BLOB,
PRIMARY KEY (our_jid, prekey_id)
);
CREATE TABLE signal_sender_keys (
our_jid TEXT,
group_id TEXT,
sender_id TEXT,
record BLOB,
PRIMARY KEY (our_jid, group_id, sender_id)
);
Location: src/store/sqlite.rs, schema in migrations/
Serialization
Device state is serialized to bincode format:
pub struct SerializableDevice {
pub jid: String,
pub push_name: String,
pub platform: String,
pub noise_key: Vec<u8>,
pub identity_key: Vec<u8>,
pub registration_id: u32,
pub adv_secret_key: Vec<u8>,
pub edge_routing_info: Option<Vec<u8>>,
pub last_connected: Option<i64>,
}
impl Device {
pub fn to_serializable(&self) -> SerializableDevice {
SerializableDevice {
jid: self.core.id.to_string(),
push_name: self.core.push_name.clone(),
noise_key: bincode::serialize(&self.core.noise_key).unwrap(),
// ...
}
}
pub fn load_from_serializable(&mut self, data: SerializableDevice) {
self.core.id = data.jid.parse().unwrap();
self.core.push_name = data.push_name;
self.core.noise_key = bincode::deserialize(&data.noise_key).unwrap();
// ...
}
}
Location: src/store/device.rs:100-200
Debugging State
Database Snapshots
The debug-snapshots feature enables database snapshots for debugging:
// In error handler
if let Err(e) = decrypt_message(...) {
persistence_manager.create_snapshot(
"decrypt_error",
Some(error_details.as_bytes())
).await?;
return Err(e);
}
This creates a timestamped copy of the database:
chats.db
chats_snapshot_decrypt_error_20260228_143022.db
chats_snapshot_decrypt_error_20260228_143022.txt (metadata)
Location: src/store/persistence_manager.rs:99-121
Logging
State changes are logged at debug level:
RUST_LOG=whatsapp_rust::store=debug cargo run
Output:
[DEBUG] PersistenceManager: Ensuring device row exists.
[DEBUG] PersistenceManager: Loaded existing device data (PushName: 'Alice')
[DEBUG] Device state is dirty, saving to disk.
[DEBUG] Device state saved successfully.
Best Practices
1. Always Use Commands for State Changes
// Bad: Direct modification
persistence_manager.modify_device(|device| {
device.core.push_name = "New Name".to_string();
}).await;
// Good: Use command
persistence_manager.process_command(
DeviceCommand::SetPushName { name: "New Name".to_string() }
).await;
2. Minimize Lock Duration
// Bad: Long lock duration
persistence_manager.modify_device(|device| {
let data = expensive_calculation(&device.core.id); // Blocks all access!
device.core.some_field = data;
}).await;
// Good: Release lock during expensive operation
let device = persistence_manager.get_device_snapshot().await;
let data = expensive_calculation(&device.core.id);
persistence_manager.process_command(
DeviceCommand::SetSomeField { data }
).await;
3. Use Chat Locks for Chat-Specific Operations
// Ensures message send + state update are atomic per-chat
client.with_chat_lock(&chat_jid, || async {
let msg_id = client.send_message(&chat_jid, text).await?;
client.persistence_manager.process_command(
DeviceCommand::EnqueueMessage { message: msg_id }
).await;
Ok(())
}).await?;
4. Offload Heavy Operations
use tokio::task::spawn_blocking;
// Crypto operations should use spawn_blocking
let ciphertext = spawn_blocking(move || {
encrypt_message(&plaintext, &key)
}).await??;
References
- Implementation:
src/store/persistence_manager.rs
- Commands:
wacore/src/store/commands.rs
- Device structure:
src/store/device.rs, wacore/src/store/device_core.rs
- Backend trait:
src/store/traits.rs
- SQLite backend:
src/store/sqlite.rs
- AGENTS.md: State management guidelines (section 3)