Overview
The transport layer provides a runtime-agnostic abstraction for network connections. It handles raw byte transmission without knowledge of WhatsApp’s protocol framing.
The transport system consists of two main traits:
- Transport - Represents an active connection for sending/receiving raw bytes
- TransportFactory - Creates new transport instances and event streams
Transport Trait
The Transport trait represents an active network connection as a simple byte pipe.
use async_trait::async_trait;
#[async_trait]
pub trait Transport: Send + Sync {
/// Sends raw data to the server
async fn send(&self, data: Vec<u8>) -> Result<(), anyhow::Error>;
/// Closes the connection
async fn disconnect(&self);
}
Methods
send
Sends raw bytes through the transport. The caller is responsible for any protocol framing.
async fn send(&self, data: Vec<u8>) -> Result<(), anyhow::Error>;
Parameters:
Returns:
Ok(()) on success
Err(anyhow::Error) on failure
Example:
let data = vec![1, 2, 3, 4];
transport.send(data).await?;
disconnect
Gracefully closes the connection.
async fn disconnect(&self);
Example:
transport.disconnect().await;
TransportFactory Trait
Creates new transport instances and associated event streams.
#[async_trait]
pub trait TransportFactory: Send + Sync {
/// Creates a new transport and returns it along with a stream of events
async fn create_transport(
&self,
) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error>;
}
Methods
create_transport
Establishes a new connection and returns both the transport handle and an event receiver.
async fn create_transport(
&self,
) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error>;
Returns:
Arc<dyn Transport> - The transport instance for sending data
async_channel::Receiver<TransportEvent> - Stream of transport events
Example:
let factory = TokioWebSocketTransportFactory::new();
let (transport, events) = factory.create_transport().await?;
// Use transport to send data
transport.send(data).await?;
// Listen for events
while let Ok(event) = events.recv().await {
match event {
TransportEvent::Connected => println!("Connected"),
TransportEvent::DataReceived(bytes) => println!("Received {} bytes", bytes.len()),
TransportEvent::Disconnected => break,
}
}
TransportEvent
Events produced by the transport layer:
pub enum TransportEvent {
/// The transport has successfully connected
Connected,
/// Raw data has been received from the server
DataReceived(Bytes),
/// The connection was lost
Disconnected,
}
Event Types
Connected
Emitted immediately after successful connection establishment.
TransportEvent::Connected
DataReceived
Emitted when raw data is received from the server.
TransportEvent::DataReceived(bytes)
Fields:
bytes: Bytes - Raw data received (from the bytes crate)
Disconnected
Emitted when the connection is closed (gracefully or due to error).
TransportEvent::Disconnected
Tokio WebSocket transport
The default transport implementation using tokio-websockets for async WebSocket connections.
Features
- Async I/O - Built on Tokio runtime
- TLS support - Uses rustls with webpki-roots for certificate validation
- Split architecture - Separate read/write paths for efficiency
- Generic over streams - Works with any
AsyncRead + AsyncWrite stream type
- Automatic reconnection - Handled by higher-level Client code
- Development mode - Optional
danger-skip-tls-verify feature
TokioWebSocketTransportFactory
The default factory handles DNS resolution, TCP connection, and TLS. For custom connection logic, use from_websocket directly.
use whatsapp_rust_tokio_transport::TokioWebSocketTransportFactory;
// Default - connects to WhatsApp Web
let factory = TokioWebSocketTransportFactory::new();
// Custom URL
let factory = TokioWebSocketTransportFactory::new()
.with_url("wss://custom-endpoint.example.com/ws");
// Custom TLS connector (e.g. custom CA certificates, client certs)
let factory = TokioWebSocketTransportFactory::new()
.with_connector(my_custom_connector);
with_connector
pub fn with_connector(self, connector: Connector) -> Self
Uses a custom TLS Connector instead of the built-in default. This is the primary extension point for custom TLS configuration — for example, adding custom CA certificates or client certificates.
For full proxy support, implement TransportFactory directly and use from_websocket instead.
Parameters:
connector - A tokio_websockets::Connector (re-exported as whatsapp_rust::transport::Connector)
Example — custom CA certificate:
use whatsapp_rust_tokio_transport::{TokioWebSocketTransportFactory, Connector, default_tls_connector};
use tokio_rustls::TlsConnector;
use rustls::RootCertStore;
use std::sync::Arc;
// Build a custom TLS config with your own CA
let mut root_store = RootCertStore::empty();
root_store.add_parsable_certificates(my_ca_certs);
let config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
let connector = Connector::Rustls(TlsConnector::from(Arc::new(config)));
let factory = TokioWebSocketTransportFactory::new()
.with_connector(connector);
Use default_tls_connector() to inspect or replicate the default TLS configuration as a starting point before customizing.
default_tls_connector
pub fn default_tls_connector() -> Connector
Returns the default TLS connector used by TokioWebSocketTransportFactory. Uses rustls with webpki_roots for certificate validation and ring as the crypto provider.
This is useful as a starting point when you need to inspect or replicate the default TLS configuration before customizing it via with_connector.
Usage with Bot builder
use whatsapp_rust::Bot;
use whatsapp_rust_tokio_transport::TokioWebSocketTransportFactory;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let factory = TokioWebSocketTransportFactory::new();
let mut bot = Bot::builder()
.with_backend(backend)
.with_transport_factory(factory)
.with_http_client(http_client)
.on_event(|event, client| async move { /* handle events */ })
.build()
.await?;
Ok(())
}
from_websocket
Wraps an already-upgraded WebSocketStream into a Transport + event channel. This is useful when you need custom connection strategies such as IPv4 preference, TCP keepalive tuning, or connecting through a proxy.
pub fn from_websocket<S>(
ws: WebSocketStream<S>,
) -> (Arc<dyn Transport>, async_channel::Receiver<TransportEvent>)
where
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
Parameters:
ws - An already-connected WebSocketStream over any async stream type
Returns:
Arc<dyn Transport> - The transport instance for sending data
async_channel::Receiver<TransportEvent> - Stream of transport events
The function splits the WebSocket into read/write halves, spawns a background read pump task, and synchronously enqueues a Connected event before the read pump starts — ensuring it always precedes any DataReceived events.
Example — IPv4-only connection with TCP keepalive:
use whatsapp_rust_tokio_transport::from_websocket;
use tokio::net::TcpStream;
use tokio_websockets::ClientBuilder;
// Custom TCP connection with keepalive
let tcp = TcpStream::connect("web.whatsapp.com:443").await?;
tcp.set_nodelay(true)?;
let sock_ref = socket2::SockRef::from(&tcp);
let keepalive = socket2::TcpKeepalive::new()
.with_time(Duration::from_secs(30))
.with_interval(Duration::from_secs(10));
sock_ref.set_tcp_keepalive(&keepalive)?;
// Upgrade to WebSocket (TLS handled externally)
let (ws, _response) = ClientBuilder::from_uri("wss://web.whatsapp.com/ws/chat".parse()?)
.connect_on(tcp)
.await?;
// Wrap into a Transport
let (transport, events) = from_websocket(ws);
Example — using with a custom TransportFactory:
use whatsapp_rust_tokio_transport::from_websocket;
use wacore::net::{Transport, TransportEvent, TransportFactory};
struct MyTransportFactory;
#[async_trait]
impl TransportFactory for MyTransportFactory {
async fn create_transport(
&self,
) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error> {
// Your custom connection logic here
let ws = establish_custom_websocket().await?;
Ok(from_websocket(ws))
}
}
TokioWebSocketTransportFactory itself delegates to from_websocket internally. Use the factory when the default DNS/TCP/TLS behavior is sufficient, and from_websocket when you need full control over the connection.
Proxy support
The transport layer provides two approaches for proxy support, depending on your needs:
Option 1: Implement TransportFactory directly — for full control over the connection (SOCKS5, HTTP CONNECT, etc.). Establish a WebSocket through the proxy yourself, then wrap it with from_websocket:
use whatsapp_rust_tokio_transport::from_websocket;
use wacore::net::{Transport, TransportEvent, TransportFactory};
struct ProxyTransportFactory {
proxy_url: String,
}
#[async_trait]
impl TransportFactory for ProxyTransportFactory {
async fn create_transport(
&self,
) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error> {
// 1. Connect to proxy
let proxy_stream = connect_to_proxy(&self.proxy_url).await?;
// 2. Upgrade to WebSocket through the proxy
let (ws, _) = tokio_websockets::ClientBuilder::from_uri(
"wss://web.whatsapp.com/ws/chat".parse()?
)
.connect_on(proxy_stream)
.await?;
// 3. Wrap into Transport
Ok(from_websocket(ws))
}
}
Option 2: Use with_connector — for custom TLS only (no proxy routing). The factory still handles DNS and TCP, but you control the TLS layer:
let factory = TokioWebSocketTransportFactory::new()
.with_connector(my_custom_tls_connector);
See custom backends — proxy and custom TLS for a complete guide with examples.
TLS configuration
By default, the transport validates TLS certificates using webpki-roots:
let mut root_store = rustls::RootCertStore::empty();
root_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
Development mode (skip TLS verification)
Only use for development and testing.
Enable the danger-skip-tls-verify feature to disable certificate verification:
[dependencies]
whatsapp-rust-tokio-transport = { version = "*", features = ["danger-skip-tls-verify"] }
This allows connecting through MITM proxies or self-signed certificates.
Connection settings
The default WebSocket URL is:
pub const WHATSAPP_WEB_WS_URL: &str = "wss://web.whatsapp.com/ws/chat";
The Client wraps create_transport() in a 20-second timeout matching WhatsApp Web’s connect timeout. The transport itself does not enforce this timeout — it is applied at the client layer. If you use a custom transport, be aware that the client will abort the connection attempt after 20 seconds regardless of the transport’s own timeout behavior.
Internal architecture
The transport is generic over any AsyncRead + AsyncWrite stream, split into separate read/write paths:
let (sink, stream) = ws.split();
// Sink - wrapped in Arc<Mutex> for sending
let sink: Arc<Mutex<Option<Sink<S>>>> = Arc::new(Mutex::new(Some(sink)));
// Stream - moved to read_pump task with shutdown signal
tokio::task::spawn(read_pump(stream, event_tx, shutdown_rx));
The read pump uses tokio::select! with a shutdown watch channel to ensure clean termination:
async fn read_pump<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
mut stream: SplitStream<WebSocketStream<S>>,
tx: async_channel::Sender<TransportEvent>,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) {
loop {
tokio::select! {
biased;
_ = shutdown.changed() => break,
next = stream.next() => match next {
Some(Ok(msg)) if msg.is_binary() => {
let payload = msg.into_payload();
// Shutdown-aware send to prevent blocking on full channel
tokio::select! {
biased;
_ = shutdown.changed() => break,
r = tx.send(TransportEvent::DataReceived(Bytes::from(payload))) => {
if r.is_err() { break; }
}
}
}
Some(Ok(msg)) if msg.is_close() => break,
Some(Err(e)) => break,
None => break,
},
}
}
let _ = tx.send(TransportEvent::Disconnected).await;
}
Implementing Custom Transports
You can implement custom transports for different runtimes or protocols.
Example: Mock Transport for Testing
use async_trait::async_trait;
use std::sync::Arc;
use wacore::net::{Transport, TransportEvent, TransportFactory};
/// A mock transport that does nothing
pub struct MockTransport;
#[async_trait]
impl Transport for MockTransport {
async fn send(&self, _data: Vec<u8>) -> Result<(), anyhow::Error> {
// Silently succeed
Ok(())
}
async fn disconnect(&self) {
// Nothing to do
}
}
/// Factory for creating mock transports
pub struct MockTransportFactory;
impl MockTransportFactory {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl TransportFactory for MockTransportFactory {
async fn create_transport(
&self,
) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error>
{
let (_tx, rx) = async_channel::bounded(1);
Ok((Arc::new(MockTransport), rx))
}
}
Example: TCP Transport (No TLS)
use async_trait::async_trait;
use tokio::net::TcpStream;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::sync::Arc;
use tokio::sync::Mutex;
pub struct TcpTransport {
writer: Arc<Mutex<tokio::io::WriteHalf<TcpStream>>>,
}
#[async_trait]
impl Transport for TcpTransport {
async fn send(&self, data: Vec<u8>) -> Result<(), anyhow::Error> {
let mut writer = self.writer.lock().await;
writer.write_all(&data).await?;
Ok(())
}
async fn disconnect(&self) {
// TCP disconnect handled by drop
}
}
pub struct TcpTransportFactory {
address: String,
}
impl TcpTransportFactory {
pub fn new(address: impl Into<String>) -> Self {
Self {
address: address.into(),
}
}
}
#[async_trait]
impl TransportFactory for TcpTransportFactory {
async fn create_transport(
&self,
) -> Result<(Arc<dyn Transport>, async_channel::Receiver<TransportEvent>), anyhow::Error>
{
let stream = TcpStream::connect(&self.address).await?;
let (reader, writer) = tokio::io::split(stream);
let (event_tx, event_rx) = async_channel::bounded(100);
let transport = Arc::new(TcpTransport {
writer: Arc::new(Mutex::new(writer)),
});
// Spawn read task
tokio::task::spawn(async move {
let mut reader = reader;
let mut buf = vec![0u8; 4096];
event_tx.send(TransportEvent::Connected).await.ok();
loop {
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
let data = bytes::Bytes::copy_from_slice(&buf[..n]);
if event_tx.send(TransportEvent::DataReceived(data)).await.is_err() {
break;
}
}
Err(_) => break,
}
}
event_tx.send(TransportEvent::Disconnected).await.ok();
});
Ok((transport, event_rx))
}
}
Best Practices
- Thread Safety - Transport must be
Send + Sync
- Error Handling - Return descriptive errors from
send()
- Graceful Shutdown - Implement proper cleanup in
disconnect()
- Event Channel Size - Use bounded channels with reasonable capacity
- Read Task - Spawn a separate task for receiving data
- Resource Cleanup - Ensure sockets/resources are closed on drop
Testing Transports
Unit Test Example
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_mock_transport() {
let factory = MockTransportFactory::new();
let (transport, events) = factory.create_transport().await.unwrap();
// Should succeed without error
transport.send(vec![1, 2, 3]).await.unwrap();
// Disconnect should be no-op
transport.disconnect().await;
}
#[tokio::test]
async fn test_websocket_transport() {
let factory = TokioWebSocketTransportFactory::new();
let (transport, mut events) = factory.create_transport().await.unwrap();
// Should receive Connected event
match events.recv().await {
Ok(TransportEvent::Connected) => {},
other => panic!("Expected Connected, got {:?}", other),
}
transport.disconnect().await;
}
}
See Also