The Apache Iceberg data lake storage format enables ACID transactions on tables saved to MinIO. ACID transactions enable multiple users and services to concurrently and reliably add and remove records atomically. At the same time, queries are isolated to maintain read consistency against tables that are in the process of being altered. You can put MinIO and Iceberg, in conjunction with a PostgreSQL as a metadata database, to work using ACID transaction support for write, delete, update, time travel and schema modification.
The great news about all of this is that you can continue using the SQL and DML that you already know and love. For example, you can UPDATE a row with new information or DELETE a row to remove it. Even time travel is a SELECT statement.
Pairing the Iceberg table format with MinIO creates a powerful, flexible and extensible lakehouse platform. The Iceberg Table Spec declares a table format that is designed to manage “a large, slow-changing collection” of files or objects stored in a distributed system. Version 1 of the Iceberg spec defines the management of large analytic tables using the immutable file formats, Parquet, Avro, and ORC. Version 2 of the spec added row-level updates and deletes for analytic tables with immutable files.
The Iceberg table format tracks individual data files in a table instead of directories. Data files are created in-place and files are only added to a table explicitly. Table state is maintained in metadata. Iceberg metadata and manifest lists are merely objects stored in MinIO, which also maintains its own metadata that is stored with objects. Every change to table state requires that a new Iceberg metadata file be created that replaces the old metadata with an atomic swap. Atomicity is enabled by MinIO.
The Iceberg table format requires:
- In-place write: files/objects are not moved or altered once they are written
- Seekable reads: data file formats require seek support
- Deletes: tables delete files/objects that are no longer needed (or, in the case of MinIO, objects are marked as deleted, but retained)
Such requirements are compatible with object stores such as MinIO. Once written, data and metadata files are immutable until they are deleted. MinIO continues to save the outdated versions of data and metadata objects, making sure that data is never deleted and extending Iceberg’s capabilities for time travel.
In this blog post, we’re going to delve into ACID transactions with Iceberg to create an Iceberg table, update and delete records in it, and evolve its schema. This post is a follow-up to The Definitive Guide to Lakehouse Architecture with Iceberg and MinIO which included an explanation of lakehouse architecture, a deep discussion of Spark, Iceberg, PostgreSQL and MinIO work together, and a tutorial that taught readers how to install them, create a table, evolve table schema, and how to use time travel and rollback.
ACID transactions with Iceberg and MinIO tutorial
I am using the Online Retail Data Set hosted at the UCI Machine Learning Repository. This is a transactional data set which contains all the transactions occurring between January 12, 2010 and September 12, 2011 for a UK-based and registered non-store online retail operation. The data set contains about 550,000 records with eight attributes:
- InvoiceNo: Invoice number. Nominal, a 6-digit integral number uniquely assigned to each transaction. If this code starts with letter 'c', it indicates a cancellation.
- StockCode: Product (item) code. Nominal, a 5-digit integral number uniquely assigned to each distinct product.
- Description: Product (item) name. Nominal.
- Quantity: The quantities of each product (item) per transaction. Numeric.
- InvoiceDate: Invoice Date and time. Numeric, the day and time when each transaction was generated.
- UnitPrice: Unit price. Numeric, Product price per unit in sterling.
- CustomerID: Customer number. Nominal, a 5-digit integral number uniquely assigned to each customer.
- Country: Country name. Nominal, the name of the country where each customer resides.
The file is in Microsoft Excel format, so the first step is to convert it to a .CSV so it can be read into Spark as a dataframe and then saved to Iceberg. After converting the file, I’ve saved it to a MinIO bucket where I store raw data.
I’m using the Iceberg installation from the earlier tutorial, The Definitive Guide to Lakehouse Architecture with Iceberg and MinIO. If you haven’t read that blog post, please refer to it or have your own Iceberg and MinIO environment set up.
We’re going to use Spark and Spark-SQL to work with Iceberg.
Read in data and save as an Iceberg table
The first task we’ll accomplish is to read our .CSV into Spark as a dataframe and then save it to Iceberg.
There’s a lot going on in just 3 lines of code, so let’s set the stage with a short synopsis of how Spark works with data. Spark is a unified analytics engine for large-scale data processing with a set of high-level APIs in Java, Scala, Python and R. Spark can use legacy libraries for HDFS and YARN, or run using object storage such as MinIO via the S3 API. Spark-SQL is a Spark module for structured data processing. It uses the same execution engine as Spark, and the combination of Spark and Spark-SQL make a powerful toolkit for analytics.
A dataframe is a dataset (a collection of data) that is organized into columns. Dataframes can be thought of as tables in relational databases, with some built-in optimizations for the Spark engine. Dataframes can be built from a wide variety of sources and we’re going to build a dataframe from a .CSV, organizing our retail data into columns and rows.
Spark will read in the .CSV and organize it into a dataframe as a temporary view. Spark creates temporary views from SELECT statements. These views are session-scoped and read-only. Then we’ll write the temporary view as an Iceberg table, allowing Spark to automatically create a schema for us.
val df spark.read.csv("online-retail.csv") df.createOrReplaceTempView("tempview"); spark.sql("CREATE or REPLACE TABLE retail USING iceberg AS SELECT * FROM tempview");
It’s not a very big dataset so reading in the .CSV and writing out the Iceberg Parquet takes little time.
A quick query in SparkSQL verifies data made it into the Iceberg table
spark-sql> SELECT * from local.retail limit 10; 22/08/24 18:08:56 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException InvoiceNo StockCode Description Quantity InvoiceDate UnitPrice CustomerID Country 536365 85123A WHITE HANGING HEART T-LIGHT HOLDER 6 12/1/2010 8:26 2.55 17850 United Kingdom 536365 71053 WHITE METAL LANTERN 6 12/1/2010 8:26 3.39 17850 United Kingdom 536365 84406B CREAM CUPID HEARTS COAT HANGER 8 12/1/2010 8:26 2.75 17850 United Kingdom 536365 84029G KNITTED UNION FLAG HOT WATER BOTTLE 6 12/1/2010 8:26 3.39 17850 United Kingdom 536365 84029E RED WOOLLY HOTTIE WHITE HEART. 6 12/1/2010 8:26 3.39 17850 United Kingdom 536365 22752 SET 7 BABUSHKA NESTING BOXES 2 12/1/2010 8:26 7.65 17850 United Kingdom 536365 21730 GLASS STAR FROSTED T-LIGHT HOLDER 6 12/1/2010 8:26 4.25 17850 United Kingdom 536366 22633 HAND WARMER UNION JACK 6 12/1/2010 8:28 1.85 17850 United Kingdom 536366 22632 HAND WARMER RED POLKA DOT 6 12/1/2010 8:28 1.85 17850 United Kingdom Time taken: 7.944 seconds, Fetched 10 row(s)
In MinIO, we can also see the data files have been saved to the
Update a record in the Iceberg table
We’re going to update the table using a SELECT statement followed by an UPDATE statement. Updates and deletes are common operations in OLTP databases such as the online-retail dataset, and the Iceberg table format brings this capability to data lakes. Iceberg is able to perform row level updates and maintain data consistency.
We’ll update the
retail table to change the quantity purchased for a particular invoice. It’s easy to imagine this change taking place in the real world, for example the client could return to the site and buy one more item to be included in the original order. We’re going to add a unit to the
retail._c0) 559340 for
retail._c1) 22413 (METAL SIGN TAKE IT OR LEAVE IT).
First, query for the invoice and item
SELECT retail.c0,retail.c1,retail.c3 FROM retail where retail._c0='559340' and retail._c1='22413';
Then update using an UPDATE statement
UPDATE retail SET retail._c3='7' WHERE retail._c0='559340' and retail._c1='22413';
Finally, verify that the UPDATE took place
SELECT retail._c0, retail._c1, retail._c2, retail._c3 FROM retail WHERE retail._c0='559340' and retail._c1='22413';
You should see the following output, with
retail._c3) changed to
559340 22413 METAL SIGN TAKE IT OR LEAVE IT 7 Time taken: 0.464 seconds, Fetched 1 row(s)
Delete records from the Iceberg table
ACID properties make delete operations possible in an Iceberg data lake.
Imagine that a customer has requested that their user data be deleted from our retail data lake. In order to comply with GDPR, we must promptly locate and delete this data. A request has come in to delete all records related to customer
First, let’s count the number of records related to this customer
SELECT count(*) FROM retail WHERE retail._c6=’13269’; 320 Time taken: 0.242 seconds, Fetched 1 row(s)
We’re going to delete those 320 records from the table
DELETE FROM retail WHERE retail._c6='13269';
Verify that the records were removed. The following query should report 0 records:
SELECT count(*) FROM retail WHERE retail._c6='13269';
Evolve the schema of an Iceberg table
We let Iceberg automatically create the table schema when we initially saved the Spark dataframe as an Iceberg table, resulting in some not-so-straightforward column names,
_c7. We’re going to change these to be human readable, for example
_c0 will be renamed to
Iceberg schema updates only change metadata. Columns can be added, dropped, renamed, updated and reordered without requiring a full rewrite of the table, a costly proposition.
Let’s take a look at the current schema
SHOW CREATE TABLE retail; CREATE TABLE iceberg.retail ( _c0 STRING, _c1 STRING, _c2 STRING, _c3 STRING, _c4 STRING, _c5 STRING, _c6 STRING, _c7 STRING) USING iceberg LOCATION 'S3://iceberg/retail' TBLPROPERTIES ( 'current-snapshot-id' = '6565073876818863127', 'format' = 'iceberg/parquet', 'format-version' = '1') Time taken: 0.082 seconds, Fetched 1 row(s)
Let’s rename columns to be human readable. As you execute each command, notice how fast Iceberg executes the metadata change – we’re talking about milliseconds to change metadata vs. about a minute to write the entire table.
ALTER TABLE retail RENAME COLUMN _c0 TO InvoiceNo; ALTER TABLE retail RENAME COLUMN _c1 TO StockCode; ALTER TABLE retail RENAME COLUMN _c2 TO Description; ALTER TABLE retail RENAME COLUMN _c3 TO Quantity; ALTER TABLE retail RENAME COLUMN _c4 TO InvoiceDate; ALTER TABLE retail RENAME COLUMN _c5 TO UnitPrice; ALTER TABLE retail RENAME COLUMN _c6 TO CustomerID; ALTER TABLE retail RENAME COLUMN _c7 TO Country;
Let’s verify that the table schema has been changed
SHOW CREATE TABLE retail; CREATE TABLE iceberg.retail ( InvoiceNo STRING, StockCode STRING, Description STRING, Quantity STRING, InvoiceDate STRING, UnitPrice STRING, CustomerID STRING, Country STRING) USING iceberg LOCATION 'S3://iceberg/retail' TBLPROPERTIES ( 'current-snapshot-id' = '6565073876818863127', 'format' = 'iceberg/parquet', 'format-version' = '1') Time taken: 0.034 seconds, Fetched 1 row(s)
MinIO and Iceberg for multicloud data lakes
Iceberg and MinIO are powerful technologies for building enterprise lakehouses. Both are performant, highly-scalable and reliable open source components with multitudes of users running analytics workloads across a wide variety of hardware, software and cloud instances.
Lakehouses built on Iceberg, Delta and HUDI open table formats are propelling data lake analytics to the next level. There’s no limit to what you can build with these open table formats, MinIO and the analytics or ML package of your choice. It’s all open and MinIO is the S3-API compatible layer that ties it all together - and extends the data lake across the multicloud, from edge to datacenter to public/private cloud.