3 min read

SQL Query on Parquet Files with DataFusion

SQL Query on Parquet Files with DataFusion

Rust big data ecosystem is all set for bigtime - with Arrow and surrounding ecosystem (DataFusion, Ballista) leading the pack. In this post we'll take a look at how you can use Apache DataFusion to run SQL queries on Parquet files.  

Before we get into the code, let me make some introductions about the tools we're going to use.

  • Apache Arrow is a language-independent, in-memory, columnar format. It provides a standardized, specification for representing structured, table-like datasets. This data format has a rich data type system (included nested and user-defined data types) designed to support the needs of analytic database systems, data frame libraries, and more.
  • Apache Parquet is a language-independent, columnar storage format for on disk storage. Parquet is already quite popular with almost all major players from the Hadoop ecosystem supporting writing / reading Parquet. As it is well established, columnar formats are superior for analytics workloads and offer optimal compression to ensure better disk utilization. Parquet is apparently the most widely adopted columnar implementation, with support in several programming languages and big data tools.
  • Apache Arrow DataFusion is a query execution platform written in Rust. It uses Apache Arrow as the internal, in-memory format. DataFusion is designed to be extensible and can be used as a standalone tool or imported inside other Rust programs. Either way, it brings amazing query performance and simple interface to run queries.

Querying multiple Parquet files

Let us take a look at working example of how to query multiple parquet files (with same schema) using DataFusion.

Set up a Rust Project

We'll use DataFusion as a library in a standalone Rust project to perform this query. To start, create a Rust project using cargo init. Note that you'll need Rust and Cargo installed for this to work.

Once the project boilerplate is created, open the Cargo.toml file and paste the below contents. This will add all the dependencies required for our query project.

[package]
name = "datafusion-parquet"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
datafusion = "6.0.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] }
futures = "0.3"

Sample Data

We'll use the below Parquet files as the sample data for querying.

All these Parquet files have below schema, and each file has 1000 entries.

column#		column_name		hive_datatype
=====================================================
1		registration_dttm 	timestamp
2		id 			int
3		first_name 		string
4		last_name 		string
5		email 			string
6		gender 			string
7		ip_address 		string
8		cc 			string
9		country 		string
10		birthdate 		string
11		salary 			double
12		title 			string
13		comments 		string

Download all the three parquet files in a directory called data inside the project directory. We'll use this directory as the data source for query.

Finally, open the main.rs file and add the below content.

use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use std::sync::Arc;

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results
#[tokio::main]
async fn main() -> Result<()> {
    // create local execution context
    let mut ctx = ExecutionContext::new();
    let file_format = ParquetFormat::default().with_enable_pruning(true);

    let listing_options = ListingOptions {
        file_extension: ".parquet".to_owned(),
        format: Arc::new(file_format),
        table_partition_cols: vec![],
        collect_stat: true,
        target_partitions: 1,
    };

    ctx.register_listing_table(
        "my_table",
        &format!("file://{}", "./data/"),
        listing_options,
        None,
    ).await.unwrap();
    
    // execute the query
    let df = ctx.sql("SELECT COUNT(*) FROM my_table").await?;

    // print the results
    df.show().await?;
    Ok(())
}

Now, run this using cargo run. You should see an output like

+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 3000            |
+-----------------+

This was a simple query to count the number of rows in input directory. You can now run further complicated queries as you see fit. For example the query SELECT id,first_name,last_name FROM my_table where country = 'China' will return

+-----------------+
| COUNT(UInt8(1)) |
+-----------------+
| 551             |
+-----------------+

Summary

I hope this small tutorial gave you can overview of how to query multiple Parquet files using the DataFusion library. If you're interested, here is a great overview of DataFusion.