Ruby on Rails applications frequently need to process data in real-time or near real-time. ActiveJob provides a solid foundation for background processing, but when dealing with high-volume data streams or complex processing requirements, additional tools can significantly enhance your application’s capabilities. I’ve worked with numerous Ruby gems that extend ActiveJob’s functionality, making it more suitable for real-time data processing tasks.
The Challenge of Real-Time Data Processing
Real-time data processing presents unique challenges that basic ActiveJob implementations may struggle with. These include handling high throughput, ensuring job reliability, managing resource consumption, and providing visibility into processing status. The gems I’ll discuss address these challenges directly.
When building systems that process data streams in real-time, we need tools that help us manage concurrency, provide back-pressure mechanisms, and handle failures gracefully. Let’s explore six gems that I’ve found particularly valuable for these scenarios.
1. Sidekiq Pro for Enhanced Job Processing
Sidekiq serves as the most popular ActiveJob adapter, but Sidekiq Pro takes real-time processing capabilities to the next level. Its reliability features make it particularly well-suited for data processing workloads.
The batch processing API allows you to group related jobs and track their collective progress:
batch = Sidekiq::Batch.new
batch.description = "Processing data stream #{stream_id}"
batch.on(:complete, StreamCallbacks, stream_id: stream_id)
batch.jobs do
data_chunks.each do |chunk|
StreamProcessorJob.perform_async(chunk.id)
end
end
For rate-limited data sources, Sidekiq Pro’s rate limiting is invaluable:
class ApiConsumerJob < ApplicationJob
sidekiq_options throttle: { threshold: 100, period: 1.minute }
def perform(record_id)
record = Record.find(record_id)
result = ApiClient.process(record.data)
record.update(processed_at: Time.current, result: result)
end
end
The reliability guarantees of Sidekiq Pro, including its unique job recovery mechanisms, make it a cornerstone for any serious real-time processing system.
2. Shoryuken for AWS SQS Integration
When processing data from AWS services, Shoryuken provides tight integration with SQS queues. This gem is particularly useful when building systems that consume data from AWS data sources like Kinesis or SQS.
Configuring Shoryuken for ActiveJob is straightforward:
# config/initializers/active_job.rb
Rails.application.config.active_job.queue_adapter = :shoryuken
# config/initializers/shoryuken.rb
Shoryuken.configure_server do |config|
config.sqs_client = Aws::SQS::Client.new(
region: 'us-east-1',
access_key_id: ENV['AWS_ACCESS_KEY_ID'],
secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']
)
config.options[:concurrency] = 25
config.options[:active_job] = true
config.options[:auto_visibility_timeout] = true
end
For more controlled processing, you can create worker classes:
class StreamProcessor
include Shoryuken::Worker
shoryuken_options queue: 'data_stream', auto_delete: true
def perform(sqs_msg, body)
data = JSON.parse(body)
# Process the data
DataProcessingService.new(data).process
rescue => e
# Handle errors
ErrorTracker.capture(e, extra: { body: body })
raise # Re-raise to trigger Shoryuken's retry mechanism
end
end
The auto_visibility_timeout feature is particularly valuable as it automatically extends the visibility timeout of a message while it’s being processed, preventing duplicate processing in case of long-running jobs.
3. Karafka for Kafka-Based Processing
When dealing with Kafka streams, Karafka is an excellent choice. It provides a comprehensive framework for consuming and processing Kafka topics with ActiveJob integration.
Setting up Karafka in a Rails application:
# karafka.rb
class KarafkaApp < Karafka::App
setup do |config|
config.client_id = 'my_app'
config.backend = :inline
config.batch_fetching = true
config.batch_consuming = true
config.kafka.seed_brokers = ['kafka://127.0.0.1:9092']
end
end
# Define a consumer for a topic
class DataStreamConsumer < Karafka::BaseConsumer
def consume
# Process messages in batches
messages = params_batch.map do |message|
{
id: message['id'],
payload: message['payload'],
timestamp: message['timestamp']
}
end
# Enqueue an ActiveJob to process this batch
DataProcessingJob.perform_later(messages)
end
end
# Register the consumer
Karafka.monitor.subscribe(DataStreamConsumer)
# Create a routing for the consumer
KarafkaApp.consumer_groups.draw do
consumer_group :stream_processors do
topic :data_stream do
consumer DataStreamConsumer
end
end
end
The ActiveJob integration:
class DataProcessingJob < ApplicationJob
queue_as :stream_processing
def perform(messages)
messages.each do |message|
process_message(message)
end
end
private
def process_message(message)
# Business logic for processing a single message
data = message[:payload]
# Transform and store the data
ProcessedRecord.create!(
external_id: message[:id],
processed_at: Time.current,
data: data
)
end
end
Karafka’s batching capabilities and offset management make it particularly suitable for high-volume Kafka streams.
4. Racecar for Simplified Kafka Consumer Management
Racecar complements Karafka by providing a simpler way to manage Kafka consumers. It offers excellent ActiveJob integration and simplifies the deployment and monitoring of Kafka consumers.
Configuring Racecar is straightforward:
# config/racecar.yml
development:
brokers:
- localhost:9092
client_id: my_application
offset_commit_interval: 5
max_wait_time: 1.0
group_id_prefix: my_app
production:
brokers: <%= ENV['KAFKA_BROKERS'].split(',') %>
client_id: <%= ENV['KAFKA_CLIENT_ID'] %>
group_id_prefix: <%= ENV['KAFKA_CONSUMER_GROUP_PREFIX'] %>
ssl_ca_cert: <%= ENV['KAFKA_SSL_CA_CERT'] %>
ssl_client_cert: <%= ENV['KAFKA_SSL_CLIENT_CERT'] %>
ssl_client_cert_key: <%= ENV['KAFKA_SSL_CLIENT_CERT_KEY'] %>
Creating a consumer for real-time processing:
class DataStreamConsumer < Racecar::Consumer
subscribes_to "data_stream", start_from_beginning: false
def process(message)
payload = JSON.parse(message.value)
# Enqueue an ActiveJob for processing
DataProcessingJob.perform_later(
id: payload["id"],
data: payload["data"],
metadata: {
partition: message.partition,
offset: message.offset,
key: message.key
}
)
rescue JSON::ParserError => e
# Handle message parsing errors
Rails.logger.error "Invalid message format: #{e.message}"
end
end
For batch processing:
class BatchDataConsumer < Racecar::Consumer
subscribes_to "high_volume_stream"
# Process messages in batches of up to 1000
def process_batch(batch)
payloads = batch.map do |message|
begin
JSON.parse(message.value)
rescue JSON::ParserError
nil
end
end.compact
# Process the batch with a single job
BatchProcessingJob.perform_later(payloads)
end
end
Racecar’s approach to consumer management makes it easier to deploy and monitor Kafka consumers in production environments.
5. GoodJob for Postgres-Based Processing
When building real-time processing systems with Rails and PostgreSQL, GoodJob offers a compelling alternative to Redis-based solutions like Sidekiq. It leverages your existing PostgreSQL database for job queuing and execution.
Setting up GoodJob:
# Gemfile
gem 'good_job'
# config/application.rb
config.active_job.queue_adapter = :good_job
# config/initializers/good_job.rb
GoodJob.configure do |config|
config.execution_mode = :async
config.max_threads = 5
config.poll_interval = 5
config.shutdown_timeout = 25
config.enable_cron = true
config.cron = {
cleanup_jobs: {
cron: '0 * * * *', # Hourly
class: 'GoodJob::CleanupCronJob'
},
stream_processor: {
cron: '* * * * *', # Every minute
class: 'StreamProcessorInitiatorJob'
}
}
end
Creating a job to handle stream processing:
class StreamProcessorJob < ApplicationJob
queue_as :stream_processing
retry_on Timeout::Error, wait: :exponentially_longer, attempts: 5
discard_on ActiveRecord::RecordNotFound
# Use GoodJob specific features
good_job_control_concurrency_with(
key: -> { "stream_process_#{arguments.first}" },
total_limit: 2 # Only allow 2 jobs for the same stream to run concurrently
)
def perform(stream_id, batch_size = 100)
stream = DataStream.find(stream_id)
records = stream.fetch_records(limit: batch_size)
unless records.empty?
process_records(records)
# Schedule the next batch
self.class.set(wait: 1.second).perform_later(stream_id, batch_size)
end
end
private
def process_records(records)
records.each do |record|
# Process each record
ProcessedData.create!(
source_id: record.id,
processed_at: Time.current,
content: transform_data(record.data)
)
end
end
def transform_data(data)
# Apply transformations
data.transform_keys(&:to_sym)
end
end
The concurrency control features in GoodJob are particularly useful for real-time processing scenarios where you need to manage resource consumption carefully.
6. Databand for Processing Observability
Monitoring real-time data processing jobs is crucial. Databand provides detailed visibility into your ActiveJob processing pipelines.
Setting up Databand:
# Gemfile
gem 'databand'
# config/initializers/databand.rb
Databand.configure do |config|
config.api_key = ENV['DATABAND_API_KEY']
config.app_name = 'MyDataProcessingApp'
config.environment = Rails.env
config.enabled = Rails.env.production?
# Track ActiveJob processing
config.instrument_active_job = true
# Add custom metadata to all jobs
config.job_metadata = ->(job) {
{
queue: job.queue_name,
custom_data: job.arguments.first.is_a?(Hash) ? job.arguments.first[:metadata] : nil
}
}
end
Enhancing a job with detailed metrics:
class StreamProcessingJob < ApplicationJob
queue_as :priority_stream
def perform(stream_id)
stream = Stream.find(stream_id)
Databand.run_task(name: "process_stream_#{stream_id}") do |task|
# Record the stream size as a metric
task.log_metric("stream_size", stream.record_count)
records = stream.fetch_unprocessed_records
task.log_metric("unprocessed_records", records.size)
start_time = Time.current
processed_count = 0
error_count = 0
records.each do |record|
begin
process_record(record)
processed_count += 1
rescue => e
error_count += 1
task.log_exception(e, context: { record_id: record.id })
end
end
# Log performance metrics
processing_time = Time.current - start_time
task.log_metric("processing_time_seconds", processing_time)
task.log_metric("records_per_second", processed_count / processing_time)
task.log_metric("error_rate", error_count.to_f / records.size)
# Add a summary
task.log_param("summary", {
processed: processed_count,
errors: error_count,
total_time: processing_time
})
end
end
private
def process_record(record)
# Processing logic
end
end
Databand provides not just error tracking but comprehensive observability into the performance and behavior of your data processing pipelines.
Building a Complete Real-Time Processing System
Combining these gems allows us to build sophisticated real-time data processing systems. Here’s an example of how these tools can work together:
# A complete real-time processing pipeline
# 1. Define a Kafka consumer with Racecar
class EventStreamConsumer < Racecar::Consumer
subscribes_to "event_stream"
def process_batch(batch)
events = batch.map do |message|
begin
payload = JSON.parse(message.value)
{
id: payload["id"],
type: payload["type"],
data: payload["data"],
metadata: {
partition: message.partition,
offset: message.offset,
timestamp: message.create_time
}
}
rescue => e
Rails.logger.error "Failed to parse message: #{e.message}"
nil
end
end.compact
# Group events by type for efficient processing
events_by_type = events.group_by { |e| e[:type] }
# Process each type with appropriate priority
events_by_type.each do |type, type_events|
EventProcessorJob.set(
queue: event_type_to_queue(type),
priority: event_type_priority(type)
).perform_later(type_events)
end
end
private
def event_type_to_queue(type)
case type
when "high_priority" then "critical"
when "notification" then "notifications"
else "default"
end
end
def event_type_priority(type)
case type
when "high_priority" then 1
when "notification" then 10
else 5
end
end
end
# 2. Define the processing job with Sidekiq Pro
class EventProcessorJob < ApplicationJob
include Sidekiq::Worker
sidekiq_options retry: 3, backtrace: true
# GoodJob concurrency control
good_job_control_concurrency_with(
key: -> { "event_type_#{arguments.first.first[:type]}" },
total_limit: 5
)
def perform(events)
return if events.empty?
# Track processing with Databand
Databand.run_task(name: "process_events_#{events.first[:type]}") do |task|
task.log_metric("batch_size", events.size)
processor = EventProcessor.for_type(events.first[:type])
# Process in smaller chunks to avoid timeout issues
events.in_groups_of(50, false).each do |chunk|
processor.process_batch(chunk)
end
task.log_metric("processed_count", events.size)
end
end
end
# 3. Implement the processor
class EventProcessor
def self.for_type(type)
case type
when "user_event" then UserEventProcessor.new
when "system_event" then SystemEventProcessor.new
else DefaultEventProcessor.new
end
end
class BaseProcessor
def process_batch(events)
events.each do |event|
process_single_event(event)
end
end
def process_single_event(event)
raise NotImplementedError
end
end
class UserEventProcessor < BaseProcessor
def process_single_event(event)
# User event processing logic
user = User.find_by(external_id: event[:data]["user_id"])
return unless user
# Update user stats
user.register_activity(
type: event[:data]["activity_type"],
timestamp: Time.zone.at(event[:metadata][:timestamp] / 1000.0),
data: event[:data]["activity_data"]
)
end
end
# Additional processor implementations...
end
This example demonstrates how to build a real-time processing system that handles high-volume event streams while maintaining reliability and observability.
Conclusion
Real-time data processing with ActiveJob becomes significantly more powerful when enhanced with specialized gems. Sidekiq Pro provides the backbone with its reliability features, while Shoryuken and Karafka/Racecar offer excellent integration with AWS services and Kafka respectively. GoodJob leverages PostgreSQL for those preferring to avoid Redis dependencies, and Databand ensures you have proper visibility into your processing pipelines.
By combining these tools thoughtfully, you can build robust, scalable real-time data processing systems that handle high volumes reliably while maintaining good observability. The key is understanding the specific requirements of your data processing workflows and selecting the right gems to address your particular challenges.
The sample code I’ve provided serves as a starting point, but each real-world implementation will need customization based on specific requirements. With these tools in your arsenal, you’re well-equipped to tackle complex real-time data processing challenges in your Ruby on Rails applications.