When building microservices with Ruby on Rails, I’ve found that event-driven architecture offers tremendous advantages for creating scalable, loosely coupled systems. After working on several large-scale implementations, I’ve identified key techniques that significantly improve reliability and performance.
Event-driven microservices rely on asynchronous communication patterns where services interact by publishing and consuming events rather than making direct requests. This approach creates systems that are more resilient, independently scalable, and maintainable.
Event Schema Design and Versioning
Effective event schema design forms the foundation of any event-driven system. Events should be self-contained, carrying all necessary information without requiring the consumer to fetch additional data.
I recommend using a schema registry to enforce consistency across events. With Ruby on Rails, we can implement this using a dedicated gem:
# app/models/event_schema.rb
class EventSchema
include ActiveModel::Validations
class << self
def registry
@registry ||= {}
end
def register(event_name, version, schema)
registry[event_name] ||= {}
registry[event_name][version] = schema
end
def validate(event_name, version, payload)
schema = registry.dig(event_name, version)
raise UnknownSchemaError, "No schema for #{event_name}:#{version}" unless schema
schema.validate(payload)
end
end
end
# Registration example
EventSchema.register('user.created', '1.0', UserCreatedSchema)
For versioning, I follow semantic versioning principles and include the version in the event metadata. This allows services to handle multiple versions simultaneously during transitions:
# app/services/event_versioner.rb
class EventVersioner
def self.transform(event, target_version)
return event if event.version == target_version
migration_path = find_migration_path(event.name, event.version, target_version)
current_payload = event.payload.dup
migration_path.each do |migration|
current_payload = migration.transform(current_payload)
end
event.dup.tap do |e|
e.payload = current_payload
e.version = target_version
end
end
private
def self.find_migration_path(event_name, from_version, to_version)
# Find appropriate migrations between versions
EventMigrationRegistry.migrations_for(event_name, from_version, to_version)
end
end
Message Broker Integration
Selecting the right message broker is crucial. I’ve had success with RabbitMQ for complex routing needs and Kafka for high-throughput scenarios.
For Rails integration with RabbitMQ, the Bunny gem works well:
# config/initializers/rabbitmq.rb
require 'bunny'
module RabbitMQ
class << self
def connection
@connection ||= begin
conn = Bunny.new(
hosts: Rails.application.config.rabbitmq[:hosts],
vhost: Rails.application.config.rabbitmq[:vhost],
username: Rails.application.config.rabbitmq[:username],
password: Rails.application.config.rabbitmq[:password],
automatically_recover: true
)
conn.start
conn
end
end
def channel
Thread.current[:rabbitmq_channel] ||= connection.create_channel
end
def exchange
@exchange ||= channel.topic('events', durable: true)
end
end
end
# Graceful shutdown
at_exit do
RabbitMQ.connection.close if RabbitMQ.connection.open?
end
When working with Kafka, the ruby-kafka gem provides a solid foundation:
# config/initializers/kafka.rb
require 'kafka'
module KafkaClient
class << self
def producer
@producer ||= client.producer(
required_acks: :all,
compression_codec: :snappy,
max_retries: 3,
retry_backoff: 1
)
end
def consumer(group_id:)
client.consumer(group_id: group_id)
end
private
def client
@client ||= Kafka.new(
seed_brokers: Rails.application.config.kafka[:brokers],
client_id: Rails.application.config.service_name
)
end
end
end
Event Sourcing Patterns
Event sourcing maintains a complete history of domain changes as a sequence of events. This approach provides auditability and enables replaying events to reconstruct state.
Here’s a basic implementation of an event-sourced aggregate:
# app/models/concerns/event_sourced.rb
module EventSourced
extend ActiveSupport::Concern
included do
class_attribute :event_handlers
self.event_handlers = {}
end
class_methods do
def apply_event(event_name, &handler)
self.event_handlers[event_name.to_s] = handler
end
end
attr_reader :id, :version
def initialize(id = nil)
@id = id || SecureRandom.uuid
@version = 0
@changes = []
end
def apply(event)
event_handler = self.class.event_handlers[event.name]
raise UnhandledEventError, "No handler for #{event.name}" unless event_handler
instance_exec(event.payload, &event_handler)
@version += 1
@changes << event
self
end
def commit_changes
return if @changes.empty?
EventRepository.store_events(@changes)
EventPublisher.publish_events(@changes)
@changes = []
end
def load_from_history(events)
events.each { |event| apply(event) }
@changes = []
self
end
end
And an example aggregate using this pattern:
# app/models/order.rb
class Order
include EventSourced
attr_reader :status, :items, :total_amount
apply_event 'order.created' do |data|
@status = 'pending'
@items = data[:items]
@total_amount = calculate_total(data[:items])
end
apply_event 'order.item_added' do |data|
@items << data[:item]
@total_amount = calculate_total(@items)
end
apply_event 'order.payment_processed' do |data|
@status = 'paid'
@payment_id = data[:payment_id]
end
def initialize(id = nil)
super
@items = []
@total_amount = 0
end
def create(items)
raise InvalidStateError, "Order already exists" if @version > 0
apply(Event.new(
name: 'order.created',
payload: { items: items }
))
end
def add_item(item)
raise InvalidStateError, "Cannot modify a paid order" if @status == 'paid'
apply(Event.new(
name: 'order.item_added',
payload: { item: item }
))
end
def process_payment(payment_id)
raise InvalidStateError, "Order already paid" if @status == 'paid'
apply(Event.new(
name: 'order.payment_processed',
payload: { payment_id: payment_id }
))
end
private
def calculate_total(items)
items.sum { |item| item[:price] * item[:quantity] }
end
end
Command-Query Responsibility Segregation
CQRS separates operations that modify state (commands) from operations that read state (queries). This pattern works particularly well with event-driven architectures.
In a Rails application, we can implement CQRS by separating our controllers into command and query controllers:
# app/controllers/orders/commands_controller.rb
module Orders
class CommandsController < ApplicationController
def create
command = CreateOrderCommand.new(order_params)
CommandBus.execute(command)
head :accepted
end
def add_item
command = AddOrderItemCommand.new(
order_id: params[:id],
item: item_params
)
CommandBus.execute(command)
head :accepted
end
private
def order_params
params.require(:order).permit(items: [:product_id, :quantity, :price])
end
def item_params
params.require(:item).permit(:product_id, :quantity, :price)
end
end
end
# app/controllers/orders/queries_controller.rb
module Orders
class QueriesController < ApplicationController
def show
@order = OrderReadModel.find(params[:id])
render json: @order
end
def index
@orders = OrderReadModel.where(customer_id: current_customer.id)
.order(created_at: :desc)
.page(params[:page])
render json: @orders
end
end
end
The command side uses a command bus to handle command execution:
# app/services/command_bus.rb
class CommandBus
class << self
def register(command_class, handler)
handlers[command_class.name] = handler
end
def execute(command)
handler = handlers[command.class.name]
raise UnregisteredCommandError, "No handler for #{command.class.name}" unless handler
handler.execute(command)
end
private
def handlers
@handlers ||= {}
end
end
end
# Registration example
CommandBus.register(CreateOrderCommand, CreateOrderHandler.new)
Dead Letter Queues
Processing failures are inevitable in distributed systems. Dead letter queues (DLQs) provide a mechanism to handle failed event processing.
Here’s how I implement DLQ handling in Rails:
# app/services/event_consumer.rb
class EventConsumer
def initialize(queue_name, options = {})
@queue_name = queue_name
@max_retries = options[:max_retries] || 3
@handler = options[:handler]
@dlq_name = "#{queue_name}.dlq"
end
def start
channel = RabbitMQ.connection.create_channel
channel.prefetch(10)
queue = channel.queue(@queue_name, durable: true)
dlq = channel.queue(@dlq_name, durable: true)
queue.subscribe(manual_ack: true) do |delivery_info, properties, payload|
process_message(channel, delivery_info, properties, payload, dlq)
end
end
private
def process_message(channel, delivery_info, properties, payload, dlq)
event = EventSerializer.deserialize(payload)
@handler.process(event)
channel.acknowledge(delivery_info.delivery_tag, false)
rescue StandardError => e
retry_count = properties.headers&.fetch('x-retry-count', 0).to_i
if retry_count < @max_retries
requeue_with_backoff(channel, delivery_info, properties, payload, retry_count)
else
move_to_dlq(channel, delivery_info, properties, payload, dlq, e)
end
end
def requeue_with_backoff(channel, delivery_info, properties, payload, retry_count)
channel.reject(delivery_info.delivery_tag, false)
headers = properties.headers || {}
headers['x-retry-count'] = retry_count + 1
# Exponential backoff
backoff = (2**retry_count) * 1000 # milliseconds
RabbitMQ.exchange.publish(
payload,
routing_key: delivery_info.routing_key,
headers: headers,
expiration: backoff.to_s
)
end
def move_to_dlq(channel, delivery_info, properties, payload, dlq, error)
channel.reject(delivery_info.delivery_tag, false)
headers = properties.headers || {}
headers['x-exception'] = error.class.name
headers['x-exception-message'] = error.message
headers['x-failed-at'] = Time.current.iso8601
dlq.publish(payload, headers: headers)
Rails.logger.error("Message moved to DLQ: #{error.message}")
end
end
Idempotent Event Processing
Idempotent processing ensures that events can be processed multiple times without causing unintended side effects, which is essential in distributed systems where message delivery can’t be guaranteed exactly once.
I implement idempotency using a processed events tracker:
# app/services/processed_events_tracker.rb
class ProcessedEventsTracker
def self.processed?(event_id)
key = "processed_event:#{event_id}"
# Use Redis to track processed events
result = Redis.current.get(key)
return true if result
false
end
def self.mark_processed(event_id, ttl: 7.days)
key = "processed_event:#{event_id}"
Redis.current.setex(key, ttl.to_i, '1')
end
end
# Usage in event handler
class OrderCreatedHandler
def handle(event)
return if ProcessedEventsTracker.processed?(event.id)
ActiveRecord::Base.transaction do
# Process the event...
# Mark as processed at the end of the transaction
ProcessedEventsTracker.mark_processed(event.id)
end
end
end
Circuit Breaker Implementation
Circuit breakers prevent cascading failures by stopping calls to failing services. This pattern is vital in microservices architectures.
Here’s a simple implementation for Rails:
# app/services/circuit_breaker.rb
class CircuitBreaker
class OpenError < StandardError; end
STATES = [:closed, :open, :half_open].freeze
attr_reader :name, :state
def initialize(name, options = {})
@name = name
@state = :closed
@failure_threshold = options[:failure_threshold] || 5
@reset_timeout = options[:reset_timeout] || 30.seconds
@half_open_max_calls = options[:half_open_max_calls] || 3
@failure_count = 0
@opened_at = nil
@half_open_successful_calls = 0
@mutex = Mutex.new
end
def run
case state
when :closed
execute_closed { yield }
when :open
execute_open { yield }
when :half_open
execute_half_open { yield }
end
end
def self.circuits
@circuits ||= {}
end
def self.get(name)
circuits[name] ||= new(name)
end
def self.run(name, options = {}, &block)
circuit = get(name)
options.each { |k, v| circuit.instance_variable_set("@#{k}", v) }
circuit.run(&block)
end
private
def execute_closed
result = yield
reset_failure_count
result
rescue => error
handle_failure(error)
end
def execute_open
if ready_to_half_open?
transition_to(:half_open)
execute_half_open { yield }
else
raise OpenError, "Circuit #{name} is open"
end
end
def execute_half_open
result = yield
record_success
result
rescue => error
transition_to(:open)
raise error
end
def handle_failure(error)
@mutex.synchronize do
@failure_count += 1
transition_to(:open) if @failure_count >= @failure_threshold
end
raise error
end
def reset_failure_count
@mutex.synchronize do
@failure_count = 0
end
end
def record_success
@mutex.synchronize do
@half_open_successful_calls += 1
transition_to(:closed) if @half_open_successful_calls >= @half_open_max_calls
end
end
def transition_to(new_state)
@state = new_state
@opened_at = Time.current if new_state == :open
@half_open_successful_calls = 0 if new_state != :half_open
@failure_count = 0 if new_state == :closed
end
def ready_to_half_open?
@opened_at + @reset_timeout < Time.current
end
end
Event Tracing and Observability
Tracing events across services is essential for debugging and performance optimization in distributed systems.
A Rails implementation combining OpenTelemetry with custom tracers works well:
# app/services/event_tracer.rb
class EventTracer
def self.with_new_context
current_span_context = OpenTelemetry::Trace.current_span.context
# Create new trace context
new_trace_id = SecureRandom.hex(16)
new_span_id = SecureRandom.hex(8)
context = {
trace_id: new_trace_id,
span_id: new_span_id,
parent_span_id: nil
}
yield context
end
def self.with_existing_context(trace_id, parent_span_id)
context = {
trace_id: trace_id,
span_id: SecureRandom.hex(8),
parent_span_id: parent_span_id
}
yield context
end
def self.add_event_span(event_name, context, metadata = {})
tracer = OpenTelemetry.tracer_provider.tracer('events')
tracer.start_span(
event_name,
kind: :producer,
attributes: {
'messaging.system': 'rabbitmq',
'messaging.destination': event_name,
'messaging.destination_kind': 'topic'
}.merge(metadata)
) do |span|
span.set_attribute('trace.id', context[:trace_id])
span.set_attribute('parent.id', context[:parent_span_id]) if context[:parent_span_id]
yield span if block_given?
end
end
end
To tie everything together, I implement a comprehensive event publisher:
# app/services/event_publisher.rb
class EventPublisher
class << self
def publish(event_name, payload, options = {})
EventTracer.with_new_context do |context|
event = build_event(event_name, payload, context, options)
# Track event publishing attempt
EventTracer.add_event_span(event_name, context, { version: event.version }) do |span|
begin
CircuitBreaker.run('event_publishing') do
publish_to_broker(event)
end
span.add_event('event.published')
rescue => e
span.record_exception(e)
span.set_status(OpenTelemetry::Trace::Status.error(e.message))
# Store for retry
StoreAndForwardRepository.store(event)
raise EventPublishingError, "Failed to publish: #{e.message}"
end
end
event.id
end
end
private
def build_event(name, payload, context, options)
Event.new(
id: SecureRandom.uuid,
name: name,
payload: payload,
version: options[:version] || '1.0',
trace_id: context[:trace_id],
span_id: context[:span_id],
timestamp: Time.current
)
end
def publish_to_broker(event)
serialized_event = EventSerializer.serialize(event)
RabbitMQ.exchange.publish(
serialized_event,
routing_key: event.name,
headers: {
'x-version': event.version,
'x-trace-id': event.trace_id,
'x-span-id': event.span_id,
'x-source': Rails.application.config.service_name
},
persistent: true
)
end
end
end
Saga Pattern for Distributed Transactions
When working across multiple services, traditional ACID transactions aren’t available. The Saga pattern provides an alternative approach.
Here’s a straightforward implementation:
# app/services/saga.rb
class Saga
attr_reader :name, :steps
def initialize(name)
@name = name
@steps = []
@current_step = 0
end
def add_step(action, compensation = nil)
@steps << { action: action, compensation: compensation }
self
end
def execute(context = {})
result = context.dup
@steps.each_with_index do |step, index|
@current_step = index
begin
result = step[:action].call(result)
rescue => e
rollback(result, index)
raise SagaFailedError.new("Failed at step #{index}: #{e.message}", index, e)
end
end
result
end
private
def rollback(context, failed_step_index)
@steps[0..failed_step_index].reverse.each_with_index do |step, index|
next unless step[:compensation]
begin
step[:compensation].call(context)
rescue => e
Rails.logger.error("Compensation failed: #{e.message}")
# Continue with other compensations despite failure
end
end
end
end
# Usage example
class OrderProcessingSaga
def execute(order_id)
saga = Saga.new("process_order_#{order_id}")
saga.add_step(
->(ctx) { validate_inventory(ctx) },
->(ctx) { release_inventory_reservation(ctx) }
).add_step(
->(ctx) { process_payment(ctx) },
->(ctx) { refund_payment(ctx) }
).add_step(
->(ctx) { update_order_status(ctx) }
)
saga.execute({ order_id: order_id })
end
private
def validate_inventory(context)
# Implementation
context.merge(inventory_validated: true)
end
# Other methods for saga steps...
end
Event-Driven Testing Strategies
Testing event-driven systems requires specialized approaches. I’ve found these techniques helpful:
# spec/support/event_testing.rb
module EventTesting
def capture_published_events
original_publisher = EventPublisher.method(:publish)
published_events = []
EventPublisher.define_singleton_method(:publish) do |event_name, payload, options = {}|
event = Event.new(
id: SecureRandom.uuid,
name: event_name,
payload: payload,
version: options[:version] || '1.0',
trace_id: SecureRandom.uuid,
span_id: SecureRandom.uuid,
timestamp: Time.current
)
published_events << event
event.id
end
yield
published_events
ensure
EventPublisher.define_singleton_method(:publish, original_publisher)
end
def expect_event_published(event_name, predicate = nil)
published_events = capture_published_events { yield }
matching_events = published_events.select { |e| e.name == event_name }
if predicate
matching_events = matching_events.select { |e| predicate.call(e.payload) }
end
expect(matching_events).not_to be_empty, "Expected event '#{event_name}' to be published, but it wasn't"
matching_events.first
end
end
RSpec.configure do |config|
config.include EventTesting
end
# Sample test
RSpec.describe OrdersController, type: :controller do
describe 'POST #create' do
it 'publishes an order.created event' do
params = { order: { items: [{ product_id: 1, quantity: 2 }] } }
expect_event_published('order.created', ->(payload) { payload[:items].size == 1 }) do
post :create, params: params
end
expect(response).to have_http_status(:accepted)
end
end
end
Scaling Up: Performance Optimization
As systems grow, performance optimization becomes critical. Here are techniques I’ve implemented for scaling Rails microservices:
# app/services/batch_event_consumer.rb
class BatchEventConsumer
def initialize(queue_name, batch_size: 100, wait_timeout: 5)
@queue_name = queue_name
@batch_size = batch_size
@wait_timeout = wait_timeout
@handlers = {}
end
def register_handler(event_name, handler)
@handlers[event_name] = handler
end
def start
channel = RabbitMQ.connection.create_channel
queue = channel.queue(@queue_name, durable: true)
loop do
messages = collect_batch(channel, queue)
process_batch(messages) unless messages.empty?
end
end
private
def collect_batch(channel, queue)
messages = []
delivery_tags = []
deadline = Time.current + @wait_timeout
while messages.size < @batch_size && Time.current < deadline
delivery_info, properties, payload = queue.pop(manual_ack: true)
break unless delivery_info
event = EventSerializer.deserialize(payload)
messages << event
delivery_tags << delivery_info.delivery_tag
end
[messages, delivery_tags, channel]
end
def process_batch(batch_data)
messages, delivery_tags, channel = batch_data
# Group messages by event type for batch processing
messages_by_type = messages.group_by(&:name)
begin
ActiveRecord::Base.transaction do
messages_by_type.each do |event_name, events|
handler = @handlers[event_name]
next unless handler
handler.process_batch(events)
end
end
# Acknowledge all messages in the batch
delivery_tags.each { |tag| channel.acknowledge(tag, false) }
rescue => e
# Reject all messages in the batch
delivery_tags.each { |tag| channel.reject(tag, true) }
Rails.logger.error("Batch processing failed: #{e.message}")
end
end
end
I’ve found implementing these ten techniques significantly improves the scalability and reliability of Ruby on Rails microservices. The event-driven approach provides natural resilience and flexibility as systems grow, while patterns like circuit breakers and dead letter queues handle inevitable failures gracefully.
The key is thinking in events rather than direct API calls. This mental model shift leads to better system design and easier scaling. By combining these techniques with Rails’ productivity benefits, we get the best of both worlds: rapid development and robust, scalable architecture.