Event-driven architecture has revolutionized how we build scalable, responsive applications. In Rust, this paradigm becomes even more powerful by combining the language’s safety guarantees with efficient concurrency models. Over the years, I’ve implemented numerous event-driven systems and discovered that certain design patterns consistently lead to more maintainable and performant code.
Event Bus Pattern
The Event Bus serves as a central hub for event distribution, effectively decoupling event producers from consumers. This pattern creates a publish-subscribe infrastructure where components can interact without direct dependencies.
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
enum EventType {
UserCreated,
OrderPlaced,
PaymentProcessed,
}
#[derive(Clone, Debug)]
struct Event {
event_type: EventType,
payload: String,
}
struct EventBus {
subscribers: HashMap<EventType, Vec<Box<dyn Fn(&Event) + Send + Sync>>>,
}
impl EventBus {
fn new() -> Self {
EventBus { subscribers: HashMap::new() }
}
fn publish(&self, event: Event) {
if let Some(handlers) = self.subscribers.get(&event.event_type) {
for handler in handlers {
handler(&event);
}
}
}
fn subscribe(&mut self, event_type: EventType, handler: Box<dyn Fn(&Event) + Send + Sync>) {
self.subscribers.entry(event_type).or_default().push(handler);
}
}
// Usage
fn main() {
let mut bus = EventBus::new();
bus.subscribe(EventType::UserCreated, Box::new(|event| {
println!("User created: {:?}", event.payload);
}));
bus.publish(Event {
event_type: EventType::UserCreated,
payload: "user_id:1234".to_string(),
});
}
A thread-safe implementation typically wraps the EventBus in an Arc<Mutex<>> for shared ownership across threads.
Actor Model Implementation
The Actor model treats actors as the fundamental unit of computation. Each actor maintains private state and communicates exclusively through message passing, making it ideal for concurrent event processing.
use std::sync::mpsc;
use std::thread;
#[derive(Debug)]
enum Message {
Update(String),
Process,
GetStatus(mpsc::Sender<String>),
}
struct ActorState {
data: String,
status: String,
}
struct Actor {
mailbox: mpsc::Receiver<Message>,
state: ActorState,
}
impl Actor {
fn new(mailbox: mpsc::Receiver<Message>) -> Self {
Actor {
mailbox,
state: ActorState {
data: String::new(),
status: "idle".to_string(),
},
}
}
fn run(&mut self) {
while let Ok(msg) = self.mailbox.recv() {
self.handle_message(msg);
}
}
fn handle_message(&mut self, msg: Message) {
match msg {
Message::Update(data) => {
self.state.data = data;
self.state.status = "updated".to_string();
},
Message::Process => {
self.process_data();
self.state.status = "processed".to_string();
},
Message::GetStatus(sender) => {
sender.send(self.state.status.clone()).unwrap();
}
}
}
fn process_data(&mut self) {
// Process the data
println!("Processing: {}", self.state.data);
}
}
// Usage
fn main() {
let (sender, receiver) = mpsc::channel();
let mut actor = Actor::new(receiver);
thread::spawn(move || {
actor.run();
});
sender.send(Message::Update("important data".to_string())).unwrap();
sender.send(Message::Process).unwrap();
let (status_sender, status_receiver) = mpsc::channel();
sender.send(Message::GetStatus(status_sender)).unwrap();
println!("Actor status: {}", status_receiver.recv().unwrap());
}
This pattern excels at isolating concerns and creating fault-tolerant systems, as actors handle failures independently.
Event Sourcing
Event sourcing persists all changes to application state as a sequence of events. This provides a complete audit trail and enables rebuilding state at any point in time.
use std::collections::HashMap;
#[derive(Clone, Debug)]
enum EventType {
UserRegistered,
EmailChanged,
PasswordChanged,
}
#[derive(Clone, Debug)]
struct Event {
event_type: EventType,
data: HashMap<String, String>,
timestamp: u64,
}
trait ApplyEvent {
fn apply(&mut self, event: &Event);
}
#[derive(Default, Debug)]
struct User {
id: String,
email: String,
password_hash: String,
}
impl ApplyEvent for User {
fn apply(&mut self, event: &Event) {
match event.event_type {
EventType::UserRegistered => {
self.id = event.data.get("id").unwrap().clone();
self.email = event.data.get("email").unwrap().clone();
self.password_hash = event.data.get("password_hash").unwrap().clone();
},
EventType::EmailChanged => {
self.email = event.data.get("email").unwrap().clone();
},
EventType::PasswordChanged => {
self.password_hash = event.data.get("password_hash").unwrap().clone();
}
}
}
}
struct EventSourcedEntity<T: ApplyEvent + Default> {
state: T,
events: Vec<Event>,
}
impl<T: ApplyEvent + Default> EventSourcedEntity<T> {
fn new() -> Self {
Self {
state: T::default(),
events: vec![],
}
}
fn apply_event(&mut self, event: Event) {
self.state.apply(&event);
self.events.push(event);
}
fn restore_from_events(events: Vec<Event>) -> Self {
let mut entity = Self::new();
for event in events {
entity.apply_event(event);
}
entity
}
}
// Usage
fn main() {
let mut user_entity = EventSourcedEntity::<User>::new();
// Create a user
let mut register_data = HashMap::new();
register_data.insert("id".to_string(), "12345".to_string());
register_data.insert("email".to_string(), "[email protected]".to_string());
register_data.insert("password_hash".to_string(), "hashed_password".to_string());
user_entity.apply_event(Event {
event_type: EventType::UserRegistered,
data: register_data,
timestamp: 1000,
});
// Change email
let mut email_data = HashMap::new();
email_data.insert("email".to_string(), "[email protected]".to_string());
user_entity.apply_event(Event {
event_type: EventType::EmailChanged,
data: email_data,
timestamp: 1001,
});
println!("Current user state: {:?}", user_entity.state);
println!("Event history length: {}", user_entity.events.len());
}
This pattern provides excellent auditability and enables advanced features like temporal queries and event replay.
Command-Query Separation
Command-Query Separation (CQS) distinguishes operations that modify state (commands) from those that return values (queries). This separation simplifies reasoning about system behavior.
use std::collections::HashMap;
// Command side
struct User {
id: String,
name: String,
email: String,
}
enum UserCommand {
Create { id: String, name: String, email: String },
UpdateEmail { id: String, email: String },
DeleteUser { id: String },
}
enum CommandError {
UserAlreadyExists,
UserNotFound,
InvalidData,
}
struct UserCommandHandler {
users: HashMap<String, User>,
}
impl UserCommandHandler {
fn new() -> Self {
Self { users: HashMap::new() }
}
fn handle(&mut self, command: UserCommand) -> Result<(), CommandError> {
match command {
UserCommand::Create { id, name, email } => {
if self.users.contains_key(&id) {
return Err(CommandError::UserAlreadyExists);
}
self.users.insert(id.clone(), User { id, name, email });
Ok(())
},
UserCommand::UpdateEmail { id, email } => {
let user = self.users.get_mut(&id)
.ok_or(CommandError::UserNotFound)?;
user.email = email;
Ok(())
},
UserCommand::DeleteUser { id } => {
if self.users.remove(&id).is_none() {
return Err(CommandError::UserNotFound);
}
Ok(())
}
}
}
}
// Query side
enum UserQuery {
GetById(String),
FindByEmail(String),
}
enum QueryError {
UserNotFound,
InvalidQuery,
}
#[derive(Clone)]
struct UserDto {
id: String,
name: String,
email: String,
}
struct UserQueryHandler {
user_repository: HashMap<String, User>,
}
impl UserQueryHandler {
fn new(user_repository: HashMap<String, User>) -> Self {
Self { user_repository }
}
fn handle(&self, query: UserQuery) -> Result<Vec<UserDto>, QueryError> {
match query {
UserQuery::GetById(id) => {
self.user_repository.get(&id)
.map(|user| vec![UserDto {
id: user.id.clone(),
name: user.name.clone(),
email: user.email.clone(),
}])
.ok_or(QueryError::UserNotFound)
},
UserQuery::FindByEmail(email) => {
let results: Vec<UserDto> = self.user_repository.values()
.filter(|user| user.email == email)
.map(|user| UserDto {
id: user.id.clone(),
name: user.name.clone(),
email: user.email.clone(),
})
.collect();
if results.is_empty() {
return Err(QueryError::UserNotFound);
}
Ok(results)
}
}
}
}
// Usage
fn main() {
let mut command_handler = UserCommandHandler::new();
// Execute commands
command_handler.handle(UserCommand::Create {
id: "1".to_string(),
name: "John Doe".to_string(),
email: "[email protected]".to_string(),
}).unwrap();
command_handler.handle(UserCommand::Create {
id: "2".to_string(),
name: "Jane Smith".to_string(),
email: "[email protected]".to_string(),
}).unwrap();
// Query handler works with a copy of the data
let query_handler = UserQueryHandler::new(command_handler.users.clone());
// Execute queries
let user = query_handler.handle(UserQuery::GetById("1".to_string())).unwrap();
println!("Found user: {:?}", user[0].name);
}
CQS architecture excels in systems where read and write operations have different scaling requirements.
Event Stream Processing
Event stream processing handles continuous flows of events, applying transformations, filtering, and aggregations to derive insights.
use futures::{Stream, StreamExt};
use std::pin::Pin;
use std::time::{Duration, Instant};
type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
#[derive(Debug, Clone)]
struct SensorReading {
sensor_id: String,
temperature: f64,
humidity: f64,
timestamp: Instant,
}
#[derive(Debug)]
struct AggregatedReading {
sensor_id: String,
avg_temperature: f64,
min_temperature: f64,
max_temperature: f64,
reading_count: usize,
}
// Transform events
async fn transform_readings(readings: BoxStream<SensorReading>) -> BoxStream<SensorReading> {
readings
.map(|reading| SensorReading {
temperature: reading.temperature * 1.8 + 32.0, // Convert to Fahrenheit
..reading
})
.boxed()
}
// Filter events
async fn filter_anomalies(readings: BoxStream<SensorReading>) -> BoxStream<SensorReading> {
readings
.filter(|reading| reading.temperature > 80.0 || reading.temperature < 10.0)
.boxed()
}
// Windowed aggregation
async fn window_readings(
readings: BoxStream<SensorReading>,
window_size: Duration,
) -> BoxStream<Vec<SensorReading>> {
let mut current_window: Vec<SensorReading> = Vec::new();
let mut window_start = Instant::now();
readings
.filter_map(move |reading| {
async move {
let now = reading.timestamp;
// If window duration passed, emit window and start new one
if now - window_start > window_size {
let result = Some(std::mem::replace(&mut current_window, vec![reading]));
window_start = now;
result
} else {
current_window.push(reading);
None
}
}
})
.boxed()
}
// Aggregate windowed events
async fn aggregate_readings(windows: BoxStream<Vec<SensorReading>>) -> BoxStream<AggregatedReading> {
windows
.filter_map(|window| {
async move {
if window.is_empty() {
return None;
}
let sensor_id = window[0].sensor_id.clone();
let count = window.len();
let sum_temp: f64 = window.iter().map(|r| r.temperature).sum();
let min_temp = window.iter().map(|r| r.temperature).fold(f64::INFINITY, f64::min);
let max_temp = window.iter().map(|r| r.temperature).fold(f64::NEG_INFINITY, f64::max);
Some(AggregatedReading {
sensor_id,
avg_temperature: sum_temp / count as f64,
min_temperature: min_temp,
max_temperature: max_temp,
reading_count: count,
})
}
})
.boxed()
}
// Example usage (in real code you'd have a real stream source)
async fn process_sensor_data() {
// Simulate a stream of sensor readings
let readings: BoxStream<SensorReading> = futures::stream::iter(vec![
SensorReading {
sensor_id: "sensor1".to_string(),
temperature: 25.0,
humidity: 60.0,
timestamp: Instant::now(),
},
// More readings...
]).boxed();
// Build processing pipeline
let transformed = transform_readings(readings).await;
let windowed = window_readings(transformed, Duration::from_secs(60)).await;
let aggregated = aggregate_readings(windowed).await;
// Consume the final stream
aggregated.for_each(|agg| async move {
println!("Aggregated data: {:?}", agg);
}).await;
}
Stream processing works especially well for time-series data and real-time analytics applications.
Circuit Breaker Pattern
The circuit breaker prevents system failures from cascading by temporarily stopping operations when error rates exceed thresholds.
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicU8, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy)]
enum CircuitState {
Closed, // Normal operation
Open, // Circuit tripped, failing fast
HalfOpen, // Testing if system has recovered
}
#[derive(Debug)]
enum CircuitBreakerError {
CircuitOpen,
OperationFailed(String),
}
struct CircuitBreaker {
state: AtomicU8,
failure_threshold: u32,
reset_timeout: Duration,
failures: AtomicU32,
last_failure_time: AtomicU64,
}
impl CircuitBreaker {
fn new(failure_threshold: u32, reset_timeout: Duration) -> Self {
Self {
state: AtomicU8::new(CircuitState::Closed as u8),
failure_threshold,
reset_timeout,
failures: AtomicU32::new(0),
last_failure_time: AtomicU64::new(0),
}
}
fn current_state(&self) -> CircuitState {
match self.state.load(Ordering::Relaxed) {
0 => CircuitState::Closed,
1 => CircuitState::Open,
2 => CircuitState::HalfOpen,
_ => unreachable!(),
}
}
fn record_failure(&self) {
let current_failures = self.failures.fetch_add(1, Ordering::Relaxed) + 1;
self.last_failure_time.store(
Instant::now().elapsed().as_secs(),
Ordering::Relaxed,
);
if current_failures >= self.failure_threshold {
self.trip();
}
}
fn record_success(&self) {
match self.current_state() {
CircuitState::HalfOpen => {
// Reset circuit on success in half-open state
self.reset();
},
CircuitState::Closed => {
// Reset failure count
self.failures.store(0, Ordering::Relaxed);
},
_ => {}
}
}
fn trip(&self) {
self.state.store(CircuitState::Open as u8, Ordering::Relaxed);
}
fn reset(&self) {
self.state.store(CircuitState::Closed as u8, Ordering::Relaxed);
self.failures.store(0, Ordering::Relaxed);
}
fn attempt_reset(&self) {
let last_failure = self.last_failure_time.load(Ordering::Relaxed);
let now = Instant::now().elapsed().as_secs();
if now - last_failure >= self.reset_timeout.as_secs() {
self.state.store(CircuitState::HalfOpen as u8, Ordering::Relaxed);
}
}
fn execute<F, T>(&self, operation: F) -> Result<T, CircuitBreakerError>
where
F: FnOnce() -> Result<T, String>,
{
match self.current_state() {
CircuitState::Open => {
self.attempt_reset();
if self.current_state() == CircuitState::Open {
return Err(CircuitBreakerError::CircuitOpen);
}
},
_ => {}
}
match operation() {
Ok(result) => {
self.record_success();
Ok(result)
},
Err(err) => {
self.record_failure();
Err(CircuitBreakerError::OperationFailed(err))
}
}
}
}
// Usage
fn main() {
let circuit = CircuitBreaker::new(3, Duration::from_secs(30));
// Simulate some operations
for i in 0..10 {
let result = circuit.execute(|| {
// Simulate an external service call
if i % 4 == 0 {
Ok("Success")
} else {
Err("Service unavailable".to_string())
}
});
match result {
Ok(msg) => println!("Operation succeeded: {}", msg),
Err(CircuitBreakerError::CircuitOpen) => {
println!("Circuit open, failing fast without making the call");
},
Err(CircuitBreakerError::OperationFailed(err)) => {
println!("Operation failed: {}", err);
}
}
}
}
This pattern is crucial for resilient microservices architectures, where dependencies might fail.
Backpressure Handling
Backpressure mechanisms protect systems from being overwhelmed by managing resource consumption when event production outpaces consumption.
use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
#[derive(Debug)]
enum SendError<T> {
Backpressure(T),
ChannelClosed,
}
struct BackpressureChannel<T> {
queue: Arc<Mutex<VecDeque<T>>>,
condvar: Arc<Condvar>,
high_watermark: usize,
low_watermark: usize,
closed: Arc<Mutex<bool>>,
}
impl<T> BackpressureChannel<T> {
fn new(high_watermark: usize, low_watermark: usize) -> Self {
Self {
queue: Arc::new(Mutex::new(VecDeque::new())),
condvar: Arc::new(Condvar::new()),
high_watermark,
low_watermark,
closed: Arc::new(Mutex::new(false)),
}
}
fn send(&self, item: T) -> Result<(), SendError<T>> {
let mut queue = self.queue.lock().unwrap();
if *self.closed.lock().unwrap() {
return Err(SendError::ChannelClosed);
}
if queue.len() >= self.high_watermark {
// Apply backpressure
return Err(SendError::Backpressure(item));
}
queue.push_back(item);
// Notify waiting receivers
self.condvar.notify_one();
Ok(())
}
fn try_send(&self, item: T) -> Result<(), SendError<T>> {
let mut queue = self.queue.lock().unwrap();
if *self.closed.lock().unwrap() {
return Err(SendError::ChannelClosed);
}
if queue.len() >= self.high_watermark {
return Err(SendError::Backpressure(item));
}
queue.push_back(item);
self.condvar.notify_one();
Ok(())
}
fn receive(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
while queue.is_empty() && !*self.closed.lock().unwrap() {
queue = self.condvar.wait(queue).unwrap();
}
let item = queue.pop_front();
// If queue length drops below low watermark, we've reduced pressure
if queue.len() <= self.low_watermark {
// In a real implementation, you might notify senders here
// that it's safe to send more
}
item
}
fn try_receive(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop_front()
}
fn close(&self) {
let mut closed = self.closed.lock().unwrap();
*closed = true;
self.condvar.notify_all();
}
fn len(&self) -> usize {
self.queue.lock().unwrap().len()
}
fn is_empty(&self) -> bool {
self.queue.lock().unwrap().is_empty()
}
}
// Usage
fn main() {
let channel = BackpressureChannel::new(100, 20);
// Producer thread
let channel_clone = channel.clone();
std::thread::spawn(move || {
for i in 0..1000 {
match channel_clone.send(i) {
Ok(_) => println!("Sent: {}", i),
Err(SendError::Backpressure(item)) => {
println!("Backpressure applied, waiting to send: {}", item);
std::thread::sleep(std::time::Duration::from_millis(100));
// In a real system, you might retry or signal upstream
},
Err(SendError::ChannelClosed) => {
println!("Channel closed");
break;
}
}
}
});
// Consumer thread
for _ in 0..1000 {
if let Some(item) = channel.receive() {
println!("Received: {}", item);
} else {
break;
}
// Simulate slow consumer
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
Backpressure techniques are essential for maintaining system stability under variable load conditions.
Reactive Event Processing
Reactive programming combines functional techniques with event streams to create composable, declarative event processing pipelines.
use futures::{stream, Stream, StreamExt};
use std::pin::Pin;
use std::time::Duration;
type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
#[derive(Debug, Clone)]
enum Event {
UserSignedUp { user_id: String, email: String },
OrderPlaced { order_id: String, user_id: String, amount: f64 },
PaymentReceived { payment_id: String, order_id: String, amount: f64 },
}
#[derive(Debug, Clone)]
enum Priority {
Low,
Medium,
High,
}
#[derive(Debug)]
struct PrioritizedEvent {
event: Event,
priority: Priority,
}
#[derive(Debug)]
struct ProcessedEvent {
event_type: String,
user_id: Option<String>,
details: String,
}
fn determine_priority(event: &Event) -> Priority {
match event {
Event::PaymentReceived { amount, .. } if *amount > 1000.0 => Priority::High,
Event::OrderPlaced { .. } => Priority::Medium,
_ => Priority::Low,
}
}
fn process_event(event: PrioritizedEvent) -> Result<ProcessedEvent, String> {
match event.event {
Event::UserSignedUp { user_id, email } => {
Ok(ProcessedEvent {
event_type: "signup".to_string(),
user_id: Some(user_id),
details: format!("New user with email: {}", email),
})
},
Event::OrderPlaced { order_id, user_id, amount } => {
Ok(ProcessedEvent {
event_type: "order".to_string(),
user_id: Some(user_id),
details: format!("Order {} placed for ${:.2}", order_id, amount),
})
},
Event::PaymentReceived { payment_id, order_id, amount } => {
Ok(ProcessedEvent {
event_type: "payment".to_string(),
user_id: None,
details: format!("Payment {} for order {} of ${:.2}",
payment_id, order_id, amount),
})
}
}
}
fn process_batch(events: Vec<ProcessedEvent>) -> Vec<String> {
events.iter().map(|e| format!("{}: {}", e.event_type, e.details)).collect()
}
async fn react_to_events(events: BoxStream<Event>) -> BoxStream<Vec<String>> {
events
// Prioritize events
.map(|event| {
PrioritizedEvent {
priority: determine_priority(&event),
event,
}
})
// Filter by priority
.filter(|prioritized| {
let is_high_priority = matches!(prioritized.priority, Priority::High);
async move { is_high_priority }
})
// Process events
.map(|event| process_event(event))
// Keep only successful results
.filter_map(|result| async move { result.ok() })
// Buffer events into batches
.chunks(5)
// Process batches
.map(process_batch)
.boxed()
}
// Illustrative example
#[tokio::main]
async fn main() {
// Create a sample event stream
let events = stream::iter(vec![
Event::UserSignedUp {
user_id: "user123".to_string(),
email: "[email protected]".to_string()
},
Event::OrderPlaced {
order_id: "order456".to_string(),
user_id: "user123".to_string(),
amount: 99.95,
},
Event::PaymentReceived {
payment_id: "pmt789".to_string(),
order_id: "order456".to_string(),
amount: 99.95,
},
Event::OrderPlaced {
order_id: "order789".to_string(),
user_id: "user123".to_string(),
amount: 1299.95,
},
Event::PaymentReceived {
payment_id: "pmt999".to_string(),
order_id: "order789".to_string(),
amount: 1299.95,
},
]).boxed();
// Process events reactively
let processed_events = react_to_events(events).await;
// Consume and print the processed events
processed_events.for_each(|batch| async move {
println!("Processed batch: {:?}", batch);
}).await;
}
The reactive approach excels when dealing with complex event processing requirements that involve multiple transformations and variable event arrival patterns.
By incorporating these patterns into your Rust event-driven systems, you can achieve the ideal balance of performance, maintainability, and reliability. Each pattern addresses specific concerns, from loosely coupling components with the Event Bus to ensuring resilience with Circuit Breakers. The beauty of Rust is that these patterns gain additional