Data pipelines are the quiet engines that move and shape information in modern applications. If you’ve ever wondered how data gets from a raw database table into a dashboard chart, or how user clicks become analytics, you’re thinking about pipelines. In my work with Ruby, I’ve found that a good pipeline isn’tt just about moving data from point A to point B. It’s about doing so reliably, understandably, and in a way that can handle both today’s ten records and tomorrow’s ten million.
Let’s look at some ways to build these systems. I’ll share patterns I use, with code to show how they work in practice. Think of these as blueprints you can adapt.
A common starting point is the stage-based pipeline. This approach breaks the work into clear, separate steps. It makes the whole process easier to think about, test, and fix. You extract data, then validate it, then transform it, and finally load it somewhere new. Each step is a focused unit.
# A simple stage-based pipeline for cleaning user data
class CleanUserPipeline
def initialize(source, destination)
@source = source
@destination = destination
@stages = []
end
def add_stage(stage)
@stages << stage
end
def run
@source.each_batch do |batch|
current_data = batch
@stages.each do |stage|
current_data = stage.process(current_data)
end
@destination.write(current_data)
end
puts "Pipeline finished."
end
end
# A stage that removes invalid emails
class EmailValidationStage
def process(records)
records.select do |user|
user[:email]&.include?('@')
end
end
end
# A stage that standardizes date formats
class DateFormatStage
def process(records)
records.map do |user|
if user[:signup_date].is_a?(String)
begin
user[:signup_date] = Date.parse(user[:signup_date]).iso8601
rescue ArgumentError
user[:signup_date] = nil
end
end
user
end
end
end
# Using the pipeline
pipeline = CleanUserPipeline.new(DatabaseSource.new('raw_users'), DataWarehouse.new('clean_users'))
pipeline.add_stage(EmailValidationStage.new)
pipeline.add_stage(DateFormatStage.new)
pipeline.run
This structure is powerful because you can change one part without breaking another. Need a new cleaning rule? Add a stage. The batch processing loop also protects you from running out of memory with large datasets by handling pieces at a time.
Sometimes, data doesn’t come in scheduled batches. It arrives as things happen—a user signs up, a payment is made, a sensor sends a reading. For this, an event-driven pattern works well. Instead of asking for data, you set up listeners that react when new data appears.
# A pipeline that reacts to new order events
class OrderEventPipeline
def initialize(message_queue)
@queue = message_queue
@handlers = {
'order.created' => [ChargeCardHandler.new, SendReceiptHandler.new],
'order.updated' => [UpdateInventoryHandler.new],
'order.cancelled' => [RefundHandler.new, RestockItemHandler.new]
}
end
def start_listening
Thread.new do
@queue.subscribe do |event|
puts "Processing event: #{event[:type]}"
handlers = @handlers[event[:type]] || []
handlers.each { |handler| handler.call(event) }
end
end
puts "Pipeline listener started."
end
end
# A sample handler
class ChargeCardHandler
def call(event)
order = event[:data]
# ... logic to charge a payment method ...
puts "Charged card for order #{order[:id]}"
end
end
# Simulate an event coming in
queue = MessageQueue.new('orders')
pipeline = OrderEventPipeline.new(queue)
pipeline.start_listening
# Later, something publishes an event
queue.publish(type: 'order.created', data: { id: 12345, amount: 99.99 })
The beauty here is loose coupling. The service that creates an order doesn’t need to know about inventory or receipts. It just says “an order happened.” The pipeline figures out what needs to happen next. This makes systems more resilient. If the receipt service is down, the order can still be created and charged; the receipt can be retried later.
When you need to analyze a continuous flow of data, like website clicks or server logs, stream processing is the answer. You look at data in windows of time—like the last five minutes—to calculate things like averages or counts.
# A processor that counts website clicks per minute
class ClickstreamProcessor
def initialize
@window_duration = 60 # seconds
@window_start = Time.now.to_i
@click_counts = Hash.new(0) # URL => count
end
def process_event(event)
url = event[:url]
event_time = event[:timestamp]
# Check if event is in the current 1-minute window
if event_time >= @window_start && event_time < @window_start + @window_duration
@click_counts[url] += 1
else
# Time to finalize the old window and start a new one
emit_results
@window_start = Time.now.to_i
@click_counts.clear
@click_counts[url] = 1 # Count this event in the new window
end
end
def emit_results
puts "Window results for #{Time.at(@window_start)}:"
@click_counts.each do |url, count|
puts " #{url}: #{count} clicks"
end
# In reality, you'd write this to a database or monitoring system
end
end
processor = ClickstreamProcessor.new
# Simulate incoming clicks
simulated_clicks = [
{ url: '/home', timestamp: Time.now.to_i - 30 },
{ url: '/home', timestamp: Time.now.to_i - 25 },
{ url: '/about', timestamp: Time.now.to_i - 10 },
{ url: '/home', timestamp: Time.now.to_i + 65 } # This will trigger a new window
]
simulated_clicks.each { |click| processor.process_event(click) }
This pattern lets you understand what’s happening right now. It’s useful for monitoring, alerting, or real-time dashboards. The code manages the sliding time window, making sure calculations are always current.
You can’t trust your data if you don’t check its quality. A data quality pipeline runs a set of checks against your information. Is every required field filled in? Do the dates make sense? Are the numbers in a reasonable range?
class DataQualityChecker
def initialize(rules)
@rules = rules
@failures = []
end
def check_dataset(data)
data.each_with_index do |record, index|
@rules.each do |rule|
unless rule.satisfied_by?(record)
@failures << { row: index, rule: rule.name, value: record[rule.field] }
end
end
end
generate_report
end
def generate_report
if @failures.empty?
puts "All checks passed."
return { score: 1.0, failures: [] }
end
total_checks = @rules.count * @failures.map { |f| f[:row] }.uniq.count
score = 1.0 - (@failures.count.to_f / total_checks)
puts "Quality Score: #{(score * 100).round(2)}%"
puts "Failures:"
@failures.first(5).each do |fail| # Show first 5 for brevity
puts " Row #{fail[:row]}: #{fail[:rule]} failed for value '#{fail[:value]}'"
end
{ score: score, failures: @failures }
end
end
# Define some rules
class NotNullRule
attr_reader :name, :field
def initialize(field)
@field = field
@name = "#{field} not null"
end
def satisfied_by?(record)
!record[field].nil?
end
end
class EmailFormatRule
attr_reader :name, :field
def initialize(field)
@field = field
@name = "#{field} valid email"
end
def satisfied_by?(record)
record[field] =~ /\A[^@\s]+@[^@\s]+\z/
end
end
# Use the checker
user_data = [
{ id: 1, name: 'Alice', email: '[email protected]' },
{ id: 2, name: 'Bob', email: 'bob' }, # Bad email
{ id: 3, name: nil, email: '[email protected]' } # Missing name
]
rules = [NotNullRule.new(:name), EmailFormatRule.new(:email)]
checker = DataQualityChecker.new(rules)
report = checker.check_dataset(user_data)
Running these checks regularly helps you catch problems before they mess up reports or machine learning models. You can set it to alert you if the quality score drops below a certain point.
Loading all your data every time is often wasteful. Incremental loading only fetches what has changed since the last run. You need a way to track what’s new or different, usually with timestamps or checksums.
class IncrementalLoader
def initialize(source_table, target_table)
@source = source_table
@target = target_table
@last_run_key = "last_run:#{source_table}"
end
def load
last_run_time = get_last_run_time
puts "Loading changes since #{last_run_time}"
# Find records updated after the last run
new_or_updated = fetch_records_updated_since(last_run_time)
if new_or_updated.any?
puts "Found #{new_or_updated.count} records to sync."
save_records(new_or_updated)
update_last_run_time
else
puts "No new changes."
end
end
private
def get_last_run_time
# Store this in Redis, a database, or a file
# For this example, we'll use a simple file
if File.exist?(@last_run_key)
Time.parse(File.read(@last_run_key))
else
Time.new(2000, 1, 1) # A date far in the past to get everything first time
end
end
def fetch_records_updated_since(time)
# This is a placeholder. In reality, you'd query your database.
# SELECT * FROM @source WHERE updated_at > time
[]
end
def save_records(records)
records.each do |record|
# Update if exists, insert if new
# This logic depends on your storage layer
end
end
def update_last_run_time
File.write(@last_run_key, Time.now.iso8601)
end
end
loader = IncrementalLoader.new('production.orders', 'warehouse.orders')
loader.load
This pattern saves time and computing resources. It’s crucial for syncing large tables where only a small percentage changes each hour or day. The key is reliably remembering when you last looked.
As pipelines get complex, you need to track where data came from and what happened to it. This is called data lineage. It’s like a version history for your data, important for debugging and compliance.
class LineageTracker
def initialize
@graph = {} # A simple hash to store relationships
end
def log_step(input_id, process, output_id)
@graph[output_id] = { source: input_id, process: process, time: Time.now }
puts "Logged: #{input_id} -> [#{process}] -> #{output_id}"
end
def trace_origin(data_id)
history = []
current_id = data_id
while @graph[current_id]
step = @graph[current_id]
history << step
current_id = step[:source]
end
puts "Origin trace for #{data_id}:"
history.reverse.each_with_index do |step, i|
puts " Step #{i+1}: #{step[:source]} transformed by '#{step[:process]}' at #{step[:time]}"
end
history
end
end
tracker = LineageTracker.new
# Simulate a pipeline run
tracker.log_step('raw_users.csv', 'clean_and_validate', 'clean_users_v1')
tracker.log_step('clean_users_v1', 'aggregate_by_region', 'users_by_region_report')
tracker.log_step('users_by_region_report', 'generate_chart', 'chart_123.png')
# Now find out how chart_123.png was made
tracker.trace_origin('chart_123.png')
When a number looks wrong, lineage helps you walk back through each step to find where the error was introduced. In regulated industries, this audit trail isn’t just helpful—it’s required.
Finally, when you have many pipelines that depend on each other, you need orchestration. Think of it as the conductor of the orchestra, making sure each pipeline runs in the right order and at the right time.
class SimpleOrchestrator
def initialize
@tasks = []
@dependencies = {}
end
def add_task(name, depends_on: [], &block)
@tasks << { name: name, job: block }
@dependencies[name] = depends_on
end
def run
# Determine execution order
executed = []
remaining = @tasks.dup
while remaining.any?
# Find a task whose dependencies are already met
task_to_run = remaining.find do |task|
(@dependencies[task[:name]] - executed.map { |t| t[:name] }).empty?
end
if task_to_run
puts "Starting task: #{task_to_run[:name]}"
task_to_run[:job].call
executed << task_to_run
remaining.delete(task_to_run)
puts "Finished task: #{task_to_run[:name]}"
else
puts "Error: Circular dependency detected!"
break
end
end
end
end
orchestrator = SimpleOrchestrator.new
orchestrator.add_task('extract_sales') do
puts " Extracting sales data..."
# ... extraction code ...
end
orchestrator.add_task('transform_sales', depends_on: ['extract_sales']) do
puts " Transforming sales data..."
# ... transformation code ...
end
orchestrator.add_task('load_sales', depends_on: ['transform_sales']) do
puts " Loading sales to warehouse..."
# ... load code ...
end
orchestrator.add_task('send_daily_report', depends_on: ['load_sales']) do
puts " Sending report email..."
# ... email code ...
end
puts "Running workflow..."
orchestrator.run
The orchestrator understands that you can’t transform data before extracting it, and you can’t load it before transforming it. It manages the workflow so you don’t have to remember the order manually. It can also handle retries, logging, and alerting if a step fails.
Each of these patterns solves a different part of the data pipeline problem. In my projects, I often combine them. A main orchestrated workflow might run a stage-based ETL pipeline, which uses incremental loading and has a data quality check built into one of its stages. All the while, a lineage tracker logs each step.
The right choice depends on your specific needs. Ask yourself: How much data arrives, and how fast? How fresh does the processed data need to be? How important is absolute correctness versus speed? The answers will guide you toward the patterns that fit.
Start simple. You might begin with a basic stage-based script. As needs grow—maybe data volume increases, or you need faster results—you can evolve it. Perhaps you add a message queue to make it event-driven. The patterns are tools, not mandates. Use them to build pipelines that are clear, reliable, and ready for what comes next.