Overview
whatsapp-rust uses WebSocket for transport and the Noise Protocol for encryption. All messages are encrypted at the transport layer before being sent over the network.
Architecture
The WebSocket handling system has several layers:
src/
├── handshake.rs # Noise protocol handshake
├── socket/
│ ├── noise_socket.rs # Encrypted communication layer
│ ├── mod.rs # Re-exports
│ └── error.rs # Socket errors
├── transport/ # WebSocket transport abstraction
└── client.rs # High-level client orchestration
Noise Protocol Handshake
The handshake establishes an encrypted channel using the Noise XX pattern. A 20-second timeout (NOISE_HANDSHAKE_RESPONSE_TIMEOUT) is applied when waiting for the server’s handshake response, ensuring the client does not hang indefinitely if the server is unresponsive.
Handshake State
pub struct HandshakeState {
noise_key: KeyPair, // Client's static Curve25519 keypair
client_payload: Vec<u8>, // Client identification data
pattern: &'static str, // "Noise_XX_25519_AESGCM_SHA256"
prologue: &'static [u8], // "WA" header
}
Location: wacore/src/handshake/state.rs
Handshake Process
// From src/handshake.rs:29-104
pub async fn do_handshake(
device: &Device,
transport: Arc<dyn Transport>,
transport_events: &mut async_channel::Receiver<TransportEvent>,
) -> Result<Arc<NoiseSocket>>
Step-by-step process:
-
Prepare client payload:
let client_payload = device.core.get_client_payload().encode_to_vec();
The payload contains:
- Client version
- Platform information
- Device details
-
Initialize Noise state:
let mut handshake_state = HandshakeState::new(
device.core.noise_key.clone(),
client_payload,
NOISE_START_PATTERN, // "Noise_XX_25519_AESGCM_SHA256"
&WA_CONN_HEADER, // [0x57, 0x41] ("WA")
)?;
-
Send ClientHello:
let client_hello_bytes = handshake_state.build_client_hello()?;
// Optionally include edge routing for faster reconnection
let (header, used_edge_routing) =
build_handshake_header(device.core.edge_routing_info.as_deref());
let framed = wacore::framing::encode_frame(
&client_hello_bytes,
Some(&header)
)?;
transport.send(framed).await?;
-
Receive ServerHello (with 20s timeout):
const NOISE_HANDSHAKE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(20);
let resp_frame = loop {
match timeout(NOISE_HANDSHAKE_RESPONSE_TIMEOUT, transport_events.recv()).await {
Ok(Ok(TransportEvent::DataReceived(data))) => {
frame_decoder.feed(&data);
if let Some(frame) = frame_decoder.decode_frame() {
break frame;
}
}
// Handle errors...
}
};
-
Send ClientFinish:
let client_finish_bytes = handshake_state
.read_server_hello_and_build_client_finish(&resp_frame)?;
let framed = wacore::framing::encode_frame(&client_finish_bytes, None)?;
transport.send(framed).await?;
-
Complete handshake:
let (write_key, read_key) = handshake_state.finish()?;
info!("Handshake complete, switching to encrypted communication");
Ok(Arc::new(NoiseSocket::new(transport, write_key, read_key)))
Location: src/handshake.rs:29-104
Edge Routing Pre-Intro
For optimized reconnection, the client can include edge routing info in the initial frame:
pub fn build_handshake_header(
edge_routing_info: Option<&[u8]>
) -> (Vec<u8>, bool) {
let mut header = WA_CONN_HEADER.to_vec(); // [0x57, 0x41]
if let Some(info) = edge_routing_info {
if info.len() <= 8192 { // Max size
header.extend_from_slice(info);
return (header, true);
}
}
(header, false)
}
Location: wacore/src/handshake/edge_routing.rs
Handshake Errors
pub enum HandshakeError {
Transport(#[from] anyhow::Error),
Core(#[from] CoreHandshakeError),
Timeout,
UnexpectedEvent(String),
}
Location: src/handshake.rs:15-25
NoiseSocket
The NoiseSocket provides encrypted send/receive operations after handshake.
Architecture
pub struct NoiseSocket {
read_key: Arc<NoiseCipher>,
read_counter: Arc<AtomicU32>,
send_job_tx: mpsc::Sender<SendJob>,
sender_task_handle: JoinHandle<()>,
}
struct SendJob {
plaintext_buf: Vec<u8>,
out_buf: Vec<u8>,
response_tx: oneshot::Sender<SendResult>,
}
Location: src/socket/noise_socket.rs:21-32
Design Patterns
Dedicated Sender Task
The socket uses a dedicated task for sending to ensure frame ordering:
impl NoiseSocket {
pub fn new(
transport: Arc<dyn Transport>,
write_key: NoiseCipher,
read_key: NoiseCipher,
) -> Self {
let (send_job_tx, send_job_rx) = mpsc::channel::<SendJob>(32);
// Spawn dedicated sender task
let sender_task_handle = tokio::spawn(
Self::sender_task(transport, write_key, send_job_rx)
);
Self {
read_key: Arc::new(read_key),
read_counter: Arc::new(AtomicU32::new(0)),
send_job_tx,
sender_task_handle,
}
}
}
Why a dedicated task?
- Ordering guarantee: Frames must be sent with sequential counters
- Non-blocking: Callers don’t block on encryption or network I/O
- Backpressure: Channel capacity (32) prevents unbounded queuing
Location: src/socket/noise_socket.rs:34-62
Sender Task Implementation
async fn sender_task(
transport: Arc<dyn Transport>,
write_key: Arc<NoiseCipher>,
mut send_job_rx: mpsc::Receiver<SendJob>,
) {
let mut write_counter: u32 = 0;
while let Some(job) = send_job_rx.recv().await {
let result = Self::process_send_job(
&transport,
&write_key,
&mut write_counter,
job.plaintext_buf,
job.out_buf,
).await;
// Return buffers to caller for reuse
let _ = job.response_tx.send(result);
}
}
Location: src/socket/noise_socket.rs:64-89
Encryption
Small Messages (≤16KB)
Encrypted inline to avoid thread pool overhead:
if plaintext_buf.len() <= INLINE_ENCRYPT_THRESHOLD {
// Copy to output buffer and encrypt in-place
out_buf.clear();
out_buf.extend_from_slice(&plaintext_buf);
plaintext_buf.clear();
write_key.encrypt_in_place_with_counter(counter, &mut out_buf)?;
// Frame the ciphertext
wacore::framing::encode_frame_into(&ciphertext, None, &mut out_buf)?;
}
Location: src/socket/noise_socket.rs:103-130
Large Messages (>16KB)
Offloaded to blocking thread pool:
else {
let plaintext_arc = Arc::new(plaintext_buf);
let plaintext_arc_for_task = plaintext_arc.clone();
// Offload to blocking thread
let spawn_result = tokio::task::spawn_blocking(move || {
write_key.encrypt_with_counter(
counter,
&plaintext_arc_for_task[..]
)
}).await;
// Recover original buffer
plaintext_buf = Arc::try_unwrap(plaintext_arc)
.unwrap_or_else(|arc| (*arc).clone());
let ciphertext = spawn_result??;
// Frame and send
wacore::framing::encode_frame_into(&ciphertext, None, &mut out_buf)?;
transport.send(out_buf).await?;
}
Location: src/socket/noise_socket.rs:131-164
The 16KB threshold is chosen based on benchmarking. Smaller messages benefit from inline encryption (no thread spawning overhead), while larger messages benefit from parallel execution.
Send API
pub async fn encrypt_and_send(
&self,
plaintext_buf: Vec<u8>,
out_buf: Vec<u8>
) -> SendResult {
let (response_tx, response_rx) = oneshot::channel();
let job = SendJob {
plaintext_buf,
out_buf,
response_tx,
};
// Send to dedicated task
if let Err(send_err) = self.send_job_tx.send(job).await {
let job = send_err.0;
return Err(EncryptSendError::channel_closed(
job.plaintext_buf,
job.out_buf,
));
}
// Wait for result
match response_rx.await {
Ok(result) => result,
Err(_) => Err(EncryptSendError::channel_closed(
Vec::new(), Vec::new()
)),
}
}
Buffer Management:
The API returns both buffers for reuse:
type SendResult = Result<(Vec<u8>, Vec<u8>), EncryptSendError>;
// ^^^^^^^^^^^^^^^^^^^^^
// (plaintext_buf, out_buf) for reuse
This enables the client to reuse buffers across multiple sends:
let mut plaintext_buf = Vec::with_capacity(1024);
let mut out_buf = Vec::with_capacity(1056); // plaintext + 32 overhead
loop {
plaintext_buf.clear();
marshal_to_vec(&message, &mut plaintext_buf)?;
(plaintext_buf, out_buf) =
noise_socket.encrypt_and_send(plaintext_buf, out_buf).await?;
}
Location: src/socket/noise_socket.rs:173-200
Decryption
pub fn decrypt_frame(&self, ciphertext: &[u8]) -> Result<Vec<u8>> {
let counter = self.read_counter.fetch_add(1, Ordering::SeqCst);
self.read_key
.decrypt_with_counter(counter, ciphertext)
.map_err(|e| SocketError::Crypto(e.to_string()))
}
Decryption is synchronous because:
- Frames arrive sequentially in the transport receiver
- Decryption is fast (AES-GCM hardware acceleration)
- No ordering concerns (unlike send)
Location: src/socket/noise_socket.rs:202-207
Cleanup
impl Drop for NoiseSocket {
fn drop(&mut self) {
// Abort sender task to prevent resource leaks
self.sender_task_handle.abort();
}
}
Location: src/socket/noise_socket.rs:210-217
Frame Protocol
Messages are framed before encryption:
pub fn encode_frame(
payload: &[u8],
header: Option<&[u8]>
) -> Result<Vec<u8>> {
let mut output = Vec::new();
if let Some(header) = header {
output.extend_from_slice(header);
}
// 3-byte big-endian length
let len = payload.len() as u32;
output.push(((len >> 16) & 0xFF) as u8);
output.push(((len >> 8) & 0xFF) as u8);
output.push((len & 0xFF) as u8);
output.extend_from_slice(payload);
Ok(output)
}
Location: wacore/src/framing/mod.rs:10-30
┌────────────────┬─────────────────┬────────────────────┐
│ Optional Header│ Length (3 bytes)│ Payload │
│ (handshake) │ (big-endian) │ (encrypted) │
└────────────────┴─────────────────┴────────────────────┘
Frame length limits:
- Maximum frame size: 16MB (enforced by framing layer)
- Typical message frame: < 1KB
- Media messages: 100KB - 2MB (encrypted metadata)
Frame Decoder
pub struct FrameDecoder {
buffer: Vec<u8>,
}
impl FrameDecoder {
pub fn feed(&mut self, data: &[u8]) {
self.buffer.extend_from_slice(data);
}
pub fn decode_frame(&mut self) -> Option<Vec<u8>> {
if self.buffer.len() < 3 {
return None; // Need length header
}
let len = u32::from_be_bytes([
0,
self.buffer[0],
self.buffer[1],
self.buffer[2],
]) as usize;
if self.buffer.len() < 3 + len {
return None; // Incomplete frame
}
let frame = self.buffer[3..3 + len].to_vec();
self.buffer.drain(..3 + len);
Some(frame)
}
}
Location: wacore/src/framing/decoder.rs
Transport Abstraction
The transport layer abstracts WebSocket implementation:
#[async_trait]
pub trait Transport: Send + Sync {
async fn send(&self, data: Vec<u8>) -> Result<(), anyhow::Error>;
async fn disconnect(&self);
}
pub enum TransportEvent {
Connected,
Disconnected,
DataReceived(Vec<u8>),
}
Location: src/transport/mod.rs
WebSocket implementation
The transport is generic over any AsyncRead + AsyncWrite stream. The from_websocket function wraps an already-upgraded WebSocketStream into a Transport + event channel:
pub fn from_websocket<S>(
ws: WebSocketStream<S>,
) -> (Arc<dyn Transport>, async_channel::Receiver<TransportEvent>)
where
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
Internally, the WebSocket is split into a write half (guarded by Arc<Mutex>) and a read half (moved to a spawned read_pump task). A watch channel coordinates graceful shutdown between the transport and the read pump.
TokioWebSocketTransportFactory handles the default DNS/TCP/TLS connection and delegates to from_websocket. For custom connection strategies (IPv4 preference, TCP keepalive, proxies), call from_websocket directly.
Location: transports/tokio-transport/src/lib.rs
Connection Lifecycle
Connect timeout
Both the transport connection and the version fetch are wrapped in a 20-second timeout (TRANSPORT_CONNECT_TIMEOUT), matching WhatsApp Web’s MQTT CONNECT_TIMEOUT and DGW connectTimeoutMs defaults. Without this, a dead network would block on the OS TCP SYN timeout (~60-75s).
const TRANSPORT_CONNECT_TIMEOUT: Duration = Duration::from_secs(20);
The client runs the version fetch and transport connection in parallel using tokio::join!, both under this timeout:
let version_future = tokio::time::timeout(
TRANSPORT_CONNECT_TIMEOUT,
resolve_and_update_version(&persistence_manager, &http_client, override_version),
);
let transport_future = tokio::time::timeout(
TRANSPORT_CONNECT_TIMEOUT,
transport_factory.create_transport(),
);
let (version_result, transport_result) = tokio::join!(version_future, transport_future);
If either times out, the connection attempt fails with a descriptive error (e.g., "Transport connect timed out after 20s").
Location: src/client.rs:108-877
1. Connect
impl Client {
pub async fn connect(&self) -> Result<()> {
// Version fetch + transport connection run in parallel, both under 20s timeout
let (transport, mut events) = tokio::time::timeout(
TRANSPORT_CONNECT_TIMEOUT,
self.transport_factory.create_transport(),
).await??;
// Perform Noise handshake
let device = self.persistence_manager.get_device_snapshot().await;
let noise_socket = do_handshake(
&device,
transport,
&mut events
).await?;
// Store socket and start receivers
self.noise_socket.store(Some(Arc::clone(&noise_socket)));
self.start_frame_receiver(events, noise_socket).await;
Ok(())
}
}
2. Message loop (read loop)
The read_messages_loop runs on the run() caller’s task — not spawned — so the keepalive loop (which runs in a separate spawned task) is never blocked by frame processing. This eliminates a class of bugs where a long-running batch of frames (e.g., offline sync) could starve the keepalive timer.
async fn read_messages_loop(self: &Arc<Self>) -> Result<(), anyhow::Error> {
let transport_events = self.transport_events.lock().await.take()
.ok_or_else(|| anyhow!("Cannot start message loop: not connected"))?;
let mut frame_decoder = FrameDecoder::new();
loop {
futures::select_biased! {
_ = self.shutdown_notifier.listen().fuse() => {
return Ok(());
},
event_result = transport_events.recv().fuse() => {
match event_result {
Ok(TransportEvent::DataReceived(data)) => {
// Update dead-socket timer on arrival
self.last_data_received_ms.store(now_millis(), Ordering::Relaxed);
frame_decoder.feed(&data);
let mut frames_in_batch: u32 = 0;
while let Some(encrypted_frame) = frame_decoder.decode_frame() {
if let Some(node) = self.decrypt_frame(&encrypted_frame).await {
// Inline vs spawned processing (see below)
if is_critical(&node) {
self.process_decrypted_node(node).await;
} else {
self.runtime.spawn(/* ... */).detach();
}
}
// Cooperative yield every N frames
frames_in_batch += 1;
if frames_in_batch.is_multiple_of(self.runtime.yield_frequency()) {
if let Some(yield_fut) = self.runtime.yield_now() {
yield_fut.await;
}
}
}
// Refresh timestamp after batch so keepalive sees
// batch completion time, not just arrival time
if frames_in_batch > 1 {
self.last_data_received_ms.store(now_millis(), Ordering::Relaxed);
}
},
Ok(TransportEvent::Disconnected) | Err(_) => { /* handle disconnect */ }
_ => {}
}
}
}
}
}
Key design decisions:
select_biased! — the shutdown listener has priority over transport events, ensuring the loop exits promptly when shutdown_notifier fires (e.g., on stream error or disconnect)
- Batch timestamp refresh — after processing multiple frames,
last_data_received_ms is updated again so the keepalive loop sees the batch completion time rather than the arrival time. This prevents false-positive dead-socket triggers during large offline sync batches that take seconds to drain
- Cooperative yielding — the loop yields to the runtime every
yield_frequency() frames, preventing a large burst of frames from monopolizing the executor
Inline vs concurrent node processing
Frame decryption is always sequential (noise protocol counter ordering), but node processing uses a hybrid strategy:
| Node tag | Processing | Reason |
|---|
success, failure, stream:error | Inline | Critical for connection state transitions |
message | Inline | Preserves arrival order for per-chat queues (MessageHandler just enqueues + ACKs; heavy crypto runs in queue workers) |
ib | Inline | Ensures offline sync tracking (expected count) is set up before offline messages are processed |
| Everything else | Spawned concurrently | Maximizes parallelism for non-ordering-sensitive stanzas |
### 3. Send Message
```rust
impl Client {
pub async fn send_node(&self, node: &Node) -> Result<()> {
let noise_socket = self.noise_socket.load()
.ok_or_else(|| anyhow!("not connected"))?;
// Marshal node to binary with auto-sized buffer
let plaintext_buf = marshal_auto(node)?;
// Encrypt and send
self.send_raw_bytes(plaintext_buf).await
}
}
The marshal_auto function automatically selects an appropriate buffer capacity based on the node’s characteristics. For nodes exceeding certain thresholds (24+ attributes, 64+ children, or 8KB+ scalar content), it pre-estimates the capacity to avoid reallocations. For typical small nodes, it uses the default 1024-byte capacity. This replaces the previous manual Vec::with_capacity(1024) + marshal_to pattern.
### 4. Disconnect
On disconnect, `cleanup_connection_state()` runs exactly once — in the `run()` method after the message loop exits — resetting all connection-scoped state:
```rust
impl Client {
async fn cleanup_connection_state(&self) {
self.shutdown_notifier.notify(usize::MAX);
*self.transport.lock().await = None;
*self.noise_socket.lock().await = None;
self.is_connected.store(false, Ordering::Release);
// Drop per-chat lane senders so workers exit via channel close.
// Without this, stale workers from the old connection survive reconnects
// holding outdated signal/crypto state.
self.chat_lanes.invalidate_all();
// Clear signal cache, pending retries, IQ waiters, offline sync state...
}
}
Key cleanup actions include invalidating chat lanes (so stale message processing workers don’t survive with outdated crypto state), clearing the signal cache, draining IQ response waiters, and resetting offline sync state. See disconnect cleanup for the full list.
Connection state tracking
The client tracks whether the noise socket is established using a dedicated AtomicBool (is_connected) rather than probing the noise socket mutex. This design prevents a TOCTOU race where try_lock() on the mutex fails due to contention (e.g., during frame encryption), not because the socket is absent — which previously caused is_connected() to return false on live connections, silently dropping receipt acks.
State transitions:
| Event | is_connected value | Ordering |
|---|
connect() start | false | Relaxed (reset) |
| Noise socket stored | true | Release (after socket is Some) |
cleanup_connection_state() | false | Release (after socket is None) |
The Release/Acquire ordering ensures that any task reading is_connected() == true is guaranteed to see the noise socket as Some, and any task reading false after cleanup sees the socket as None.
// Lock-free connection check — never affected by mutex contention
pub fn is_connected(&self) -> bool {
self.is_connected.load(Ordering::Acquire)
}
This is critical for the keepalive loop and stanza acknowledgment, both of which call is_connected() to decide whether to send data. Under the old try_lock() approach, concurrent send_node() calls holding the mutex would cause false negatives, leading to skipped keepalive pings or dropped ack stanzas.
Error Handling
Socket Errors
pub enum SocketError {
SocketClosed,
NoiseHandshake(String),
Io(String),
Crypto(String),
}
pub enum EncryptSendError {
Crypto { error: anyhow::Error, ... },
Transport { error: anyhow::Error, ... },
Framing { error: FramingError, ... },
Join { error: JoinError, ... },
ChannelClosed { ... },
}
All variants return buffers for reuse:
impl EncryptSendError {
pub fn into_buffers(self) -> (Vec<u8>, Vec<u8>) {
match self {
Self::Crypto { plaintext_buf, out_buf, .. } =>
(plaintext_buf, out_buf),
Self::Transport { plaintext_buf, out_buf, .. } =>
(plaintext_buf, out_buf),
// ...
}
}
}
Location: src/socket/error.rs
Stream error handling
When the server sends a <stream:error> stanza, it is processed inline (not spawned concurrently) because stream errors are critical for connection state. The StanzaRouter dispatches the node to a StreamErrorHandler, which calls Client::handle_stream_error().
Each stream error sets is_logged_in = false and fires the shutdown_notifier to exit the keepalive loop and other background tasks.
Error code behavior:
| Code | Action | Event | Reconnects? |
|---|
| 401 | Disables auto-reconnect | LoggedOut | No — session invalid, must re-pair |
| 409 | Disables auto-reconnect | StreamReplaced | No — prevents displacement loop |
| 429 | Adds 5 to backoff counter | None | Yes — extended Fibonacci backoff |
| 503 | Normal handling | None | Yes — standard backoff |
| 515 | Marks as expected disconnect | None | Yes — immediate, no backoff |
| 516 | Disables auto-reconnect | LoggedOut | No — device removed |
| Unknown | Disables auto-reconnect | StreamError | No |
Processing pipeline:
Server sends <stream:error code="..."/>
→ read_messages_loop (inline, not spawned)
→ StanzaRouter → StreamErrorHandler
→ Client::handle_stream_error()
→ is_logged_in = false
→ Code-specific handling (see table above)
→ shutdown_notifier fires → keepalive exits
→ run() loop checks enable_auto_reconnect
Stanza acknowledgment
The client automatically sends <ack/> nodes in response to incoming stanzas (messages, receipts, notifications, calls). The ack construction follows WhatsApp Web and whatsmeow behavior:
Ack attributes:
| Attribute | Value |
|---|
class | Original stanza tag (e.g., "message", "receipt", "notification") |
id | Copied from the incoming stanza |
to | Flipped from the incoming from attribute |
participant | Copied from the incoming stanza (when present) |
from | Own device phone number JID — only included for message acks |
type attribute rules:
The type attribute is handled differently depending on the stanza:
| Stanza | type in ack | Reason |
|---|
message | Never included | Matches whatsmeow: node.Tag != "message" guard skips type for messages |
receipt (with type, e.g. "read") | Echoed | WA Web echoes type when explicitly present |
receipt (without type, i.e. delivery) | Omitted | Delivery receipts have no type; including one (e.g., type="delivery") causes <stream:error> disconnections |
notification | Echoed | Type is echoed for most notifications |
notification type="encrypt" with <identity/> child | Omitted | WA Web specifically drops type for identity-change notifications |
Sending incorrect type attributes in ack stanzas can cause the server to issue <stream:error> disconnections. The library handles this automatically — you don’t need to build ack nodes manually.
Location: src/client.rs (build_ack_node, is_encrypt_identity_notification)
Fibonacci backoff
The reconnection backoff follows the Fibonacci sequence, matching WhatsApp Web’s behavior:
Sequence: 1s, 1s, 2s, 3s, 5s, 8s, 13s, 21s, 34s, 55s, 89s, 144s, ...
Maximum: 900s (15 minutes)
Jitter: ±10%
For rate-limited errors (429), the backoff counter is incremented by 5 before the normal increment, causing the delay to jump significantly on the next reconnection attempt.
Retry strategy
The run() method handles reconnection automatically:
// Simplified reconnection logic in run()
loop {
self.connect().await;
self.read_messages_loop().await;
if !self.enable_auto_reconnect.load(Ordering::Relaxed) {
break; // 401, 409, 516 — stop permanently
}
if self.expected_disconnect.load(Ordering::Relaxed) {
continue; // 515 — reconnect immediately
}
let errors = self.auto_reconnect_errors.fetch_add(1, Ordering::Relaxed);
let delay = fibonacci_backoff(errors + 1);
sleep(delay).await;
}
Keepalive and Dead Socket Detection
The keepalive loop monitors connection health, matching WhatsApp Web’s behavior precisely.
Constants
| Constant | Value | Description |
|---|
KEEP_ALIVE_INTERVAL_MIN | 15s | Minimum interval between pings |
KEEP_ALIVE_INTERVAL_MAX | 30s | Maximum interval between pings |
KEEP_ALIVE_RESPONSE_DEADLINE | 20s | Timeout waiting for pong response |
DEAD_SOCKET_TIME | 20s | Max silence after a send before declaring socket dead |
Timestamp safety
All timestamp conversions from now_millis() (which returns i64) to u64 are guarded with .max(0) before casting. This prevents silent wrap-around on negative clock values (e.g., from NTP corrections or virtualized environments) that would otherwise produce incorrect timestamps.
Keepalive loop behavior
The loop runs every 15-30 seconds (randomized, matching WA Web’s 15 * (1 + random()) formula) and performs these checks in order:
- Skip if recently active — if data was received within
KEEP_ALIVE_INTERVAL_MIN (15s), the connection is proven alive; skip the ping and reset the error counter
- Send keepalive ping — sends the ping before the dead-socket check so that a successful pong updates
last_data_received_ms and prevents false-positive dead-socket detection on idle-but-healthy connections
- RTT-adjusted clock skew — on pong, calculates server time offset using the midpoint formula:
(startTime + rtt/2) / 1000 - serverTime, matching WA Web’s onClockSkewUpdate
- Skip ping when IQ pending — if there are already pending IQ responses, the connection is implicitly being tested; skip the explicit ping
Dead socket detection
Dead socket detection mirrors WA Web’s deadSocketTimer pattern:
- Not armed if nothing was ever sent (both timestamps are zero)
- Cancelled if data was received after the last send
- Fires if
DEAD_SOCKET_TIME (20s) has elapsed since the last send with no receive
The dead-socket check runs on every keepalive tick — not just after a failed ping. This catches scenarios where pending IQs caused the ping to be skipped, or where the ping “succeeded” but the connection died immediately after. When a dead socket is detected, the client calls reconnect_immediately() and exits the keepalive loop.
WA Web uses an independent 20s deadSocketTimer armed on every send and cancelled on every receive. The keepalive loop approximates this by checking is_dead_socket(last_sent, last_recv) unconditionally each iteration.
Error classification
Keepalive errors are classified exhaustively (compile-time enforced for new error variants):
| Error type | Classification | Behavior |
|---|
Socket, Disconnected, NotConnected, InternalChannelClosed | Fatal | Exit keepalive loop immediately |
Timeout, ServerError, ParseError | Transient | Increment error count, check dead socket |
Periodic maintenance
Approximately every 12 keepalive ticks (~5 minutes), the keepalive loop runs background cleanup of expired sent messages from the database, based on CacheConfig::sent_message_ttl_secs.
Buffer Sizing
Optimal buffer capacity based on payload characteristics:
// Encrypted size = plaintext + 16 (AES-GCM tag) + 3 (frame header)
let buffer_capacity = plaintext.len() + 32; // Extra headroom
let out_buf = Vec::with_capacity(buffer_capacity);
Location: src/socket/noise_socket.rs:373-407 (tests verify this formula)
SIMD Encryption
The Noise cipher uses hardware AES acceleration when available:
pub struct NoiseCipher {
cipher: Aes256Gcm, // Uses AES-NI on x86_64
}
Zero-Copy Patterns
// Bad: Allocates new buffer
let data = node.to_bytes();
socket.send(data).await?;
// Good: Reuses buffer
let mut buf = Vec::with_capacity(1024);
marshal_to_vec(&node, &mut buf)?;
socket.send(buf).await?;
Testing
Mock Transport
pub struct MockTransport;
#[async_trait]
impl Transport for MockTransport {
async fn send(&self, data: Vec<u8>) -> Result<()> {
// Record for assertions
Ok(())
}
async fn disconnect(&self) {}
}
Location: src/transport/mock.rs
Test Cases
Key test scenarios:
#[tokio::test]
async fn test_encrypt_and_send_returns_both_buffers()
#[tokio::test]
async fn test_concurrent_sends_maintain_order()
#[tokio::test]
async fn test_encrypted_buffer_sizing_is_sufficient()
#[tokio::test]
async fn test_handshake_with_edge_routing()
Location: src/socket/noise_socket.rs:219-459
References