Core Concepts
Pub/Sub Broadcasting
Sometimes you need to notify multiple actors about an event. Instead of sending messages to each one individually, use the broker to broadcast to all subscribers.
The Broker
Every Acton runtime has a broker - think of it as a bulletin board where actors can post announcements and subscribe to topics.
let mut runtime = ActonApp::launch_async().await;
// Get the broker
let broker = runtime.broker();
// Actors can also access it
actor.mutate_on::<SomeMessage>(|actor, ctx| {
let broker = actor.broker();
// ...
});
Subscribing to Messages
Actors subscribe to receive specific message types:
// Get the actor's handle before starting
let handle = actor.handle().clone();
// Subscribe to message types
handle.subscribe::<PriceUpdate>().await;
handle.subscribe::<SystemAlert>().await;
// Now start the actor
let handle = actor.start().await;
Subscribe before starting
Subscribe before calling start() to ensure you don't miss any broadcasts sent immediately after startup.
Broadcasting Messages
Anyone with access to the broker can broadcast:
let broker = runtime.broker();
// Broadcast to all subscribers
broker.broadcast(PriceUpdate {
symbol: "ACME".into(),
price: 123.45,
}).await;
From within a handler:
actor.mutate_on::<PriceChanged>(|actor, ctx| {
let broker = actor.broker().clone();
let update = PriceUpdate {
symbol: ctx.message().symbol.clone(),
price: ctx.message().new_price,
};
Reply::pending(async move {
broker.broadcast(update).await;
})
});
Example: Price Feed
#[acton_message]
struct PriceUpdate {
symbol: String,
price: f64,
}
// Price display actor
let mut display = runtime.new_actor::<PriceDisplay>();
display.mutate_on::<PriceUpdate>(|actor, ctx| {
let update = ctx.message();
println!("{}: ${:.2}", update.symbol, update.price);
Reply::ready()
});
display.handle().subscribe::<PriceUpdate>().await;
let _display = display.start().await;
// Price logger actor
let mut logger = runtime.new_actor::<PriceLogger>();
logger.mutate_on::<PriceUpdate>(|actor, ctx| {
actor.model.history.push(ctx.message().clone());
Reply::ready()
});
logger.handle().subscribe::<PriceUpdate>().await;
let _logger = logger.start().await;
// Broadcast reaches both
let broker = runtime.broker();
broker.broadcast(PriceUpdate {
symbol: "ACME".into(),
price: 150.0,
}).await;
Architecture
Subscription Lifecycle
Unsubscribing
Actors can unsubscribe from message types:
handle.unsubscribe::<PriceUpdate>().await;
Actors automatically unsubscribe from everything when they stop.
Multiple Message Types
Subscribe to as many types as needed:
let handle = actor.handle().clone();
handle.subscribe::<PriceUpdate>().await;
handle.subscribe::<VolumeUpdate>().await;
handle.subscribe::<TradeExecuted>().await;
handle.subscribe::<MarketOpen>().await;
handle.subscribe::<MarketClose>().await;
let handle = actor.start().await;
Filtering Broadcasts
The broker delivers all broadcasts of a type. Filter in your handler:
actor.mutate_on::<PriceUpdate>(|actor, ctx| {
let update = ctx.message();
// Only care about specific symbols
if !actor.model.watched_symbols.contains(&update.symbol) {
return Reply::ready();
}
// Process the update
actor.model.prices.insert(update.symbol.clone(), update.price);
Reply::ready()
});
Patterns
Event Bus
Use the broker as a central event bus:
// Define events
#[acton_message]
struct UserLoggedIn { user_id: String }
#[acton_message]
struct OrderPlaced { order_id: String, user_id: String }
#[acton_message]
struct PaymentReceived { order_id: String, amount: f64 }
// Analytics subscribes to everything
analytics.handle().subscribe::<UserLoggedIn>().await;
analytics.handle().subscribe::<OrderPlaced>().await;
analytics.handle().subscribe::<PaymentReceived>().await;
// Notification service only cares about orders
notifications.handle().subscribe::<OrderPlaced>().await;
// Publishers broadcast events
broker.broadcast(UserLoggedIn { user_id: "123".into() }).await;
broker.broadcast(OrderPlaced {
order_id: "ORD-456".into(),
user_id: "123".into(),
}).await;
System Alerts
Broadcast system-wide alerts:
#[acton_message]
enum SystemAlert {
Shutdown { in_seconds: u32 },
MaintenanceMode,
NormalOperation,
}
// All actors that need to respond to alerts subscribe
handle.subscribe::<SystemAlert>().await;
// Handler
actor.mutate_on::<SystemAlert>(|actor, ctx| {
match ctx.message() {
SystemAlert::Shutdown { in_seconds } => {
actor.model.accepting_new_work = false;
// Finish current work...
}
SystemAlert::MaintenanceMode => {
actor.model.in_maintenance = true;
}
SystemAlert::NormalOperation => {
actor.model.in_maintenance = false;
actor.model.accepting_new_work = true;
}
}
Reply::ready()
});
Config Reload
Broadcast configuration changes:
#[acton_message]
struct ConfigReload {
config: AppConfig,
}
// Actors that use config subscribe
worker.handle().subscribe::<ConfigReload>().await;
cache.handle().subscribe::<ConfigReload>().await;
rate_limiter.handle().subscribe::<ConfigReload>().await;
// When config changes
broker.broadcast(ConfigReload { config: new_config }).await;
Health Checks
Periodic health broadcasts:
#[acton_message]
struct HealthCheck { request_id: Uuid }
#[acton_message]
struct HealthResponse {
request_id: Uuid,
actor_id: String,
healthy: bool,
}
// Health monitor broadcasts checks
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
broker.broadcast(HealthCheck {
request_id: Uuid::new_v4(),
}).await;
}
});
// Actors respond
actor.mutate_on::<HealthCheck>(|actor, ctx| {
let request_id = ctx.message().request_id;
let actor_id = actor.id().to_string();
let healthy = actor.model.is_healthy();
let reply = ctx.reply_envelope();
Reply::pending(async move {
reply.send(HealthResponse {
request_id,
actor_id,
healthy,
}).await;
})
});
Best Practices
Use Specific Message Types
// Good: specific events
#[acton_message]
struct OrderCreated { order_id: String }
#[acton_message]
struct OrderShipped { order_id: String, tracking: String }
// Avoid: generic events
#[acton_message]
struct Event { event_type: String, data: Value }
Don't Rely on Ordering
Broadcasts are delivered asynchronously. Don't assume order:
// Don't do this
broker.broadcast(Step1).await;
broker.broadcast(Step2).await;
broker.broadcast(Step3).await;
// Actors might receive in different orders!
// Instead, use explicit sequencing in messages
broker.broadcast(Step { number: 1 }).await;
Keep Broadcasts Lightweight
// Good: small messages
#[acton_message]
struct PriceChanged {
symbol: String,
price: f64,
}
// Avoid: large payloads
#[acton_message]
struct DataDump {
all_prices: HashMap<String, f64>, // Could be huge!
full_history: Vec<Trade>,
}
Next Steps
- Supervision - Parent-child actor hierarchies
- Request-Response - Coordinating between actors
- IPC Communication - Pub/sub across process boundaries