Apache Iceberg is an open table format that is multi-engine compatible and built to accommodate at-scale analytic data sets. Being multi-engine means that Spark, Trino, Presto, Hive and Impala can all operate on the same data independently at the same time. Previously, we published The Definitive Guide to Lakehouse Architecture with Iceberg and MinIO as an introduction to the topic, including design goals and key features, and Digging Deeper into Iceberg: ACID Transactions on Tables that discussed how to work with data in Iceberg format.
In this blog post, we will build a Notebook that uses MinIO as object storage for Spark jobs to manage Iceberg tables. If you haven't already set up the spark-operator in your Kubernetes environment, please see Spark, MinIO and Kubernetes.
Apache Iceberg
Apache Iceberg is an open-source table format that allows for the efficient storage of large, slow-changing datasets in cloud storage systems such as Amazon S3, Azure Blob Storage, Google Cloud Storage and MinIO. Originally developed by Netflix, Iceberg addresses some of the limitations of existing table formats like Apache Parquet and Apache ORC.
Iceberg is designed to provide a number of benefits over traditional table formats and a goto table format for Data Lake, below are some of them:
- Schema evolution: Data lakes are often characterized by their flexibility and ability to store a wide variety of data formats. However, this flexibility can make it challenging to manage schema changes over time. Iceberg provides a way to add, remove, or modify table columns without requiring a full rewrite of the data, making it easier to evolve schemas over time
- Transactional writes: In a data lake it is important to ensure that data is accurate and consistent, especially if the data is used for business-critical purposes. Iceberg provides support for ACID transactions for write operations, ensuring that data is always in a consistent state
- Query isolation: Data lakes are often used by many users or applications simultaneously. Iceberg allows multiple queries to run concurrently without interfering with each other, making it possible to scale data lake usage without sacrificing performance
- Time travel: In a data lake it is often useful to be able to query data as it appeared at a specific point in time. Iceberg provides a time-travel API that enables users to query data as it existed at a specific version or timestamp, making it easier to analyze historical trends or track changes over time
- Partition pruning: Data lakes often contain large amounts of data, which can make querying slow and resource-intensive. Iceberg supports partitioning data by one or more columns, which can significantly improve query performance by reducing the amount of data that needs to be read during queries.
Iceberg can be used with a variety of processing engines and frameworks, including Apache Spark, Dremio and Presto. It is also integrated with Apache Arrow, a cross-language in-memory data format, which enables efficient data serialization and deserialization across different processing engines.
Iceberg Catalogs
The Apache Iceberg catalog is a metadata store that contains information about tables, including their schema, location, and partitioning scheme. It is responsible for managing the lifecycle of tables, including creating, updating, and deleting them, and provides APIs for querying metadata and accessing data.
Below are some of the catalogs supported by Apache Iceberg:
- JDBC catalog
- Hive catalog
- Nessie Catalog
- Hadoop catalog
- Glue catalog
- DynamoDB catalog
- REST catalog
To keep this walk through simple we will be using the Hadoop Catalog for our Iceberg tables.
MinIO Object Storage
MinIO guarantees durability for Iceberg tables and high-performance for Spark operations on those tables. MinIO secures Iceberg tables using encryption and limits access to them based on policy-based access controls. MinIO encrypts data in transit with TLS and data on drives with granular object-level encryption using modern, industry-standard encryption algorithms, such as AES-256-GCM, ChaCha20-Poly1305, and AES-CBC. MinIO integrates with external identity providers such as ActiveDirectory/LDAP, Okta and Keycloak for IAM. Users and groups are then subject to AWS IAM-compatible PBAC as they attempt to access Iceberg tables.
Iceberg tables are protected after they are written to a MinIO bucket with:
- Erasure Coding splits data files into data and parity blocks and encodes it so that the primary data is recoverable even if part of the encoded data is not available. Horizontally scalable distributed storage systems save encoded data across multiple drives and nodes. If a drive or node fails or data becomes corrupted, the original can be reconstructed from the parity and data blocks saved on other drives and nodes.
- Bit Rot Protection captures and heals corrupted objects in the background to remove this hidden threat to data durability
- Bucket and Object Immutability protects data saved to MinIO from deletion or modification using object locking, retention and other governance mechanisms. Objects written to MinIO are never overwritten.
- Bucket and Object Versioning further protect objects. MinIO stores every version of every object, even if it is deleted, unless a version is specifically removed. MinIO’s Data Lifecycle Management allows administrators to move buckets between tiers, for example to use NVMe for performance intensive workloads, and to set an expiration date on versions so they are purged from the system to improve storage efficiency.
Getting Demo Data into MinIO
We will be using the NYC Taxi dataset that is available on MinIO. You can download the dataset from here which has about 112M rows and is roughly 10GB in size. You can use any other dataset of your choice and upload it to MinIO using the following commands:
!mc mb play/openlake !mc mb play/openlake/spark !mc mb play/openlake/spark/sample-data !mc cp nyc-taxi-data.csv play/openlake/spark/sample-data/nyc-taxi-data.csv |
Sample PySpark Application that Manages an Iceberg Table
This is based on the Getting Started with Iceberg Notebook.
%%writefile sample-code/src/main-iceberg.py import logging import os from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger("MinIOSparkJob")
# adding iceberg configs conf = ( SparkConf() .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .set("spark.sql.catalog.demo.warehouse", "s3a://openlake/warehouse/") .set("spark.sql.catalog.demo.s3.endpoint", "https://play.min.io:50000") .set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog .set("spark.sql.catalogImplementation", "in-memory") .set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type .set("spark.executor.heartbeatInterval", "300000") .set("spark.network.timeout", "400000") )
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Disable below line to see INFO logs spark.sparkContext.setLogLevel("ERROR")
def load_config(spark_context: SparkContext): spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser")) spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser")) spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", os.getenv("ENDPOINT", "play.min.io:50000")) spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true") spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true") spark_context._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1") spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000") spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")
load_config(spark.sparkContext)
# Define schema for NYC Taxi Data schema = StructType([ StructField('VendorID', LongType(), True), StructField('tpep_pickup_datetime', StringType(), True), StructField('tpep_dropoff_datetime', StringType(), True), StructField('passenger_count', DoubleType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True)])
# Read CSV file from MinIO df = spark.read.option("header", "true").schema(schema).csv( os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Create Iceberg table "nyc.taxis_large" from RDD df.write.mode("overwrite").saveAsTable("nyc.taxis_large")
# Query table row count count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large") total_rows_count = count_df.first().cnt logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}")
# Rename column "fare_amount" in nyc.taxis_large to "fare" spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")
# Rename column "trip_distance" in nyc.taxis_large to "distance" spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")
# Add description to the new column "distance" spark.sql( "ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")
# Move "distance" next to "fare" column spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")
# Add new column "fare_per_distance" of type float spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")
# Check the snapshots available snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots") snap_df.show() # prints all the available snapshots (1 till now)
# Populate the new column "fare_per_distance" logger.info("Populating fare_per_distance column...") spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance")
# Check the snapshots available logger.info("Checking snapshots...") snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots") snap_df.show() # prints all the available snapshots (2 now) since previous operation will create a new snapshot
# Qurey the table to see the results res_df = spark.sql("""SELECT VendorID ,tpep_pickup_datetime ,tpep_dropoff_datetime ,fare ,distance ,fare_per_distance FROM nyc.taxis_large LIMIT 15""") res_df.show()
# Delete rows from "fare_per_distance" based on criteria logger.info("Deleting rows from fare_per_distance column...") spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0") spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")
# Check the snapshots available logger.info("Checking snapshots...") snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots") snap_df.show() # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots
# Query table row count count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large") total_rows_count = count_df.first().cnt logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}")
# Partition table based on "VendorID" column logger.info("Partitioning table based on VendorID column...") spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID")
# Query Metadata tables like snapshot, files, history logger.info("Querying Snapshot table...") snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at") snapshots_df.show() # shows all the snapshots in ascending order of committed_at column
logger.info("Querying Files table...") files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files") total_files_count = files_count_df.first().cnt logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")
spark.sql("""SELECT file_path, file_format, record_count, null_value_counts, lower_bounds, upper_bounds FROM nyc.taxis_large.files LIMIT 1""").show()
# Query history table logger.info("Querying History table...") hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history") hist_df.show()
# Time travel to initial snapshot logger.info("Time Travel to initial snapshot...") snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1") spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")
# Qurey the table to see the results res_df = spark.sql("""SELECT VendorID ,tpep_pickup_datetime ,tpep_dropoff_datetime ,fare ,distance ,fare_per_distance FROM nyc.taxis_large LIMIT 15""") res_df.show()
# Query history table logger.info("Querying History table...") hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history") hist_df.show() # 1 new row
# Query table row count count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large") total_rows_count = count_df.first().cnt logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}") |
Building the Docker Image
We will now build the Docker image that contains the above Python application. You can use the following Dockerfile to build the image:
%%writefile sample-code/Dockerfile FROM openlake/spark-py:3.3.2
USER root
WORKDIR /app
RUN pip3 install pyspark==3.3.2
COPY src/*.py . |
You can build your own Docker image or use the pre-built image openlake/sparkjob-demo:3.3.2 that is available on Docker Hub.
Deploy Spark Iceberg Application
We will build the Spark job YAML to define the specs and then deploy it in a Kubernetes cluster
%%writefile sample-code/spark-job/sparkjob-iceberg.yml apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: spark-iceberg namespace: spark-operator spec: type: Python pythonVersion: "3" mode: cluster image: "openlake/sparkjob-demo:3.3.2" imagePullPolicy: Always mainApplicationFile: local:///app/main-iceberg.py sparkVersion: "3.3.2" restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: 1 memory: "1024m" labels: version: 3.3.2 serviceAccount: my-release-spark env: - name: AWS_REGION value: us-east-1 - name: AWS_ACCESS_KEY_ID value: openlakeuser - name: AWS_SECRET_ACCESS_KEY value: openlakeuser executor: cores: 1 instances: 3 memory: "2048m" labels: version: 3.3.2 env: - name: INPUT_PATH value: "s3a://openlake/spark/sample-data/taxi-data.csv" - name: AWS_REGION valueFrom: secretKeyRef: name: minio-secret key: AWS_REGION - name: AWS_ACCESS_KEY_ID valueFrom: secretKeyRef: name: minio-secret key: AWS_ACCESS_KEY_ID - name: AWS_SECRET_ACCESS_KEY valueFrom: secretKeyRef: name: minio-secret key: AWS_SECRET_ACCESS_KEY - name: ENDPOINT valueFrom: secretKeyRef: name: minio-secret key: ENDPOINT |
You can deploy the above sparkjob-iceberg.yml using the below command
!kubectl apply -f sample-code/spark-job/sparkjob-iceberg.yml |
After the application is deployed, you can check the status of the application using the following command:
!kubectl get sparkapplications -n spark-operator |
You can also check the logs of the application using the following command (Since we have disabled the INFO logs in the Spark application you may not see much activity until our application logs start showing up):
!kubectl logs -f spark-iceberg-driver -n spark-operator # stop this shell once you are done |
Once the application is completed, you can delete the application using the following command:
!kubectl delete sparkapplications spark-iceberg -n spark-operator |
Code Walkthrough
Now that we’ve see the end-to-end code, let's look at the code snippets in detail
Setup Iceberg Properties
# adding iceberg configs conf = ( SparkConf() .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") # Use Iceberg with Spark .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .set("spark.sql.catalog.demo.warehouse", "s3a://warehouse/") .set("spark.sql.catalog.demo.s3.endpoint", "https://play.min.io:50000") .set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog .set("spark.sql.catalogImplementation", "in-memory") .set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type .set("spark.executor.heartbeatInterval", "300000") .set("spark.network.timeout", "400000") ) |
The above snippet instructs Spark to use the Iceberg spark session extensions with the catalog demo defined as the default catalog, which is of type hadoop with S3FileIO as the IO implementation with https://play.min.io:50000 as the S3 Endpoint.
Create Iceberg Table
# Read CSV file from MinIO df = spark.read.option("header", "true").schema(schema).csv( os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
# Create Iceberg table "nyc.taxis_large" from RDD df.write.saveAsTable("nyc.taxis_large")
# Query table row count count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large") total_rows_count = count_df.first().cnt logger.info(f"Total Rows for NYC Taxi Data: {total_rows_count}") |
In the above snippet, we read the taxi-data.csv file from the Minio https://play.min.io:50000 endpoint and save it as an Iceberg table nyc.taxis_large. After the Iceberg table is saved, we use Spark SQL to query the nyc.taxis_large to get the total number of records present.
Schema Evolution
# Rename column "fare_amount" in nyc.taxis_large to "fare" spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN fare_amount TO fare")
# Rename column "trip_distance" in nyc.taxis_large to "distance" spark.sql("ALTER TABLE nyc.taxis_large RENAME COLUMN trip_distance TO distance")
# Add description to the new column "distance" spark.sql( "ALTER TABLE nyc.taxis_large ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'")
# Move "distance" next to "fare" column spark.sql("ALTER TABLE nyc.taxis_large ALTER COLUMN distance AFTER fare")
# Add new column "fare_per_distance" of type float spark.sql("ALTER TABLE nyc.taxis_large ADD COLUMN fare_per_distance FLOAT AFTER distance")
# Check the snapshots available snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots") snap_df.show() # prints all the available snapshots (1 till now)
# Populate the new column "fare_per_distance" logger.info("Populating fare_per_distance column...") spark.sql("UPDATE nyc.taxis_large SET fare_per_distance = fare/distance") |
The above code demonstrates schema evolution by renaming, changing column types, adding a new column fare_per_distance and populating the new column based on values from the fare and distance columns.
Deleting data from the table
# Delete rows from "fare_per_distance" based on criteria logger.info("Deleting rows from fare_per_distance column...") spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance > 4.0 OR distance > 2.0") spark.sql("DELETE FROM nyc.taxis_large WHERE fare_per_distance IS NULL")
# Check the snapshots available logger.info("Checking snapshots...") snap_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots") snap_df.show() # prints all the available snapshots (4 now) since previous operations will create 2 new snapshots
# Query table row count count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large") total_rows_count = count_df.first().cnt logger.info(f"Total Rows for NYC Taxi Data after delete operations: {total_rows_count}") |
In the above snippet, we delete records from the new field fare_per_distance when it is null or greater than 4.0 and when the distance field is greater than 2.0. Once the operation is complete, we query the snapshots table to see 2 new snapshots were created. We also get the count of total records, which is significantly less than what we started with (397014 vs 112234626).
Partitioning the table
# Partition table based on "VendorID" column logger.info("Partitioning table based on VendorID column...") spark.sql("ALTER TABLE nyc.taxis_large ADD PARTITION FIELD VendorID") |
This bit of code creates a new partition using the VendorID column. This partition will be applicable to the new rows that get inserted moving forward, and old data will not be impacted. We can also add partitions when we create the Iceberg table using something like below
CREATE TABLE IF NOT EXISTS nyc.taxis_large (VendorID BIGINT, tpep_pickup_datetime STRING, tpep_dropoff_datetime STRING, passenger_count DOUBLE, trip_distance DOUBLE, RatecodeID DOUBLE, store_and_fwd_flag STRING, PULocationID BIGINT, DOLocationID BIGINT, payment_type BIGINT, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE) PARTITIONED BY VendorID USING iceberg; |
# Query Metadata tables like snapshot, files, history logger.info("Querying Snapshot table...") snapshots_df = spark.sql("SELECT * FROM nyc.taxis_large.snapshots ORDER BY committed_at") snapshots_df.show() # shows all the snapshots in ascending order of committed_at column
logger.info("Querying Files table...") files_count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large.files") total_files_count = files_count_df.first().cnt logger.info(f"Total Data Files for NYC Taxi Data: {total_files_count}")
spark.sql("""SELECT file_path, file_format, record_count, null_value_counts, lower_bounds, upper_bounds FROM nyc.taxis_large.files LIMIT 1""").show()
# Query history table logger.info("Querying History table...") hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history") hist_df.show() |
Iceberg has metadata tables like snapshots, files, history that we can query to understand what is going on behind the scenes. For instance, by querying the snapshots table, we can see the operations that were performed when a new snapshot was created. The files table gives us information about the data files stored in Minio such as record count per file, file format etc.. In the history table, we get all the info about when the snapshot was made current and who the parent is, etc.
Time Travel with snapshots
# Time travel to initial snapshot logger.info("Time Travel to initial snapshot...") snap_df = spark.sql("SELECT snapshot_id FROM nyc.taxis_large.history LIMIT 1") spark.sql(f"CALL demo.system.rollback_to_snapshot('nyc.taxis_large', {snap_df.first().snapshot_id})")
# Qurey the table to see the results res_df = spark.sql("""SELECT VendorID ,tpep_pickup_datetime ,tpep_dropoff_datetime ,fare ,distance ,fare_per_distance FROM nyc.taxis_large LIMIT 15""") res_df.show()
# Query history table logger.info("Querying History table...") hist_df = spark.sql("SELECT * FROM nyc.taxis_large.history") hist_df.show() # 1 new row
# Query table row count count_df = spark.sql("SELECT COUNT(*) AS cnt FROM nyc.taxis_large") total_rows_count = count_df.first().cnt logger.info(f"Total Rows for NYC Taxi Data after time travel: {total_rows_count}") |
It is possible to time travel in Iceberg using snapshots that capture the transactions made at a given time. In the above code, we query the history table to get the first snapshot that was ever created and do a roll_back_to_snapshot system call to that snapshot_id. If we query the table once the rollback has been performed, we can clearly see that the fare_per_distance field is null and the record count is back to 112234626. Finally, the history table has a new record with the snapshot_id that we used.
This is a high level overview of what we can do with Apache Iceberg. There is also support for table audit and maintenance which we can explore later. Apache Iceberg is also adding support for tags and branches on top of snapshots and that has a huge potential. We will explore that once fully functional.
It’s Cool How Well Spark and Iceberg Run On MinIO
Iceberg, Spark and MinIO are a powerful combination of technologies for building scalable high-performance data lakes. Iceberg’s open table format and support for multiple engines makes it a great choice for enterprise data lakes. Using MinIO for Iceberg storage establishes a firm foundation for multi-cloud data lakes and analytics. MinIO includes active-active replication to synchronize data between locations — on-premise, in the public/private cloud and at the edge — enabling the features enterprises need like geographic load balancing and fast hot-hot failover.
Try Iceberg on MinIO today. If you have any questions or want to share tips, please reach out through our Slack channel or drop us a note on hello@min.io.