Skip to main content

Overview

This guide covers implementing custom backends for storage, network transport, and HTTP operations in whatsapp-rust. The library uses trait-based abstractions to allow full customization.

Storage Backend Architecture

The storage backend is split into four domain-grouped traits:
  1. SignalStore - Signal protocol cryptography (identities, sessions, keys)
  2. AppSyncStore - WhatsApp app state synchronization
  3. ProtocolStore - WhatsApp Web protocol alignment (SKDM, LID mapping, device registry)
  4. DeviceStore - Device persistence operations
See Store API reference for the complete trait definitions.

Backend Trait

Any type implementing all four traits automatically implements Backend:
use wacore::store::traits::*;

pub trait Backend: SignalStore + AppSyncStore + ProtocolStore + DeviceStore + Send + Sync {}

impl<T> Backend for T 
where 
    T: SignalStore + AppSyncStore + ProtocolStore + DeviceStore + Send + Sync 
{}

Implementing a Custom Storage Backend

Step 1: Define Your Store

use std::sync::Arc;
use async_trait::async_trait;
use wacore::store::traits::*;
use wacore::store::error::{Result, StoreError};

#[derive(Clone)]
pub struct MyCustomStore {
    // Your storage implementation (database connection, file handle, etc.)
    connection: Arc<MyConnection>,
}

impl MyCustomStore {
    pub async fn new(connection_string: &str) -> Result<Self> {
        let connection = MyConnection::connect(connection_string).await?;
        Ok(Self {
            connection: Arc::new(connection),
        })
    }
}

Step 2: Implement SignalStore

Handle Signal protocol cryptographic operations:
use bytes::Bytes;

#[async_trait]
impl SignalStore for MyCustomStore {
    // --- Identity Operations ---
    
    async fn put_identity(&self, address: &str, key: [u8; 32]) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO identities (address, key) VALUES (?, ?)",
            &[address, &key[..]],
        ).await?;
        Ok(())
    }
    
    async fn load_identity(&self, address: &str) -> Result<Option<[u8; 32]>> {
        let row = self.connection.query_optional(
            "SELECT key FROM identities WHERE address = ?",
            &[address],
        ).await?;
        Ok(row.map(|r| {
            let bytes: Vec<u8> = r.get(0);
            let mut arr = [0u8; 32];
            arr.copy_from_slice(&bytes);
            arr
        }))
    }
    
    async fn delete_identity(&self, address: &str) -> Result<()> {
        self.connection.execute(
            "DELETE FROM identities WHERE address = ?",
            &[address],
        ).await?;
        Ok(())
    }
    
    // --- Session Operations ---
    
    async fn get_session(&self, address: &str) -> Result<Option<Bytes>> {
        let row = self.connection.query_optional(
            "SELECT record FROM sessions WHERE address = ?",
            &[address],
        ).await?;
        Ok(row.map(|r| Bytes::from(r.get::<Vec<u8>>(0))))
    }
    
    async fn put_session(&self, address: &str, session: &[u8]) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO sessions (address, record) VALUES (?, ?)",
            &[address, session],
        ).await?;
        Ok(())
    }
    
    async fn delete_session(&self, address: &str) -> Result<()> {
        self.connection.execute(
            "DELETE FROM sessions WHERE address = ?",
            &[address],
        ).await?;
        Ok(())
    }
    
    // --- PreKey Operations ---
    
    async fn store_prekey(&self, id: u32, record: &[u8], uploaded: bool) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO prekeys (id, record, uploaded) VALUES (?, ?, ?)",
            &[&id, record, &(uploaded as i32)],
        ).await?;
        Ok(())
    }
    
    async fn load_prekey(&self, id: u32) -> Result<Option<Bytes>> {
        let row = self.connection.query_optional(
            "SELECT record FROM prekeys WHERE id = ?",
            &[&id],
        ).await?;
        Ok(row.map(|r| Bytes::from(r.get::<Vec<u8>>(0))))
    }

    // Override load_prekeys_batch for better performance (default loops over load_prekey)
    async fn load_prekeys_batch(&self, ids: &[u32]) -> Result<Vec<(u32, Bytes)>> {
        let placeholders = ids.iter().map(|_| "?").collect::<Vec<_>>().join(",");
        let query = format!("SELECT id, record FROM prekeys WHERE id IN ({})", placeholders);
        let rows = self.connection.query_all(&query, ids).await?;
        Ok(rows.iter().map(|r| (r.get(0), Bytes::from(r.get::<Vec<u8>>(1)))).collect())
    }
    
    async fn remove_prekey(&self, id: u32) -> Result<()> {
        self.connection.execute(
            "DELETE FROM prekeys WHERE id = ?",
            &[&id],
        ).await?;
        Ok(())
    }
    
    async fn get_max_prekey_id(&self) -> Result<u32> {
        let row = self.connection.query_optional(
            "SELECT MAX(id) FROM prekeys",
            &[],
        ).await?;
        Ok(row.and_then(|r| r.get(0)).unwrap_or(0))
    }
    
    // --- Signed PreKey Operations ---
    
    async fn store_signed_prekey(&self, id: u32, record: &[u8]) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO signed_prekeys (id, record) VALUES (?, ?)",
            &[&id, record],
        ).await?;
        Ok(())
    }
    
    async fn load_signed_prekey(&self, id: u32) -> Result<Option<Vec<u8>>> {
        let row = self.connection.query_optional(
            "SELECT record FROM signed_prekeys WHERE id = ?",
            &[&id],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }
    
    async fn load_all_signed_prekeys(&self) -> Result<Vec<(u32, Vec<u8>)>> {
        let rows = self.connection.query(
            "SELECT id, record FROM signed_prekeys",
            &[],
        ).await?;
        Ok(rows.into_iter().map(|r| (r.get(0), r.get(1))).collect())
    }
    
    async fn remove_signed_prekey(&self, id: u32) -> Result<()> {
        self.connection.execute(
            "DELETE FROM signed_prekeys WHERE id = ?",
            &[&id],
        ).await?;
        Ok(())
    }
    
    // --- Sender Key Operations ---
    
    async fn put_sender_key(&self, address: &str, record: &[u8]) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO sender_keys (address, record) VALUES (?, ?)",
            &[address, record],
        ).await?;
        Ok(())
    }
    
    async fn get_sender_key(&self, address: &str) -> Result<Option<Vec<u8>>> {
        let row = self.connection.query_optional(
            "SELECT record FROM sender_keys WHERE address = ?",
            &[address],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }
    
    async fn delete_sender_key(&self, address: &str) -> Result<()> {
        self.connection.execute(
            "DELETE FROM sender_keys WHERE address = ?",
            &[address],
        ).await?;
        Ok(())
    }
}
See Store API reference for all SignalStore methods.

Step 3: Implement AppSyncStore

Handle WhatsApp app state synchronization:
use wacore::appstate::hash::HashState;
use wacore_appstate::processor::AppStateMutationMAC;

#[async_trait]
impl AppSyncStore for MyCustomStore {
    async fn get_sync_key(&self, key_id: &[u8]) -> Result<Option<AppStateSyncKey>> {
        let row = self.connection.query_optional(
            "SELECT key_data, fingerprint, timestamp FROM app_state_sync_keys WHERE key_id = ?",
            &[key_id],
        ).await?;
        
        Ok(row.map(|r| AppStateSyncKey {
            key_data: r.get(0),
            fingerprint: r.get(1),
            timestamp: r.get(2),
        }))
    }
    
    async fn set_sync_key(&self, key_id: &[u8], key: AppStateSyncKey) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO app_state_sync_keys (key_id, key_data, fingerprint, timestamp) VALUES (?, ?, ?, ?)",
            &[key_id, &key.key_data[..], &key.fingerprint[..], &key.timestamp],
        ).await?;
        Ok(())
    }
    
    async fn get_version(&self, name: &str) -> Result<HashState> {
        let row = self.connection.query_optional(
            "SELECT version, hash FROM app_state_versions WHERE name = ?",
            &[name],
        ).await?;
        
        Ok(row.map(|r| HashState {
            version: r.get(0),
            hash: r.get(1),
        }).unwrap_or_default())
    }
    
    async fn set_version(&self, name: &str, state: HashState) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO app_state_versions (name, version, hash) VALUES (?, ?, ?)",
            &[name, &state.version, &state.hash[..]],
        ).await?;
        Ok(())
    }
    
    async fn put_mutation_macs(
        &self,
        name: &str,
        version: u64,
        mutations: &[AppStateMutationMAC],
    ) -> Result<()> {
        for mutation in mutations {
            self.connection.execute(
                "INSERT INTO mutation_macs (name, version, index_mac, value_mac) VALUES (?, ?, ?, ?)",
                &[name, &version, &mutation.index_mac[..], &mutation.value_mac[..]],
            ).await?;
        }
        Ok(())
    }
    
    async fn get_mutation_mac(&self, name: &str, index_mac: &[u8]) -> Result<Option<Vec<u8>>> {
        let row = self.connection.query_optional(
            "SELECT value_mac FROM mutation_macs WHERE name = ? AND index_mac = ?",
            &[name, index_mac],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }
    
    async fn delete_mutation_macs(&self, name: &str, index_macs: &[Vec<u8>]) -> Result<()> {
        for index_mac in index_macs {
            self.connection.execute(
                "DELETE FROM mutation_macs WHERE name = ? AND index_mac = ?",
                &[name, &index_mac[..]],
            ).await?;
        }
        Ok(())
    }
    
    async fn get_latest_sync_key_id(&self) -> Result<Option<Vec<u8>>> {
        let row = self.connection.query_optional(
            "SELECT key_id FROM app_state_sync_keys ORDER BY timestamp DESC LIMIT 1",
            &[],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }
}
See Store API reference for all AppSyncStore methods.

Step 4: Implement ProtocolStore

Handle WhatsApp Web protocol alignment:
use wacore_binary::jid::Jid;

#[async_trait]
impl ProtocolStore for MyCustomStore {
    // --- Per-Device Sender Key Tracking ---
    
    async fn get_sender_key_devices(&self, group_jid: &str) -> Result<Vec<(String, bool)>> {
        let rows = self.connection.query(
            "SELECT device_jid, has_key FROM sender_key_devices WHERE group_jid = ?",
            &[group_jid],
        ).await?;
        
        Ok(rows.into_iter()
            .map(|r| (r.get::<String>(0), r.get::<i32>(1) != 0))
            .collect())
    }
    
    async fn set_sender_key_status(&self, group_jid: &str, entries: &[(&str, bool)]) -> Result<()> {
        for (device_jid, has_key) in entries {
            self.connection.execute(
                "INSERT OR REPLACE INTO sender_key_devices (group_jid, device_jid, has_key, updated_at) VALUES (?, ?, ?, strftime('%s', 'now'))",
                &[group_jid, device_jid, &(*has_key as i32)],
            ).await?;
        }
        Ok(())
    }
    
    async fn clear_sender_key_devices(&self, group_jid: &str) -> Result<()> {
        self.connection.execute(
            "DELETE FROM sender_key_devices WHERE group_jid = ?",
            &[group_jid],
        ).await?;
        Ok(())
    }
    
    // --- LID-PN Mapping ---
    
    async fn get_lid_mapping(&self, lid: &str) -> Result<Option<LidPnMappingEntry>> {
        let row = self.connection.query_optional(
            "SELECT lid, phone_number, created_at, updated_at, learning_source FROM lid_pn_mappings WHERE lid = ?",
            &[lid],
        ).await?;
        
        Ok(row.map(|r| LidPnMappingEntry {
            lid: r.get(0),
            phone_number: r.get(1),
            created_at: r.get(2),
            updated_at: r.get(3),
            learning_source: r.get(4),
        }))
    }
    
    async fn get_pn_mapping(&self, phone: &str) -> Result<Option<LidPnMappingEntry>> {
        let row = self.connection.query_optional(
            "SELECT lid, phone_number, created_at, updated_at, learning_source FROM lid_pn_mappings WHERE phone_number = ? ORDER BY updated_at DESC LIMIT 1",
            &[phone],
        ).await?;
        
        Ok(row.map(|r| LidPnMappingEntry {
            lid: r.get(0),
            phone_number: r.get(1),
            created_at: r.get(2),
            updated_at: r.get(3),
            learning_source: r.get(4),
        }))
    }
    
    async fn put_lid_mapping(&self, entry: &LidPnMappingEntry) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO lid_pn_mappings (lid, phone_number, created_at, updated_at, learning_source) VALUES (?, ?, ?, ?, ?)",
            &[&entry.lid, &entry.phone_number, &entry.created_at, &entry.updated_at, &entry.learning_source],
        ).await?;
        Ok(())
    }
    
    async fn get_all_lid_mappings(&self) -> Result<Vec<LidPnMappingEntry>> {
        let rows = self.connection.query(
            "SELECT lid, phone_number, created_at, updated_at, learning_source FROM lid_pn_mappings",
            &[],
        ).await?;
        
        Ok(rows.into_iter().map(|r| LidPnMappingEntry {
            lid: r.get(0),
            phone_number: r.get(1),
            created_at: r.get(2),
            updated_at: r.get(3),
            learning_source: r.get(4),
        }).collect())
    }
    
    // --- Sent Message Store (retry support) ---

    async fn store_sent_message(
        &self,
        chat_jid: &str,
        message_id: &str,
        payload: &[u8],
    ) -> Result<()> {
        self.connection.execute(
            "INSERT OR REPLACE INTO sent_messages (chat_jid, message_id, payload, created_at) VALUES (?, ?, ?, strftime('%s', 'now'))",
            &[chat_jid, message_id, payload],
        ).await?;
        Ok(())
    }

    async fn take_sent_message(
        &self,
        chat_jid: &str,
        message_id: &str,
    ) -> Result<Option<Vec<u8>>> {
        // Atomic read-and-delete
        let row = self.connection.query_optional(
            "DELETE FROM sent_messages WHERE chat_jid = ? AND message_id = ? RETURNING payload",
            &[chat_jid, message_id],
        ).await?;
        Ok(row.map(|r| r.get(0)))
    }

    async fn delete_expired_sent_messages(&self, cutoff_timestamp: i64) -> Result<u32> {
        let count = self.connection.execute(
            "DELETE FROM sent_messages WHERE created_at < ?",
            &[&cutoff_timestamp],
        ).await?;
        Ok(count as u32)
    }

    // ... Implement remaining methods (base_key, device_list, tc_token)
}
See Store API reference for all ProtocolStore methods.

Step 5: Implement DeviceStore

Handle device data persistence:
use wacore::store::Device;

#[async_trait]
impl DeviceStore for MyCustomStore {
    async fn save(&self, device: &Device) -> Result<()> {
        // Serialize device data (use your preferred format)
        let serialized = bincode::serialize(device)
            .map_err(|e| StoreError::Serialization(e.to_string()))?;
        
        self.connection.execute(
            "UPDATE devices SET data = ? WHERE id = 1",
            &[&serialized[..]],
        ).await?;
        Ok(())
    }
    
    async fn load(&self) -> Result<Option<Device>> {
        let row = self.connection.query_optional(
            "SELECT data FROM devices WHERE id = 1",
            &[],
        ).await?;
        
        match row {
            Some(r) => {
                let data: Vec<u8> = r.get(0);
                let device = bincode::deserialize(&data)
                    .map_err(|e| StoreError::Deserialization(e.to_string()))?;
                Ok(Some(device))
            }
            None => Ok(None),
        }
    }
    
    async fn exists(&self) -> Result<bool> {
        let row = self.connection.query_optional(
            "SELECT 1 FROM devices WHERE id = 1",
            &[],
        ).await?;
        Ok(row.is_some())
    }
    
    async fn create(&self) -> Result<i32> {
        self.connection.execute(
            "INSERT INTO devices (id, data) VALUES (1, ?)",
            &[&Vec::<u8>::new()],
        ).await?;
        Ok(1)
    }
    
    async fn snapshot_db(&self, name: &str, extra_content: Option<&[u8]>) -> Result<()> {
        // Optional: Implement database snapshotting for debugging
        Ok(())
    }
}
See Store API reference for all DeviceStore methods.

Using your custom backend

use whatsapp_rust::bot::Bot;
use whatsapp_rust::TokioRuntime;

let backend = Arc::new(MyCustomStore::new("my://connection").await?);

let bot = Bot::builder()
    .with_backend(backend)
    .with_transport_factory(transport_factory)
    .with_http_client(http_client)
    .with_runtime(TokioRuntime)
    .build()
    .await?;

Custom Runtime

The Runtime trait is the foundation of the runtime-agnostic architecture. It abstracts async task spawning, sleeping, blocking operations, and cooperative yielding. Implement it to use a different async runtime (e.g., async-std, smol, or a WASM executor).

The Runtime trait

use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use async_trait::async_trait;
use wacore::runtime::{AbortHandle, Runtime};

pub struct MyRuntime;

#[async_trait]
impl Runtime for MyRuntime {
    fn spawn(
        &self,
        future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
    ) -> AbortHandle {
        // Spawn a task on your runtime and return an AbortHandle
        // that cancels the task when called
        let handle = my_runtime::spawn(future);
        AbortHandle::new(move || handle.cancel())
    }

    fn sleep(&self, duration: Duration) -> Pin<Box<dyn Future<Output = ()> + Send>> {
        // Return a future that completes after the given duration
        Box::pin(my_runtime::sleep(duration))
    }

    fn spawn_blocking(
        &self,
        f: Box<dyn FnOnce() + Send + 'static>,
    ) -> Pin<Box<dyn Future<Output = ()> + Send>> {
        // Offload a blocking closure to a thread pool
        Box::pin(async {
            let _ = my_runtime::spawn_blocking(f).await;
        })
    }

    fn yield_now(&self) -> Option<Pin<Box<dyn Future<Output = ()> + Send>>> {
        // Return Some(future) if cooperative yielding is needed,
        // or None if the runtime handles this automatically
        // (e.g., multi-threaded runtimes don't need explicit yielding)
        None
    }

    fn yield_frequency(&self) -> u32 {
        // How many items to process before yielding in tight loops.
        // Default is 10. Single-threaded runtimes should return 1
        // to avoid starving the event loop.
        10
    }
}

AbortHandle

AbortHandle is a type-erased cancellation handle marked #[must_use]. Dropping an AbortHandle aborts the spawned task, ensuring cleanup. Call .detach() to prevent automatic cancellation for fire-and-forget tasks where the task should run to completion even if the parent scope is dropped.
Because AbortHandle is #[must_use], the compiler warns if you discard the return value of Runtime::spawn(). You must either store the handle (to abort later), call .detach() (to let the task run freely), or explicitly call .abort().
// Store the handle to abort later
let handle = runtime.spawn(Box::pin(my_task()));
// ...
handle.abort(); // Cancel the task

// Detach for fire-and-forget tasks
runtime.spawn(Box::pin(background_work())).detach();

Using your runtime

use whatsapp_rust::bot::Bot;

let mut bot = Bot::builder()
    .with_backend(backend)
    .with_transport_factory(transport_factory)
    .with_http_client(http_client)
    .with_runtime(MyRuntime)
    .build()
    .await?;
To target WASM, disable all default features and implement Runtime without Send bounds. On target_arch = "wasm32", the trait automatically removes Send requirements.
If you are implementing a single-threaded runtime, override yield_frequency() to return 1 and have yield_now() return Some(future). This ensures the event loop is not starved during tight processing loops (e.g., decoding incoming frames).

Proxy and custom TLS

whatsapp-rust has two separate network layers, each with its own extension point for proxy and TLS customization:
LayerPurposeExtension point
WebSocket transportWhatsApp protocol connectionTransportFactory trait / with_connector
HTTP clientMedia upload/download, version fetchingHttpClient trait / UreqHttpClient::with_agent
To route all traffic through a proxy, you need to configure both layers.

Custom TLS for the WebSocket transport

Use TokioWebSocketTransportFactory::with_connector to supply a custom TLS Connector — for example, adding custom CA certificates or client certificates:
use whatsapp_rust_tokio_transport::{TokioWebSocketTransportFactory, Connector};
use tokio_rustls::TlsConnector;
use rustls::RootCertStore;
use std::sync::Arc;

let mut root_store = RootCertStore::empty();
root_store.add_parsable_certificates(my_ca_certs);

let tls_config = rustls::ClientConfig::builder()
    .with_root_certificates(root_store)
    .with_no_client_auth();

let connector = Connector::Rustls(TlsConnector::from(Arc::new(tls_config)));

let factory = TokioWebSocketTransportFactory::new()
    .with_connector(connector);
Use default_tls_connector() to inspect or replicate the default TLS configuration as a starting point.

Full proxy support for the WebSocket transport

For full proxy support (SOCKS5, HTTP CONNECT, etc.), implement the TransportFactory trait directly. Establish the WebSocket connection through your proxy, then wrap it with from_websocket:
use whatsapp_rust_tokio_transport::from_websocket;
use wacore::net::{Transport, TransportEvent, TransportFactory};
use async_trait::async_trait;
use std::sync::Arc;

struct ProxyTransportFactory {
    proxy_addr: String,
    target_url: String,
}

#[async_trait]
impl TransportFactory for ProxyTransportFactory {
    async fn create_transport(
        &self,
    ) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error> {
        // 1. Connect to proxy (example: SOCKS5)
        let proxy_stream = tokio_socks::tcp::Socks5Stream::connect(
            &self.proxy_addr,
            "web.whatsapp.com:443",
        ).await?;

        // 2. Upgrade to TLS + WebSocket through the proxy
        let (ws, _) = tokio_websockets::ClientBuilder::from_uri(
            self.target_url.parse()?,
        )
        .connect_on(proxy_stream)
        .await?;

        // 3. Wrap the WebSocket into a Transport + event channel
        Ok(from_websocket(ws))
    }
}

Proxy support for the HTTP client

Use UreqHttpClient::with_agent to supply a pre-configured ureq::Agent with proxy settings:
use whatsapp_rust_ureq_http_client::UreqHttpClient;
use ureq::config::Config;

let agent: ureq::Agent = Config::builder()
    .proxy(ureq::Proxy::new("socks5://127.0.0.1:1080")?)
    .build()
    .into();

let http_client = UreqHttpClient::with_agent(agent);
Alternatively, implement the HttpClient trait directly for full control over HTTP behavior.

Combining both layers

To route all traffic through a proxy, configure both the transport factory and the HTTP client:
use whatsapp_rust::bot::Bot;
use whatsapp_rust::TokioRuntime;

let mut bot = Bot::builder()
    .with_backend(backend)
    .with_transport_factory(ProxyTransportFactory {
        proxy_addr: "127.0.0.1:1080".to_string(),
        target_url: "wss://web.whatsapp.com/ws/chat".to_string(),
    })
    .with_http_client(UreqHttpClient::with_agent(proxy_agent))
    .with_runtime(TokioRuntime)
    .on_event(|event, client| async move { /* ... */ })
    .build()
    .await?;
The WebSocket transport handles the persistent WhatsApp protocol connection, while the HTTP client handles media operations (upload/download) and version fetching. Both must be configured separately for full proxy coverage.

Custom Transport Backend

Transport Trait

Implement the Transport trait for custom network transports:
use async_trait::async_trait;
use wacore::net::Transport;

pub struct MyCustomTransport {
    // Your transport implementation
}

#[async_trait]
impl Transport for MyCustomTransport {
    async fn send(&self, data: Vec<u8>) -> Result<(), anyhow::Error> {
        // Send raw bytes through your transport
        Ok(())
    }
    
    async fn disconnect(&self) {
        // Close the connection
    }
}
See Transport API reference for full trait details.

Transport Factory

Implement TransportFactory to create transport instances:
use wacore::net::{TransportFactory, TransportEvent};
use async_channel::Receiver;
use std::sync::Arc;

pub struct MyCustomTransportFactory {
    url: String,
}

impl MyCustomTransportFactory {
    pub fn new(url: String) -> Self {
        Self { url }
    }
}

#[async_trait]
impl TransportFactory for MyCustomTransportFactory {
    async fn create_transport(
        &self,
    ) -> Result<(Arc<dyn Transport>, Receiver<TransportEvent>), anyhow::Error> {
        // Create transport and event channel
        let (event_tx, event_rx) = async_channel::bounded(10000);
        
        // Connect to server
        let transport = MyCustomTransport::connect(&self.url).await?;
        
        // Spawn read pump task
        tokio::spawn(read_pump(transport.reader(), event_tx));
        
        // Send Connected event
        event_tx.send(TransportEvent::Connected).await?;
        
        Ok((Arc::new(transport), event_rx))
    }
}
See Transport API reference for factory details.

Transport Events

pub enum TransportEvent {
    Connected,
    DataReceived(Bytes),
    Disconnected,
}

// Read pump example
async fn read_pump(
    mut reader: impl AsyncRead,
    event_tx: async_channel::Sender<TransportEvent>,
) {
    loop {
        match read_frame(&mut reader).await {
            Ok(data) => {
                if event_tx.send(TransportEvent::DataReceived(data)).await.is_err() {
                    break;
                }
            }
            Err(_) => break,
        }
    }
    let _ = event_tx.send(TransportEvent::Disconnected).await;
}

Custom HTTP Client

Implement the HttpClient trait for custom HTTP operations:
use whatsapp_rust::http::{HttpClient, HttpRequest, HttpResponse};
use wacore::net::StreamingHttpResponse;
use async_trait::async_trait;

pub struct MyCustomHttpClient {
    // Your HTTP client implementation
}

#[async_trait]
impl HttpClient for MyCustomHttpClient {
    async fn execute(&self, request: HttpRequest) -> Result<HttpResponse, anyhow::Error> {
        // Execute HTTP request
        let response = self.send_request(&request).await?;
        
        Ok(HttpResponse {
            status_code: response.status_code,
            body: response.body,
        })
    }

    // Override supports_streaming if you implement execute_streaming.
    // When false (the default), download_to_writer falls back to buffered downloads.
    fn supports_streaming(&self) -> bool {
        true
    }
    
    fn execute_streaming(&self, request: HttpRequest) -> Result<StreamingHttpResponse, anyhow::Error> {
        // Return a streaming response with a reader
        Ok(StreamingHttpResponse {
            status_code: response.status_code,
            body: Box::new(response.body_reader),
        })
    }
}
If your HTTP client doesn’t support streaming, you only need to implement execute. The supports_streaming method defaults to false, and download_to_writer will automatically use a buffered fallback.

SQLite Reference Implementation

The library includes a full SQLite implementation you can use as reference: See Store API reference for the SQLite implementation details.

Key Features

  • Diesel ORM with migrations
  • Connection pooling (r2d2)
  • WAL mode for concurrency
  • Prepared statements for performance
  • Transaction support
  • Database snapshotting for debugging

Using SQLite store

use whatsapp_rust::store::SqliteStore;

let backend = Arc::new(SqliteStore::new("whatsapp.db").await?);
SQLite is bundled by default via the bundled-sqlite feature on whatsapp-rust-sqlite-storage. No system SQLite installation is required.

Custom Cache Store

The pluggable cache store adapter lets you replace the default in-process moka caches with an external backend like Redis or Memcached. This is useful for sharing cache state across multiple client instances or for deployments where in-process memory is limited.

The CacheStore trait

Implement the CacheStore trait from wacore::store::cache:
use async_trait::async_trait;
use std::time::Duration;
// You can import from either location:
// use wacore::store::cache::CacheStore;
use whatsapp_rust::CacheStore;

pub struct MyRedisCacheStore {
    client: redis::aio::ConnectionManager,
}

impl MyRedisCacheStore {
    pub async fn new(url: &str) -> Self {
        let client = redis::Client::open(url).unwrap();
        let conn = client.get_connection_manager().await.unwrap();
        Self { client: conn }
    }
}

#[async_trait]
impl CacheStore for MyRedisCacheStore {
    async fn get(&self, namespace: &str, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
        let redis_key = format!("{namespace}:{key}");
        let result: Option<Vec<u8>> = redis::cmd("GET")
            .arg(&redis_key)
            .query_async(&mut self.client.clone())
            .await?;
        Ok(result)
    }

    async fn set(
        &self,
        namespace: &str,
        key: &str,
        value: &[u8],
        ttl: Option<Duration>,
    ) -> anyhow::Result<()> {
        let redis_key = format!("{namespace}:{key}");
        match ttl {
            Some(duration) => {
                redis::cmd("SET")
                    .arg(&redis_key)
                    .arg(value)
                    .arg("EX")
                    .arg(duration.as_secs())
                    .query_async(&mut self.client.clone())
                    .await?;
            }
            None => {
                redis::cmd("SET")
                    .arg(&redis_key)
                    .arg(value)
                    .query_async(&mut self.client.clone())
                    .await?;
            }
        }
        Ok(())
    }

    async fn delete(&self, namespace: &str, key: &str) -> anyhow::Result<()> {
        let redis_key = format!("{namespace}:{key}");
        redis::cmd("DEL")
            .arg(&redis_key)
            .query_async(&mut self.client.clone())
            .await?;
        Ok(())
    }

    async fn clear(&self, namespace: &str) -> anyhow::Result<()> {
        // Use SCAN to find and delete all keys with the namespace prefix
        let pattern = format!("{namespace}:*");
        let mut cursor = 0u64;
        loop {
            let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
                .arg(cursor)
                .arg("MATCH")
                .arg(&pattern)
                .arg("COUNT")
                .arg(100)
                .query_async(&mut self.client.clone())
                .await?;

            if !keys.is_empty() {
                redis::cmd("DEL")
                    .arg(&keys)
                    .query_async(&mut self.client.clone())
                    .await?;
            }

            cursor = next_cursor;
            if cursor == 0 {
                break;
            }
        }
        Ok(())
    }

    async fn entry_count(&self, namespace: &str) -> anyhow::Result<u64> {
        // Approximate count using SCAN (expensive — use only for diagnostics)
        let pattern = format!("{namespace}:*");
        let mut count = 0u64;
        let mut cursor = 0u64;
        loop {
            let (next_cursor, keys): (u64, Vec<String>) = redis::cmd("SCAN")
                .arg(cursor)
                .arg("MATCH")
                .arg(&pattern)
                .arg("COUNT")
                .arg(100)
                .query_async(&mut self.client.clone())
                .await?;
            count += keys.len() as u64;
            cursor = next_cursor;
            if cursor == 0 {
                break;
            }
        }
        Ok(count)
    }
}

Plugging in your cache store

Use CacheStores to assign your custom backend to specific caches:
use whatsapp_rust::{CacheConfig, CacheStores};
use std::sync::Arc;

let redis = Arc::new(MyRedisCacheStore::new("redis://localhost:6379").await);

// Route only group and device caches to Redis
let bot = Bot::builder()
    .with_backend(backend)
    .with_transport_factory(transport)
    .with_http_client(http_client)
    .with_runtime(TokioRuntime)
    .with_cache_config(CacheConfig {
        cache_stores: CacheStores {
            group_cache: Some(redis.clone()),
            device_registry_cache: Some(redis.clone()),
            ..Default::default()
        },
        ..Default::default()
    })
    .build()
    .await?;
Or route all pluggable caches to the same backend:
let config = CacheConfig {
    cache_stores: CacheStores::all(redis.clone()),
    ..Default::default()
};

Available namespaces

The following namespaces are used internally by the client:
NamespaceCacheDescription
"group"group_cacheGroup metadata
"device_registry"device_registry_cacheDevice registry entries
"lid_pn_by_lid"lid_pn_cacheLID-to-phone bidirectional mappings

Design considerations

  • Error handling is best-effort. Cache misses and failures are logged as warnings but don’t break the client — it falls back to fetching from the authoritative source.
  • Serialization uses serde_json. Values are serialized to JSON bytes on the custom-store path. The moka path has zero serialization overhead.
  • TTL is forwarded from CacheEntryConfig. Your implementation receives the same TTL configured in CacheConfig.
  • Coordination caches cannot be externalized. Session locks, message queues, and enqueue locks hold live Rust objects (mutexes, channels) and always stay in-process.
  • invalidate_all() requires tokio-runtime. The synchronous invalidate_all() method on TypedCache spawns a fire-and-forget task via Tokio for custom backends. Without the tokio-runtime feature, the clear is skipped with a warning. Use the async clear() method instead if you disable tokio-runtime.

Best Practices

1
Use Connection Pooling
2
// ✅ Good: Use connection pool
let pool = Pool::new(config)?;
let backend = MyCustomStore::new(pool);

// ❌ Bad: Create connections per operation
let backend = MyCustomStore::new(connection_string);
3
Implement Proper Error Handling
4
use wacore::store::error::{Result, StoreError};

async fn load_session(&self, address: &str) -> Result<Option<Vec<u8>>> {
    match self.connection.query("SELECT ...", &[address]).await {
        Ok(row) => Ok(Some(row.get(0))),
        Err(e) if is_not_found(&e) => Ok(None),
        Err(e) => Err(StoreError::Database(e.to_string())),
    }
}
5
Use Transactions for Batch Operations
6
async fn put_mutation_macs(
    &self,
    name: &str,
    version: u64,
    mutations: &[AppStateMutationMAC],
) -> Result<()> {
    let tx = self.connection.begin_transaction().await?;
    
    for mutation in mutations {
        tx.execute("INSERT ...", &[...]).await?;
    }
    
    tx.commit().await?;
    Ok(())
}
7
Implement Database Migrations
8
Use a migration system (e.g., Diesel migrations) to manage schema changes:
9
-- migrations/2024-01-01-000000_create_sessions/up.sql
CREATE TABLE IF NOT EXISTS sessions (
    address TEXT PRIMARY KEY,
    record BLOB NOT NULL
);

Next Steps