When we talk about building systems that move and transform data, the conversation often revolves around Python, Java, or Scala. For a long time, I did the same. But then I started looking at the bottlenecks—the memory errors, the runtime exceptions in production, the sheer cost of compute for simple transformations. That’s when I turned my attention to Rust. Its promise of performance without sacrificing safety isn’t just for operating systems; it’s a perfect fit for the heavy, repetitive workloads in data engineering. You get the speed of C++ with guardrails that prevent entire classes of bugs.
Over time, I’ve assembled a toolkit of Rust libraries that handle the core tasks. They let me build pipelines that are fast, reliable, and surprisingly pleasant to maintain. I want to share eight of these with you, not as an abstract list, but as practical tools I reach for when I need to get real work done.
First, let’s talk about databases. If you need to talk to PostgreSQL, MySQL, SQLite, or SQL Server, SQLx is a fantastic starting point. What I like most is that it checks my SQL queries against the actual database schema at compile time. I make a typo in a column name, and my code won’t even build. This saves me from a whole category of runtime failures. It works asynchronously, which is great for handling many database connections without blocking threads.
Here’s a common pattern I use. I define a struct that represents a row from my table, and SQLx can map query results directly to it.
use sqlx::PgPool;
// This tells SQLx how to map database rows to this struct.
#[derive(sqlx::FromRow)]
struct SensorReading {
sensor_id: i32,
recorded_at: chrono::DateTime<chrono::Utc>,
temperature: f64,
}
async fn get_recent_readings(pool: &PgPool, hours: i32) -> Result<Vec<SensorReading>, sqlx::Error> {
let query = "
SELECT sensor_id, recorded_at, temperature
FROM sensor_readings
WHERE recorded_at > NOW() - INTERVAL '$1 hours'
ORDER BY recorded_at DESC
";
let readings = sqlx::query_as::<_, SensorReading>(query)
.bind(hours)
.fetch_all(pool) // Fetches all results as a vector.
.await?; // The `?` propagates errors up.
Ok(readings)
}
The .bind(hours) part safely inserts my parameter, avoiding SQL injection. The await keyword is because this is an asynchronous operation; the function yields control until the database responds. This is a clean, type-safe way to interact with SQL.
Sometimes, you want more structure. You want your database tables to feel like an integral part of your Rust code. That’s where Diesel comes in. It’s a full Object-Relational Mapper (ORM) and query builder. I use it when I have complex relationships between tables or when I want to manage my database schema through Rust code. Diesel uses a separate schema file that you generate, which acts as a source of truth.
Imagine I have a blog with posts and comments. Diesel helps me model this relationship clearly.
// This is typically in a `src/schema.rs` file, auto-generated by Diesel.
diesel::table! {
posts (id) {
id -> Integer,
title -> Text,
body -> Text,
published -> Bool,
}
}
diesel::table! {
comments (id) {
id -> Integer,
post_id -> Integer,
author -> Text,
content -> Text,
}
}
// My Rust structs for the application.
#[derive(diesel::Queryable, diesel::Identifiable)]
struct Post {
id: i32,
title: String,
body: String,
published: bool,
}
#[derive(diesel::Queryable, diesel::Associations)]
#[diesel(belongs_to(Post))] // This declares the foreign key relationship.
struct Comment {
id: i32,
post_id: i32,
author: String,
content: String,
}
// A function to get all comments for a published post.
fn get_comments_for_post(
conn: &mut PgConnection,
post_title: &str
) -> QueryResult<Vec<(Comment, Post)>> {
use crate::schema::posts::dsl::{posts, published, title};
use crate::schema::comments::dsl::comments;
// This is the query builder. It's all Rust code, checked at compile time.
comments
.inner_join(posts.on(posts::id.eq(comments::post_id)))
.filter(published.eq(true).and(title.eq(post_title)))
.select((comments::all_columns, posts::all_columns))
.load::<(Comment, Post)>(conn)
}
The query is built using Rust functions like .filter() and .eq(). Diesel translates this into efficient SQL. The #[diesel(belongs_to(Post))] is a powerful macro; it lets me easily join tables together in a way the compiler understands.
Now, what about the data itself, once it’s out of the database? For analytical work, you often need column-oriented data structures. The Apache Arrow ecosystem in Rust, primarily through the arrow and datafusion crates, is a game-changer. Arrow defines a language-independent columnar memory format. Data in this format can be shared between systems (like between Rust and Python) with zero copy overhead. DataFusion is a query engine that operates on this format.
I often use DataFusion to run SQL queries directly on CSV or Parquet files, or on data I’ve already loaded into memory. It’s like having a mini, embeddable database engine.
use datafusion::prelude::*;
use datafusion::arrow::util::pretty::print_batches;
let ctx = SessionContext::new();
// Register a CSV file as a queryable table named "sales".
ctx.register_csv(
"sales",
"./data/daily_sales.csv",
CsvReadOptions::new()
).await?;
// Now I can run a SQL query on it.
let sql = "
SELECT
region,
SUM(amount) as total_sales,
COUNT(*) as transaction_count
FROM sales
WHERE date > '2023-10-01'
GROUP BY region
HAVING SUM(amount) > 10000
ORDER BY total_sales DESC
";
let df = ctx.sql(sql).await?; // `df` is a DataFrame.
// I can also manipulate it using the DataFrame API.
let filtered_df = df
.filter(col("total_sales").gt(lit(50000)))?
.select(vec![col("region"), col("transaction_count")])?;
// Show the results.
filtered_df.show().await?;
// Or, I can collect the results as Arrow record batches for further processing.
let results: Vec<RecordBatch> = filtered_df.collect().await?;
print_batches(&results)?;
This is incredibly powerful for building data transformation steps within a Rust application. You’re not just shuffling bytes; you’re performing database-grade aggregations and filters in process.
When you’re dealing with massive datasets in cloud storage (like S3 or ADLS), managing consistency is hard. This is the problem Delta Lake solves, and delta-rs is the Rust library for it. It provides ACID transactions, schema enforcement, and time travel on top of standard Parquet files. You can think of it as Git for your data lake. I use it to create reliable, audit-able tables that many processes can read from and write to safely.
use deltalake::DeltaTableBuilder;
use std::collections::HashMap;
// Open an existing Delta table.
let table_path = "s3://my-data-bucket/gold/transactions";
let table = DeltaTableBuilder::from_uri(table_path)
.with_allow_http(true) // Needed for S3 if not using AWS SDK.
.load()
.await?;
// Let's see the table's history. "Time travel" is a key feature.
let operations = table.history(None).await?;
for op in operations {
println!("Version {}: {}", op.version?, op.operation?);
}
// Read data from a specific version (time travel).
let versioned_table = DeltaTableBuilder::from_uri(table_path)
.with_version(5) // Load the table as it was at version 5.
.load()
.await?;
let files = versioned_table.get_files();
for file in files {
println!("Reading from: {}", file);
// You would typically use `arrow` or `polars` to read these Parquet files.
}
// Write new data to the table.
// First, you'd prepare your data in Arrow record batches...
// let new_data: RecordBatch = ...
// Then, create an operation to add the data.
let mut metadata = HashMap::new();
metadata.insert("source".to_string(), "daily_ingest_rust".to_string());
let operation = deltalake::operations::WriteBuilder::new(table.table_uri())
.with_input(new_data)
.with_save_mode(deltalake::SaveMode::Append)
.with_metadata(metadata)
.await?;
// This transactionally adds the data and updates the Delta log.
let _table = operation.execute().await?;
println!("Successfully wrote to table, new version: {}", _table.version());
The Delta log is a JSON file that tracks every change. If a write fails halfway, the transaction isn’t recorded, keeping your table in a consistent state.
For more hands-on, programmatic data manipulation, I reach for Polars. It’s a DataFrame library, like a supercharged version of Pandas, but built from the ground up in Rust for speed and parallelism. Its secret weapon is a lazy API. Instead of executing operations immediately, it lets you build a whole query plan, which it then optimizes and executes in parallel.
I use Polars when I need to do complex joins, groupings, or custom transformations on datasets that fit in memory (or can be processed in chunks).
use polars::prelude::*;
use std::io::Cursor;
// Example CSV data as a string.
let csv_data = "\
name,department,salary
Alice,Engineering,85000
Bob,Sales,72000
Carol,Engineering,92000
David,Marketing,68000
Eve,Engineering,88000
";
// Read with LazyCsvReader for lazy evaluation. Nothing is loaded yet.
let lazy_df = LazyCsvReader::new(Cursor::new(csv_data))
.has_header(true)
.finish()?;
// Build a query plan: filter, group, and aggregate.
let query = lazy_df
.filter(col("salary").gt(lit(75000))) // Keep high salaries.
.group_by(["department"]) // Group by department.
.agg([
col("salary").mean().alias("avg_salary"), // Average salary.
col("name").n_unique().alias("headcount"), // Number of unique names.
])
.sort("avg_salary", SortOptions::default().with_order_descending(true)); // Sort high to low.
// Now, execute the optimized plan and collect the result.
let df: DataFrame = query.collect()?;
println!("{}", df);
// You can also collect in a streaming fashion for large files.
let streaming_result = query.collect_streaming()?;
// ... process batches as they come in.
The beauty is in the .collect() line. That’s when all the optimizations kick in. Polars might decide to predicate pushdown, combine filters, or use specialized algorithms for the aggregation, all across multiple threads.
Data engineering isn’t just about batches; it’s about streams. For working with Apache Kafka, rdkafka is the robust, production-ready choice. It’s a wrapper around the C library librdkafka, so it’s very mature. I’ve used it to build both producers that publish data and consumers that process event streams in real-time.
Here’s a simple producer and a consumer.
// PRODUCER
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092")
.set("message.timeout.ms", "5000") // 5 second timeout.
.create()?;
let key = "sensor_42";
let payload = r#"{"temp": 23.7, "humidity": 65}"#;
let record = FutureRecord::to("sensor-readings") // Topic.
.key(key)
.payload(payload);
match producer.send(record, Timeout::Never).await {
Ok((partition, offset)) => println!("Sent to partition {}, offset {}", partition, offset),
Err((e, _original_record)) => eprintln!("Error sending: {}", e),
}
// CONSUMER
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::Message;
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", "rust-data-processor")
.set("bootstrap.servers", "localhost:9092")
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true") // Or false for manual commit.
.create()?;
consumer.subscribe(&["sensor-readings"])?;
// This is a simple loop. In reality, you'd use a tokio stream.
loop {
match consumer.recv().await {
Ok(msg) => {
if let Some(payload) = msg.payload() {
println!("Received: {:?}", std::str::from_utf8(payload)?);
// Process the message...
}
// Manually commit offset if auto-commit is disabled.
// consumer.commit_message(&msg, CommitMode::Async)?;
},
Err(e) => eprintln!("Kafka error: {}", e),
}
}
The FutureProducer returns a Future, which integrates neatly with async runtimes like Tokio. The consumer can be part of a larger async stream-processing topology.
In any data pipeline, you need to convert data from one format to another. This is where Serde is indispensable. It’s not a data engineering library per se, but it’s the foundation for serialization in Rust. With derive macros, you can make your structs serializable to JSON, CSV, Avro, YAML, and dozens of other formats with minimal code.
I use it constantly—to parse configuration, to read/write intermediate data, to send messages over HTTP.
use serde::{Deserialize, Serialize};
use serde_json;
use csv;
// Define the structure of your data.
#[derive(Debug, Serialize, Deserialize)]
struct LogEntry {
#[serde(rename = "@timestamp")] // Map to a JSON field with a special character.
timestamp: String,
level: String,
message: String,
#[serde(skip_serializing_if = "Option::is_none")] // Omit if None.
user_id: Option<u64>,
}
// Serialize to JSON.
fn log_to_json(entry: &LogEntry) -> String {
serde_json::to_string_pretty(entry).expect("Failed to serialize to JSON")
}
// Deserialize from a CSV file (using the `csv` crate with Serde support).
fn read_from_csv(file_path: &str) -> Result<Vec<LogEntry>, Box<dyn std::error::Error>> {
let mut reader = csv::Reader::from_path(file_path)?;
let mut entries = Vec::new();
for result in reader.deserialize() {
let entry: LogEntry = result?; // Serde handles the CSV parsing.
entries.push(entry);
}
Ok(entries)
}
// Using it.
let entry = LogEntry {
timestamp: "2023-11-02T10:15:30Z".to_string(),
level: "ERROR".to_string(),
message: "Failed to connect to database".to_string(),
user_id: Some(12345),
};
let json_output = log_to_json(&entry);
println!("{}", json_output);
// This would print:
// {
// "@timestamp": "2023-11-02T10:15:30Z",
// "level": "ERROR",
// "message": "Failed to connect to database",
// "user_id": 12345
// }
The #[derive(Serialize, Deserialize)] macro does almost all the work. The annotations like #[serde(rename = "...")] give you fine-grained control over the format.
Finally, once you have processed data, you often need to serve it. Writing a full REST API for every dataset is tedious. ROAPI automates this. You give it a configuration file pointing to your datasets (CSV, JSON, Parquet files, or even a database connection), and it spins up an HTTP server with automatic endpoints that support filtering, sorting, and pagination.
While you typically run ROAPI as a standalone binary, you can think of it as the final piece in your pipeline. You process data with Polars or DataFusion, write it as a Parquet file to S3, and then point ROAPI at it. Instantly, that data is queryable via a robust API.
A simple roapi.toml configuration:
# roapi.toml
server.host = "0.0.0.0"
server.port = 8080
[[tables]]
name = "stock_prices"
uri = "s3://my-bucket/data/stocks.parquet"
format = "parquet"
[[tables]]
name = "company_info"
uri = "postgres://user:pass@localhost/mydb"
db.table = "companies"
With this running, I can query my data using HTTP requests:
# Get all data (with default pagination)
curl "http://localhost:8080/api/tables/stock_prices"
# Filter for a specific symbol
curl "http://localhost:8080/api/tables/stock_prices?symbol=eq.AAPL"
# Select specific columns and order by date
curl "http://localhost:8080/api/tables/stock_prices?select=symbol,date,close&order=date.desc"
It uses a subset of the PostgREST syntax, which is very powerful for client-side queries. This is incredibly useful for creating quick internal tools or serving clean data to front-end applications without writing a line of backend logic.
Together, these libraries form a cohesive stack. You can ingest streaming data with rdkafka, transform it in memory with Polars or DataFusion, store it reliably in a Delta Lake table on cloud storage, and finally expose it through an auto-generated API with ROAPI. SQLx or Diesel handle stateful metadata, and Serde glues all the data formats together. Each piece leverages Rust’s strengths—speed, safety, and expressiveness—to handle data not just as bytes, but as structured, reliable information. This is how you build data systems that are not only fast but also trustworthy and easy to reason about.