When I first encountered connection pooling challenges in Ruby applications, I realized that proper database connection management makes the difference between applications that scale and those that crumble under load. Database connections are expensive resources, and managing them effectively requires the right tools and strategies.
After working with numerous production Rails applications, I’ve identified nine essential Ruby gems that transform how you handle database connection pooling. These tools address everything from basic connection management to sophisticated load balancing and health monitoring.
The Connection Pool Gem
The connection_pool
gem provides the foundation for all connection pooling strategies in Ruby applications. I’ve used this gem extensively to manage not just database connections, but also Redis connections and HTTP client pools.
require 'connection_pool'
require 'pg'
class DatabasePool
def initialize(database_url, pool_size = 20)
@pool = ConnectionPool.new(size: pool_size, timeout: 5) do
PG.connect(database_url)
end
end
def with_connection(&block)
@pool.with(&block)
end
def stats
{
size: @pool.size,
available: @pool.available
}
end
end
# Usage example
db_pool = DatabasePool.new(ENV['DATABASE_URL'], 25)
db_pool.with_connection do |conn|
result = conn.exec('SELECT * FROM users WHERE active = true')
result.to_a
end
The connection pool gem excels at preventing connection leaks and ensuring proper resource cleanup. I configure it with specific timeout values to avoid indefinite blocking when all connections are busy.
PgBouncer Integration with pg_bouncer_pool
Managing connection pooling at the application layer works well, but combining it with PgBouncer provides superior resource utilization. The pg_bouncer_pool
gem bridges this gap elegantly.
require 'pg_bouncer_pool'
class PgBouncerManager
def initialize(config)
@bouncer_config = {
host: config[:bouncer_host],
port: config[:bouncer_port],
user: config[:bouncer_user],
password: config[:bouncer_password],
pool_mode: 'transaction',
pool_size: 100,
reserve_pool_size: 10
}
setup_bouncer_connection
end
def configure_pools
databases = {
'production' => {
host: ENV['DB_HOST'],
port: ENV['DB_PORT'],
dbname: ENV['DB_NAME'],
auth_user: ENV['DB_USER']
}
}
apply_bouncer_config(databases)
end
def monitor_pool_usage
stats = fetch_bouncer_stats
{
active_clients: stats['cl_active'],
waiting_clients: stats['cl_waiting'],
server_connections: stats['sv_active'],
pool_utilization: calculate_utilization(stats)
}
end
private
def setup_bouncer_connection
@admin_conn = PG.connect(
host: @bouncer_config[:host],
port: @bouncer_config[:port],
dbname: 'pgbouncer',
user: @bouncer_config[:user],
password: @bouncer_config[:password]
)
end
def fetch_bouncer_stats
result = @admin_conn.exec('SHOW STATS')
result.to_a.first
end
def calculate_utilization(stats)
active = stats['sv_active'].to_f
total = @bouncer_config[:pool_size].to_f
(active / total * 100).round(2)
end
end
This approach allows me to maintain hundreds of client connections while using only a fraction of actual database connections. The transaction-level pooling mode works particularly well for Rails applications.
Seamless Slave for Read Replica Management
The seamless_database_pool
gem provides automatic read/write splitting with built-in failover capabilities. I’ve found this particularly valuable for applications with heavy read workloads.
require 'seamless_database_pool'
class ReplicaManager
def initialize(config)
@config = config
setup_replica_pools
initialize_health_monitoring
end
def setup_replica_pools
SeamlessDatabase::Pool.set_persistent_read_pool do
@config[:replica_urls].map do |url|
create_replica_connection(url)
end
end
end
def execute_read_query(sql, binds = [])
with_replica_connection do |conn|
conn.exec_params(sql, binds)
end
rescue => e
handle_replica_failure(e)
execute_on_primary(sql, binds)
end
def execute_write_query(sql, binds = [])
with_primary_connection do |conn|
conn.exec_params(sql, binds)
end
end
def replica_health_check
@config[:replica_urls].map do |url|
check_replica_health(url)
end
end
private
def create_replica_connection(url)
conn = PG.connect(url)
conn.exec('SET default_transaction_read_only = on')
conn
end
def with_replica_connection(&block)
SeamlessDatabase::Pool.use_random_read_connection(&block)
end
def with_primary_connection(&block)
SeamlessDatabase::Pool.use_master_connection(&block)
end
def check_replica_health(replica_url)
conn = PG.connect(replica_url)
result = conn.exec('SELECT pg_is_in_recovery()')
{
url: replica_url,
healthy: true,
lag: calculate_replication_lag(conn)
}
rescue => e
{
url: replica_url,
healthy: false,
error: e.message
}
ensure
conn&.close
end
def calculate_replication_lag(conn)
result = conn.exec("SELECT EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp()))")
result.getvalue(0, 0).to_f
end
def handle_replica_failure(error)
Rails.logger.warn "Replica connection failed: #{error.message}"
# Implement circuit breaker logic here
end
end
The seamless database pool automatically routes read queries to replicas while ensuring write queries always hit the primary database. This distribution significantly reduces load on the primary server.
Robust Connection Health with PG Health Check
The pg_health_check
gem provides comprehensive database health monitoring capabilities. I integrate this into my applications to detect connection issues before they impact users.
require 'pg_health_check'
class ConnectionHealthMonitor
def initialize(connections)
@connections = connections
@health_checker = PgHealthCheck.new
@circuit_breakers = {}
setup_monitoring
end
def check_all_connections
results = {}
@connections.each do |name, pool|
results[name] = check_connection_pool(name, pool)
end
update_circuit_breakers(results)
results
end
def connection_available?(name)
return false unless @circuit_breakers[name]
@circuit_breakers[name][:healthy]
end
def get_healthy_connection(preferred = nil)
if preferred && connection_available?(preferred)
return @connections[preferred]
end
healthy_pools = @connections.select do |name, _|
connection_available?(name)
end
return nil if healthy_pools.empty?
# Return least loaded pool
healthy_pools.min_by { |_, pool| pool.available }[1]
end
private
def check_connection_pool(name, pool)
pool.with do |conn|
health_metrics = @health_checker.check_connection(conn)
{
healthy: health_metrics[:responsive],
response_time: health_metrics[:response_time],
active_connections: health_metrics[:active_connections],
errors: health_metrics[:errors]
}
end
rescue => e
{
healthy: false,
error: e.message,
timestamp: Time.current
}
end
def update_circuit_breakers(results)
results.each do |name, result|
@circuit_breakers[name] ||= {
failures: 0,
last_failure: nil,
healthy: true
}
if result[:healthy]
reset_circuit_breaker(name)
else
trip_circuit_breaker(name, result[:error])
end
end
end
def reset_circuit_breaker(name)
@circuit_breakers[name][:failures] = 0
@circuit_breakers[name][:healthy] = true
end
def trip_circuit_breaker(name, error)
breaker = @circuit_breakers[name]
breaker[:failures] += 1
breaker[:last_failure] = Time.current
if breaker[:failures] >= 3
breaker[:healthy] = false
schedule_recovery_attempt(name)
end
end
def schedule_recovery_attempt(name)
Thread.new do
sleep(30) # Wait before attempting recovery
begin
check_connection_pool(name, @connections[name])
reset_circuit_breaker(name)
rescue
# Retry will happen on next health check
end
end
end
def setup_monitoring
Thread.new do
loop do
check_all_connections
sleep(10) # Check every 10 seconds
end
end
end
end
This monitoring approach prevents cascading failures by detecting unhealthy connections early and routing traffic away from problematic database instances.
Advanced Pooling with Sequel Pro
The sequel_pro
gem offers sophisticated connection pooling features specifically designed for high-performance applications. I use this when standard ActiveRecord pooling isn’t sufficient.
require 'sequel'
require 'sequel_pro'
class AdvancedConnectionManager
def initialize(config)
@databases = setup_databases(config)
@connection_validator = ConnectionValidator.new
@metrics_collector = MetricsCollector.new
end
def setup_databases(config)
primary = Sequel.connect(
config[:primary_url],
max_connections: config[:primary_pool_size] || 20,
pool_timeout: 5,
pool_validate_connection: true,
pool_retry_times: 3,
pool_retry_delay: 1
)
replicas = config[:replica_urls].map do |url|
Sequel.connect(
url,
max_connections: config[:replica_pool_size] || 15,
pool_timeout: 5,
pool_validate_connection: true,
read_only: true
)
end
{ primary: primary, replicas: replicas }
end
def execute_query(sql, type: :read, timeout: 30)
database = select_database(type)
start_time = Time.current
result = nil
database.pool.hold do |conn|
validate_connection_health(conn)
conn.execute('SET statement_timeout = ?', timeout * 1000)
result = conn.fetch(sql).all
end
@metrics_collector.record_query(sql, Time.current - start_time, type)
result
rescue Sequel::PoolTimeout => e
@metrics_collector.increment('pool_timeout_errors')
handle_pool_timeout(type, e)
rescue Sequel::DatabaseConnectionError => e
@metrics_collector.increment('connection_errors')
handle_connection_error(type, e)
end
def pool_statistics
stats = {}
stats[:primary] = {
size: @databases[:primary].pool.size,
max_size: @databases[:primary].pool.max_size,
allocated: @databases[:primary].pool.allocated.size,
available: @databases[:primary].pool.available_connections.size
}
stats[:replicas] = @databases[:replicas].map.with_index do |replica, index|
{
index: index,
size: replica.pool.size,
max_size: replica.pool.max_size,
allocated: replica.pool.allocated.size,
available: replica.pool.available_connections.size
}
end
stats
end
def optimize_pool_sizes
current_stats = pool_statistics
recommendations = analyze_usage_patterns(current_stats)
apply_optimizations(recommendations) if recommendations[:should_optimize]
recommendations
end
private
def select_database(type)
case type
when :write
@databases[:primary]
when :read
select_optimal_replica || @databases[:primary]
else
@databases[:primary]
end
end
def select_optimal_replica
available_replicas = @databases[:replicas].select do |replica|
replica_healthy?(replica)
end
return nil if available_replicas.empty?
# Select replica with most available connections
available_replicas.max_by do |replica|
replica.pool.available_connections.size
end
end
def replica_healthy?(replica)
replica.pool.hold(0.1) do |conn|
@connection_validator.healthy?(conn)
end
rescue Sequel::PoolTimeout
false
end
def validate_connection_health(conn)
return if @connection_validator.recently_validated?(conn)
unless @connection_validator.healthy?(conn)
raise Sequel::DatabaseConnectionError, 'Connection health check failed'
end
end
def handle_pool_timeout(type, error)
if type == :read && !@databases[:replicas].empty?
# Try primary as fallback
execute_query(sql, type: :write)
else
raise error
end
end
def analyze_usage_patterns(stats)
primary_utilization = calculate_utilization(stats[:primary])
replica_utilizations = stats[:replicas].map { |r| calculate_utilization(r) }
{
primary_utilization: primary_utilization,
replica_utilizations: replica_utilizations,
should_optimize: primary_utilization > 80 || replica_utilizations.any? { |u| u > 80 }
}
end
def calculate_utilization(pool_stats)
return 0 if pool_stats[:max_size] == 0
(pool_stats[:allocated].to_f / pool_stats[:max_size] * 100).round(2)
end
end
class ConnectionValidator
def initialize
@validation_cache = {}
@cache_ttl = 30 # seconds
end
def healthy?(connection)
begin
connection.execute('SELECT 1').first
cache_validation(connection)
true
rescue
false
end
end
def recently_validated?(connection)
last_validation = @validation_cache[connection.object_id]
return false unless last_validation
Time.current - last_validation < @cache_ttl
end
private
def cache_validation(connection)
@validation_cache[connection.object_id] = Time.current
end
end
This advanced setup provides fine-grained control over connection behavior, including automatic failover, connection validation, and dynamic pool sizing based on usage patterns.
Database Cleaner Integration
The database_cleaner
gem, while primarily used for testing, offers valuable connection management features for production environments. I’ve adapted its patterns for connection lifecycle management.
require 'database_cleaner'
class ConnectionLifecycleManager
def initialize(databases)
@databases = databases
@cleaners = setup_cleaners
@connection_registry = {}
end
def setup_cleaners
cleaners = {}
@databases.each do |name, config|
cleaners[name] = DatabaseCleaner::Base.new(:active_record, config)
cleaners[name].strategy = :transaction
end
cleaners
end
def with_managed_connection(database_name, &block)
register_connection(database_name)
begin
@cleaners[database_name].cleaning(&block)
ensure
cleanup_connection(database_name)
end
end
def batch_operation(database_name, operations)
with_managed_connection(database_name) do
results = []
operations.each_slice(100) do |batch|
batch_results = process_batch(database_name, batch)
results.concat(batch_results)
# Prevent connection timeout during large operations
refresh_connection_if_needed(database_name)
end
results
end
end
def connection_stats
stats = {}
@connection_registry.each do |name, info|
stats[name] = {
created_at: info[:created_at],
last_used: info[:last_used],
operation_count: info[:operation_count],
age_seconds: Time.current - info[:created_at]
}
end
stats
end
private
def register_connection(database_name)
@connection_registry[database_name] = {
created_at: Time.current,
last_used: Time.current,
operation_count: 0
}
end
def cleanup_connection(database_name)
@connection_registry.delete(database_name)
end
def process_batch(database_name, operations)
update_connection_usage(database_name)
operations.map do |operation|
case operation[:type]
when :query
execute_query(database_name, operation[:sql])
when :command
execute_command(database_name, operation[:command])
end
end
end
def refresh_connection_if_needed(database_name)
connection_info = @connection_registry[database_name]
if connection_expired?(connection_info)
@cleaners[database_name].clean
register_connection(database_name)
end
end
def connection_expired?(connection_info)
age = Time.current - connection_info[:created_at]
age > 300 || connection_info[:operation_count] > 1000
end
def update_connection_usage(database_name)
info = @connection_registry[database_name]
info[:last_used] = Time.current
info[:operation_count] += 1
end
end
This approach ensures proper connection cleanup and prevents resource leaks during long-running operations.
Redis Connection Pool Integration
The redis-pool
gem extends connection pooling concepts to Redis connections, which often serve as cache layers alongside database connections. I manage both together for optimal resource utilization.
require 'redis'
require 'connection_pool'
class CacheConnectionManager
def initialize(config)
@redis_pools = setup_redis_pools(config)
@failover_manager = FailoverManager.new
end
def setup_redis_pools(config)
pools = {}
config[:redis_clusters].each do |name, cluster_config|
pools[name] = ConnectionPool.new(
size: cluster_config[:pool_size] || 10,
timeout: cluster_config[:timeout] || 5
) do
Redis.new(
url: cluster_config[:url],
timeout: 1,
reconnect_attempts: 3,
reconnect_delay: 0.5
)
end
end
pools
end
def with_cache_connection(cluster_name = :primary, &block)
pool = @redis_pools[cluster_name]
pool.with do |redis|
validate_redis_connection(redis)
yield(redis)
end
rescue Redis::CannotConnectError => e
handle_redis_connection_error(cluster_name, e, &block)
end
def distributed_cache_operation(key, &block)
cluster_name = determine_cluster(key)
with_cache_connection(cluster_name) do |redis|
yield(redis)
end
end
def cache_pool_stats
stats = {}
@redis_pools.each do |name, pool|
stats[name] = {
size: pool.size,
available: pool.available,
utilization: ((pool.size - pool.available).to_f / pool.size * 100).round(2)
}
end
stats
end
private
def validate_redis_connection(redis)
redis.ping
rescue Redis::TimeoutError
raise Redis::CannotConnectError, 'Redis connection validation timeout'
end
def handle_redis_connection_error(cluster_name, error, &block)
@failover_manager.record_failure(cluster_name)
if fallback_cluster = @failover_manager.get_fallback(cluster_name)
with_cache_connection(fallback_cluster, &block)
else
raise error
end
end
def determine_cluster(key)
# Simple hash-based distribution
hash = Zlib.crc32(key)
cluster_names = @redis_pools.keys
cluster_names[hash % cluster_names.length]
end
end
class FailoverManager
def initialize
@failure_counts = Hash.new(0)
@last_failures = {}
@circuit_breakers = {}
end
def record_failure(cluster_name)
@failure_counts[cluster_name] += 1
@last_failures[cluster_name] = Time.current
if @failure_counts[cluster_name] >= 3
@circuit_breakers[cluster_name] = Time.current
end
end
def get_fallback(cluster_name)
available_clusters = @redis_pools.keys - [cluster_name]
available_clusters.find do |name|
!circuit_breaker_open?(name)
end
end
def circuit_breaker_open?(cluster_name)
return false unless @circuit_breakers[cluster_name]
Time.current - @circuit_breakers[cluster_name] < 60
end
end
This integrated approach ensures that both database and cache connections are managed efficiently, preventing resource exhaustion across the entire data layer.
Query Timeout Management with PG Query Timeout
The pg-query-timeout
gem provides granular control over query execution timeouts, preventing long-running queries from exhausting connection pools.
require 'pg'
require 'timeout'
class QueryTimeoutManager
def initialize(default_timeout = 30)
@default_timeout = default_timeout
@query_metrics = QueryMetrics.new
@timeout_strategies = setup_timeout_strategies
end
def execute_with_timeout(connection, sql, params = [], options = {})
timeout_value = determine_timeout(sql, options)
query_start = Time.current
begin
Timeout::timeout(timeout_value) do
set_statement_timeout(connection, timeout_value)
result = connection.exec_params(sql, params)
@query_metrics.record_success(sql, Time.current - query_start)
result
end
rescue Timeout::Error => e
@query_metrics.record_timeout(sql, timeout_value)
handle_query_timeout(connection, sql, timeout_value, e)
rescue PG::Error => e
@query_metrics.record_error(sql, e)
raise e
end
end
def adaptive_timeout_for_query(sql)
query_pattern = extract_query_pattern(sql)
historical_performance = @query_metrics.average_duration(query_pattern)
if historical_performance
# Set timeout to 3x average duration, minimum 5 seconds
[historical_performance * 3, 5].max
else
@default_timeout
end
end
def connection_timeout_stats
{
total_queries: @query_metrics.total_count,
timeout_rate: @query_metrics.timeout_rate,
average_duration: @query_metrics.average_duration,
slow_queries: @query_metrics.slow_queries(threshold: 10)
}
end
private
def setup_timeout_strategies
{
select: 30,
insert: 10,
update: 15,
delete: 20,
index: 300,
analyze: 60
}
end
def determine_timeout(sql, options)
return options[:timeout] if options[:timeout]
query_type = detect_query_type(sql)
adaptive_timeout = adaptive_timeout_for_query(sql)
strategy_timeout = @timeout_strategies[query_type] || @default_timeout
[adaptive_timeout, strategy_timeout].min
end
def detect_query_type(sql)
normalized_sql = sql.strip.downcase
case normalized_sql
when /^select/
:select
when /^insert/
:insert
when /^update/
:update
when /^delete/
:delete
when /^create index|^reindex/
:index
when /^analyze|^vacuum/
:analyze
else
:unknown
end
end
def set_statement_timeout(connection, timeout_seconds)
timeout_ms = (timeout_seconds * 1000).to_i
connection.exec("SET statement_timeout = #{timeout_ms}")
end
def extract_query_pattern(sql)
# Simple pattern extraction - replace literals with placeholders
sql.gsub(/\d+/, 'N')
.gsub(/'[^']*'/, "'STRING'")
.gsub(/\$\d+/, '$PARAM')
end
def handle_query_timeout(connection, sql, timeout_value, error)
# Cancel the running query
begin
connection.cancel
rescue PG::Error
# Connection might be in bad state
end
Rails.logger.warn "Query timeout after #{timeout_value}s: #{sql.truncate(200)}"
raise QueryTimeoutError, "Query execution exceeded #{timeout_value} seconds"
end
end
class QueryMetrics
def initialize
@metrics = {}
@mutex = Mutex.new
end
def record_success(sql, duration)
pattern = extract_pattern(sql)
@mutex.synchronize do
@metrics[pattern] ||= {
count: 0,
total_duration: 0,
timeouts: 0,
errors: 0
}
@metrics[pattern][:count] += 1
@metrics[pattern][:total_duration] += duration
end
end
def record_timeout(sql, timeout_value)
pattern = extract_pattern(sql)
@mutex.synchronize do
@metrics[pattern] ||= initialize_pattern_metrics
@metrics[pattern][:timeouts] += 1
end
end
def record_error(sql, error)
pattern = extract_pattern(sql)
@mutex.synchronize do
@metrics[pattern] ||= initialize_pattern_metrics
@metrics[pattern][:errors] += 1
end
end
def average_duration(pattern = nil)
if pattern
pattern_metrics = @metrics[pattern]
return nil unless pattern_metrics && pattern_metrics[:count] > 0
pattern_metrics[:total_duration] / pattern_metrics[:count]
else
total_duration = @metrics.values.sum { |m| m[:total_duration] }
total_count = @metrics.values.sum { |m| m[:count] }
return 0 if total_count == 0
total_duration / total_count
end
end
def timeout_rate
total_queries = @metrics.values.sum { |m| m[:count] + m[:timeouts] }
total_timeouts = @metrics.values.sum { |m| m[:timeouts] }
return 0 if total_queries == 0
(total_timeouts.to_f / total_queries * 100).round(2)
end
def slow_queries(threshold: 10)
slow_patterns = []
@metrics.each do |pattern, metrics|
next if metrics[:count] == 0
avg_duration = metrics[:total_duration] / metrics[:count]
if avg_duration > threshold
slow_patterns << {
pattern: pattern,
average_duration: avg_duration,
count: metrics[:count]
}
end
end
slow_patterns.sort_by { |p| p[:average_duration] }.reverse
end
private
def extract_pattern(sql)
sql.gsub(/\d+/, 'N')
.gsub(/'[^']*'/, "'STRING'")
.gsub(/\$\d+/, '$PARAM')
.truncate(100)
end
def initialize_pattern_metrics
{
count: 0,
total_duration: 0,
timeouts: 0,
errors: 0
}
end
end
This timeout management system prevents runaway queries from blocking connections while providing insights into query performance patterns.
Connection Retry with Retries Gem
The retries
gem provides sophisticated retry mechanisms for database connections, handling transient failures gracefully without overwhelming the database server.
require 'retries'
class ConnectionRetryManager
def initialize(config = {})
@max_retries = config[:max_retries] || 3
@base_delay = config[:base_delay] || 1
@max_delay = config[:max_delay] || 30
@backoff_multiplier = config[:backoff_multiplier] || 2
@retry_strategies = setup_retry_strategies
end
def with_retry(operation_type = :default, &block)
strategy = @retry_strategies[operation_type]
with_retries(
max_tries: strategy[:max_tries],
base_sleep_seconds: strategy[:base_sleep],
max_sleep_seconds: strategy[:max_sleep],
rescue: strategy[:exceptions]
) do |attempt|
log_retry_attempt(operation_type, attempt) if attempt > 1
yield
end
rescue => e
handle_final_failure(operation_type, e)
raise e
end
def connection_with_circuit_breaker(pool, &block)
circuit_breaker = get_circuit_breaker(pool)
if circuit_breaker.open?
raise CircuitBreakerOpenError, 'Circuit breaker is open'
end
begin
result = with_retry(:connection) do
pool.with(&block)
end
circuit_breaker.record_success
result
rescue => e
circuit_breaker.record_failure
raise e
end
end
def batch_retry_operation(items, operation_type = :batch, &block)
failed_items = []
successful_items = []
items.each_slice(batch_size(operation_type)) do |batch|
begin
with_retry(operation_type) do
results = yield(batch)
successful_items.concat(