Apache Iceberg is an open table format that is multi-engine compatible and built to accommodate at-scale analytic data sets. Being multi-engine means that Spark, Trino, Presto, Hive and Impala can all operate on the same data independently at the same time. Previously, we published The Definitive Guide to Lakehouse Architecture with Iceberg and MinIO as an introduction to the topic, including design goals and key features, and Digging Deeper into Iceberg: ACID Transactions on Tables that discussed how to work with data in Iceberg format.
In this blog post, we will build a Notebook that uses MinIO as object storage for Spark jobs to manage Iceberg tables. If you haven't already set up the spark-operator in your Kubernetes environment, please see Spark, MinIO and Kubernetes.
Apache Iceberg is an open-source table format that allows for the efficient storage of large, slow-changing datasets in cloud storage systems such as Amazon S3, Azure Blob Storage, Google Cloud Storage and MinIO. Originally developed by Netflix, Iceberg addresses some of the limitations of existing table formats like Apache Parquet and Apache ORC.
Iceberg is designed to provide a number of benefits over traditional table formats and a goto table format for Data Lake, below are some of them:
- Schema evolution: Data lakes are often characterized by their flexibility and ability to store a wide variety of data formats. However, this flexibility can make it challenging to manage schema changes over time. Iceberg provides a way to add, remove, or modify table columns without requiring a full rewrite of the data, making it easier to evolve schemas over time
- Transactional writes: In a data lake it is important to ensure that data is accurate and consistent, especially if the data is used for business-critical purposes. Iceberg provides support for ACID transactions for write operations, ensuring that data is always in a consistent state
- Query isolation: Data lakes are often used by many users or applications simultaneously. Iceberg allows multiple queries to run concurrently without interfering with each other, making it possible to scale data lake usage without sacrificing performance
- Time travel: In a data lake it is often useful to be able to query data as it appeared at a specific point in time. Iceberg provides a time-travel API that enables users to query data as it existed at a specific version or timestamp, making it easier to analyze historical trends or track changes over time
- Partition pruning: Data lakes often contain large amounts of data, which can make querying slow and resource-intensive. Iceberg supports partitioning data by one or more columns, which can significantly improve query performance by reducing the amount of data that needs to be read during queries.
Iceberg can be used with a variety of processing engines and frameworks, including Apache Spark, Dremio and Presto. It is also integrated with Apache Arrow, a cross-language in-memory data format, which enables efficient data serialization and deserialization across different processing engines.
The Apache Iceberg catalog is a metadata store that contains information about tables, including their schema, location, and partitioning scheme. It is responsible for managing the lifecycle of tables, including creating, updating, and deleting them, and provides APIs for querying metadata and accessing data.
Below are some of the catalogs supported by Apache Iceberg:
- JDBC catalog
- Hive catalog
- Nessie Catalog
- Hadoop catalog
- Glue catalog
- DynamoDB catalog
- REST catalog
To keep this walk through simple we will be using the Hadoop Catalog for our Iceberg tables.
MinIO Object Storage
MinIO guarantees durability for Iceberg tables and high-performance for Spark operations on those tables. MinIO secures Iceberg tables using encryption and limits access to them based on policy-based access controls. MinIO encrypts data in transit with TLS and data on drives with granular object-level encryption using modern, industry-standard encryption algorithms, such as AES-256-GCM, ChaCha20-Poly1305, and AES-CBC. MinIO integrates with external identity providers such as ActiveDirectory/LDAP, Okta and Keycloak for IAM. Users and groups are then subject to AWS IAM-compatible PBAC as they attempt to access Iceberg tables.
Iceberg tables are protected after they are written to a MinIO bucket with:
- Erasure Coding splits data files into data and parity blocks and encodes it so that the primary data is recoverable even if part of the encoded data is not available. Horizontally scalable distributed storage systems save encoded data across multiple drives and nodes. If a drive or node fails or data becomes corrupted, the original can be reconstructed from the parity and data blocks saved on other drives and nodes.
- Bit Rot Protection captures and heals corrupted objects in the background to remove this hidden threat to data durability
- Bucket and Object Immutability protects data saved to MinIO from deletion or modification using object locking, retention and other governance mechanisms. Objects written to MinIO are never overwritten.
- Bucket and Object Versioning further protect objects. MinIO stores every version of every object, even if it is deleted, unless a version is specifically removed. MinIO’s Data Lifecycle Management allows administrators to move buckets between tiers, for example to use NVMe for performance intensive workloads, and to set an expiration date on versions so they are purged from the system to improve storage efficiency.
Getting Demo Data into MinIO
We will be using the NYC Taxi dataset that is available on MinIO. You can download the dataset from here which has about 112M rows and is roughly 10GB in size. You can use any other dataset of your choice and upload it to MinIO using the following commands:
Sample PySpark Application that Manages an Iceberg Table
This is based on the Getting Started with Iceberg Notebook.
Building the Docker Image
We will now build the Docker image that contains the above Python application. You can use the following Dockerfile to build the image:
You can build your own Docker image or use the pre-built image openlake/sparkjob-demo:3.3.2 that is available on Docker Hub.
Deploy Spark Iceberg Application
We will build the Spark job YAML to define the specs and then deploy it in a Kubernetes cluster
You can deploy the above sparkjob-iceberg.yml using the below command
After the application is deployed, you can check the status of the application using the following command:
You can also check the logs of the application using the following command (Since we have disabled the INFO logs in the Spark application you may not see much activity until our application logs start showing up):
Once the application is completed, you can delete the application using the following command:
Now that we’ve see the end-to-end code, let's look at the code snippets in detail
Setup Iceberg Properties
The above snippet instructs Spark to use the Iceberg spark session extensions with the catalog demo defined as the default catalog, which is of type hadoop with S3FileIO as the IO implementation with https://play.min.io:50000 as the S3 Endpoint.
Create Iceberg Table
In the above snippet, we read the taxi-data.csv file from the Minio https://play.min.io:50000 endpoint and save it as an Iceberg table nyc.taxis_large. After the Iceberg table is saved, we use Spark SQL to query the nyc.taxis_large to get the total number of records present.
The above code demonstrates schema evolution by renaming, changing column types, adding a new column fare_per_distance and populating the new column based on values from the fare and distance columns.
Deleting data from the table
In the above snippet, we delete records from the new field fare_per_distance when it is null or greater than 4.0 and when the distance field is greater than 2.0. Once the operation is complete, we query the snapshots table to see 2 new snapshots were created. We also get the count of total records, which is significantly less than what we started with (397014 vs 112234626).
Partitioning the table
This bit of code creates a new partition using the VendorID column. This partition will be applicable to the new rows that get inserted moving forward, and old data will not be impacted. We can also add partitions when we create the Iceberg table using something like below
Iceberg has metadata tables like snapshots, files, history that we can query to understand what is going on behind the scenes. For instance, by querying the snapshots table, we can see the operations that were performed when a new snapshot was created. The files table gives us information about the data files stored in Minio such as record count per file, file format etc.. In the history table, we get all the info about when the snapshot was made current and who the parent is, etc.
Time Travel with snapshots
It is possible to time travel in Iceberg using snapshots that capture the transactions made at a given time. In the above code, we query the history table to get the first snapshot that was ever created and do a roll_back_to_snapshot system call to that snapshot_id. If we query the table once the rollback has been performed, we can clearly see that the fare_per_distance field is null and the record count is back to 112234626. Finally, the history table has a new record with the snapshot_id that we used.
This is a high level overview of what we can do with Apache Iceberg. There is also support for table audit and maintenance which we can explore later. Apache Iceberg is also adding support for tags and branches on top of snapshots and that has a huge potential. We will explore that once fully functional.
It’s Cool How Well Spark and Iceberg Run On MinIO
Iceberg, Spark and MinIO are a powerful combination of technologies for building scalable high-performance data lakes. Iceberg’s open table format and support for multiple engines makes it a great choice for enterprise data lakes. Using MinIO for Iceberg storage establishes a firm foundation for multi-cloud data lakes and analytics. MinIO includes active-active replication to synchronize data between locations — on-premise, in the public/private cloud and at the edge — enabling the features enterprises need like geographic load balancing and fast hot-hot failover.
Try Iceberg on MinIO today. If you have any questions or want to share tips, please reach out through our Slack channel or drop us a note on email@example.com.