Skip to main content

Overview

This guide covers implementing custom backends for storage, network transport, and HTTP operations in whatsapp-rust. The library uses trait-based abstractions to allow full customization.

Storage Backend Architecture

The storage backend is split into four domain-grouped traits:
  1. SignalStore - Signal protocol cryptography (identities, sessions, keys)
  2. AppSyncStore - WhatsApp app state synchronization
  3. ProtocolStore - WhatsApp Web protocol alignment (SKDM, LID mapping, device registry)
  4. DeviceStore - Device persistence operations
See \1

Backend Trait

Any type implementing all four traits automatically implements Backend:
use wacore::store::traits::*;

pub trait Backend: SignalStore + AppSyncStore + ProtocolStore + DeviceStore + Send + Sync {}

impl<T> Backend for T 
where 
    T: SignalStore + AppSyncStore + ProtocolStore + DeviceStore + Send + Sync 
{}

Implementing a Custom Storage Backend

Step 1: Define Your Store

use std::sync::Arc;
use async_trait::async_trait;
use wacore::store::traits::*;
use wacore::store::error::{Result, StoreError};

#[derive(Clone)]
pub struct MyCustomStore {
    // Your storage implementation (database connection, file handle, etc.)
    connection: Arc<MyConnection>,
}

impl MyCustomStore {
    pub async fn new(connection_string: &str) -> Result<Self> {
        let connection = MyConnection::connect(connection_string).await?;
        Ok(Self {
            connection: Arc::new(connection),
        })
    }
}

Step 2: Implement SignalStore

Handle Signal protocol cryptographic operations:
#[async_trait]
impl SignalStore for MyCustomStore {
    // --- Identity Operations ---
    
    async fn put_identity(&self, address: &str, key: [u8; 32]) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO identities (address, key) VALUES (?, ?)",
            &[address, &key[..]],
        ).await?;
        Ok(())
    }
    
    async fn load_identity(&self, address: &str) -> Result<Option<Vec<u8>>> {
        let row = self.connection.query_optional(
            "SELECT key FROM identities WHERE address = ?",
            &[address],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }
    
    async fn delete_identity(&self, address: &str) -> Result<()> {
        self.connection.execute(
            "DELETE FROM identities WHERE address = ?",
            &[address],
        ).await?;
        Ok(())
    }
    
    // --- Session Operations ---
    
    async fn get_session(&self, address: &str) -> Result<Option<Vec<u8>>> {
        let row = self.connection.query_optional(
            "SELECT record FROM sessions WHERE address = ?",
            &[address],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }
    
    async fn put_session(&self, address: &str, session: &[u8]) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO sessions (address, record) VALUES (?, ?)",
            &[address, session],
        ).await?;
        Ok(())
    }
    
    async fn delete_session(&self, address: &str) -> Result<()> {
        self.connection.execute(
            "DELETE FROM sessions WHERE address = ?",
            &[address],
        ).await?;
        Ok(())
    }
    
    // --- PreKey Operations ---
    
    async fn store_prekey(&self, id: u32, record: &[u8], uploaded: bool) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO prekeys (id, record, uploaded) VALUES (?, ?, ?)",
            &[&id, record, &(uploaded as i32)],
        ).await?;
        Ok(())
    }
    
    async fn load_prekey(&self, id: u32) -> Result<Option<Vec<u8>>> {
        let row = self.connection.query_optional(
            "SELECT record FROM prekeys WHERE id = ?",
            &[&id],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }
    
    async fn remove_prekey(&self, id: u32) -> Result<()> {
        self.connection.execute(
            "DELETE FROM prekeys WHERE id = ?",
            &[&id],
        ).await?;
        Ok(())
    }
    
    // --- Signed PreKey Operations ---
    
    async fn store_signed_prekey(&self, id: u32, record: &[u8]) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO signed_prekeys (id, record) VALUES (?, ?)",
            &[&id, record],
        ).await?;
        Ok(())
    }
    
    async fn load_signed_prekey(&self, id: u32) -> Result<Option<Vec<u8>>> {
        let row = self.connection.query_optional(
            "SELECT record FROM signed_prekeys WHERE id = ?",
            &[&id],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }
    
    async fn load_all_signed_prekeys(&self) -> Result<Vec<(u32, Vec<u8>)>> {
        let rows = self.connection.query(
            "SELECT id, record FROM signed_prekeys",
            &[],
        ).await?;
        Ok(rows.into_iter().map(|r| (r.get(0), r.get(1))).collect())
    }
    
    async fn remove_signed_prekey(&self, id: u32) -> Result<()> {
        self.connection.execute(
            "DELETE FROM signed_prekeys WHERE id = ?",
            &[&id],
        ).await?;
        Ok(())
    }
    
    // --- Sender Key Operations ---
    
    async fn put_sender_key(&self, address: &str, record: &[u8]) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO sender_keys (address, record) VALUES (?, ?)",
            &[address, record],
        ).await?;
        Ok(())
    }
    
    async fn get_sender_key(&self, address: &str) -> Result<Option<Vec<u8>>> {
        let row = self.connection.query_optional(
            "SELECT record FROM sender_keys WHERE address = ?",
            &[address],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }
    
    async fn delete_sender_key(&self, address: &str) -> Result<()> {
        self.connection.execute(
            "DELETE FROM sender_keys WHERE address = ?",
            &[address],
        ).await?;
        Ok(())
    }
}
See \1

Step 3: Implement AppSyncStore

Handle WhatsApp app state synchronization:
use wacore::appstate::hash::HashState;
use wacore_appstate::processor::AppStateMutationMAC;

#[async_trait]
impl AppSyncStore for MyCustomStore {
    async fn get_sync_key(&self, key_id: &[u8]) -> Result<Option<AppStateSyncKey>> {
        let row = self.connection.query_optional(
            "SELECT key_data, fingerprint, timestamp FROM app_state_sync_keys WHERE key_id = ?",
            &[key_id],
        ).await?;
        
        Ok(row.map(|r| AppStateSyncKey {
            key_data: r.get(0),
            fingerprint: r.get(1),
            timestamp: r.get(2),
        }))
    }
    
    async fn set_sync_key(&self, key_id: &[u8], key: AppStateSyncKey) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO app_state_sync_keys (key_id, key_data, fingerprint, timestamp) VALUES (?, ?, ?, ?)",
            &[key_id, &key.key_data[..], &key.fingerprint[..], &key.timestamp],
        ).await?;
        Ok(())
    }
    
    async fn get_version(&self, name: &str) -> Result<HashState> {
        let row = self.connection.query_optional(
            "SELECT version, hash FROM app_state_versions WHERE name = ?",
            &[name],
        ).await?;
        
        Ok(row.map(|r| HashState {
            version: r.get(0),
            hash: r.get(1),
        }).unwrap_or_default())
    }
    
    async fn set_version(&self, name: &str, state: HashState) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO app_state_versions (name, version, hash) VALUES (?, ?, ?)",
            &[name, &state.version, &state.hash[..]],
        ).await?;
        Ok(())
    }
    
    async fn put_mutation_macs(
        &self,
        name: &str,
        version: u64,
        mutations: &[AppStateMutationMAC],
    ) -> Result<()> {
        for mutation in mutations {
            self.connection.execute(
                "INSERT INTO mutation_macs (name, version, index_mac, value_mac) VALUES (?, ?, ?, ?)",
                &[name, &version, &mutation.index_mac[..], &mutation.value_mac[..]],
            ).await?;
        }
        Ok(())
    }
    
    async fn get_mutation_mac(&self, name: &str, index_mac: &[u8]) -> Result<Option<Vec<u8>>> {
        let row = self.connection.query_optional(
            "SELECT value_mac FROM mutation_macs WHERE name = ? AND index_mac = ?",
            &[name, index_mac],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }
    
    async fn delete_mutation_macs(&self, name: &str, index_macs: &[Vec<u8>]) -> Result<()> {
        for index_mac in index_macs {
            self.connection.execute(
                "DELETE FROM mutation_macs WHERE name = ? AND index_mac = ?",
                &[name, &index_mac[..]],
            ).await?;
        }
        Ok(())
    }
}
See \1

Step 4: Implement ProtocolStore

Handle WhatsApp Web protocol alignment:
use wacore_binary::jid::Jid;

#[async_trait]
impl ProtocolStore for MyCustomStore {
    // --- SKDM Tracking ---
    
    async fn get_skdm_recipients(&self, group_jid: &str) -> Result<Vec<Jid>> {
        let rows = self.connection.query(
            "SELECT device_jid FROM skdm_recipients WHERE group_jid = ?",
            &[group_jid],
        ).await?;
        
        rows.into_iter()
            .map(|r| {
                let jid_str: String = r.get(0);
                jid_str.parse().map_err(|e| StoreError::Parse(format!("{}", e)))
            })
            .collect()
    }
    
    async fn add_skdm_recipients(&self, group_jid: &str, device_jids: &[Jid]) -> Result<()> {
        for device_jid in device_jids {
            self.connection.execute(
                "INSERT OR IGNORE INTO skdm_recipients (group_jid, device_jid) VALUES (?, ?)",
                &[group_jid, &device_jid.to_string()],
            ).await?;
        }
        Ok(())
    }
    
    async fn clear_skdm_recipients(&self, group_jid: &str) -> Result<()> {
        self.connection.execute(
            "DELETE FROM skdm_recipients WHERE group_jid = ?",
            &[group_jid],
        ).await?;
        Ok(())
    }
    
    // --- LID-PN Mapping ---
    
    async fn get_lid_mapping(&self, lid: &str) -> Result<Option<LidPnMappingEntry>> {
        let row = self.connection.query_optional(
            "SELECT lid, phone_number, created_at, updated_at, learning_source FROM lid_pn_mappings WHERE lid = ?",
            &[lid],
        ).await?;
        
        Ok(row.map(|r| LidPnMappingEntry {
            lid: r.get(0),
            phone_number: r.get(1),
            created_at: r.get(2),
            updated_at: r.get(3),
            learning_source: r.get(4),
        }))
    }
    
    async fn get_pn_mapping(&self, phone: &str) -> Result<Option<LidPnMappingEntry>> {
        let row = self.connection.query_optional(
            "SELECT lid, phone_number, created_at, updated_at, learning_source FROM lid_pn_mappings WHERE phone_number = ? ORDER BY updated_at DESC LIMIT 1",
            &[phone],
        ).await?;
        
        Ok(row.map(|r| LidPnMappingEntry {
            lid: r.get(0),
            phone_number: r.get(1),
            created_at: r.get(2),
            updated_at: r.get(3),
            learning_source: r.get(4),
        }))
    }
    
    async fn put_lid_mapping(&self, entry: &LidPnMappingEntry) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO lid_pn_mappings (lid, phone_number, created_at, updated_at, learning_source) VALUES (?, ?, ?, ?, ?)",
            &[&entry.lid, &entry.phone_number, &entry.created_at, &entry.updated_at, &entry.learning_source],
        ).await?;
        Ok(())
    }
    
    async fn get_all_lid_mappings(&self) -> Result<Vec<LidPnMappingEntry>> {
        let rows = self.connection.query(
            "SELECT lid, phone_number, created_at, updated_at, learning_source FROM lid_pn_mappings",
            &[],
        ).await?;
        
        Ok(rows.into_iter().map(|r| LidPnMappingEntry {
            lid: r.get(0),
            phone_number: r.get(1),
            created_at: r.get(2),
            updated_at: r.get(3),
            learning_source: r.get(4),
        }).collect())
    }
    
    // ... Implement remaining methods (base_key, device_list, forget_marks, tc_token)
}
See \1

Step 5: Implement DeviceStore

Handle device data persistence:
use wacore::store::Device;

#[async_trait]
impl DeviceStore for MyCustomStore {
    async fn save(&self, device: &Device) -> Result<()> {
        // Serialize device data (use your preferred format)
        let serialized = bincode::serialize(device)
            .map_err(|e| StoreError::Serialization(e.to_string()))?;
        
        self.connection.execute(
            "UPDATE devices SET data = ? WHERE id = 1",
            &[&serialized[..]],
        ).await?;
        Ok(())
    }
    
    async fn load(&self) -> Result<Option<Device>> {
        let row = self.connection.query_optional(
            "SELECT data FROM devices WHERE id = 1",
            &[],
        ).await?;
        
        match row {
            Some(r) => {
                let data: Vec<u8> = r.get(0);
                let device = bincode::deserialize(&data)
                    .map_err(|e| StoreError::Deserialization(e.to_string()))?;
                Ok(Some(device))
            }
            None => Ok(None),
        }
    }
    
    async fn exists(&self) -> Result<bool> {
        let row = self.connection.query_optional(
            "SELECT 1 FROM devices WHERE id = 1",
            &[],
        ).await?;
        Ok(row.is_some())
    }
    
    async fn create(&self) -> Result<i32> {
        self.connection.execute(
            "INSERT INTO devices (id, data) VALUES (1, ?)",
            &[&Vec::<u8>::new()],
        ).await?;
        Ok(1)
    }
    
    async fn snapshot_db(&self, name: &str, extra_content: Option<&[u8]>) -> Result<()> {
        // Optional: Implement database snapshotting for debugging
        Ok(())
    }
}
See \1

Using Your Custom Backend

use whatsapp_rust::bot::Bot;

let backend = Arc::new(MyCustomStore::new("my://connection").await?);

let bot = Bot::builder()
    .with_backend(backend)
    .with_transport_factory(transport_factory)
    .with_http_client(http_client)
    .build()
    .await?;

Custom Transport Backend

Transport Trait

Implement the Transport trait for custom network transports:
use async_trait::async_trait;
use wacore::net::Transport;

pub struct MyCustomTransport {
    // Your transport implementation
}

#[async_trait]
impl Transport for MyCustomTransport {
    async fn send(&self, data: Vec<u8>) -> Result<(), anyhow::Error> {
        // Send raw bytes through your transport
        Ok(())
    }
    
    async fn disconnect(&self) {
        // Close the connection
    }
}
See \1

Transport Factory

Implement TransportFactory to create transport instances:
use wacore::net::{TransportFactory, TransportEvent};
use async_channel::Receiver;
use std::sync::Arc;

pub struct MyCustomTransportFactory {
    url: String,
}

impl MyCustomTransportFactory {
    pub fn new(url: String) -> Self {
        Self { url }
    }
}

#[async_trait]
impl TransportFactory for MyCustomTransportFactory {
    async fn create_transport(
        &self,
    ) -> Result<(Arc<dyn Transport>, Receiver<TransportEvent>), anyhow::Error> {
        // Create transport and event channel
        let (event_tx, event_rx) = async_channel::bounded(10000);
        
        // Connect to server
        let transport = MyCustomTransport::connect(&self.url).await?;
        
        // Spawn read pump task
        tokio::spawn(read_pump(transport.reader(), event_tx));
        
        // Send Connected event
        event_tx.send(TransportEvent::Connected).await?;
        
        Ok((Arc::new(transport), event_rx))
    }
}
See \1

Transport Events

pub enum TransportEvent {
    Connected,
    DataReceived(Bytes),
    Disconnected,
}

// Read pump example
async fn read_pump(
    mut reader: impl AsyncRead,
    event_tx: async_channel::Sender<TransportEvent>,
) {
    loop {
        match read_frame(&mut reader).await {
            Ok(data) => {
                if event_tx.send(TransportEvent::DataReceived(data)).await.is_err() {
                    break;
                }
            }
            Err(_) => break,
        }
    }
    let _ = event_tx.send(TransportEvent::Disconnected).await;
}

Custom HTTP Client

Implement the HttpClient trait for custom HTTP operations:
use whatsapp_rust::http::{HttpClient, HttpRequest, HttpResponse};
use async_trait::async_trait;

pub struct MyCustomHttpClient {
    // Your HTTP client implementation
}

#[async_trait]
impl HttpClient for MyCustomHttpClient {
    async fn execute(&self, request: HttpRequest) -> Result<HttpResponse, anyhow::Error> {
        // Execute HTTP request
        let response = self.send_request(&request).await?;
        
        Ok(HttpResponse {
            status_code: response.status_code,
            body: response.body,
        })
    }
    
    fn execute_streaming(&self, request: HttpRequest) -> Result<HttpStreamingResponse, anyhow::Error> {
        // Return a streaming response with a reader
        Ok(HttpStreamingResponse {
            status_code: response.status_code,
            body: Box::new(response.body_reader),
        })
    }
    
    fn clone_box(&self) -> Box<dyn HttpClient> {
        Box::new(self.clone())
    }
}

SQLite Reference Implementation

The library includes a full SQLite implementation you can use as reference: See \1

Key Features

  • Diesel ORM with migrations
  • Connection pooling (r2d2)
  • WAL mode for concurrency
  • Prepared statements for performance
  • Transaction support
  • Database snapshotting for debugging

Using SQLite Store

use whatsapp_rust::store::SqliteStore;

let backend = Arc::new(SqliteStore::new("whatsapp.db").await?);

Best Practices

1
Use Connection Pooling
2
// ✅ Good: Use connection pool
let pool = Pool::new(config)?;
let backend = MyCustomStore::new(pool);

// ❌ Bad: Create connections per operation
let backend = MyCustomStore::new(connection_string);
3
Implement Proper Error Handling
4
use wacore::store::error::{Result, StoreError};

async fn load_session(&self, address: &str) -> Result<Option<Vec<u8>>> {
    match self.connection.query("SELECT ...", &[address]).await {
        Ok(row) => Ok(Some(row.get(0))),
        Err(e) if is_not_found(&e) => Ok(None),
        Err(e) => Err(StoreError::Database(e.to_string())),
    }
}
5
Use Transactions for Batch Operations
6
async fn put_mutation_macs(
    &self,
    name: &str,
    version: u64,
    mutations: &[AppStateMutationMAC],
) -> Result<()> {
    let tx = self.connection.begin_transaction().await?;
    
    for mutation in mutations {
        tx.execute("INSERT ...", &[...]).await?;
    }
    
    tx.commit().await?;
    Ok(())
}
7
Implement Database Migrations
8
Use a migration system (e.g., Diesel migrations) to manage schema changes:
9
-- migrations/2024-01-01-000000_create_sessions/up.sql
CREATE TABLE IF NOT EXISTS sessions (
    address TEXT PRIMARY KEY,
    record BLOB NOT NULL
);

Next Steps