Building Event-Sourced Systems in Ruby
Event sourcing captures state changes as immutable records, providing an audit trail and enabling temporal analysis. I’ve found this approach particularly valuable for financial systems where transaction history matters. Let me share practical Ruby techniques I’ve implemented across multiple projects.
Event Store Foundations
A reliable event store needs atomic writes and version control. Here’s how I handle stream persistence:
class PostgresEventStore
def append_to_stream(stream_name, events, expected_version: nil)
connection.transaction do
current_version = get_stream_version(stream_name)
raise ConcurrencyError unless expected_version == current_version
events.each_with_index do |event, index|
insert_event(
stream_name: stream_name,
event_type: event.class.name,
data: event.to_h,
metadata: { causation_id: SecureRandom.uuid },
version: current_version + index + 1
)
end
end
end
private
def insert_event(stream_name:, event_type:, data:, metadata:, version:)
sql = <<~SQL
INSERT INTO events
(stream, type, data, metadata, version)
VALUES ($1, $2, $3, $4, $5)
SQL
connection.exec_params(sql, [stream_name, event_type, data.to_json, metadata.to_json, version])
end
end
# Usage
store = PostgresEventStore.new
events = [InvoiceCreated.new(invoice_id: "INV-001", amount: 500)]
store.append_to_stream("Invoice-INV-001", events, expected_version: 0)
This pattern ensures sequential event ordering using database transactions. The expected_version
check prevents concurrent modifications. I’ve used this with PostgreSQL and Redis backends - both work well when you enforce strict versioning.
Command Validation Pipelines
Before processing commands, I validate them through transformation pipelines:
class CreateOrderCommand
include ActiveModel::Validations
attr_accessor :user_id, :items, :total
validates :user_id, presence: true
validates :items, length: { minimum: 1 }
validate :total_matches_items
def initialize(params)
@user_id = params[:user_id]
@items = params[:items]
@total = params[:total]
end
private
def total_matches_items
calculated = items.sum(&:price)
errors.add(:total, "doesn't match items") if total != calculated
end
end
class CommandHandler
def handle(command)
return Failure(:invalid_command) unless command.valid?
aggregate = load_aggregate(command.aggregate_id)
events = aggregate.process(command)
event_store.append(aggregate.id, events, aggregate.version)
Success(:created)
rescue ConcurrencyError
Retry.new(delay: 200.ms)
end
end
I layer validations: basic checks at command level, business rules in aggregates. This separation keeps core domain logic clean while catching issues early.
Projection Rebuilding
For fast state reconstruction, I implement memory-efficient projections:
class AccountBalanceProjection
def initialize
@balances = {}
end
def apply(event)
case event
when FundsDeposited
@balances[event.account_id] ||= 0
@balances[event.account_id] += event.amount
when FundsWithdrawn
@balances[event.account_id] -= event.amount
end
end
def current_balance(account_id)
@balances.fetch(account_id, 0)
end
end
# Rebuild from scratch
projection = AccountBalanceProjection.new
event_store.read_all_events.each { |e| projection.apply(e) }
# Partial rebuild since checkpoint
last_seq = projection.checkpoint
events = event_store.read_events_after(last_seq)
events.each { |e| projection.apply(e) }
I include checkpoint markers to resume from specific positions. For large datasets, I parallelize processing across event partitions.
Concurrency Management
Optimistic locking prevents race conditions:
class InventoryItem
def initialize(events)
@version = 0
@stock = 0
events.each { |e| apply(e) }
end
def restock(quantity)
raise ArgumentError if quantity <= 0
[ItemRestocked.new(quantity: quantity)]
end
def apply(event)
case event
when ItemRestocked
@stock += event.quantity
@version += 1
end
end
end
# Handler usage
def handle_restock(cmd)
events = event_store.load_stream(cmd.sku)
item = InventoryItem.new(events)
new_events = item.restock(cmd.quantity)
event_store.append(cmd.sku, new_events, item.version)
end
The version counter increments with each applied event. When persisting, we verify the aggregate’s version matches the stream’s head position.
Event Version Migration
Schemas evolve - here’s how I handle legacy formats:
class AddressUpdated
def self.upcast(old_event)
new(
customer_id: old_event.user_id,
address: {
street: old_event.street,
city: old_event.city,
# Added in v2
country: old_event.country || "USA"
}
)
end
end
class EventStore
def load_stream(stream_id)
raw_events = adapter.fetch_events(stream_id)
raw_events.map do |record|
event_class = Object.const_get(record.type)
data = record.data
# Transform legacy events
if record.schema_version < 2
data = event_class.upcast(data)
end
event_class.new(data)
end
end
end
I attach schema versions during persistence. When loading, outdated events pass through transformation methods before instantiation.
Snapshot Optimization
For aggregates with long histories, I periodically snapshot state:
class AccountSnapshot
def self.from_aggregate(account)
new(
account_id: account.id,
balance: account.balance,
version: account.version
)
end
end
class AccountRepository
def load(account_id)
snapshot = snapshot_store.latest_snapshot(account_id)
events = event_store.load_stream(account_id, from_version: snapshot.version + 1)
account = Account.new(snapshot.account_id)
account.restore_snapshot(snapshot)
account.apply_events(events)
account
end
def save(account)
events = account.unpublished_events
event_store.append(account.id, events, account.version - events.size)
if account.version % 50 == 0
snapshot = AccountSnapshot.from_aggregate(account)
snapshot_store.save(snapshot)
end
end
end
Snapshots reduce replay overhead significantly. I typically trigger them after every 50 events or during low-traffic periods.
Read Model Synchronization
I use pub/sub for updating query-optimized views:
class ProjectionSubscriber
def initialize(event_store, projections)
@event_store = event_store
@projections = projections
end
def start
@thread = Thread.new do
last_position = 0
loop do
events = @event_store.read_after(last_position, batch_size: 100)
break if events.empty?
events.each do |event|
@projections.each { |p| p.process(event) }
last_position = event.global_position
end
end
end
end
end
# ElasticSearch projection example
class ProductCatalogProjection
def process(event)
case event
when ProductAdded
elasticsearch.index(
index: 'products',
id: event.product_id,
body: { name: event.name, price: event.price }
)
when ProductPriceChanged
elasticsearch.update(
index: 'products',
id: event.product_id,
body: { doc: { price: event.new_price } }
)
end
end
end
Separate subscriptions allow independent update cadences. I’ve achieved sub-second read model updates using this pattern.
These techniques form a robust foundation for event-sourced systems. The immutability of events provides natural audit trails, while replay capabilities enable powerful diagnostics. I recommend starting with core business domains where history matters - financial operations or inventory management are excellent candidates. The initial complexity pays dividends in traceability and flexibility as requirements evolve.