Integrations
WebSocket
New to acton-service?
Start with the homepage to understand what acton-service is, then explore Core Concepts for foundational explanations. See Dual HTTP+gRPC for protocol multiplexing basics. Check the Glossary for technical term definitions.
Build real-time applications with WebSocket support for bidirectional communication, room-based messaging, and efficient broadcasting.
Overview
acton-service provides production-ready WebSocket support with:
- Bidirectional messaging - Full-duplex communication over a single connection
- Room/channel support - Actor-based room management for chat-like scenarios
- Broadcasting - Efficient message distribution to multiple connections
- Same-port coexistence - WebSocket upgrades seamlessly from HTTP
- Connection management - Unique connection IDs and lifecycle handling
Actor-Based Room Management
Room management is powered by acton-reactive actors that handle join/leave operations, message broadcasting, and connection lifecycle. The RoomManager actor ensures thread-safe room operations. See Reactive Architecture for implementation details.
Installation
Enable the WebSocket feature:
[dependencies]
acton-service = { version = "0.8", features = ["websocket"] }
Or add to existing features:
[dependencies]
acton-service = { version = "0.8", features = ["http", "websocket", "observability"] }
Quick Start
Basic WebSocket Handler
use acton_service::prelude::*;
use acton_service::websocket::{WebSocket, WebSocketUpgrade, Message};
use futures::{SinkExt, StreamExt};
async fn ws_handler(ws: WebSocketUpgrade) -> impl IntoResponse {
ws.on_upgrade(handle_socket)
}
async fn handle_socket(mut socket: WebSocket) {
// Send a welcome message
let _ = socket.send(Message::Text("Welcome!".into())).await;
// Echo incoming messages
while let Some(Ok(msg)) = socket.next().await {
match msg {
Message::Text(text) => {
let _ = socket.send(Message::Text(text)).await;
}
Message::Ping(data) => {
let _ = socket.send(Message::Pong(data)).await;
}
Message::Close(_) => break,
_ => {}
}
}
}
#[tokio::main]
async fn main() -> Result<()> {
let routes = VersionedApiBuilder::new()
.with_base_path("/api")
.add_version(ApiVersion::V1, |router| {
router.route("/ws", get(ws_handler))
})
.build_routes();
ServiceBuilder::new()
.with_routes(routes)
.build()
.serve()
.await
}
Testing
# Install websocat
cargo install websocat
# Connect to WebSocket endpoint
websocat ws://localhost:8080/api/v1/ws
Configuration
WebSocket configuration is optional with sensible defaults:
# config.toml
[websocket]
# Maximum message size (default: 64KB)
max_message_size_bytes = 65536
# Ping interval to keep connection alive (default: 30s)
ping_interval_secs = 30
# Pong timeout before considering connection dead (default: 10s)
pong_timeout_secs = 10
[websocket.rooms]
# Enable room management (default: true)
enabled = true
# Maximum members per room (default: 1000)
max_members = 1000
# Maximum rooms a connection can join (default: 10)
max_rooms_per_connection = 10
# Room idle timeout before cleanup (default: 3600s / 1 hour)
idle_timeout_secs = 3600
Environment Variable Override
ACTON_WEBSOCKET_MAX_MESSAGE_SIZE_BYTES=131072 cargo run
Connection Management
ConnectionId
Each WebSocket connection gets a unique identifier:
use acton_service::websocket::ConnectionId;
async fn handle_socket(socket: WebSocket) {
let connection_id = ConnectionId::new();
tracing::info!(connection_id = %connection_id, "New connection");
// Use connection_id for tracking, logging, room membership, etc.
}
WebSocketConnection
Track connection state with sender channel:
use acton_service::websocket::{ConnectionId, WebSocketConnection, Message};
use tokio::sync::mpsc;
let connection_id = ConnectionId::new();
let (tx, rx) = mpsc::channel::<Message>(32);
let connection = WebSocketConnection::new(connection_id, tx);
// Send message to this connection
connection.send(Message::Text("Hello".into())).await;
Broadcasting
The Broadcaster manages multiple connections and enables efficient message distribution.
Setup
use acton_service::websocket::{Broadcaster, ConnectionId, Message};
use std::sync::Arc;
// Create broadcaster as shared state
let broadcaster = Arc::new(Broadcaster::new());
// Register connections
broadcaster.register(connection_id, sender_channel).await;
// Unregister on disconnect
broadcaster.unregister(&connection_id).await;
Broadcast Patterns
use acton_service::websocket::{Broadcaster, BroadcastTarget, Message};
// Broadcast to all connected clients
broadcaster.broadcast_all(Message::Text("Hello everyone!".into())).await;
// Broadcast to specific connections
broadcaster.broadcast_to(
&[connection_id_1, connection_id_2],
Message::Text("Private message".into()),
).await;
// Broadcast to all except specified (useful for echo prevention)
broadcaster.broadcast_except(
&[sender_connection_id],
Message::Text("Message from another user".into()),
).await;
Complete Broadcasting Example
use acton_service::prelude::*;
use acton_service::websocket::{
Broadcaster, ConnectionId, Message, WebSocket, WebSocketUpgrade,
};
use futures::{SinkExt, StreamExt};
use std::sync::Arc;
async fn ws_handler(
ws: WebSocketUpgrade,
Extension(broadcaster): Extension<Arc<Broadcaster>>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, broadcaster))
}
async fn handle_socket(socket: WebSocket, broadcaster: Arc<Broadcaster>) {
let (mut sender, mut receiver) = socket.split();
let connection_id = ConnectionId::new();
// Create channel for this connection
let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(32);
// Register with broadcaster
broadcaster.register(connection_id, tx.clone()).await;
// Forward messages from channel to WebSocket
let send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if sender.send(msg).await.is_err() {
break;
}
}
});
// Process incoming messages
while let Some(Ok(msg)) = receiver.next().await {
match msg {
Message::Text(text) => {
// Broadcast to all other connections
broadcaster
.broadcast_except(
&[connection_id],
Message::Text(text),
)
.await;
}
Message::Close(_) => break,
_ => {}
}
}
// Cleanup
broadcaster.unregister(&connection_id).await;
send_task.abort();
}
#[tokio::main]
async fn main() -> Result<()> {
let broadcaster = Arc::new(Broadcaster::new());
let routes = VersionedApiBuilder::new()
.with_base_path("/api")
.add_version(ApiVersion::V1, |router| {
router
.route("/ws", get(ws_handler))
.layer(Extension(broadcaster.clone()))
})
.build_routes();
ServiceBuilder::new()
.with_routes(routes)
.build()
.serve()
.await
}
Room Management
For chat-like applications, use the actor-based RoomManager:
Room Types
use acton_service::websocket::{RoomId, RoomMember, Room};
// Room identifier
let room_id = RoomId::new("general");
// Room member with sender channel
let member = RoomMember::new(connection_id, sender_channel);
// Room with members and metadata
let room = Room::new(room_id);
RoomManager Actor
The RoomManager handles room operations through actor messages:
use acton_service::websocket::{
RoomManager, SharedRoomManager,
JoinRoomRequest, LeaveRoomRequest, BroadcastToRoom,
ConnectionDisconnected, GetRoomInfo,
};
use acton_reactive::prelude::*;
// Spawn the room manager actor
let room_manager: SharedRoomManager = RoomManager::spawn().await?;
// Join a room
room_manager.send(JoinRoomRequest {
room_id: RoomId::new("general"),
connection_id,
sender: tx.clone(),
}).await;
// Leave a room
room_manager.send(LeaveRoomRequest {
room_id: RoomId::new("general"),
connection_id,
}).await;
// Broadcast to room members
room_manager.send(BroadcastToRoom {
room_id: RoomId::new("general"),
message: Message::Text("Hello room!".into()),
exclude: Some(connection_id), // Exclude sender
}).await;
// Handle connection disconnect (leaves all rooms)
room_manager.send(ConnectionDisconnected {
connection_id,
}).await;
// Get room information
let info = room_manager.ask(GetRoomInfo {
room_id: RoomId::new("general"),
}).await?;
if let Some(room_info) = info {
println!("Room has {} members", room_info.member_count);
}
Chat Server Example
Complete chat server with room support:
use acton_service::prelude::*;
use acton_service::websocket::{
Broadcaster, ConnectionId, Message, WebSocket, WebSocketUpgrade,
};
use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum IncomingMessage {
Join { room: String },
Leave { room: String },
Message { room: String, content: String },
}
#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum OutgoingMessage {
Joined { room: String },
Left { room: String },
Message { room: String, content: String, from: String },
Error { message: String },
System { message: String },
}
async fn ws_handler(
ws: WebSocketUpgrade,
Extension(broadcaster): Extension<Arc<Broadcaster>>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, broadcaster))
}
async fn handle_socket(socket: WebSocket, broadcaster: Arc<Broadcaster>) {
let (mut sender, mut receiver) = socket.split();
let connection_id = ConnectionId::new();
let (tx, mut rx) = tokio::sync::mpsc::channel::<Message>(32);
broadcaster.register(connection_id, tx.clone()).await;
// Send welcome
let welcome = OutgoingMessage::System {
message: format!("Connected as {}", connection_id),
};
let _ = sender
.send(Message::Text(serde_json::to_string(&welcome).unwrap().into()))
.await;
// Forward task
let send_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if sender.send(msg).await.is_err() {
break;
}
}
});
// Process messages
while let Some(Ok(Message::Text(text))) = receiver.next().await {
match serde_json::from_str::<IncomingMessage>(&text) {
Ok(IncomingMessage::Join { room }) => {
let response = OutgoingMessage::Joined { room };
let _ = tx
.send(Message::Text(serde_json::to_string(&response).unwrap().into()))
.await;
}
Ok(IncomingMessage::Message { room, content }) => {
let broadcast = OutgoingMessage::Message {
room,
content,
from: connection_id.to_string(),
};
let _ = broadcaster
.broadcast_except(
&[connection_id],
Message::Text(serde_json::to_string(&broadcast).unwrap().into()),
)
.await;
}
Ok(IncomingMessage::Leave { room }) => {
let response = OutgoingMessage::Left { room };
let _ = tx
.send(Message::Text(serde_json::to_string(&response).unwrap().into()))
.await;
}
Err(e) => {
let error = OutgoingMessage::Error {
message: format!("Invalid message: {}", e),
};
let _ = tx
.send(Message::Text(serde_json::to_string(&error).unwrap().into()))
.await;
}
}
}
broadcaster.unregister(&connection_id).await;
send_task.abort();
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.init();
let broadcaster = Arc::new(Broadcaster::new());
let routes = VersionedApiBuilder::new()
.with_base_path("/api")
.add_version(ApiVersion::V1, |router| {
router
.route("/ws", get(ws_handler))
.layer(Extension(broadcaster.clone()))
})
.build_routes();
let mut config = Config::<()>::default();
config.service.name = "chat-server".to_string();
config.service.port = 8080;
tracing::info!("Chat server starting on ws://localhost:8080/api/v1/ws");
ServiceBuilder::new()
.with_config(config)
.with_routes(routes)
.build()
.serve()
.await
}
Testing the Chat Server
# Terminal 1: Start server
cargo run --example chat-server --features websocket
# Terminal 2: Connect client 1
websocat ws://localhost:8080/api/v1/ws
{"type": "join", "room": "general"}
{"type": "message", "room": "general", "content": "Hello!"}
# Terminal 3: Connect client 2
websocat ws://localhost:8080/api/v1/ws
{"type": "join", "room": "general"}
# Receives: {"type":"message","room":"general","content":"Hello!","from":"..."}
Message Types
WebSocket messages use Axum's Message enum:
use acton_service::websocket::Message;
match message {
// UTF-8 text message
Message::Text(text) => {
println!("Received: {}", text);
}
// Binary data
Message::Binary(data) => {
println!("Binary: {} bytes", data.len());
}
// Ping - respond with Pong
Message::Ping(data) => {
let _ = socket.send(Message::Pong(data)).await;
}
// Pong - response to our Ping
Message::Pong(_) => {
// Connection is alive
}
// Close request
Message::Close(frame) => {
if let Some(cf) = frame {
println!("Close: {} - {}", cf.code, cf.reason);
}
}
}
Best Practices
Use Structured Message Formats
Define clear message types with serde:
// Good - typed messages with serde
#[derive(Serialize, Deserialize)]
#[serde(tag = "type")]
enum WsMessage {
Chat { content: String },
Presence { user_id: String, status: String },
Error { code: u32, message: String },
}
// Parse incoming
let msg: WsMessage = serde_json::from_str(&text)?;
// Send outgoing
let json = serde_json::to_string(&msg)?;
socket.send(Message::Text(json.into())).await?;
Handle Connection Lifecycle
Always clean up on disconnect:
async fn handle_socket(socket: WebSocket, state: Arc<AppState>) {
let connection_id = ConnectionId::new();
// Register
state.broadcaster.register(connection_id, tx).await;
// ... handle messages ...
// Always cleanup, even on error
state.broadcaster.unregister(&connection_id).await;
state.room_manager.send(ConnectionDisconnected { connection_id }).await;
}
Implement Heartbeats
Keep connections alive with ping/pong:
use tokio::time::{interval, Duration};
let mut ping_interval = interval(Duration::from_secs(30));
loop {
tokio::select! {
_ = ping_interval.tick() => {
if socket.send(Message::Ping(vec![])).await.is_err() {
break; // Connection lost
}
}
msg = receiver.next() => {
match msg {
Some(Ok(Message::Pong(_))) => {
// Connection confirmed alive
}
// ... handle other messages
}
}
}
}
Limit Message Size
Validate incoming message sizes:
if text.len() > config.websocket.max_message_size_bytes {
let error = OutgoingMessage::Error {
message: "Message too large".to_string(),
};
let _ = tx.send(Message::Text(serde_json::to_string(&error)?.into())).await;
continue;
}
Use Appropriate Channel Buffer Sizes
Balance memory usage and throughput:
// Small buffer for low-traffic connections
let (tx, rx) = mpsc::channel::<Message>(16);
// Larger buffer for high-throughput scenarios
let (tx, rx) = mpsc::channel::<Message>(256);
Error Handling
Handle WebSocket errors gracefully:
while let Some(result) = receiver.next().await {
match result {
Ok(Message::Text(text)) => {
// Handle message
}
Ok(Message::Close(frame)) => {
tracing::info!("Client closed connection");
break;
}
Err(e) => {
tracing::warn!(error = %e, "WebSocket error");
break; // Exit on error
}
_ => {}
}
}
Combining with Other Features
WebSocket + JWT Authentication
Authenticate during the upgrade:
use acton_service::middleware::Claims;
async fn authenticated_ws_handler(
ws: WebSocketUpgrade,
claims: Claims, // Extracted by JWT middleware
Extension(broadcaster): Extension<Arc<Broadcaster>>,
) -> impl IntoResponse {
let user_id = claims.sub.clone();
ws.on_upgrade(move |socket| handle_authenticated_socket(socket, user_id, broadcaster))
}
async fn handle_authenticated_socket(
socket: WebSocket,
user_id: String,
broadcaster: Arc<Broadcaster>,
) {
tracing::info!(user_id = %user_id, "Authenticated WebSocket connection");
// ... handle connection with known user identity
}
WebSocket + Database
Access database in WebSocket handlers:
async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
) -> impl IntoResponse {
ws.on_upgrade(move |socket| handle_socket(socket, state))
}
async fn handle_socket(socket: WebSocket, state: AppState) {
let db = state.db().await.expect("Database available");
// Load user's chat history, etc.
let history = sqlx::query!("SELECT * FROM messages LIMIT 50")
.fetch_all(db)
.await
.unwrap_or_default();
// ... send history to client
}
Troubleshooting
Connection Immediately Closes
Cause: Handler panics or returns early.
Solution: Add error handling and logging:
async fn handle_socket(socket: WebSocket) {
tracing::info!("New connection");
// ... your code ...
tracing::info!("Connection closed");
}
Messages Not Broadcasting
Cause: Connection not registered with broadcaster.
Solution: Verify registration:
broadcaster.register(connection_id, tx.clone()).await;
tracing::debug!(connection_id = %connection_id, "Registered with broadcaster");
Memory Growing Unbounded
Cause: Connections not unregistered on disconnect.
Solution: Always clean up:
// Use Drop guard or explicit cleanup
broadcaster.unregister(&connection_id).await;
"ws" Feature Not Found
Cause: Axum's ws feature not enabled.
Solution: Ensure workspace Cargo.toml has:
axum = { version = "0.8", features = ["macros", "ws"] }
Related Features
- gRPC Guide - Bidirectional streaming alternative
- Events (NATS) - Pub/sub messaging between services
- Token Authentication - Authenticate WebSocket connections
- Reactive Architecture - Actor-based room management details
- Observability - Trace WebSocket connections