ruby

Building Event-Sourced Ruby Systems: Complete Guide with PostgreSQL and Command Patterns

Discover practical Ruby techniques for building event-sourced systems with audit trails and temporal analysis. Learn event stores, concurrency, and projections. Perfect for financial apps.

Building Event-Sourced Ruby Systems: Complete Guide with PostgreSQL and Command Patterns

Building Event-Sourced Systems in Ruby

Event sourcing captures state changes as immutable records, providing an audit trail and enabling temporal analysis. I’ve found this approach particularly valuable for financial systems where transaction history matters. Let me share practical Ruby techniques I’ve implemented across multiple projects.

Event Store Foundations
A reliable event store needs atomic writes and version control. Here’s how I handle stream persistence:

class PostgresEventStore
  def append_to_stream(stream_name, events, expected_version: nil)
    connection.transaction do
      current_version = get_stream_version(stream_name)
      raise ConcurrencyError unless expected_version == current_version
      
      events.each_with_index do |event, index|
        insert_event(
          stream_name: stream_name,
          event_type: event.class.name,
          data: event.to_h,
          metadata: { causation_id: SecureRandom.uuid },
          version: current_version + index + 1
        )
      end
    end
  end

  private

  def insert_event(stream_name:, event_type:, data:, metadata:, version:)
    sql = <<~SQL
      INSERT INTO events 
        (stream, type, data, metadata, version) 
      VALUES ($1, $2, $3, $4, $5)
    SQL
    connection.exec_params(sql, [stream_name, event_type, data.to_json, metadata.to_json, version])
  end
end

# Usage
store = PostgresEventStore.new
events = [InvoiceCreated.new(invoice_id: "INV-001", amount: 500)]
store.append_to_stream("Invoice-INV-001", events, expected_version: 0)

This pattern ensures sequential event ordering using database transactions. The expected_version check prevents concurrent modifications. I’ve used this with PostgreSQL and Redis backends - both work well when you enforce strict versioning.

Command Validation Pipelines
Before processing commands, I validate them through transformation pipelines:

class CreateOrderCommand
  include ActiveModel::Validations
  attr_accessor :user_id, :items, :total
  
  validates :user_id, presence: true
  validates :items, length: { minimum: 1 }
  validate :total_matches_items
  
  def initialize(params)
    @user_id = params[:user_id]
    @items = params[:items]
    @total = params[:total]
  end
  
  private
  
  def total_matches_items
    calculated = items.sum(&:price)
    errors.add(:total, "doesn't match items") if total != calculated
  end
end

class CommandHandler
  def handle(command)
    return Failure(:invalid_command) unless command.valid?
    
    aggregate = load_aggregate(command.aggregate_id)
    events = aggregate.process(command)
    
    event_store.append(aggregate.id, events, aggregate.version)
    Success(:created)
  rescue ConcurrencyError
    Retry.new(delay: 200.ms)
  end
end

I layer validations: basic checks at command level, business rules in aggregates. This separation keeps core domain logic clean while catching issues early.

Projection Rebuilding
For fast state reconstruction, I implement memory-efficient projections:

class AccountBalanceProjection
  def initialize
    @balances = {}
  end
  
  def apply(event)
    case event
    when FundsDeposited
      @balances[event.account_id] ||= 0
      @balances[event.account_id] += event.amount
    when FundsWithdrawn
      @balances[event.account_id] -= event.amount
    end
  end
  
  def current_balance(account_id)
    @balances.fetch(account_id, 0)
  end
end

# Rebuild from scratch
projection = AccountBalanceProjection.new
event_store.read_all_events.each { |e| projection.apply(e) }

# Partial rebuild since checkpoint
last_seq = projection.checkpoint
events = event_store.read_events_after(last_seq)
events.each { |e| projection.apply(e) }

I include checkpoint markers to resume from specific positions. For large datasets, I parallelize processing across event partitions.

Concurrency Management
Optimistic locking prevents race conditions:

class InventoryItem
  def initialize(events)
    @version = 0
    @stock = 0
    events.each { |e| apply(e) }
  end
  
  def restock(quantity)
    raise ArgumentError if quantity <= 0
    [ItemRestocked.new(quantity: quantity)]
  end
  
  def apply(event)
    case event
    when ItemRestocked
      @stock += event.quantity
      @version += 1
    end
  end
end

# Handler usage
def handle_restock(cmd)
  events = event_store.load_stream(cmd.sku)
  item = InventoryItem.new(events)
  
  new_events = item.restock(cmd.quantity)
  event_store.append(cmd.sku, new_events, item.version)
end

The version counter increments with each applied event. When persisting, we verify the aggregate’s version matches the stream’s head position.

Event Version Migration
Schemas evolve - here’s how I handle legacy formats:

class AddressUpdated
  def self.upcast(old_event)
    new(
      customer_id: old_event.user_id,
      address: {
        street: old_event.street,
        city: old_event.city,
        # Added in v2
        country: old_event.country || "USA"
      }
    )
  end
end

class EventStore
  def load_stream(stream_id)
    raw_events = adapter.fetch_events(stream_id)
    
    raw_events.map do |record|
      event_class = Object.const_get(record.type)
      data = record.data
      
      # Transform legacy events
      if record.schema_version < 2
        data = event_class.upcast(data)
      end
      
      event_class.new(data)
    end
  end
end

I attach schema versions during persistence. When loading, outdated events pass through transformation methods before instantiation.

Snapshot Optimization
For aggregates with long histories, I periodically snapshot state:

class AccountSnapshot
  def self.from_aggregate(account)
    new(
      account_id: account.id,
      balance: account.balance,
      version: account.version
    )
  end
end

class AccountRepository
  def load(account_id)
    snapshot = snapshot_store.latest_snapshot(account_id)
    events = event_store.load_stream(account_id, from_version: snapshot.version + 1)
    
    account = Account.new(snapshot.account_id)
    account.restore_snapshot(snapshot)
    account.apply_events(events)
    account
  end
  
  def save(account)
    events = account.unpublished_events
    event_store.append(account.id, events, account.version - events.size)
    
    if account.version % 50 == 0
      snapshot = AccountSnapshot.from_aggregate(account)
      snapshot_store.save(snapshot)
    end
  end
end

Snapshots reduce replay overhead significantly. I typically trigger them after every 50 events or during low-traffic periods.

Read Model Synchronization
I use pub/sub for updating query-optimized views:

class ProjectionSubscriber
  def initialize(event_store, projections)
    @event_store = event_store
    @projections = projections
  end
  
  def start
    @thread = Thread.new do
      last_position = 0
      loop do
        events = @event_store.read_after(last_position, batch_size: 100)
        break if events.empty?
        
        events.each do |event|
          @projections.each { |p| p.process(event) }
          last_position = event.global_position
        end
      end
    end
  end
end

# ElasticSearch projection example
class ProductCatalogProjection
  def process(event)
    case event
    when ProductAdded
      elasticsearch.index(
        index: 'products',
        id: event.product_id,
        body: { name: event.name, price: event.price }
      )
    when ProductPriceChanged
      elasticsearch.update(
        index: 'products',
        id: event.product_id,
        body: { doc: { price: event.new_price } }
      )
    end
  end
end

Separate subscriptions allow independent update cadences. I’ve achieved sub-second read model updates using this pattern.

These techniques form a robust foundation for event-sourced systems. The immutability of events provides natural audit trails, while replay capabilities enable powerful diagnostics. I recommend starting with core business domains where history matters - financial operations or inventory management are excellent candidates. The initial complexity pays dividends in traceability and flexibility as requirements evolve.

Keywords: event sourcing ruby, ruby event store, event sourcing patterns, ruby cqrs, event driven architecture ruby, ruby aggregate patterns, ruby command handler, event store implementation, ruby event sourcing tutorial, building event sourced systems, ruby domain driven design, event sourcing best practices, ruby event streaming, postgresql event store, ruby projection patterns, event sourcing concurrency, ruby event versioning, event sourcing snapshots, ruby read models, event sourcing performance, ruby event replay, command query responsibility segregation, ruby event bus, event sourcing migration, ruby aggregate root, event sourcing validation, ruby event store design, event sourcing testing, ruby event serialization, immutable events ruby, ruby event sourcing framework, event sourcing architecture, ruby financial systems, audit trail implementation, temporal data analysis, ruby transaction history, event sourcing optimization, ruby event processing, event sourcing scalability, ruby event handlers, distributed event sourcing, ruby event sourcing library, microservices event sourcing, ruby event store patterns, event sourcing infrastructure, ruby event sourcing examples



Similar Posts
Blog Image
Why Is RSpec the Secret Sauce to Rock-Solid Ruby Code?

Ensuring Rock-Solid Ruby Code with RSpec and Best Practices

Blog Image
Can You Crack the Secret Code of Ruby's Metaclasses?

Unlocking Ruby's Secrets: Metaclasses as Your Ultimate Power Tool

Blog Image
Are You Using Ruby's Enumerators to Their Full Potential?

Navigating Data Efficiently with Ruby’s Enumerator Class

Blog Image
# 9 Advanced Service Worker Techniques for Offline-Capable Rails Applications

Transform your Rails app into a powerful offline-capable PWA. Learn 9 advanced service worker techniques for caching assets, offline data management, and background syncing. Build reliable web apps that work anywhere, even without internet.

Blog Image
Are N+1 Queries Secretly Slowing Down Your Ruby on Rails App?

Bullets and Groceries: Mastering Ruby on Rails Performance with Precision

Blog Image
7 Powerful Ruby Meta-Programming Techniques: Boost Your Code Flexibility

Unlock Ruby's meta-programming power: Learn 7 key techniques to create flexible, dynamic code. Explore method creation, hooks, and DSLs. Boost your Ruby skills now!