Advanced

IPC Patterns

This page covers the three main IPC communication patterns: request-response, streaming, and push notifications via subscriptions.


Pattern 1: Request-Response

Client sends a request, actor sends a single response. This is the most common pattern for RPC-style calls.

Server Side

use acton_reactive::prelude::*;

#[acton_message(ipc)]
struct AddRequest { a: i32, b: i32 }

#[acton_message(ipc)]
struct AddResult { sum: i32 }

// Register types
registry.register::<AddRequest>("AddRequest");
registry.register::<AddResult>("AddResult");

// Handler
calculator.mutate_on::<AddRequest>(|_actor, ctx| {
    let a = ctx.message().a;
    let b = ctx.message().b;
    let reply = ctx.reply_envelope();

    Reply::pending(async move {
        reply.send(AddResult { sum: a + b }).await;
    })
});

Client Side

SEND: IpcEnvelope {
    target: "calculator",
    message_type: "AddRequest",
    payload: { a: 5, b: 3 },
    expects_reply: true
}

RECEIVE: IpcResponse {
    success: true,
    payload: { sum: 8 }
}

Python Client

client = ActonClient()
client.send("calculator", "AddRequest", {"a": 5, "b": 3})
response = client.receive()
print(f"Result: {response['payload']['sum']}")  # 8

Pattern 2: Request-Stream

Client sends a request, actor sends multiple response frames. Use this for pagination, countdown timers, or real-time data feeds.

Server Side

use acton_reactive::prelude::*;

#[acton_message(ipc)]
struct ListRequest { page_size: usize }

#[acton_message(ipc)]
struct ListItem { id: u64, name: String }

// Handler sends multiple responses
actor.mutate_on::<ListRequest>(|actor, ctx| {
    let page_size = ctx.message().page_size;
    let items = actor.model.items.clone();
    let reply = ctx.reply_envelope();

    Reply::pending(async move {
        for chunk in items.chunks(page_size) {
            for item in chunk {
                reply.send(ListItem {
                    id: item.id,
                    name: item.name.clone(),
                }).await;
            }
        }
    })
});

Client Side

SEND: IpcEnvelope {
    target: "list_service",
    message_type: "ListRequest",
    payload: { page_size: 10 },
    expects_stream: true
}

RECEIVE: IpcStreamFrame { sequence: 0, payload: {...}, is_final: false }
RECEIVE: IpcStreamFrame { sequence: 1, payload: {...}, is_final: false }
RECEIVE: IpcStreamFrame { sequence: 2, payload: {...}, is_final: true }

Countdown Example

#[acton_message(ipc)]
struct CountdownRequest { from: u32, delay_ms: u64 }

#[acton_message(ipc)]
struct CountdownTick { value: u32 }

actor.mutate_on::<CountdownRequest>(|_actor, ctx| {
    let start = ctx.message().from;
    let delay = ctx.message().delay_ms;
    let reply = ctx.reply_envelope();

    Reply::pending(async move {
        for i in (1..=start).rev() {
            reply.send(CountdownTick { value: i }).await;
            tokio::time::sleep(Duration::from_millis(delay)).await;
        }
    })
});

Pattern 3: Push Notifications (Subscriptions)

Client subscribes to message types and receives pushed notifications whenever those messages are broadcast.

Server Side

use acton_reactive::prelude::*;

#[acton_message(ipc)]
struct PriceUpdate { symbol: String, price: f64 }

// Register subscribable type
registry.register::<PriceUpdate>("PriceUpdate");

// Background task broadcasts updates
let broker = runtime.broker();
tokio::spawn(async move {
    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
        broker.broadcast(PriceUpdate {
            symbol: "ACME".to_string(),
            price: get_current_price(),
        }).await;
    }
});

Client Side

SEND: SubscribeRequest {
    message_types: ["PriceUpdate", "TradeExecuted"]
}

// Continuous stream of push notifications
RECEIVE: IpcPushNotification { message_type: "PriceUpdate", payload: {...} }
RECEIVE: IpcPushNotification { message_type: "PriceUpdate", payload: {...} }
...

Architecture


Multiple Services

Expose multiple actors with different responsibilities:

// Register all types
registry.register::<AddRequest>("AddRequest");
registry.register::<AddResult>("AddResult");
registry.register::<SetValue>("SetValue");
registry.register::<GetValue>("GetValue");
registry.register::<ValueResponse>("ValueResponse");

// Create and expose multiple services using expose_for_ipc
let mut calculator = runtime.new_actor_with_name::<Calculator>("calculator".to_string());
calculator.expose_for_ipc();
calculator.start().await;

let mut kv_store = runtime.new_actor_with_name::<KvStore>("kv_store".to_string());
kv_store.expose_for_ipc();
kv_store.start().await;

let mut price_feed = runtime.new_actor_with_name::<PriceFeed>("price_feed".to_string());
price_feed.expose_for_ipc();
price_feed.start().await;

Clients target different services:

# Calculator service
client.send("calculator", "AddRequest", {"a": 5, "b": 3})

# Key-value store
client.send("kv_store", "SetValue", {"key": "name", "value": "Alice"})
client.send("kv_store", "GetValue", {"key": "name"})

Stateful Services

Actors maintain state across requests:

#[acton_actor]
struct KvStore {
    data: HashMap<String, String>,
}

#[acton_message(ipc)]
struct SetValue { key: String, value: String }

#[acton_message(ipc)]
struct GetValue { key: String }

#[acton_message(ipc)]
struct ValueResponse { value: Option<String> }

// Set handler
kv_store.mutate_on::<SetValue>(|actor, ctx| {
    let key = ctx.message().key.clone();
    let value = ctx.message().value.clone();
    actor.model.data.insert(key, value);
    Reply::ready()
});

// Get handler
kv_store.act_on::<GetValue>(|actor, ctx| {
    let key = &ctx.message().key;
    let value = actor.model.data.get(key).cloned();
    let reply = ctx.reply_envelope();

    Reply::pending(async move {
        reply.send(ValueResponse { value }).await;
    })
});

Error Handling in Patterns

Client-Side Error Handling

response = client.receive()

if not response['success']:
    error = response['error']
    if 'UnknownMessageType' in error:
        print("Server doesn't recognize message type")
    elif 'ActorNotFound' in error:
        print("Target service not available")
    elif 'RateLimited' in error:
        print("Too many requests, backing off...")
        time.sleep(1)
        # Retry...

Server-Side with Fallible Handlers

#[derive(Debug, Error)]
enum OrderError {
    #[error("Product not found: {0}")]
    ProductNotFound(String),
    #[error("Out of stock")]
    OutOfStock,
}

actor.try_mutate_on::<PlaceOrder>(|actor, ctx| {
    let product = &ctx.message().product;

    if !actor.model.products.contains(product) {
        return Reply::try_err(OrderError::ProductNotFound(product.clone()));
    }

    if actor.model.stock.get(product).unwrap_or(&0) == &0 {
        return Reply::try_err(OrderError::OutOfStock);
    }

    // Process order...
    Reply::try_ok(OrderConfirmed { order_id: "..." })
});

Monitoring IPC

Track listener statistics:

let listener = runtime.start_ipc_listener().await?;

// Periodic monitoring
tokio::spawn(async move {
    loop {
        let stats = listener.stats();
        println!(
            "Connections: {}, Messages: {}, Errors: {}",
            stats.active_connections,
            stats.messages_processed,
            stats.errors
        );
        tokio::time::sleep(Duration::from_secs(60)).await;
    }
});

Pattern Comparison

PatternWhen to UseResponse Count
Request-ResponseRPC calls, queries1
Request-StreamPagination, countdowns, feedsN
SubscriptionsEvents, real-time updatesContinuous

Next Steps

Previous
IPC setup