Integrations
Events (NATS)
New to acton-service?
Start with the homepage to understand what acton-service is, then explore Core Concepts for foundational explanations. See the Glossary for technical term definitions.
Build event-driven microservices with NATS JetStream for reliable message delivery, stream processing, and service decoupling.
Overview
acton-service provides production-ready NATS integration through async-nats with automatic connection management, JetStream support, and health monitoring. NATS enables event-driven architectures with publish/subscribe patterns, stream processing, and guaranteed message delivery.
Installation
Enable the events feature:
[dependencies]
Configuration
NATS configuration follows XDG standards with environment variable overrides:
# ~/.config/acton-service/my-service/config.toml
[nats]
url = "nats://localhost:4222"
max_reconnects = 10
optional = false # Readiness fails if NATS is unavailable
Environment Variable Override
ACTON_NATS_URL=nats://localhost:4222 cargo run
Connection Settings
NATS connection with automatic retry and reconnection:
- url: NATS server URL (default:
nats://localhost:4222) - max_reconnects: Maximum reconnection attempts (default: unlimited)
- reconnect_delay: Delay between reconnect attempts (default: 1s)
Basic Usage
Access NATS through AppState in your application:
use acton_service::prelude::*;
async fn process_event(msg: async_nats::Message) -> Result<()> {
let payload: serde_json::Value = serde_json::from_slice(&msg.payload)?;
info!("Processing event: {:?}", payload);
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let config = Config::load()?;
init_tracing(&config)?;
let state = AppState::builder()
.config(config.clone())
.build()
.await?;
let nats = state.nats().await.ok_or(Error::Internal("NATS not available"))?;
let mut subscriber = nats.subscribe("events.>").await?;
while let Some(msg) = subscriber.next().await {
if let Err(e) = process_event(msg).await {
error!("Event processing failed: {}", e);
}
}
Ok(())
}
Publish/Subscribe Pattern
Publishing Events
Publish events to NATS subjects:
use acton_service::prelude::*;
#[derive(Serialize)]
struct UserCreatedEvent {
user_id: i64,
email: String,
created_at: String,
}
async fn create_user(
State(state): State<AppState>,
Json(request): Json<CreateUserRequest>,
) -> Result<Json<User>> {
let db = state.db().await.ok_or(Error::Internal("Database not available"))?;
// Create user in database
let user = sqlx::query_as!(
User,
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING *",
request.name,
request.email
)
.fetch_one(db)
.await?;
// Publish event
let nats = state.nats().await.ok_or(Error::Internal("NATS not available"))?;
let event = UserCreatedEvent {
user_id: user.id,
email: user.email.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
};
nats.publish(
"users.created",
serde_json::to_vec(&event)?.into()
).await?;
Ok(Json(user))
}
Subscribing to Events
Subscribe to subjects with wildcard support:
async fn subscribe_user_events(state: AppState) -> Result<()> {
let nats = state.nats().await.ok_or(Error::Internal("NATS not available"))?;
// Subscribe to all user events
let mut subscriber = nats.subscribe("users.*").await?;
while let Some(msg) = subscriber.next().await {
let subject = &msg.subject;
match subject.as_str() {
"users.created" => handle_user_created(msg).await?,
"users.updated" => handle_user_updated(msg).await?,
"users.deleted" => handle_user_deleted(msg).await?,
_ => warn!("Unknown event type: {}", subject),
}
}
Ok(())
}
JetStream Integration
JetStream provides guaranteed message delivery and stream processing:
use async_nats::jetstream;
async fn setup_jetstream(state: &AppState) -> Result<()> {
let nats = state.nats().await.ok_or(Error::Internal("NATS not available"))?;
let js = jetstream::new(nats.clone());
// Create or update stream
js.create_stream(jetstream::stream::Config {
name: "EVENTS".to_string(),
subjects: vec!["events.>".to_string()],
max_messages: 1_000_000,
max_age: std::time::Duration::from_secs(86400 * 7), // 7 days
..Default::default()
})
.await?;
Ok(())
}
Durable Consumers
Create durable consumers for reliable event processing:
async fn consume_events(state: AppState) -> Result<()> {
let nats = state.nats().await.ok_or(Error::Internal("NATS not available"))?;
let js = jetstream::new(nats.clone());
// Create durable consumer
let consumer = js
.create_consumer_on_stream(
jetstream::consumer::Config {
durable_name: Some("event-processor".to_string()),
ack_policy: jetstream::consumer::AckPolicy::Explicit,
..Default::default()
},
"EVENTS",
)
.await?;
let mut messages = consumer.messages().await?;
while let Some(msg) = messages.next().await {
match msg {
Ok(msg) => {
if let Err(e) = process_jetstream_event(&msg).await {
error!("Event processing failed: {}", e);
msg.nak().await?;
} else {
msg.ack().await?;
}
}
Err(e) => error!("Message receive error: {}", e),
}
}
Ok(())
}
Event-Driven Architecture Patterns
Service Decoupling
Use NATS to decouple services:
// Order Service - publishes events
async fn create_order(
State(state): State<AppState>,
Json(order): Json<CreateOrderRequest>,
) -> Result<Json<Order>> {
let db = state.db().await.ok_or(Error::Internal("Database not available"))?;
let nats = state.nats().await.ok_or(Error::Internal("NATS not available"))?;
// Create order
let order = insert_order(db, order).await?;
// Publish event - other services react independently
let event = OrderCreatedEvent {
order_id: order.id,
user_id: order.user_id,
total: order.total,
};
nats.publish("orders.created", serde_json::to_vec(&event)?.into())
.await?;
Ok(Json(order))
}
// Email Service - consumes events
async fn email_worker(state: AppState) -> Result<()> {
let nats = state.nats().await.ok_or(Error::Internal("NATS not available"))?;
let mut subscriber = nats.subscribe("orders.created").await?;
while let Some(msg) = subscriber.next().await {
let event: OrderCreatedEvent = serde_json::from_slice(&msg.payload)?;
send_order_confirmation_email(&event).await?;
}
Ok(())
}
Request/Reply Pattern
Implement synchronous request/reply over NATS:
// Service A - makes request
async fn get_user_info(
State(state): State<AppState>,
Path(user_id): Path<i64>,
) -> Result<Json<UserInfo>> {
let nats = state.nats().await.ok_or(Error::Internal("NATS not available"))?;
let request = serde_json::to_vec(&UserInfoRequest { user_id })?;
// Send request and wait for reply (with timeout)
let reply = nats
.request("users.info", request.into())
.await
.map_err(|e| Error::ServiceError(format!("User service unavailable: {}", e)))?;
let info: UserInfo = serde_json::from_slice(&reply.payload)?;
Ok(Json(info))
}
// Service B - handles requests
async fn handle_user_info_requests(state: AppState) -> Result<()> {
let nats = state.nats().await.ok_or(Error::Internal("NATS not available"))?;
let mut subscriber = nats.subscribe("users.info").await?;
while let Some(msg) = subscriber.next().await {
if let Some(reply_subject) = msg.reply {
let request: UserInfoRequest = serde_json::from_slice(&msg.payload)?;
if let Ok(info) = fetch_user_info(&state, request.user_id).await {
let response = serde_json::to_vec(&info)?;
nats.publish(reply_subject, response.into()).await?;
}
}
}
Ok(())
}
Health Checks
NATS health is automatically monitored by the /ready endpoint:
[nats]
optional = false # Service not ready if NATS is down
The readiness probe verifies NATS connectivity:
curl http://localhost:8080/ready
# Returns 200 OK if NATS is healthy
# Returns 503 Service Unavailable if NATS is down
Error Handling and Retry
Handle transient failures with retry logic:
use backoff::{ExponentialBackoff, Error as BackoffError};
async fn publish_with_retry(
nats: &async_nats::Client,
subject: &str,
payload: Vec<u8>,
) -> Result<()> {
let operation = || async {
nats.publish(subject, payload.clone().into())
.await
.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotConnected {
BackoffError::Transient { err: e, retry_after: None }
} else {
BackoffError::Permanent(e)
}
})
};
backoff::future::retry(ExponentialBackoff::default(), operation).await?;
Ok(())
}
Best Practices
Use Hierarchical Subjects
Organize subjects with hierarchical naming:
// ✅ Good - hierarchical subjects
"users.created"
"users.updated"
"orders.created"
"orders.shipped"
"notifications.email.sent"
// ❌ Bad - flat subjects
"user_created"
"order_made"
"email"
Set Appropriate Stream Retention
Configure stream retention policies:
js.create_stream(jetstream::stream::Config {
name: "EVENTS".to_string(),
subjects: vec!["events.>".to_string()],
retention: jetstream::stream::RetentionPolicy::WorkQueue, // Or Limits, Interest
max_age: Duration::from_secs(86400 * 7), // 7 days
max_messages: 1_000_000,
..Default::default()
})
.await?;
Acknowledge Messages Explicitly
Always acknowledge JetStream messages:
// ✅ Good - explicit ack after processing
if process_message(&msg).await.is_ok() {
msg.ack().await?;
} else {
msg.nak().await?; // Negative ack for retry
}
// ❌ Bad - no acknowledgment
process_message(&msg).await?;
Use Durable Consumers
Create durable consumers for reliable processing:
jetstream::consumer::Config {
durable_name: Some("my-processor".to_string()), // ✅ Survives restarts
ack_policy: jetstream::consumer::AckPolicy::Explicit,
..Default::default()
}
Handle Poison Messages
Implement dead letter queue for failed messages:
const MAX_RETRIES: usize = 3;
async fn process_with_dlq(msg: jetstream::Message) -> Result<()> {
let info = msg.info()?;
if info.num_delivered > MAX_RETRIES {
// Send to dead letter queue
let nats = get_nats_client();
nats.publish("dlq.events", msg.payload.clone()).await?;
msg.ack().await?; // Acknowledge to remove from stream
} else if let Err(e) = process_message(&msg).await {
error!("Processing failed (attempt {}): {}", info.num_delivered, e);
msg.nak().await?; // Negative ack for retry
} else {
msg.ack().await?;
}
Ok(())
}
Production Deployment
Environment Configuration
# Production environment
export ACTON_NATS_URL=nats://nats.prod.example.com:4222
NATS Cluster Support
For high-availability deployments:
[nats]
url = "nats://nats1.prod.example.com:4222,nats://nats2.prod.example.com:4222,nats://nats3.prod.example.com:4222"
Kubernetes Deployment
env:
- name: ACTON_NATS_URL
value: "nats://nats-cluster.nats.svc.cluster.local:4222"
TLS/Authentication
Secure NATS connections:
[nats]
url = "nats://user:password@nats.prod.example.com:4222"
# Or use TLS certificates
tls_cert = "/path/to/client-cert.pem"
tls_key = "/path/to/client-key.pem"
tls_ca = "/path/to/ca-cert.pem"
Related Features
- Health Checks - Automatic NATS health monitoring
- Observability - Distributed tracing across event flows
- Database - Combine database writes with event publishing