Skip to main content

Overview

The transport layer provides a runtime-agnostic abstraction for network connections. It handles raw byte transmission without knowledge of WhatsApp’s protocol framing. The transport system consists of two main traits:
  • Transport - Represents an active connection for sending/receiving raw bytes
  • TransportFactory - Creates new transport instances and event streams

Transport Trait

The Transport trait represents an active network connection as a simple byte pipe.
use async_trait::async_trait;

#[async_trait]
pub trait Transport: Send + Sync {
    /// Sends raw data to the server
    async fn send(&self, data: Vec<u8>) -> Result<(), anyhow::Error>;

    /// Closes the connection
    async fn disconnect(&self);
}

Methods

send

Sends raw bytes through the transport. The caller is responsible for any protocol framing.
async fn send(&self, data: Vec<u8>) -> Result<(), anyhow::Error>;
Parameters:
  • data - Raw bytes to send
Returns:
  • Ok(()) on success
  • Err(anyhow::Error) on failure
Example:
let data = vec![1, 2, 3, 4];
transport.send(data).await?;

disconnect

Gracefully closes the connection.
async fn disconnect(&self);
Example:
transport.disconnect().await;

TransportFactory Trait

Creates new transport instances and associated event streams.
#[async_trait]
pub trait TransportFactory: Send + Sync {
    /// Creates a new transport and returns it along with a stream of events
    async fn create_transport(
        &self,
    ) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error>;
}

Methods

create_transport

Establishes a new connection and returns both the transport handle and an event receiver.
async fn create_transport(
    &self,
) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error>;
Returns:
  • Arc<dyn Transport> - The transport instance for sending data
  • async_channel::Receiver<TransportEvent> - Stream of transport events
Example:
let factory = TokioWebSocketTransportFactory::new();
let (transport, events) = factory.create_transport().await?;

// Use transport to send data
transport.send(data).await?;

// Listen for events
while let Ok(event) = events.recv().await {
    match event {
        TransportEvent::Connected => println!("Connected"),
        TransportEvent::DataReceived(bytes) => println!("Received {} bytes", bytes.len()),
        TransportEvent::Disconnected => break,
    }
}

TransportEvent

Events produced by the transport layer:
pub enum TransportEvent {
    /// The transport has successfully connected
    Connected,
    
    /// Raw data has been received from the server
    DataReceived(Bytes),
    
    /// The connection was lost
    Disconnected,
}

Event Types

Connected

Emitted immediately after successful connection establishment.
TransportEvent::Connected

DataReceived

Emitted when raw data is received from the server.
TransportEvent::DataReceived(bytes)
Fields:
  • bytes: Bytes - Raw data received (from the bytes crate)

Disconnected

Emitted when the connection is closed (gracefully or due to error).
TransportEvent::Disconnected

TokioWebSocketTransport

The default implementation using tokio-websockets for async WebSocket connections.

Features

  • Async I/O - Built on Tokio runtime
  • TLS Support - Uses rustls with webpki-roots for certificate validation
  • Split Architecture - Separate read/write paths for efficiency
  • Automatic Reconnection - Handled by higher-level Client code
  • Development Mode - Optional danger-skip-tls-verify feature

Creating a Transport Factory

use whatsapp_rust::transport::TokioWebSocketTransportFactory;

// Default - connects to WhatsApp Web
let factory = TokioWebSocketTransportFactory::new();

// Custom URL (for testing/proxying)
let factory = TokioWebSocketTransportFactory::new()
    .with_url("wss://custom-endpoint.example.com/ws");

Usage with Client

use whatsapp_rust::Client;
use whatsapp_rust::transport::TokioWebSocketTransportFactory;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let factory = TokioWebSocketTransportFactory::new();
    
    let mut client = Client::new();
    client.set_transport_factory(Box::new(factory));
    
    client.connect().await?;
    
    Ok(())
}

TLS Configuration

By default, TokioWebSocketTransport validates TLS certificates using webpki-roots:
let mut root_store = rustls::RootCertStore::empty();
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());

let config = rustls::ClientConfig::builder()
    .with_root_certificates(root_store)
    .with_no_client_auth();

Development Mode (Skip TLS Verification)

WARNING: Only use for development/testing! Enable the danger-skip-tls-verify feature to disable certificate verification:
[dependencies]
whatsapp-rust-tokio-transport = { version = "*", features = ["danger-skip-tls-verify"] }
This allows connecting through MITM proxies or self-signed certificates.

Connection Settings

The default WebSocket URL is:
pub const WHATSAPP_WEB_WS_URL: &str = "wss://web.whatsapp.com/ws/chat";

Internal Architecture

TokioWebSocketTransport splits the WebSocket into separate read/write paths:
let (sink, stream) = websocket.split();

// Sink - wrapped in Arc<Mutex> for sending
let ws_sink: Arc<Mutex<Option<WsSink>>> = Arc::new(Mutex::new(Some(sink)));

// Stream - moved to read_pump task
tokio::task::spawn(read_pump(stream, event_tx));
The read pump continuously processes incoming messages:
async fn read_pump(mut stream: WsStream, event_tx: async_channel::Sender<TransportEvent>) {
    loop {
        match stream.next().await {
            Some(Ok(msg)) if msg.is_binary() => {
                let payload = msg.into_payload();
                event_tx.send(TransportEvent::DataReceived(Bytes::from(payload))).await;
            }
            Some(Ok(msg)) if msg.is_close() => break,
            Some(Err(e)) => break,
            None => break,
        }
    }
    event_tx.send(TransportEvent::Disconnected).await;
}

Implementing Custom Transports

You can implement custom transports for different runtimes or protocols.

Example: Mock Transport for Testing

use async_trait::async_trait;
use std::sync::Arc;
use wacore::net::{Transport, TransportEvent, TransportFactory};

/// A mock transport that does nothing
pub struct MockTransport;

#[async_trait]
impl Transport for MockTransport {
    async fn send(&self, _data: Vec<u8>) -> Result<(), anyhow::Error> {
        // Silently succeed
        Ok(())
    }

    async fn disconnect(&self) {
        // Nothing to do
    }
}

/// Factory for creating mock transports
pub struct MockTransportFactory;

impl MockTransportFactory {
    pub fn new() -> Self {
        Self
    }
}

#[async_trait]
impl TransportFactory for MockTransportFactory {
    async fn create_transport(
        &self,
    ) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error>
    {
        let (_tx, rx) = async_channel::bounded(1);
        Ok((Arc::new(MockTransport), rx))
    }
}

Example: TCP Transport (No TLS)

use async_trait::async_trait;
use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::sync::Arc;
use tokio::sync::Mutex;

pub struct TcpTransport {
    writer: Arc<Mutex<tokio::io::WriteHalf<TcpStream>>>,
}

#[async_trait]
impl Transport for TcpTransport {
    async fn send(&self, data: Vec<u8>) -> Result<(), anyhow::Error> {
        let mut writer = self.writer.lock().await;
        writer.write_all(&data).await?;
        Ok(())
    }

    async fn disconnect(&self) {
        // TCP disconnect handled by drop
    }
}

pub struct TcpTransportFactory {
    address: String,
}

impl TcpTransportFactory {
    pub fn new(address: impl Into<String>) -> Self {
        Self {
            address: address.into(),
        }
    }
}

#[async_trait]
impl TransportFactory for TcpTransportFactory {
    async fn create_transport(
        &self,
    ) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error>
    {
        let stream = TcpStream::connect(&self.address).await?;
        let (reader, writer) = tokio::io::split(stream);
        
        let (event_tx, event_rx) = async_channel::bounded(100);
        
        let transport = Arc::new(TcpTransport {
            writer: Arc::new(Mutex::new(writer)),
        });
        
        // Spawn read task
        tokio::task::spawn(async move {
            let mut reader = reader;
            let mut buf = vec![0u8; 4096];
            
            event_tx.send(TransportEvent::Connected).await.ok();
            
            loop {
                match reader.read(&mut buf).await {
                    Ok(0) => break,
                    Ok(n) => {
                        let data = bytes::Bytes::copy_from_slice(&buf[..n]);
                        if event_tx.send(TransportEvent::DataReceived(data)).await.is_err() {
                            break;
                        }
                    }
                    Err(_) => break,
                }
            }
            
            event_tx.send(TransportEvent::Disconnected).await.ok();
        });
        
        Ok((transport, event_rx))
    }
}

Best Practices

  1. Thread Safety - Transport must be Send + Sync
  2. Error Handling - Return descriptive errors from send()
  3. Graceful Shutdown - Implement proper cleanup in disconnect()
  4. Event Channel Size - Use bounded channels with reasonable capacity
  5. Read Task - Spawn a separate task for receiving data
  6. Resource Cleanup - Ensure sockets/resources are closed on drop

Testing Transports

Unit Test Example

#[cfg(test)]
mod tests {
    use super::*;
    
    #[tokio::test]
    async fn test_mock_transport() {
        let factory = MockTransportFactory::new();
        let (transport, events) = factory.create_transport().await.unwrap();
        
        // Should succeed without error
        transport.send(vec![1, 2, 3]).await.unwrap();
        
        // Disconnect should be no-op
        transport.disconnect().await;
    }
    
    #[tokio::test]
    async fn test_websocket_transport() {
        let factory = TokioWebSocketTransportFactory::new();
        let (transport, mut events) = factory.create_transport().await.unwrap();
        
        // Should receive Connected event
        match events.recv().await {
            Ok(TransportEvent::Connected) => {},
            other => panic!("Expected Connected, got {:?}", other),
        }
        
        transport.disconnect().await;
    }
}

See Also