In my work with Ruby on Rails applications, I’ve seen how critical background job processing becomes as systems scale. Sidekiq has been my go-to solution for handling asynchronous tasks, but moving beyond basic implementations requires thoughtful patterns. Over time, I’ve cultivated several strategies that ensure jobs run reliably, efficiently, and transparently in production environments.
Let me start with idempotent job execution. This concept ensures that running a job multiple times doesn’t cause unintended side effects. I once dealt with a payment system where duplicate transactions created serious accounting issues. The solution involved implementing distributed locks using Redis.
class PaymentProcessingJob
include Sidekiq::Worker
sidekiq_options retry: 5, queue: 'payments'
def perform(payment_id)
return if processed?(payment_id)
with_lock("payment:#{payment_id}") do
payment = Payment.find(payment_id)
process_payment(payment)
mark_processed(payment_id)
end
end
private
def with_lock(key, timeout: 300)
lock_key = "lock:#{key}"
if Sidekiq.redis { |conn| conn.set(lock_key, 1, nx: true, ex: timeout) }
begin
yield
ensure
Sidekiq.redis { |conn| conn.del(lock_key) }
end
else
raise "Lock acquisition failed for #{key}"
end
end
def processed?(payment_id)
Sidekiq.redis { |conn| conn.exists?("processed:payment:#{payment_id}") }
end
def mark_processed(payment_id)
Sidekiq.redis { |conn| conn.setex("processed:payment:#{payment_id}", 86400, 1) }
end
end
The lock prevents concurrent processing of the same payment. The processed marker persists for 24 hours to handle retry scenarios. This approach eliminated our duplicate payment problems completely.
Batch processing represents another essential pattern. When dealing with large datasets, splitting work into manageable chunks prevents memory issues and provides better visibility. I implemented this for a bulk email system that needed to handle millions of users.
class BulkEmailJob
include Sidekiq::Worker
sidekiq_options queue: 'bulk'
def perform(user_ids, batch_size: 1000)
total = user_ids.size
processed = 0
user_ids.each_slice(batch_size) do |batch|
batch.each do |user_id|
send_email_to_user(user_id)
processed += 1
update_progress(processed, total)
end
# Small delay between batches to prevent overwhelming systems
sleep(0.1) if batch_size > 100
end
end
private
def update_progress(current, total)
percentage = (current.to_f / total * 100).round(2)
Sidekiq.redis do |conn|
conn.setex("job:progress:#{jid}", 3600, percentage.to_s)
end
end
def send_email_to_user(user_id)
user = User.find(user_id)
UserMailer.weekly_newsletter(user).deliver_later
end
end
class JobProgressService
def self.get_progress(job_id)
progress = Sidekiq.redis { |conn| conn.get("job:progress:#{job_id}") }&.to_f
progress || 0.0
end
def self.cleanup_progress(job_id)
Sidekiq.redis { |conn| conn.del("job:progress:#{job_id}") }
end
end
The progress tracking allowed our customer support team to provide accurate estimates to users. The small delays between batches prevented our email service from being overwhelmed during peak loads.
External service integration requires careful handling. I learned this the hard way when an API outage caused our entire job system to backup. The circuit breaker pattern became our salvation.
class ExternalServiceJob
include Sidekiq::Worker
sidekiq_options retry: 3
def perform(data)
return if circuit_open?('external_api')
begin
response = ExternalApi.call(data)
record_success('external_api')
process_response(response)
rescue ExternalApi::Error => e
record_failure('external_api')
raise e
end
end
private
def circuit_open?(service_name)
failures = Sidekiq.redis { |conn| conn.get("circuit:#{service_name}:failures") }&.to_i || 0
failures >= 5
end
def record_failure(service_name)
key = "circuit:#{service_name}:failures"
Sidekiq.redis do |conn|
conn.incr(key)
conn.expire(key, 300) # Reset after 5 minutes
end
end
def record_success(service_name)
# Reset failure count on successful calls
Sidekiq.redis { |conn| conn.del("circuit:#{service_name}:failures") }
end
def process_response(response)
# Custom processing logic here
Rails.logger.info "Processed response: #{response}"
end
end
This pattern stopped the cascade of failures during external service degradation. The five-minute reset window gives enough time for services to recover while protecting our system.
Job priority management became crucial when we noticed critical jobs getting delayed behind less important tasks. Implementing multiple queues with different characteristics solved this.
class HighPriorityJob
include Sidekiq::Worker
sidekiq_options queue: 'critical', retry: 3
def perform(urgent_data)
# Immediate processing for time-sensitive operations
process_urgent_task(urgent_data)
end
end
class MediumPriorityJob
include Sidekiq::Worker
sidekiq_options queue: 'default', retry: 2
def perform(standard_data)
# Standard business logic processing
process_standard_task(standard_data)
end
end
class LowPriorityJob
include Sidekiq::Worker
sidekiq_options queue: 'low', retry: 1
def perform(background_data)
# Non-urgent background processing
process_background_task(background_data)
end
end
# Additional configuration in config/sidekiq.yml
:queues:
- critical
- default
- low
- mailers
# Separate process definitions for different queues
:processes:
- critical: 2
- default: 5
- low: 2
- mailers: 1
We allocated more workers to critical queues and fewer to low-priority ones. This ensured that payment processing and real-time notifications always received immediate attention.
Instrumentation transformed how we understood our job system’s behavior. Before adding comprehensive metrics, we were flying blind when performance issues arose.
class InstrumentedJob
include Sidekiq::Worker
def perform(*args)
start_time = Time.current
tags = { job_class: self.class.name, queue: queue_name }
Metrics.increment('sidekiq.job.started', tags: tags)
begin
result = execute_with_retry(*args)
duration = Time.current - start_time
Metrics.timing('sidekiq.job.duration', duration, tags: tags)
Metrics.increment('sidekiq.job.success', tags: tags)
result
rescue => error
error_tags = tags.merge(error: error.class.name)
Metrics.increment('sidekiq.job.failure', tags: error_tags)
raise error
end
end
private
def execute_with_retry(*args)
yield
rescue Net::ReadTimeout, Net::OpenTimeout => error
if @retry_count.to_i < 3
@retry_count = @retry_count.to_i + 1
sleep(2 ** @retry_count)
retry
else
raise error
end
end
end
# Custom metrics service implementation
class Metrics
def self.increment(metric, tags: {})
# Implementation using your preferred metrics system
StatsD.increment(metric, tags: tags)
end
def self.timing(metric, duration, tags: {})
StatsD.timing(metric, duration, tags: tags)
end
end
The custom retry logic for network timeouts significantly reduced failures from transient issues. The detailed tagging helped us identify exactly which job types and queues were causing performance bottlenecks.
Workflow management patterns emerged when we started dealing with complex multi-step processes. Order processing involved payments, inventory checks, and fulfillment in specific sequences.
class OrderWorkflow
def self.process_order(order_id)
# Initial job to coordinate the workflow
OrderProcessingJob.perform_async(order_id)
end
end
class OrderProcessingJob
include Sidekiq::Worker
def perform(order_id)
order = Order.find(order_id)
# Update workflow state immediately
order.update!(processing_started_at: Time.current)
# Execute dependent jobs based on order state
if order.requires_payment?
PaymentProcessingJob.perform_async(order_id)
elsif order.requires_fulfillment?
FulfillmentJob.perform_async(order_id)
end
# Final state update
order.update!(processing_completed_at: Time.current)
end
end
class PaymentProcessingJob
include Sidekiq::Worker
def perform(order_id)
order = Order.find(order_id)
# Process payment logic
process_payment(order)
# Trigger next step in workflow
if payment_successful?
FulfillmentJob.perform_async(order_id)
else
PaymentFailedJob.perform_async(order_id)
end
end
end
class WorkflowMonitor
def self.stalled_workflows
# Find orders stuck in processing for too long
Order.where('processing_started_at < ? AND processing_completed_at IS NULL', 1.hour.ago)
end
def self.cleanup_stalled_workflows
stalled_workflows.find_each do |order|
OrderRecoveryJob.perform_async(order.id)
end
end
end
The state tracking allowed us to monitor workflow progress and identify stuck processes. The recovery system automatically handled cases where jobs failed without completing the workflow.
Scheduled job patterns automate routine maintenance and reporting. I found that mixing cron-based scheduling with conditional execution within jobs provides flexibility.
class DailyReportJob
include Sidekiq::Worker
include Sidekiq::Cron::Job
sidekiq_options queue: 'reports'
# Configure in sidekiq-cron format
self.cron = '0 2 * * *' # 2 AM daily
def perform
generate_daily_reports
cleanup_old_data
send_report_notifications
end
private
def generate_daily_reports
ReportGenerator.new(Date.yesterday).generate_all
end
def cleanup_old_data
# Keep reports for 30 days only
OldReport.older_than(30.days.ago).delete_all
end
end
class RecurringMaintenanceJob
include Sidekiq::Worker
def perform
# Handle different maintenance tasks based on schedule
case schedule_type
when :daily
run_daily_maintenance
when :weekly
run_weekly_maintenance
when :monthly
run_monthly_maintenance
end
end
private
def schedule_type
today = Date.current
return :monthly if today.day == 1
return :weekly if today.monday?
:daily
end
def run_daily_maintenance
Database.cleanup_temp_tables
Cache.clear_expired
end
def run_weekly_maintenance
run_daily_maintenance
Database.optimize_tables
generate_weekly_analytics
end
def run_monthly_maintenance
run_weekly_maintenance
Archive.old_data
generate_monthly_reports
end
end
# Configuration in config/schedule.yml
daily_report:
cron: "0 2 * * *"
class: "DailyReportJob"
queue: reports
maintenance:
cron: "0 3 * * *" # 3 AM daily
class: "RecurringMaintenanceJob"
queue: maintenance
The conditional execution within the maintenance job eliminated the need for multiple separate scheduled jobs. The cleanup operations kept our system running smoothly without manual intervention.
Error handling and retry strategies deserve special attention. I developed a pattern that combines exponential backoff with conditional retry logic.
class RobustJob
include Sidekiq::Worker
sidekiq_options retry: 5, queue: 'default'
def perform(resource_id)
resource = Resource.find(resource_id)
with_retry_strategy do
process_resource(resource)
end
end
private
def with_retry_strategy
retries = 0
max_retries = 3
begin
yield
rescue NetworkError => e
if retries < max_retries
retries += 1
sleep(2 ** retries) # Exponential backoff
retry
else
handle_permanent_failure(e)
end
rescue BusinessLogicError => e
# Don't retry business logic errors
handle_business_error(e)
end
end
def handle_permanent_failure(error)
Rails.logger.error "Permanent failure: #{error.message}"
Metrics.increment('job.permanent_failure')
end
end
This approach differentiates between transient network issues and permanent business logic errors. The exponential backoff prevents overwhelming systems during outages.
Dead letter queues provide safety nets for problematic jobs. I implemented this after noticing some jobs would repeatedly fail and clog the retry queues.
class DeadLetterJob
include Sidekiq::Worker
def perform(failed_job_data)
# Store failed job for manual inspection
FailedJob.create!(
class_name: failed_job_data['class'],
arguments: failed_job_data['args'],
error_message: failed_job_data['error_message'],
failed_at: Time.current
)
# Optional: Attempt recovery for certain error types
attempt_recovery(failed_job_data) if recoverable?(failed_job_data)
end
private
def recoverable?(failed_job_data)
error = failed_job_data['error_message']
error.include?('Timeout') || error.include?('Network')
end
def attempt_recovery(failed_job_data)
# Custom recovery logic
RecoveryService.attempt(failed_job_data)
end
end
# Sidekiq configuration for dead letter queue
Sidekiq.configure_server do |config|
config.dead_letter_queues = %w[dead_letters]
config.dead_letter_max_jobs = 10_000
end
The dead letter queue captures failing jobs before they’re lost forever. The recovery attempts automatically handle transient issues without manual intervention.
Job composition patterns help manage complex operations. I often break large jobs into smaller, focused units that can be composed together.
class OrderCreationJob
include Sidekiq::Worker
def perform(order_params)
# Create the order first
order = Order.create!(order_params)
# Execute subsequent jobs
Sidekiq::Client.push_bulk(
'class' => [InventoryReservationJob, PaymentAuthorizationJob, NotificationJob],
'args' => [[order.id], [order.id], [order.id]]
)
end
end
class ChainedExecution
def self.execute_chain(jobs)
jobs.each do |job_class, job_args|
job_class.perform_async(*job_args)
end
end
end
# Usage
ChainedExecution.execute_chain(
[OrderCreationJob, [order_params]],
[InventoryCheckJob, [order_id]],
[ShippingCalculationJob, [order_id]]
)
This composition approach makes complex workflows easier to reason about and modify. Each job has a single responsibility, making testing and maintenance simpler.
Memory management becomes critical when processing large datasets. I developed patterns to process data in streams rather than loading everything into memory.
class LargeDatasetJob
include Sidekiq::Worker
sidekiq_options queue: 'large_data'
def perform(dataset_id)
dataset = LargeDataset.find(dataset_id)
# Process in batches to avoid memory issues
dataset.records.find_each(batch_size: 500) do |record|
process_record(record)
# Periodic garbage collection
GC.start if record.id % 1000 == 0
end
end
private
def process_record(record)
# Lightweight processing to minimize memory usage
RecordProcessor.new(record).process
end
end
class StreamingDataJob
include Sidekiq::Worker
def perform(stream_source)
# Process data as it streams rather than loading all at once
DataStream.new(stream_source).each_batch(1000) do |batch|
process_batch(batch)
end
end
end
The batch processing with periodic garbage collection prevented memory bloat during large operations. The streaming approach handles datasets that don’t fit in memory.
Monitoring and alerting patterns ensure we catch issues before they affect users. I built a comprehensive monitoring suite that tracks job health.
class JobHealthMonitor
def self.check_queue_health
Sidekiq::Queue.all.each do |queue|
if queue.size > warning_threshold(queue.name)
AlertService.queue_size_warning(queue.name, queue.size)
end
if queue.latency > max_latency(queue.name)
AlertService.queue_latency_warning(queue.name, queue.latency)
end
end
end
def self.warning_threshold(queue_name)
case queue_name
when 'critical' then 100
when 'default' then 1000
when 'low' then 5000
else 1000
end
end
def self.failed_job_analysis
# Analyze failure patterns across job types
failures_by_class = FailedJob.group(:class_name).count
failures_by_class.each do |job_class, count|
if count > failure_threshold(job_class)
AlertService.high_failure_rate(job_class, count)
end
end
end
end
# Regular health checks
Sidekiq::Cron::Job.create(
name: 'Queue Health Check - every 5 minutes',
cron: '*/5 * * * *',
class: 'QueueHealthJob'
)
class QueueHealthJob
include Sidekiq::Worker
def perform
JobHealthMonitor.check_queue_health
JobHealthMonitor.failed_job_analysis
end
end
The proactive monitoring catches queue backups and failure patterns early. The threshold-based alerts prevent notification fatigue while ensuring important issues get attention.
Testing strategies for Sidekiq jobs evolved significantly in my practice. I developed patterns that ensure job reliability without slowing down test suites.
# spec/jobs/payment_processing_job_spec.rb
describe PaymentProcessingJob do
let(:payment) { create(:payment) }
before do
Sidekiq::Testing.fake!
end
it 'processes payments idempotently' do
# First execution
described_class.perform_async(payment.id)
described_class.drain
# Second execution should not duplicate
expect {
described_class.perform_async(payment.id)
described_class.drain
}.not_to change { payment.reload.processed_count }
end
it 'acquires locks correctly' do
lock_key = "lock:payment:#{payment.id}"
described_class.perform_async(payment.id)
expect(Sidekiq.redis { |conn| conn.get(lock_key) }).not_to be_nil
end
end
# Support code for testing
module JobTestingHelpers
def with_sidekiq
Sidekiq::Testing.inline! do
yield
end
end
end
The fake testing mode prevents actual Redis operations during tests. The drain method ensures all jobs complete before assertions run.
Configuration management patterns help maintain consistency across environments. I use environment-specific configurations with sensible defaults.
# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
config.redis = {
url: ENV['REDIS_URL'] || 'redis://localhost:6379/0',
network_timeout: 5,
pool_timeout: 5
}
# Server middleware
config.server_middleware do |chain|
chain.add Sidekiq::Middleware::Server::RetryJobs, max_retries: 3
chain.add JobMetricsMiddleware
end
end
Sidekiq.configure_client do |config|
config.redis = {
url: ENV['REDIS_URL'] || 'redis://localhost:6379/0',
network_timeout: 5,
pool_timeout: 5
}
end
# Custom middleware for additional functionality
class JobMetricsMiddleware
def call(worker, job, queue)
start_time = Time.current
yield
duration = Time.current - start_time
Metrics.timing('sidekiq.job.duration', duration, tags: {class: worker.class.name})
end
end
The environment-based configuration ensures consistent behavior across development, staging, and production. Custom middleware extends functionality without modifying job code.
These patterns have served me well across numerous production systems. They provide the reliability, performance, and maintainability needed for robust background job processing. Each pattern addresses specific production challenges while maintaining simplicity and testability.
The key insight I’ve gained is that successful job processing requires thinking about failure modes from the beginning. Designing for reliability, monitoring comprehensively, and handling edge cases proactively makes the difference between a system that works and one that thrives under production loads.
Continuous refinement of these patterns based on actual usage data and performance metrics ensures they remain effective as systems evolve and grow in complexity.