ruby

Building Event-Sourced Ruby Systems: Complete Guide with PostgreSQL and Command Patterns

Discover practical Ruby techniques for building event-sourced systems with audit trails and temporal analysis. Learn event stores, concurrency, and projections. Perfect for financial apps.

Building Event-Sourced Ruby Systems: Complete Guide with PostgreSQL and Command Patterns

Building Event-Sourced Systems in Ruby

Event sourcing captures state changes as immutable records, providing an audit trail and enabling temporal analysis. I’ve found this approach particularly valuable for financial systems where transaction history matters. Let me share practical Ruby techniques I’ve implemented across multiple projects.

Event Store Foundations
A reliable event store needs atomic writes and version control. Here’s how I handle stream persistence:

class PostgresEventStore
  def append_to_stream(stream_name, events, expected_version: nil)
    connection.transaction do
      current_version = get_stream_version(stream_name)
      raise ConcurrencyError unless expected_version == current_version
      
      events.each_with_index do |event, index|
        insert_event(
          stream_name: stream_name,
          event_type: event.class.name,
          data: event.to_h,
          metadata: { causation_id: SecureRandom.uuid },
          version: current_version + index + 1
        )
      end
    end
  end

  private

  def insert_event(stream_name:, event_type:, data:, metadata:, version:)
    sql = <<~SQL
      INSERT INTO events 
        (stream, type, data, metadata, version) 
      VALUES ($1, $2, $3, $4, $5)
    SQL
    connection.exec_params(sql, [stream_name, event_type, data.to_json, metadata.to_json, version])
  end
end

# Usage
store = PostgresEventStore.new
events = [InvoiceCreated.new(invoice_id: "INV-001", amount: 500)]
store.append_to_stream("Invoice-INV-001", events, expected_version: 0)

This pattern ensures sequential event ordering using database transactions. The expected_version check prevents concurrent modifications. I’ve used this with PostgreSQL and Redis backends - both work well when you enforce strict versioning.

Command Validation Pipelines
Before processing commands, I validate them through transformation pipelines:

class CreateOrderCommand
  include ActiveModel::Validations
  attr_accessor :user_id, :items, :total
  
  validates :user_id, presence: true
  validates :items, length: { minimum: 1 }
  validate :total_matches_items
  
  def initialize(params)
    @user_id = params[:user_id]
    @items = params[:items]
    @total = params[:total]
  end
  
  private
  
  def total_matches_items
    calculated = items.sum(&:price)
    errors.add(:total, "doesn't match items") if total != calculated
  end
end

class CommandHandler
  def handle(command)
    return Failure(:invalid_command) unless command.valid?
    
    aggregate = load_aggregate(command.aggregate_id)
    events = aggregate.process(command)
    
    event_store.append(aggregate.id, events, aggregate.version)
    Success(:created)
  rescue ConcurrencyError
    Retry.new(delay: 200.ms)
  end
end

I layer validations: basic checks at command level, business rules in aggregates. This separation keeps core domain logic clean while catching issues early.

Projection Rebuilding
For fast state reconstruction, I implement memory-efficient projections:

class AccountBalanceProjection
  def initialize
    @balances = {}
  end
  
  def apply(event)
    case event
    when FundsDeposited
      @balances[event.account_id] ||= 0
      @balances[event.account_id] += event.amount
    when FundsWithdrawn
      @balances[event.account_id] -= event.amount
    end
  end
  
  def current_balance(account_id)
    @balances.fetch(account_id, 0)
  end
end

# Rebuild from scratch
projection = AccountBalanceProjection.new
event_store.read_all_events.each { |e| projection.apply(e) }

# Partial rebuild since checkpoint
last_seq = projection.checkpoint
events = event_store.read_events_after(last_seq)
events.each { |e| projection.apply(e) }

I include checkpoint markers to resume from specific positions. For large datasets, I parallelize processing across event partitions.

Concurrency Management
Optimistic locking prevents race conditions:

class InventoryItem
  def initialize(events)
    @version = 0
    @stock = 0
    events.each { |e| apply(e) }
  end
  
  def restock(quantity)
    raise ArgumentError if quantity <= 0
    [ItemRestocked.new(quantity: quantity)]
  end
  
  def apply(event)
    case event
    when ItemRestocked
      @stock += event.quantity
      @version += 1
    end
  end
end

# Handler usage
def handle_restock(cmd)
  events = event_store.load_stream(cmd.sku)
  item = InventoryItem.new(events)
  
  new_events = item.restock(cmd.quantity)
  event_store.append(cmd.sku, new_events, item.version)
end

The version counter increments with each applied event. When persisting, we verify the aggregate’s version matches the stream’s head position.

Event Version Migration
Schemas evolve - here’s how I handle legacy formats:

class AddressUpdated
  def self.upcast(old_event)
    new(
      customer_id: old_event.user_id,
      address: {
        street: old_event.street,
        city: old_event.city,
        # Added in v2
        country: old_event.country || "USA"
      }
    )
  end
end

class EventStore
  def load_stream(stream_id)
    raw_events = adapter.fetch_events(stream_id)
    
    raw_events.map do |record|
      event_class = Object.const_get(record.type)
      data = record.data
      
      # Transform legacy events
      if record.schema_version < 2
        data = event_class.upcast(data)
      end
      
      event_class.new(data)
    end
  end
end

I attach schema versions during persistence. When loading, outdated events pass through transformation methods before instantiation.

Snapshot Optimization
For aggregates with long histories, I periodically snapshot state:

class AccountSnapshot
  def self.from_aggregate(account)
    new(
      account_id: account.id,
      balance: account.balance,
      version: account.version
    )
  end
end

class AccountRepository
  def load(account_id)
    snapshot = snapshot_store.latest_snapshot(account_id)
    events = event_store.load_stream(account_id, from_version: snapshot.version + 1)
    
    account = Account.new(snapshot.account_id)
    account.restore_snapshot(snapshot)
    account.apply_events(events)
    account
  end
  
  def save(account)
    events = account.unpublished_events
    event_store.append(account.id, events, account.version - events.size)
    
    if account.version % 50 == 0
      snapshot = AccountSnapshot.from_aggregate(account)
      snapshot_store.save(snapshot)
    end
  end
end

Snapshots reduce replay overhead significantly. I typically trigger them after every 50 events or during low-traffic periods.

Read Model Synchronization
I use pub/sub for updating query-optimized views:

class ProjectionSubscriber
  def initialize(event_store, projections)
    @event_store = event_store
    @projections = projections
  end
  
  def start
    @thread = Thread.new do
      last_position = 0
      loop do
        events = @event_store.read_after(last_position, batch_size: 100)
        break if events.empty?
        
        events.each do |event|
          @projections.each { |p| p.process(event) }
          last_position = event.global_position
        end
      end
    end
  end
end

# ElasticSearch projection example
class ProductCatalogProjection
  def process(event)
    case event
    when ProductAdded
      elasticsearch.index(
        index: 'products',
        id: event.product_id,
        body: { name: event.name, price: event.price }
      )
    when ProductPriceChanged
      elasticsearch.update(
        index: 'products',
        id: event.product_id,
        body: { doc: { price: event.new_price } }
      )
    end
  end
end

Separate subscriptions allow independent update cadences. I’ve achieved sub-second read model updates using this pattern.

These techniques form a robust foundation for event-sourced systems. The immutability of events provides natural audit trails, while replay capabilities enable powerful diagnostics. I recommend starting with core business domains where history matters - financial operations or inventory management are excellent candidates. The initial complexity pays dividends in traceability and flexibility as requirements evolve.

Keywords: event sourcing ruby, ruby event store, event sourcing patterns, ruby cqrs, event driven architecture ruby, ruby aggregate patterns, ruby command handler, event store implementation, ruby event sourcing tutorial, building event sourced systems, ruby domain driven design, event sourcing best practices, ruby event streaming, postgresql event store, ruby projection patterns, event sourcing concurrency, ruby event versioning, event sourcing snapshots, ruby read models, event sourcing performance, ruby event replay, command query responsibility segregation, ruby event bus, event sourcing migration, ruby aggregate root, event sourcing validation, ruby event store design, event sourcing testing, ruby event serialization, immutable events ruby, ruby event sourcing framework, event sourcing architecture, ruby financial systems, audit trail implementation, temporal data analysis, ruby transaction history, event sourcing optimization, ruby event processing, event sourcing scalability, ruby event handlers, distributed event sourcing, ruby event sourcing library, microservices event sourcing, ruby event store patterns, event sourcing infrastructure, ruby event sourcing examples



Similar Posts
Blog Image
7 Essential Ruby Gems for Clean, Secure Code: RuboCop, Brakeman & More Tools

7 essential Ruby gems for automated code quality: RuboCop, Brakeman, Reek & more. Keep your code clean, secure & fast with practical setup examples.

Blog Image
**Advanced Rails Rate Limiting: Complete Guide with Redis, Adaptive Algorithms, and Multi-Tier Protection**

Protect your Rails app with advanced rate limiting strategies. Learn sliding windows, token buckets, distributed limits, and adaptive controls using Redis to prevent system overload and ensure fair API usage.

Blog Image
Mastering Rails Active Storage: Simplify File Uploads and Boost Your Web App

Rails Active Storage simplifies file uploads, integrating cloud services like AWS S3. It offers easy setup, direct uploads, image variants, and metadata handling, streamlining file management in web applications.

Blog Image
6 Essential Ruby on Rails Internationalization Techniques for Global Apps

Discover 6 essential techniques for internationalizing Ruby on Rails apps. Learn to leverage Rails' I18n API, handle dynamic content, and create globally accessible web applications. #RubyOnRails #i18n

Blog Image
Is Ruby's Secret Weapon the Key to Bug-Free Coding?

Supercharging Your Ruby Code with Immutable Data Structures

Blog Image
How to Build Advanced Ruby on Rails API Rate Limiting Systems That Scale

Discover advanced Ruby on Rails API rate limiting patterns including token bucket algorithms, sliding windows, and distributed systems. Learn burst handling, quota management, and Redis implementation strategies for production APIs.