Building high-performance database engines requires careful attention to both algorithmic efficiency and systems-level optimization. Rust provides an excellent foundation for this work through its zero-cost abstractions, memory safety guarantees, and fine-grained control over system resources. I have spent considerable time implementing these patterns in production systems, and each technique addresses specific performance bottlenecks that commonly arise in database workloads.
Memory-Mapped Storage Management
Memory-mapped files offer direct access to disk pages without explicit read and write system calls. This approach reduces copy overhead and allows the operating system’s virtual memory manager to handle page replacement policies. When implementing a page manager, I focus on creating clean abstractions that hide the complexity of memory mapping while providing safe access patterns.
use memmap2::{MmapMut, MmapOptions};
use std::fs::OpenOptions;
struct PageManager {
file: std::fs::File,
mmap: MmapMut,
page_size: usize,
page_count: usize,
}
impl PageManager {
fn new(path: &str, page_size: usize, initial_pages: usize) -> std::io::Result<Self> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
let total_size = page_size * initial_pages;
file.set_len(total_size as u64)?;
let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
Ok(Self {
file,
mmap,
page_size,
page_count: initial_pages,
})
}
fn get_page(&self, page_id: usize) -> Option<&[u8]> {
if page_id >= self.page_count {
return None;
}
let start = page_id * self.page_size;
let end = start + self.page_size;
Some(&self.mmap[start..end])
}
fn get_page_mut(&mut self, page_id: usize) -> Option<&mut [u8]> {
if page_id >= self.page_count {
return None;
}
let start = page_id * self.page_size;
let end = start + self.page_size;
Some(&mut self.mmap[start..end])
}
fn expand_storage(&mut self, additional_pages: usize) -> std::io::Result<()> {
let new_page_count = self.page_count + additional_pages;
let new_size = new_page_count * self.page_size;
self.file.set_len(new_size as u64)?;
// Remap the file with new size
self.mmap = unsafe { MmapOptions::new().map_mut(&self.file)? };
self.page_count = new_page_count;
Ok(())
}
}
The key insight here is maintaining page boundaries and providing bounds checking while allowing zero-copy access to page data. When working with memory-mapped storage, I always ensure proper error handling for file operations and consider the implications of page faults on query performance.
Lock-Free Buffer Pool Implementation
Buffer pools manage the in-memory cache of database pages and represent one of the most performance-critical components in any database system. Traditional implementations rely heavily on mutexes, creating contention hotspots. I prefer lock-free designs that use atomic operations and careful memory ordering to achieve high concurrency.
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use dashmap::DashMap;
struct BufferFrame {
page_id: AtomicUsize,
data: Vec<u8>,
dirty: AtomicBool,
pin_count: AtomicUsize,
access_time: AtomicUsize,
}
struct BufferPool {
frames: Vec<BufferFrame>,
clock_hand: AtomicUsize,
frame_table: DashMap<usize, usize>,
page_manager: Arc<PageManager>,
}
impl BufferPool {
fn new(pool_size: usize, page_size: usize, page_manager: Arc<PageManager>) -> Self {
let frames = (0..pool_size)
.map(|_| BufferFrame {
page_id: AtomicUsize::new(usize::MAX),
data: vec![0; page_size],
dirty: AtomicBool::new(false),
pin_count: AtomicUsize::new(0),
access_time: AtomicUsize::new(0),
})
.collect();
Self {
frames,
clock_hand: AtomicUsize::new(0),
frame_table: DashMap::new(),
page_manager,
}
}
fn get_page(&self, page_id: usize) -> Option<BufferGuard> {
if let Some(frame_idx) = self.frame_table.get(&page_id) {
let frame = &self.frames[*frame_idx];
frame.pin_count.fetch_add(1, Ordering::SeqCst);
frame.access_time.store(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as usize,
Ordering::Relaxed
);
return Some(BufferGuard::new(frame, *frame_idx, &self.frame_table));
}
let victim_idx = self.find_victim_frame()?;
self.load_page_into_frame(page_id, victim_idx)
}
fn find_victim_frame(&self) -> Option<usize> {
for _ in 0..self.frames.len() * 2 {
let pos = self.clock_hand.fetch_add(1, Ordering::SeqCst) % self.frames.len();
let frame = &self.frames[pos];
if frame.pin_count.load(Ordering::SeqCst) == 0 {
if let Ok(old_page_id) = frame.page_id.compare_exchange(
frame.page_id.load(Ordering::SeqCst),
usize::MAX,
Ordering::SeqCst,
Ordering::Relaxed
) {
if old_page_id != usize::MAX {
self.frame_table.remove(&old_page_id);
}
return Some(pos);
}
}
}
None
}
fn load_page_into_frame(&self, page_id: usize, frame_idx: usize) -> Option<BufferGuard> {
let frame = &self.frames[frame_idx];
if let Some(page_data) = self.page_manager.get_page(page_id) {
unsafe {
std::ptr::copy_nonoverlapping(
page_data.as_ptr(),
frame.data.as_ptr() as *mut u8,
page_data.len()
);
}
}
frame.page_id.store(page_id, Ordering::SeqCst);
frame.pin_count.store(1, Ordering::SeqCst);
frame.dirty.store(false, Ordering::SeqCst);
self.frame_table.insert(page_id, frame_idx);
Some(BufferGuard::new(frame, frame_idx, &self.frame_table))
}
}
struct BufferGuard<'a> {
frame: &'a BufferFrame,
frame_idx: usize,
frame_table: &'a DashMap<usize, usize>,
}
impl<'a> BufferGuard<'a> {
fn new(frame: &'a BufferFrame, frame_idx: usize, frame_table: &'a DashMap<usize, usize>) -> Self {
Self { frame, frame_idx, frame_table }
}
fn data(&self) -> &[u8] {
&self.frame.data
}
fn mark_dirty(&self) {
self.frame.dirty.store(true, Ordering::SeqCst);
}
}
impl<'a> Drop for BufferGuard<'a> {
fn drop(&mut self) {
self.frame.pin_count.fetch_sub(1, Ordering::SeqCst);
}
}
This implementation uses a clock algorithm for page replacement combined with atomic reference counting to avoid locks in the common case. The RAII pattern through BufferGuard ensures proper cleanup and prevents pages from being evicted while in use.
B+ Tree Index Implementation
B+ trees remain the most widely used indexing structure in database systems due to their excellent performance characteristics for both point queries and range scans. My implementation focuses on cache-friendly node layouts and efficient split operations.
const BTREE_ORDER: usize = 64;
struct BTreeNode {
keys: Vec<u64>,
values: Vec<u64>,
children: Vec<Box<BTreeNode>>,
is_leaf: bool,
next_leaf: Option<Box<BTreeNode>>,
}
impl BTreeNode {
fn new(is_leaf: bool) -> Self {
Self {
keys: Vec::with_capacity(BTREE_ORDER),
values: Vec::with_capacity(BTREE_ORDER),
children: Vec::with_capacity(BTREE_ORDER + 1),
is_leaf,
next_leaf: None,
}
}
fn search(&self, key: u64) -> Option<u64> {
match self.keys.binary_search(&key) {
Ok(idx) => {
if self.is_leaf {
Some(self.values[idx])
} else {
self.children[idx + 1].search(key)
}
}
Err(idx) => {
if self.is_leaf {
None
} else {
if idx < self.children.len() {
self.children[idx].search(key)
} else {
None
}
}
}
}
}
fn range_scan(&self, start_key: u64, end_key: u64) -> Vec<(u64, u64)> {
let mut results = Vec::new();
if self.is_leaf {
for (i, &key) in self.keys.iter().enumerate() {
if key >= start_key && key <= end_key {
results.push((key, self.values[i]));
}
}
if let Some(ref next) = self.next_leaf {
if self.keys.last().map_or(false, |&k| k < end_key) {
results.extend(next.range_scan(start_key, end_key));
}
}
} else {
let start_idx = self.keys.binary_search(&start_key).unwrap_or_else(|e| e);
for i in start_idx..self.children.len() {
results.extend(self.children[i].range_scan(start_key, end_key));
if i < self.keys.len() && self.keys[i] >= end_key {
break;
}
}
}
results
}
fn insert(&mut self, key: u64, value: u64) -> Option<(u64, Box<BTreeNode>)> {
if self.is_leaf {
match self.keys.binary_search(&key) {
Ok(idx) => {
self.values[idx] = value;
None
}
Err(idx) => {
self.keys.insert(idx, key);
self.values.insert(idx, value);
if self.keys.len() > BTREE_ORDER {
Some(self.split_leaf())
} else {
None
}
}
}
} else {
let idx = self.keys.binary_search(&key).unwrap_or_else(|e| e);
if let Some((split_key, new_node)) = self.children[idx].insert(key, value) {
self.keys.insert(idx, split_key);
self.children.insert(idx + 1, new_node);
if self.keys.len() > BTREE_ORDER {
Some(self.split_internal())
} else {
None
}
} else {
None
}
}
}
fn split_leaf(&mut self) -> (u64, Box<BTreeNode>) {
let mid = self.keys.len() / 2;
let mut new_node = BTreeNode::new(true);
new_node.keys = self.keys.split_off(mid);
new_node.values = self.values.split_off(mid);
new_node.next_leaf = self.next_leaf.take();
self.next_leaf = Some(Box::new(BTreeNode::new(true)));
let split_key = new_node.keys[0];
(split_key, Box::new(new_node))
}
fn split_internal(&mut self) -> (u64, Box<BTreeNode>) {
let mid = self.keys.len() / 2;
let mut new_node = BTreeNode::new(false);
let split_key = self.keys[mid];
new_node.keys = self.keys.split_off(mid + 1);
new_node.children = self.children.split_off(mid + 1);
self.keys.pop();
(split_key, Box::new(new_node))
}
}
I emphasize maintaining the linked list structure between leaf nodes to support efficient range queries. The split operations carefully preserve tree invariants while minimizing data movement.
Write-Ahead Logging System
Write-ahead logging ensures durability and enables crash recovery by recording all changes before they are applied to the main database. My implementation focuses on minimizing I/O overhead while maintaining strict ordering guarantees.
use std::io::{Write, Seek, SeekFrom, Read};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
struct WriteAheadLog {
log_file: Mutex<std::fs::File>,
current_lsn: AtomicU64,
buffer: Mutex<Vec<u8>>,
page_size: usize,
}
#[derive(Debug, Clone)]
struct LogRecord {
lsn: u64,
transaction_id: u64,
record_type: LogRecordType,
page_id: u64,
offset: u32,
old_data: Vec<u8>,
new_data: Vec<u8>,
}
#[derive(Debug, Clone)]
enum LogRecordType {
Insert = 1,
Update = 2,
Delete = 3,
Commit = 4,
Abort = 5,
Checkpoint = 6,
}
impl WriteAheadLog {
fn new(log_path: &str, page_size: usize) -> std::io::Result<Self> {
let log_file = Mutex::new(
OpenOptions::new()
.create(true)
.append(true)
.read(true)
.open(log_path)?
);
Ok(Self {
log_file,
current_lsn: AtomicU64::new(1),
buffer: Mutex::new(Vec::with_capacity(8192)),
page_size,
})
}
fn append_record(&self, mut record: LogRecord) -> std::io::Result<u64> {
let lsn = self.current_lsn.fetch_add(1, Ordering::SeqCst);
record.lsn = lsn;
let mut buffer = self.buffer.lock().unwrap();
buffer.clear();
self.serialize_record(&record, &mut buffer);
let checksum = crc32fast::hash(&buffer);
buffer.extend_from_slice(&checksum.to_le_bytes());
let mut file = self.log_file.lock().unwrap();
file.write_all(&buffer)?;
file.sync_all()?;
Ok(lsn)
}
fn serialize_record(&self, record: &LogRecord, buffer: &mut Vec<u8>) {
buffer.extend_from_slice(&record.lsn.to_le_bytes());
buffer.extend_from_slice(&record.transaction_id.to_le_bytes());
buffer.push(record.record_type.clone() as u8);
buffer.extend_from_slice(&record.page_id.to_le_bytes());
buffer.extend_from_slice(&record.offset.to_le_bytes());
buffer.extend_from_slice(&(record.old_data.len() as u32).to_le_bytes());
buffer.extend_from_slice(&record.old_data);
buffer.extend_from_slice(&(record.new_data.len() as u32).to_le_bytes());
buffer.extend_from_slice(&record.new_data);
}
fn replay_from_lsn(&self, start_lsn: u64) -> std::io::Result<Vec<LogRecord>> {
let mut file = self.log_file.lock().unwrap();
file.seek(SeekFrom::Start(0))?;
let mut records = Vec::new();
let mut buffer = vec![0u8; self.page_size];
loop {
match file.read(&mut buffer[..8]) {
Ok(0) => break,
Ok(8) => {
let lsn = u64::from_le_bytes([
buffer[0], buffer[1], buffer[2], buffer[3],
buffer[4], buffer[5], buffer[6], buffer[7]
]);
if lsn >= start_lsn {
if let Ok(record) = self.read_complete_record(&mut file, lsn) {
records.push(record);
}
} else {
self.skip_record(&mut file)?;
}
}
_ => break,
}
}
Ok(records)
}
fn read_complete_record(&self, file: &mut std::fs::File, lsn: u64) -> std::io::Result<LogRecord> {
let mut header = [0u8; 21];
file.read_exact(&mut header)?;
let transaction_id = u64::from_le_bytes([
header[0], header[1], header[2], header[3],
header[4], header[5], header[6], header[7]
]);
let record_type = match header[8] {
1 => LogRecordType::Insert,
2 => LogRecordType::Update,
3 => LogRecordType::Delete,
4 => LogRecordType::Commit,
5 => LogRecordType::Abort,
6 => LogRecordType::Checkpoint,
_ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "Invalid record type")),
};
let page_id = u64::from_le_bytes([
header[9], header[10], header[11], header[12],
header[13], header[14], header[15], header[16]
]);
let offset = u32::from_le_bytes([
header[17], header[18], header[19], header[20]
]);
let mut len_buf = [0u8; 4];
file.read_exact(&mut len_buf)?;
let old_data_len = u32::from_le_bytes(len_buf) as usize;
let mut old_data = vec![0u8; old_data_len];
file.read_exact(&mut old_data)?;
file.read_exact(&mut len_buf)?;
let new_data_len = u32::from_le_bytes(len_buf) as usize;
let mut new_data = vec![0u8; new_data_len];
file.read_exact(&mut new_data)?;
let mut checksum_buf = [0u8; 4];
file.read_exact(&mut checksum_buf)?;
Ok(LogRecord {
lsn,
transaction_id,
record_type,
page_id,
offset,
old_data,
new_data,
})
}
}
The log format includes checksums for integrity verification and structured records that enable both redo and undo operations during recovery. I always ensure that log writes are flushed to disk before acknowledging transaction commits.
Multi-Version Concurrency Control
MVCC allows multiple transactions to access the same data concurrently without blocking by maintaining multiple versions of each data item. This approach maximizes concurrency while providing strong isolation guarantees.
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use dashmap::{DashMap, DashSet};
struct VersionChain {
versions: Vec<Version>,
latest_version: AtomicUsize,
}
struct Version {
data: Vec<u8>,
transaction_id: u64,
commit_timestamp: AtomicU64,
is_deleted: bool,
next_version: Option<usize>,
}
struct Transaction {
id: u64,
start_timestamp: u64,
read_set: DashSet<Vec<u8>>,
write_set: DashMap<Vec<u8>, Vec<u8>>,
status: AtomicU8,
}
#[derive(Clone, Copy)]
enum TransactionStatus {
Active = 0,
Committed = 1,
Aborted = 2,
}
struct TransactionManager {
active_transactions: DashMap<u64, Transaction>,
next_transaction_id: AtomicU64,
global_timestamp: AtomicU64,
version_chains: DashMap<Vec<u8>, VersionChain>,
}
impl TransactionManager {
fn new() -> Self {
Self {
active_transactions: DashMap::new(),
next_transaction_id: AtomicU64::new(1),
global_timestamp: AtomicU64::new(1),
version_chains: DashMap::new(),
}
}
fn begin_transaction(&self) -> u64 {
let txn_id = self.next_transaction_id.fetch_add(1, Ordering::SeqCst);
let timestamp = self.global_timestamp.load(Ordering::SeqCst);
let transaction = Transaction {
id: txn_id,
start_timestamp: timestamp,
read_set: DashSet::new(),
write_set: DashMap::new(),
status: AtomicU8::new(TransactionStatus::Active as u8),
};
self.active_transactions.insert(txn_id, transaction);
txn_id
}
fn read(&self, txn_id: u64, key: &[u8]) -> Option<Vec<u8>> {
let transaction = self.active_transactions.get(&txn_id)?;
if let Some(value) = transaction.write_set.get(key) {
return Some(value.clone());
}
let version_chain = self.version_chains.get(key)?;
for version in &version_chain.versions {
let commit_ts = version.commit_timestamp.load(Ordering::SeqCst);
if commit_ts > 0 && commit_ts <= transaction.start_timestamp {
transaction.read_set.insert(key.to_vec());
if version.is_deleted {
return None;
} else {
return Some(version.data.clone());
}
}
}
None
}
fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> bool {
if let Some(transaction) = self.active_transactions.get(&txn_id) {
transaction.write_set.insert(key, value);
true
} else {
false
}
}
fn commit(&self, txn_id: u64) -> bool {
let transaction = self.active_transactions.get(&txn_id).unwrap();
for read_key in transaction.read_set.iter() {
if self.has_conflicting_write(read_key, transaction.start_timestamp) {
transaction.status.store(TransactionStatus::Aborted as u8, Ordering::SeqCst);
self.active_transactions.remove(&txn_id);
return false;
}
}
let commit_timestamp = self.global_timestamp.fetch_add(1, Ordering::SeqCst);
for entry in transaction.write_set.iter() {
self.install_version(
entry.key().clone(),
entry.value().clone(),
txn_id,
commit_timestamp
);
}
transaction.status.store(TransactionStatus::Committed as u8, Ordering::SeqCst);
self.active_transactions.remove(&txn_id);
true
}
fn install_version(&self, key: Vec<u8>, data: Vec<u8>, txn_id: u64, commit_ts: u64) {
let new_version = Version {
data,
transaction_id: txn_id,
commit_timestamp: AtomicU64::new(commit_ts),
is_deleted: false,
next_version: None,
};
let mut chain = self.version_chains.entry(key).or_insert_with(|| VersionChain {
versions: Vec::new(),
latest_version: AtomicUsize::new(0),
});
chain.versions.push(new_version);
chain.latest_version.store(chain.versions.len() - 1, Ordering::SeqCst);
}
fn has_conflicting_write(&self, key: &[u8], read_timestamp: u64) -> bool {
if let Some(chain) = self.version_chains.get(key) {
for version in &chain.versions {
let commit_ts = version.commit_timestamp.load(Ordering::SeqCst);
if commit_ts > read_timestamp {
return true;
}
}
}
false
}
}
This MVCC implementation provides snapshot isolation by ensuring each transaction sees a consistent view of the database as of its start time. The validation phase during commit checks for write-write conflicts to maintain serializability.
Vectorized Query Execution
Modern query processors benefit greatly from vectorized execution, which processes data in batches rather than tuple-at-a-time. This approach improves CPU cache utilization and enables SIMD optimizations.
struct RecordBatch {
schema: Schema,
columns: Vec<ColumnArray>,
row_count: usize,
}
enum ColumnArray {
Int64(Vec<i64>),
String(Vec<String>),
Bool(Vec<bool>),
Null(usize),
}
struct Schema {
fields: Vec<Field>,
}
struct Field {
name: String,
data_type: DataType,
nullable: bool,
}
enum DataType {
Int64,
String,
Boolean,
}
trait ExecutionOperator: Send + Sync {
fn next_batch(&mut self) -> Option<RecordBatch>;
fn schema(&self) -> &Schema;
}
struct TableScanOperator {
page_manager: Arc<PageManager>,
current_page: usize,
schema: Schema,
batch_size: usize,
predicate: Option<Expression>,
}
impl ExecutionOperator for TableScanOperator {
fn next_batch(&mut self) -> Option<RecordBatch> {
let page = self.page_manager.get_page(self.current_page)?;
let records = self.parse_page_records(page)?;
if records.is_empty() {
return None;
}
let batch = self.vectorize_records(records);
self.current_page += 1;
if let Some(ref predicate) = self.predicate {
Some(self.apply_filter(batch, predicate))
} else {
Some(batch)
}
}
fn schema(&self) -> &Schema {
&self.schema
}
}
impl TableScanOperator {
fn parse_page_records(&self, page_data: &[u8]) -> Option<Vec<Record>> {
let mut records = Vec::new();
let mut offset = 0;
while offset < page_data.len() {
if let Some(record) = self.parse_record(&page_data[offset..]) {
records.push(record);
offset += record.size();
} else {
break;
}
}
Some(records)
}
fn vectorize_records(&self, records: Vec<Record>) -> RecordBatch {
let mut columns = Vec::new();
for field in &self.schema.fields {
match field.data_type {
DataType::Int64 => {
let values: Vec<i64> = records.iter()
.map(|r| r.get_int64(&field.name).unwrap_or(0))
.collect();
columns.push(ColumnArray::Int64(values));
}
DataType::String => {
let values: Vec<String> = records.iter()
.map(|r| r.get_string(&field.name).unwrap_or_default())
.collect();
columns.push(ColumnArray::String(values));
}
DataType::Boolean => {
let values: Vec<bool> = records.iter()
.map(|r| r.get_bool(&field.name).unwrap_or(false))
.collect();
columns.push(ColumnArray::Bool(values));
}
}
}
RecordBatch {
schema: self.schema.clone(),
columns,
row_count: records.len(),
}
}
fn apply_filter(&self, batch: RecordBatch, predicate: &Expression) -> RecordBatch {
let