When you build services in Ruby, you rarely get data in the exact shape you need. It comes from databases, APIs, user forms, or legacy systems, each with its own quirks. At first, you might write a few lines in a controller to tweak a hash. Then, another field needs adjusting. Soon, you have a tangled mess of logic that’s hard to change and terrifying to test.
I’ve found that as an application grows, this ad-hoc approach becomes a major source of bugs. The solution isn’t just writing better one-off transformations; it’s about establishing clear, repeatable patterns. These patterns are like blueprints. They give you a reliable way to structure your code, making it predictable, testable, and easy for other developers to understand.
Let’s start with a fundamental concept: the Pipeline. The idea is simple. Instead of having one massive method that does ten things to your data, you break it down into a series of small, focused steps. Each step is a single, clear transformation. You then connect them in a specific order.
Think of it like an assembly line. Raw data goes in at the start. The first worker (or step) normalizes keys. They pass it to the next worker, who converts dates. That worker passes it on to someone who calculates derived values, and so on. If you need to change how dates are parsed, you only touch that one worker. If you need to add a new step, like geocoding an address, you just slot a new worker into the line.
Here’s how that looks in code. I like to use plain lambdas or objects for the steps because they are so easy to test in isolation.
# Define each step as a clear, single-purpose operation
convert_to_symbols = ->(data) {
data.transform_keys { |key| key.to_s.underscore.to_sym }
}
format_dates = ->(data) {
data.each_with_object({}) do |(key, value), new_hash|
if key.to_s.end_with?('_at') && value.is_a?(String)
new_hash[key] = Time.parse(value)
else
new_hash[key] = value
end
end
}
calculate_full_name = ->(data) {
data.merge(
full_name: "#{data[:first_name]} #{data[:last_name]}".strip
)
}
# Assemble the pipeline
steps = [convert_to_symbols, format_dates, calculate_full_name]
pipeline = DataPipeline.new(steps)
# Process the data
raw_input = { 'FirstName' => 'Ada', 'last_name' => 'Lovelace', 'created_at' => '2023-12-25' }
clean_data = pipeline.process(raw_input)
# => {:first_name=>"Ada", :last_name=>"Lovelace", :created_at=>2023-12-25 00:00:00 +0000, :full_name=>"Ada Lovelace"}
The beauty here is in the simplicity. The DataPipeline class doesn’t need to know what the steps do. Its only job is to pass data from one to the next. I can unit-test format_dates without worrying about the other steps. In a test suite, I can have a “test pipeline” with mock steps to verify the flow works. This separation is incredibly powerful for maintenance.
However, pipelines can get messy when you have complex rules about which fields to map where, or what to do when data is missing. That’s where a Schema-based pattern shines. I use this when integrating with external APIs. You define a schema, a kind of contract, that says, “This is what I want my clean data to look like, and here’s exactly where to find and how to treat each piece of the messy input.”
It moves the declaration of your intent away from the procedural code. You’re not saying how to do it step-by-step; you’re describing the outcome.
# The schema is a configuration map. It's data, not logic.
user_profile_schema = {
id: { source: ['user', 'id'], type: Integer },
email: { source: ['user', 'contact_info', 'email'], type: String, transform: ->(e) { e.downcase.strip } },
age: {
source: ['user', 'birth_year'],
type: Integer,
transform: ->(birth_year) { Date.today.year - birth_year },
optional: true # Maybe not all APIs provide this
},
signup_date: {
source: ->(raw) { raw.dig('meta', 'dates', 'joined') }, # A proc for complex paths
type: String,
transform: ->(date_str) { Date.parse(date_str).iso8601 }
}
}
transformer = SchemaTransformer.new(user_profile_schema)
# Imagine this comes from some external service
external_api_response = {
'user' => {
'id' => 4567,
'contact_info' => { 'email' => ' [email protected] ' },
'birth_year' => 1990
},
'meta' => {
'dates' => { 'joined' => '2020-05-21' }
}
}
clean_result, validation_errors = transformer.transform(external_api_response)
puts clean_result
# => { :id=>4567, :email=>"[email protected]", :age=>34, :signup_date=>"2020-05-21" }
puts validation_errors
# => [] (an empty array, meaning no errors)
The SchemaTransformer does the heavy lifting: it reads the schema, fetches data from the nested source path (which can be a simple array of keys or a custom proc), validates the type, and applies any final formatting transform. If a required field is missing or the wrong type, it collects an error. You get back your clean data and a list of any problems. This makes it fantastic for validating incoming data before it touches your core models.
Sometimes, the transformation isn’t a one-way street. You often need to convert data to an external format (like for an API request) and then convert the response back to your internal format. Writing two separate methods that can get out of sync is a common bug factory. A Bidirectional Transformer keeps the rules for both directions locked together.
I think of it as a single contract with two faces: one facing inward, one facing outward.
# Define the two-way mapping in one place
user_format_mapping = BidirectionalMapping.new(
# How to go TO the external service
to_external: {
clientUserId: ->(internal) { internal[:id] },
contactEmail: ->(internal) { internal[:email] },
accountCreated: ->(internal) { internal[:created_at].iso8601(3) } # With milliseconds
},
# How to come BACK FROM the external service
from_external: {
id: ->(external) { external['clientUserId'].to_i },
email: ->(external) { external['contactEmail'] },
created_at: ->(external) { Time.parse(external['accountCreated']) }
}
)
# Use it
my_internal_data = { id: 789, email: '[email protected]', created_at: Time.utc(2024, 1, 15) }
# Convert to send to the API
payload_for_api = user_format_mapping.to_external(my_internal_data)
# => { :clientUserId=>789, :contactEmail=>"[email protected]", :accountCreated=>"2024-01-15T00:00:00.000Z" }
# Later, convert the API response back
api_response = { 'clientUserId' => '789', 'contactEmail' => '[email protected]', 'accountCreated' => '2024-01-20T10:30:00.000Z' }
internal_data = user_format_mapping.from_external(api_response)
# => { :id=>789, :email=>"[email protected]", :created_at=>2024-01-20 10:30:00 UTC }
By collocating these rules, you ensure parity. If the external API changes the clientUserId field to userId, you change it in one mapping object, and both the sending and receiving code are updated correctly. It’s a simple pattern that prevents a whole class of integration errors.
All the patterns so far assume you have all the data in memory. But what about a CSV file with ten million rows? Loading it all at once will crash your service. This is where Streaming Transformations become essential. You process the data piece by piece, in a constant, small memory footprint.
Ruby’s Enumerator and yield are your best friends here. You create a lazy chain of operations that pulls data through, one chunk or row at a time.
class StreamingJsonParser
def initialize(io_stream, batch_size: 1000)
@io = io_stream
@batch_size = batch_size
end
def each_transformed_row
# This returns an Enumerator, making it lazy
Enumerator.new do |yielder|
buffer = []
@io.each_line do |line|
# Parse a single line of JSON
parsed = JSON.parse(line.strip) rescue nil
next unless parsed
buffer << transform_row(parsed)
# Yield in batches for efficiency
if buffer.size >= @batch_size
yielder << buffer
buffer = []
end
end
# Yield any remaining rows in the buffer
yielder << buffer unless buffer.empty?
end
end
private
def transform_row(row)
# A simple, fast transformation for each row
{
id: row['ID'].to_i,
value: row['Value'].to_f,
tags: row['Tags'].to_s.split('|')
}
end
end
# Usage: Processing a huge file
File.open('gigantic_logs.jsonl') do |file|
parser = StreamingJsonParser.new(file)
parser.each_transformed_row do |batch_of_rows|
# Insert the batch into a database, push to a queue, etc.
# You only ever have `@batch_size` rows in memory.
DatabaseService.insert_batch(batch_of_rows)
end
end
This pattern is less about elegant code and more about survival. It allows your Ruby service to handle datasets of any size. The key is to never call .to_a or .map on the entire enumerator; always iterate with .each and process as you go.
Business logic has a habit of becoming complicated. “If the order is over $1000 and the customer is international, apply a tax check and flag for manual review, unless they are a premium member, in which case auto-approve but apply a different fee schedule.” Embedding these if/else trees inside your transformation code is a nightmare.
A Rule Engine pattern externalizes these conditions and actions. You define a set of rules—each with a condition and one or more actions—and let the engine figure out which ones apply.
class OrderProcessingEngine
def initialize
@rules = []
end
def add_rule(name, condition:, action:)
@rules << { name: name, condition: condition, action: action }
end
def process(order)
# Start with a base set of defaults or a copy
result = { flags: [], adjustments: [] }.merge(order)
# Apply all rules whose conditions are met
@rules.each do |rule|
if rule[:condition].call(result)
rule[:action].call(result)
end
end
result
end
end
# Configure the business rules
engine = OrderProcessingEngine.new
engine.add_rule(
'high_value_international',
condition: ->(o) { o[:amount] > 1000 && o[:country] != 'US' },
action: ->(o) { o[:flags] << :requires_manual_review; o[:tax_form] = 'W-8BEN' }
)
engine.add_rule(
'premium_customer_discount',
condition: ->(o) { o[:customer_tier] == :premium },
action: ->(o) { o[:adjustments] << { type: :discount, value: o[:amount] * 0.05 } }
)
engine.add_rule(
'fast_shipping_for_small_order',
condition: ->(o) { o[:amount] < 50 && o[:express_shipping_requested] },
action: ->(o) { o[:flags].delete(:requires_manual_review) } # Override a previous rule!
)
# Run the engine
order = { amount: 1200, country: 'UK', customer_tier: :premium, express_shipping_requested: false }
processed_order = engine.process(order)
# => {:flags=>[:requires_manual_review], :adjustments=>[{:type=>:discount, :value=>60.0}], ...}
Now, when the business team says, “Let’s also give discounts on Tuesdays,” you don’t dig through a 200-line method. You add a new, independent rule. Testing is straightforward: you test each rule in isolation and test the engine with a set of sample data. The logic is declarative and centralized.
The final pattern addresses a reality of long-lived systems: change. The format of your data today is not the format from two years ago, nor will it be the format you need next year. Building a Versioned Transformer lets you handle multiple formats simultaneously.
This is crucial for APIs (supporting v1 and v2 clients), for data migration scripts, or for reading old database records. You write a transformer for each version and a way to route data to the correct one.
class UserDataVersioner
TRANSFORMERS = {
# Version 1: Flat structure (2023)
'v1' => ->(data) {
{
user_id: data[:id],
user_email: data[:email],
signup_date: data[:created_at].strftime('%Y-%m-%d')
}
},
# Version 2: Nested structure (2024)
'v2' => ->(data) {
{
identity: {
id: data[:id],
contact: { email: data[:email] }
},
timeline: {
joined: data[:created_at].iso8601
}
}
}
}
def self.transform(data, to_version:, from_version: 'v2')
# First, ensure we have data in a common internal format (e.g., from_version)
# In reality, you'd also have transformers TO the internal format.
# For simplicity, let's assume `data` is already in the `from_version` format.
common_data = data
# Find the path to the target version.
# This could involve a direct transform or stepping through versions.
if TRANSFORMERS.key?(to_version)
TRANSFORMERS[to_version].call(common_data)
else
raise "Unsupported version: #{to_version}"
end
end
end
# Your current internal data format
current_data = { id: 1, email: '[email protected]', created_at: Time.utc(2022, 6, 10) }
# Serve an old API client that expects v1
api_response_v1 = UserDataVersioner.transform(current_data, to_version: 'v1')
# => {:user_id=>1, :user_email=>"[email protected]", :signup_date=>"2022-06-10"}
# Serve a new client that expects v2
api_response_v2 = UserDataVersioner.transform(current_data, to_version: 'v2')
# => {:identity=>{:id=>1, :contact=>{:email=>"[email protected]"}}, :timeline=>{:joined=>"2022-06-10T00:00:00Z"}}
In a real system, you’d have more complex version management—perhaps a fallback chain where v3 can use the v2 transformer if a v3-specific one isn’t defined. The core idea is to isolate the knowledge of each format into its own box, so your main application logic doesn’t fill up with if version == 'old' statements.
Choosing the right pattern depends entirely on your problem. For a simple, linear cleanup, a Pipeline is perfect. For integrating with an external service with a strict contract, use a Schema. For two-way communication, the Bidirectional pattern is a safe choice. Handle large files with Streaming. Manage complex, evolving business logic with a Rule Engine. And when you need to support the past, present, and future of your data shapes, build a Versioned Transformer.
The goal is never clever code. It’s clear, boring, and maintainable code. By applying these patterns, you move the complexity of data wrangling out of your core business logic and into structured, testable units. This leaves you free to focus on what your service actually needs to do, confident that the data flowing through it is in the right hands.