Problem
Last year I joined a project managing a document processing service handling a volume of over 12 million files, totaling almost 12 TB of data. Almost immediately, several opportunities to optimize the data architecture stood out to me. A key area for improvement was the file management system, which lacked an efficient mechanism for handling duplicate files across this massive dataset.
Files were stored in S3 with unique keys, but the metadata in Postgres wasn’t normalized, resulting in multiple database rows for the same file, as well as multiple file copies in s3. This redundancy, when scaled to millions of files, caused significant inefficiencies in storage and processing. Addressing this issue presented a substantial opportunity to enhance our system’s performance and resource utilization, potentially freeing up terabytes of storage and reducing processing overhead. The first step was to enable identification of duplicates based on content, meaning we’d need to calculate a cryptographic hash, such as SHA256, for each file in S3.
Approach
To begin, I added a sha256
column to the files table and updated the code to calculate this hash upon insertion of new rows. However, there were still over 12 million existing files (approximately 12 TB of data) to backfill. This is where our Rust-based solution comes into play.
Due to the enormous volume of data, downloading and hashing each object sequentially would be prohibitively time-consuming. A parallel approach was clearly necessary to process this volume of data efficiently. This requirement naturally led to the choice of Rust for implementation. Rust has great concurrency primitives and is well-suited for this kind of task.
To keep concerns separate and optimize performance, I designed a hasher program that doesn’t connect to the database directly. Instead, I used psql
to dump a text file with all the S3 keys and use that as input. The program reads these keys, fetches the corresponding objects from S3, calculates the SHA256 hash in memory, and outputs the results to a CSV file. This output can then be imported into a temporary database table for backfilling existing file rows. The files from S3 are never written to disk.
The program’s design consists of three types of tasks:
- Input task (1): Reads keys from the input file and sends them to the worker tasks.
- Worker tasks (n): Read keys from the input channel, download files from s3, calculate hashes, and send results to the output channel
- Output task (1): Reads results from the worker tasks’ channel and writes them to the output file.
The number of worker tasks (n) is set to the number of available CPU cores * 8, striking a balance between CPU utilization and excessive context switching.
Channels of the appropriate type and direction are established:
- Input to worker channel: multi-producer, multi consumer (MPMC)
- Worker to output channel: multi-producer, single consumer (MPSC)
- A final oneshot channel passes statistics from the output task back to the main thread
Sample CSV output:
key1,\\xb5a91391ee293f10c5dbccf058e86a2c5f9f8d3b37d627687828fd02c91dfd78
key2,\\xb5a91391ee293f10c5dbccf058e86a2c5f9f8d3b37d627687828fd02c91dfd78
key3,\\x559ba28c59b861790babae704fc07c4494ae8b3a6b2a7b042d36708b68cc9ece
This format is easily imported into a Postgres table, with the first column as the s3 key, and the second as the SHA256 hash of the file content.
Initially, I attempted a batch approach, reading 200 keys at a time and using Tokio’s async runtime for concurrent fetching and hashing. However, this led to bottlenecks where Tokio tasks were waiting for batches to finish. So instead of using a batched approach, I pivoted to a streaming approach, using channels to avoid locking and the need for Arc/Mutexes. We still use Tokio of course! But channels make the design much smoother and more efficient.
Implementation
The implementation of the S3 hasher program leverages Rust’s async capabilities and the Tokio runtime for efficient concurrent processing. The program is structured around several key components:
- Channel setup for inter-task communication,
- An input task for streaming S3 keys,
- An an output task for writing results, and
- Multiple worker tasks for fetching and hashing files
I’ve included a sample (simplified) main function to show how they tie together. Below are key excerpts from the implementation, highlighting the core functionality of each component:
// 1. Channel setup
// An MPMC channel for streaming lines from the input file into worker tasks.
// Bounded in order to handle backpressure.
let (tx, rx) = async_channel::bounded(num_workers);
// An MPSC channel for sending results from the workers to the output task
let (writer_tx, mut writer_rx) = mpsc::channel::<(String, Vec<u8>, usize)>(num_workers);
// A oneshot channel for sending the count of written hashes from the output
// task back to the main task once it's finished
let (count_tx, count_rx) = oneshot::channel::<(u64, u64)>();
// 2. Input task function (simplified)
// Spawn a task to stream lines from the input file into a channel for
// worker tasks to read from
tokio::spawn(async move {
if let Ok(lines_stream) = lines_reader.stream_lines().await {
lines_stream
.for_each(|line| {
let tx = tx.clone();
async move {
if let Ok(line) = line {
// Send each line to the channel
if tx.send(line).await.is_err() {
error!("error sending line to channel");
}
}
}
})
.await;
// Close input channel
tx.close();
}
});
// 3. Output task function (simplified)
//
// Spawn a task to read results from the workers and write them to the
// output file
let output_handle = tokio::spawn(async move {
while let Some((key, hash, bytes_len)) = writer_rx.recv().await {
// Write key + hash to a CSV output file in the format that psql can
// read and insert into a table
let str_out = format!("{},\\\\x{}\n", key, hex::encode(hash));
match outfh.write_all(str_out.as_bytes()).await {
Ok(_) => {
total_bytes_hashed += bytes_len as u64;
count_hashes_written += 1;
}
Err(e) => {
error!("error writing sha256 hash to outfile: {}", e);
}
}
}
// Send the count of hashes written to the output file back to the main
// task
let _ = count_tx.send((count_hashes_written, total_bytes_hashed));
});
// 4. Worker task function (simplified)
//
// Spawn num_workers tasks to fetch objects from s3, hash them and write
// results to the output channel
// Necessary to wait for all tasks to finish
let mut handles = Vec::with_capacity(num_workers + 1);
handles.push(output_handle);
for _ in 0..num_workers {
let rx = rx.clone();
let writer_tx = writer_tx.clone();
let s3agent = s3agent.clone();
let handle = tokio::spawn(async move {
while let Ok(line) = rx.recv().await {
// Fetch the object bytes from s3
match s3agent.get_object(key).await {
Ok(bytes) => {
let bytes_len = bytes.len();
let hash = sha256(&bytes);
// Send the results to the output channel for appending
// to the outfile
if writer_tx
.send((key.to_owned(), hash, bytes_len))
.await
.is_err()
{
error!("error sending result back to channel");
}
}
Err(e) => {
error!("Got an S3 Error for key: '{}', {}", key, e);
}
};
}
// Drop this worker's cloned writer channel tx
drop(writer_tx);
});
// Add to the vec of handles so we can wait for all the workers to
// finish before exiting the main task
handles.push(handle);
}
// Drop the writer channel tx in the main task, only the workers will use
// this.
drop(writer_tx);
// Wait for all the worker and output tasks to finish
futures::future::join_all(handles).await;
// Now collect stats from the oneshot channel
let (count_hashes_written, count_bytes_hashed) = match count_rx.await {
Ok((count_hashes, count_bytes_hashed)) => {
(count_hashes.to_string(), count_bytes_hashed.to_string())
}
Err(_e) => ("some".to_string(), "some".to_string()),
};
// 5. Main function structure
#[tokio::main]
async fn main() {
// Setup channels
// Spawn tasks
// Wait for completion and log stats
}
Results
To test the performance of the S3 hasher, I used a personal S3 bucket containing a diverse dataset. The test set comprised 50,000 objects with an average file size of 970 KB, totaling approximately 46 GB.
Testing Environment
- EC2 instance types: Primarily c7g and m7g series
- Network setup: Within a VPC in the same region as the S3 bucket and with an S3 endpoint
- This configuration eliminates data transfer costs and minimizes latency
Performance Observations
- Optimal thread count: Best performance was achieved at 8x the number of available cores
- Diminishing returns: Performance decreased slightly beyond the optimal thread count
- S3 rate limits: Encountered at thread counts > 512 threads total
Best Performance Scenario
- Instance: c7g.metal (64 cores, 128 GiB memory)
- Processing time: 14.72 seconds for the 46 GB dataset
- Processing rate: 3.125 GB/second
Cost and Scalability Analysis
Using the best-performing c7g.metal instance as our benchmark:
- EC2 instance cost: $2.32/hour for c7g.metal (spot price at time of benchmark)
- Extrapolated performance for production dataset (12 TB):
- Estimated processing time: 12,000 GB / 3.125 GB/s = 3840 seconds (64 minutes)
- Total cost: $2.32 * (64/60) hours = $2.47
These results demonstrate the efficiency and cost-effectiveness of our solution. We can process our entire 12 TB production dataset in just over an hour at a cost of less than $3, achieving a high-speed processing rate of 3.125 GB/second. This represents a significant improvement in our file management capabilities, allowing for rapid deduplication and optimization of our storage resources.
Instance type | On-demand price (us-east-1) | vCPUs | mem (GiB) | net perf (Gigabit) | 50k Payload process time (seconds) | total threads |
---|---|---|---|---|---|---|
m7g.4xlarge | $0.6528 | 16 | 64 | up to 15 | 39.21 | 128 |
27.43 | 256 | |||||
27.84 | 512 | |||||
m7g.8xlarge | $1.3056 | 32 | 128 | 15 | 34.28 | 128 |
27.19 | 256 | |||||
26.92 | 512 | |||||
m7g.metal | $2.6112 | 64 | 256 | 30 | 57.99 | 128 |
21.6 | 256 | |||||
16.3 | 512 | |||||
c7g.8xlarge | $1.16 | 32 | 64 | 15 | 50.46 | 64 |
30.23 | 128 | |||||
26.97 | 256 | |||||
26.98 | 512 | |||||
c7g.metal | $2.32 | 64 | 128 | 30 | 49.25 | 128 |
21.27 | 256 | |||||
14.72 | 512 |
Reflections
These results demonstrate the efficiency and cost-effectiveness of this approach. We were able to process the entire 12 TB production dataset in just over an hour at a cost of less than $3, achieving a high-speed processing rate of 3.1042 GB/second, just under the benchmark rate of 3.125 GB/second. The S3 hasher utility enables swift deduplication and significantly improves storage optimization, showcasing the power of Rust’s async capabilities with Tokio for rapidly hashing massive datasets stored in S3.
Those who have used Go in most any capacity are aware of the built-in concurrency model using Goroutines. I was curious to see how Go would stack up against this Rust tool, so I wrote a version in Go using the same basic design. My next post will detail that implementation and include benchmarks.