Let me walk you through a way to build Rails applications that can remember every single thing that ever happened to your data. It might sound complex, but the core idea is simple: instead of just saving the current state of a record, like an order total, you save every individual change that led to that total as a permanent, unchangeable fact.
Think of it like a bank statement. You don’t just see your current balance; you see a list of every deposit and withdrawal. This list is the source of truth. Your current balance is just a convenient view calculated from that list. This approach is called Event Sourcing. Combined with a related idea called Command Query Responsibility Segregation (CQRS), it helps you build systems that are robust, scalable, and give you a complete history of every action.
Let’s start with where we store these immutable facts, which we call events. We need an event store. I often use Redis for this because it’s fast and good at handling lists, but the concept works with any durable storage.
# A basic event store using Redis
class EventStore
def initialize(redis: Redis.current, namespace: 'es')
@redis = redis
@namespace = namespace
end
def append(stream_id, events, expected_version: nil)
stream_key = "#{@namespace}:stream:#{stream_id}"
@redis.multi do |transaction|
current_version = transaction.lindex(stream_key, -1)&.then { |json| JSON.parse(json)['version'] } || 0
# This prevents two people from changing the same thing at the same time
if expected_version && current_version != expected_version
raise "Version conflict. Expected #{expected_version}, found #{current_version}"
end
events.each_with_index do |event, index|
version = current_version + index + 1
event_data = event.merge(
event_id: SecureRandom.uuid,
stream_id: stream_id,
version: version,
timestamp: Time.current.iso8601
)
transaction.rpush(stream_key, event_data.to_json)
end
end
end
def read_stream(stream_id)
stream_key = "#{@namespace}:stream:#{stream_id}"
events_json = @redis.lrange(stream_key, 0, -1)
events_json.map do |json|
JSON.parse(json, symbolize_names: true)
end
end
end
This store saves events to a stream (a list tied to an ID, like Order:123). The expected_version check is crucial. It ensures that if I load an order at version 5 and try to save new events, no one else has saved version 6 in the meantime. It’s a simple way to manage concurrency.
Now, where do these events come from? They come from your core business objects, which we call Aggregates. An Aggregate is a cluster of objects (like an Order and its LineItems) that we treat as a single unit for changes. It protects its own business rules.
class Order
attr_reader :id, :version, :uncommitted_events
def initialize(id)
@id = id
@state = :draft
@items = []
@total = 0.0
@version = 0
@uncommitted_events = []
end
def create(customer_id, items)
# Business rule: Items are required
raise "Must have items" if items.empty?
event = {
type: 'order_created',
order_id: @id,
customer_id: customer_id,
items: items,
total: items.sum { |i| i[:price] * i[:quantity] }
}
apply_event(event)
end
def add_item(product_id, quantity, price)
# Business rule: Can't add items to a submitted order
raise "Order is already submitted" unless @state == :draft
event = {
type: 'item_added',
order_id: @id,
product_id: product_id,
quantity: quantity,
price: price
}
apply_event(event)
end
def submit
raise "Order is already submitted" unless @state == :draft
apply_event(type: 'order_submitted', order_id: @id)
end
private
def apply_event(event)
# First, update the object's internal state based on the event
case event[:type]
when 'order_created'
@state = :draft
@customer_id = event[:customer_id]
@items = event[:items]
@total = event[:total]
when 'item_added'
@items << { product_id: event[:product_id], quantity: event[:quantity], price: event[:price] }
@total += event[:quantity] * event[:price]
when 'order_submitted'
@state = :submitted
end
# Then, remember the event so we can save it later
@uncommitted_events << event
@version += 1
end
# This is the magic of Event Sourcing. We can rebuild state from scratch.
def load_from_history(events)
events.each do |event|
apply_event(event)
# Keep track of the version from the stored event
@version = event[:version] if event[:version]
end
end
end
The Aggregate is the boss. It decides if a command (like “add item”) is allowed. If it is, it produces an event (like item_added). The event is a fact: “Item X was added at this time.” The Aggregate’s state (its instance variables) is just a cache derived from applying all past events.
We need something to connect the command from a user to the Aggregate and then to the Event Store. That’s a Command Handler.
class CreateOrderHandler
def initialize(event_store)
@event_store = event_store
end
def handle(command)
# 1. Create a new aggregate instance
order = Order.new(command.order_id)
# 2. Let the aggregate process the command. It will produce events.
order.create(command.customer_id, command.items)
# 3. Save the new events to the stream.
@event_store.append(
command.order_id,
order.uncommitted_events,
expected_version: 0 # It's new, so we expect no prior version.
)
# 4. Return a result
{ success: true, order_id: command.order_id }
rescue => e
{ success: false, error: e.message }
end
end
# A simple command object
CreateOrder = Struct.new(:order_id, :customer_id, :items, keyword_init: true)
So far, we’ve covered the “write side.” We’ve captured intent (Commands), enforced rules (Aggregates), and stored facts (Events). This gives us a perfect audit log. But how do we show data to users? Loading an aggregate from hundreds of events every time is slow. This is where CQRS comes in.
CQRS says: use one model for writing (Commands) and a completely separate, optimized model for reading (Queries). They don’t have to share a database schema. The read model is built by listening to the events and updating its own specialized tables.
We build these read models with Projections. A projection is a listener that says, “When event X happens, I will update my specific table or view.”
# A projection that maintains a quick-to-query summary of orders
class OrderSummaryProjection
def handle_event(event)
case event[:type]
when 'order_created'
OrderSummary.create!(
id: event[:order_id],
customer_id: event[:customer_id],
status: 'draft',
total_amount: event[:total],
item_count: event[:items].size,
version: event[:version]
)
# We could also populate a separate items table
event[:items].each do |item|
OrderItem.create!(
order_id: event[:order_id],
product_id: item[:product_id],
quantity: item[:quantity],
price: item[:price]
)
end
when 'item_added'
summary = OrderSummary.find(event[:order_id])
summary.update!(
total_amount: summary.total_amount + (event[:quantity] * event[:price]),
item_count: summary.item_count + 1,
version: event[:version]
)
OrderItem.create!(
order_id: event[:order_id],
product_id: event[:product_id],
quantity: event[:quantity],
price: event[:price]
)
when 'order_submitted'
OrderSummary.where(id: event[:order_id]).update_all(status: 'submitted', version: event[:version])
end
end
end
# The ActiveRecord models for the read side
class OrderSummary < ApplicationRecord
# Has columns: id, customer_id, status, total_amount, item_count, version
end
class OrderItem < ApplicationRecord
# Has columns: order_id, product_id, quantity, price
end
Now, our read side is just simple, fast SQL queries with no complex business logic.
class OrdersController < ApplicationController
# Write side: Handle a command
def create
command = CreateOrder.new(
order_id: SecureRandom.uuid,
customer_id: current_user.id,
items: params[:items]
)
handler = CreateOrderHandler.new(EventStore.current)
result = handler.handle(command)
if result[:success]
render json: { id: result[:order_id] }, status: :created
else
render json: { error: result[:error] }, status: :unprocessable_entity
end
end
# Read side: Query the projection
def show
# This is now a simple, fast lookup on a denormalized table.
summary = OrderSummary.find(params[:id])
items = OrderItem.where(order_id: params[:id])
render json: {
summary: summary,
items: items
}
end
def index
# We can add any index we want for fast searching!
summaries = OrderSummary.where(customer_id: current_user.id).order(created_at: :desc)
render json: summaries
end
end
See the separation? The create action deals with commands, aggregates, and events. The show and index actions just query regular Rails models that are kept up-to-date by projections. The write side ensures correctness. The read side ensures speed.
How do we keep the projections updated? We need a publisher-subscriber system. When an event is saved, we notify all interested projections.
class EventPublisher
@@subscribers = []
def self.subscribe(subscriber, event_types: nil)
@@subscribers << { subscriber: subscriber, event_types: event_types }
end
def self.publish(event)
@@subscribers.each do |sub|
next if sub[:event_types] && !sub[:event_types].include?(event[:type])
# In a real app, do this in a background job!
sub[:subscriber].handle_event(event)
end
end
end
# We modify our EventStore to publish after appending
class EventStore
def append(stream_id, events, expected_version: nil)
# ... (same transaction logic as before) ...
@redis.multi do |transaction|
# ... save events ...
end
# After saving, tell everyone about the new events
events.each { |event| EventPublisher.publish(event) }
end
end
# During application startup, we register our projections
EventPublisher.subscribe(OrderSummaryProjection.new)
A powerful feature of this setup is event replay. If you discover a bug in your OrderSummaryProjection, you can fix the code, clear the order_summaries table, and replay every order_created and item_added event from the beginning of time to rebuild a correct read model. Your source of truth (the events) is immutable, so this is always possible.
class ProjectionRebuilder
def rebuild(projection_class)
projection = projection_class.new
# Clear the projection's data
projection.clear_data
# Fetch every stream ID from the event store
all_stream_ids = EventStore.current.all_streams
all_stream_ids.each do |stream_id|
events = EventStore.current.read_stream(stream_id)
events.each { |event| projection.handle_event(event) }
end
end
end
As your system grows, loading an aggregate from thousands of events becomes slow. We can optimize this with Snapshots. Periodically, we save the aggregate’s current state (its version and all its instance variables). Next time we load it, we start from the latest snapshot and only replay the events that happened after it.
class Snapshot
attr_accessor :aggregate_id, :aggregate_type, :version, :state
def self.take(aggregate)
new(
aggregate_id: aggregate.id,
aggregate_type: aggregate.class.name,
version: aggregate.version,
state: Marshal.dump(aggregate) # A simple way to save object state
)
end
end
class SnapshotRepository
SNAPSHOT_INTERVAL = 50 # Save a snapshot every 50 events
def load(aggregate_id, aggregate_class)
snapshot = SnapshotStore.load(aggregate_id)
if snapshot
# Start with the snapped-shot state
aggregate = Marshal.load(snapshot.state)
# Load only events that happened after the snapshot
events = EventStore.current.read_stream(aggregate_id, from_version: snapshot.version + 1)
aggregate.load_from_history(events)
else
# No snapshot, load all events
aggregate = aggregate_class.new(aggregate_id)
events = EventStore.current.read_stream(aggregate_id)
aggregate.load_from_history(events)
end
# Maybe take a snapshot if we've passed the interval
if aggregate.version % SNAPSHOT_INTERVAL == 0
SnapshotStore.save(Snapshot.take(aggregate))
end
aggregate
end
end
This is a lot to take in. Let me summarize the flow in plain steps:
- A user action (like “Add Item to Cart”) becomes a Command object.
- A Command Handler receives this command. It fetches the relevant Aggregate (e.g., the Order) from the Repository. The repository loads the Aggregate by reading its past Events from the Event Store.
- The Handler passes the command to the Aggregate. The Aggregate checks business rules (“Is the order still open?”). If the command is valid, the Aggregate produces a new Event (e.g.,
ItemAdded) and updates its own internal state. The event is added to a list of new, uncommitted events. - The Handler saves these new events to the Event Store.
- After saving, an Event Publisher announces the new event.
- Various Projections are listening. They update their own specialized database tables (the Read Model) based on the event. One projection might update an
order_summariestable. Another might update a table for customer analytics. - When a user wants to view data, a Query simply fetches it from the optimized Read Model tables. No business logic runs here.
What do you gain from this complexity?
- Complete History: You have a log of every single change.
- Auditability: You know who did what and when.
- Temporal Queries: You can ask, “What did this order look like last Tuesday?”
- Debugging: You can replay events to reproduce bugs.
- Read/Write Scalability: You can scale your read database separately from your write event store. The read model can be a completely different technology (like Elasticsearch for search).
- Business Intelligence: Your events are a rich source of data for analytics.
It’s not without costs. You introduce eventual consistency between the write and read models. A user might submit an order and not see it in their list for a few hundred milliseconds until the projection updates. You must handle that in your user interface. You also have more moving parts: event stores, projections, publishers.
I find this pattern is not for every part of every application. Use it for the core, complex domains where audit trails, history, and complex business rules are critical. For simple CRUD screens, traditional Active Record is much simpler and perfectly adequate.
Start small. Try modeling a single bounded context, like Ordering or Inventory, with event sourcing. Use a simple in-memory event publisher and a single projection. Get a feel for the flow. You’ll discover a powerful new way to think about the lifetime of your application’s data, where every change tells a story.