ACID Transactions with Iceberg on AIStor

AJ AJ on Apache Iceberg |
ACID Transactions with Iceberg on AIStor

The Apache Iceberg data lake storage format enables ACID transactions on tables saved to AIStor. ACID transactions enable multiple users and services to concurrently and reliably add and remove records atomically. At the same time, queries are isolated to maintain read consistency against tables that are in the process of being altered. You can put AIStor and Iceberg, in conjunction with a PostgreSQL as a metadata database, to work using ACID transaction support for write, delete, update, time travel and schema modification.

The great news about all of this is that you can continue using the SQL and DML that you already know and love. For example, you can UPDATE a row with new information or DELETE a row to remove it. Even time travel is a SELECT statement.

Pairing the Iceberg table format with AIStor creates a powerful, flexible and extensible lakehouse platform. The Iceberg Table Spec declares a table format that is designed to manage “a large, slow-changing collection” of files or objects stored in a distributed system. Version 1 of the Iceberg spec defines the management of large analytic tables using the immutable file formats, Parquet, Avro, and ORC. Version 2 of the spec added row-level updates and deletes for analytic tables with immutable files.

The Iceberg table format tracks individual data files in a table instead of directories. Data files are created in-place and files are only added to a table explicitly. Table state is maintained in metadata. Iceberg metadata and manifest lists are merely objects stored in AIStor, which also maintains its own metadata that is stored with objects. Every change to table state requires that a new Iceberg metadata file be created that replaces the old metadata with an atomic swap. Atomicity is enabled by AIStor.

The Iceberg table format requires:

  • In-place write: files/objects are not moved or altered once they are written
  • Seekable reads: data file formats require seek support
  • Deletes: tables delete files/objects that are no longer needed (or, in the case of AIStor, objects are marked as deleted, but retained)

Such requirements are compatible with object stores such as AIStor. Once written, data and metadata files are immutable until they are deleted. AIStor continues to save the outdated versions of data and metadata objects, making sure that data is never deleted and extending Iceberg’s capabilities for time travel.

In this blog post, we’re going to delve into ACID transactions with Iceberg to create an Iceberg table, update and delete records in it, and evolve its schema. This post is a follow-up to The Definitive Guide to Lakehouse Architecture with Iceberg and MinIO which included an explanation of lakehouse architecture, a deep discussion of Spark, Iceberg, PostgreSQL and MinIO work together, and a tutorial that taught readers how to install them, create a table, evolve table schema, and how to use time travel and rollback.

ACID transactions with Iceberg and MinIO tutorial

I am using the Online Retail Data Set hosted at the UCI Machine Learning Repository. This is a transactional data set which contains all the transactions occurring between January 12, 2010 and September 12, 2011 for a UK-based and registered non-store online retail operation. The data set contains about 550,000 records with eight attributes:

  • InvoiceNo: Invoice number. Nominal, a 6-digit integral number uniquely assigned to each transaction. If this code starts with letter 'c', it indicates a cancellation.
  • StockCode: Product (item) code. Nominal, a 5-digit integral number uniquely assigned to each distinct product.
  • Description: Product (item) name. Nominal.
  • Quantity: The quantities of each product (item) per transaction. Numeric.
  • InvoiceDate: Invoice Date and time. Numeric, the day and time when each transaction was generated.
  • UnitPrice: Unit price. Numeric, Product price per unit in sterling.
  • CustomerID: Customer number. Nominal, a 5-digit integral number uniquely assigned to each customer.
  • Country: Country name. Nominal, the name of the country where each customer resides.

The file is in Microsoft Excel format, so the first step is to convert it to a .CSV so it can be read into Spark as a dataframe and then saved to Iceberg. After converting the file, I’ve saved it to a MinIO bucket where I store raw data.

I’m using the Iceberg installation from the earlier tutorial, The Definitive Guide to Lakehouse Architecture with Iceberg and MinIO. If you haven’t read that blog post, please refer to it or have your own Iceberg and MinIO environment set up.

We’re going to use Spark and Spark-SQL to work with Iceberg.

Read in data and save as an Iceberg table

The first task we’ll accomplish is to read our .CSV into Spark as a dataframe and then save it to Iceberg.

There’s a lot going on in just 3 lines of code, so let’s set the stage with a short synopsis of how Spark works with data. Spark is a unified analytics engine for large-scale data processing with a set of high-level APIs in Java, Scala, Python and R. Spark can use legacy libraries for HDFS and YARN, or run using object storage such as MinIO via the S3 API. Spark-SQL is a Spark module for structured data processing. It uses the same execution engine as Spark, and the combination of Spark and Spark-SQL make a powerful toolkit for analytics.

A dataframe is a dataset (a collection of data) that is organized into columns. Dataframes can be thought of as tables in relational databases, with some built-in optimizations for the Spark engine. Dataframes can be built from a wide variety of sources and we’re going to build a dataframe from a .CSV, organizing our retail data into columns and rows.

Spark will read in the .CSV and organize it into a dataframe as a temporary view. Spark creates temporary views from SELECT statements. These views are session-scoped and read-only. Then we’ll write the temporary view as an Iceberg table, allowing Spark to automatically create a schema for us.    

val df spark.read.csv("online-retail.csv")

df.createOrReplaceTempView("tempview");

spark.sql("CREATE or REPLACE TABLE retail USING iceberg AS SELECT * FROM tempview");

It’s not a very big dataset so reading in the .CSV and writing out the Iceberg Parquet takes little time.  

A quick query in SparkSQL verifies data made it into the Iceberg table

spark-sql> SELECT * from local.retail limit 10;

22/08/24 18:08:56 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

InvoiceNo       StockCode       Description     Quantity        InvoiceDate     UnitPrice       CustomerID      Country

536365  85123A  WHITE HANGING HEART T-LIGHT HOLDER      6       12/1/2010 8:26  2.55    17850   United Kingdom

536365  71053   WHITE METAL LANTERN     6       12/1/2010 8:26  3.39    17850   United Kingdom

536365  84406B  CREAM CUPID HEARTS COAT HANGER  8       12/1/2010 8:26  2.75    17850   United Kingdom

536365  84029G  KNITTED UNION FLAG HOT WATER BOTTLE     6       12/1/2010 8:26  3.39    17850   United Kingdom

536365  84029E  RED WOOLLY HOTTIE WHITE HEART.  6       12/1/2010 8:26  3.39    17850   United Kingdom

536365  22752   SET 7 BABUSHKA NESTING BOXES    2       12/1/2010 8:26  7.65    17850   United Kingdom

536365  21730   GLASS STAR FROSTED T-LIGHT HOLDER       6       12/1/2010 8:26  4.25    17850   United Kingdom

536366  22633   HAND WARMER UNION JACK  6       12/1/2010 8:28  1.85    17850   United Kingdom

536366  22632   HAND WARMER RED POLKA DOT       6       12/1/2010 8:28  1.85    17850   United Kingdom

Time taken: 7.944 seconds, Fetched 10 row(s)

In MinIO, we can also see the data files have been saved to the iceberg bucket

Update a record in the Iceberg table

We’re going to update the table using a SELECT statement followed by an UPDATE statement. Updates and deletes are common operations in OLTP databases such as the online-retail dataset, and the Iceberg table format brings this capability to data lakes. Iceberg is able to perform row level updates and maintain data consistency.

We’ll update the retail table to change the quantity purchased for a particular invoice. It’s easy to imagine this change taking place in the real world, for example the client could return to the site and buy one more item to be included in the original order. We’re going to add a unit to the quantity (retail._c3 )for invoiceno (retail._c0) 559340 for stockcode (retail._c1) 22413 (METAL SIGN TAKE IT OR LEAVE IT).

First, query for the invoice and item

SELECT retail.c0,retail.c1,retail.c3 FROM retail where retail._c0='559340' and retail._c1='22413';

Then update using an UPDATE statement

UPDATE retail SET retail._c3='7' WHERE retail._c0='559340' and retail._c1='22413';

Finally, verify that the UPDATE took place

SELECT retail._c0, retail._c1, retail._c2, retail._c3 FROM retail WHERE retail._c0='559340' and retail._c1='22413';

You should see the following output, with quantity (retail._c3) changed to 7

559340  22413   METAL SIGN TAKE IT OR LEAVE IT  7

Time taken: 0.464 seconds, Fetched 1 row(s)

Delete records from the Iceberg table

ACID properties make delete operations possible in an Iceberg data lake.  

Imagine that a customer has requested that their user data be deleted from our retail data lake. In order to comply with GDPR, we must promptly locate and delete this data. A request has come in to delete all records related to customer 13269.

First, let’s count the number of records related to this customer

SELECT count(*) FROM retail WHERE retail._c6=’13269’; 

320

Time taken: 0.242 seconds, Fetched 1 row(s)

We’re going to delete those 320 records from the table

DELETE FROM retail WHERE retail._c6='13269';

Verify that the records were removed. The following query should report 0 records:

SELECT count(*) FROM retail WHERE retail._c6='13269';

Evolve the schema of an Iceberg table

We let Iceberg automatically create the table schema when we initially saved the Spark dataframe as an Iceberg table, resulting in some not-so-straightforward column names, _c0 through _c7. We’re going to change these to be human readable, for example _c0 will be renamed to InvoiceNo.

Iceberg schema updates only change metadata. Columns can be added, dropped, renamed, updated and reordered without requiring a full rewrite of the table, a costly proposition.

Let’s take a look at the current schema

SHOW CREATE TABLE retail;

CREATE TABLE iceberg.retail (

_c0 STRING,

_c1 STRING,

_c2 STRING,

_c3 STRING,

_c4 STRING,

_c5 STRING,

_c6 STRING,

_c7 STRING)

USING iceberg

LOCATION 'S3://iceberg/retail'

TBLPROPERTIES (

'current-snapshot-id' = '6565073876818863127',

'format' = 'iceberg/parquet',

'format-version' = '1')

Time taken: 0.082 seconds, Fetched 1 row(s)

Let’s rename columns to be human readable. As you execute each command, notice how fast Iceberg executes the metadata change – we’re talking about milliseconds to change metadata vs. about a minute to write the entire table.

```ALTER TABLE retail RENAME COLUMN _c1 TO StockCode;

ALTER TABLE retail RENAME COLUMN _c2 TO Description;

ALTER TABLE retail RENAME COLUMN _c3 TO Quantity;

ALTER TABLE retail RENAME COLUMN _c4 TO InvoiceDate;

ALTER TABLE retail RENAME COLUMN _c5 TO UnitPrice;

ALTER TABLE retail RENAME COLUMN _c6 TO CustomerID;

ALTER TABLE retail RENAME COLUMN _c7 TO Country;

Let’s verify that the table schema has been changed

SHOW CREATE TABLE retail;

CREATE TABLE iceberg.retail (

InvoiceNo STRING,

StockCode STRING,

Description STRING,

Quantity STRING,

InvoiceDate STRING,

UnitPrice STRING,

CustomerID STRING,

Country STRING)

USING iceberg

LOCATION 'S3://iceberg/retail'

TBLPROPERTIES (

'current-snapshot-id' = '6565073876818863127',

'format' = 'iceberg/parquet',

'format-version' = '1')

Time taken: 0.034 seconds, Fetched 1 row(s)

MinIO and Iceberg for multicloud data lakes

Iceberg and MinIO are powerful technologies for building enterprise lakehouses. Both are performant, highly-scalable and reliable open source components with multitudes of users running analytics workloads across a wide variety of hardware, software and cloud instances.  

Lakehouses built on Iceberg, Delta and HUDI open table formats are propelling data lake analytics to the next level. There’s no limit to what you can build with these open table formats, MinIO and the analytics or ML package of your choice. It’s all open and MinIO is the S3-API compatible layer that ties it all together - and extends the data lake across the multicloud, from edge to datacenter to public/private cloud.

Download AIStor and put the world’s fastest object storage to work in your data lake. Share your experiences or ask questions through our Slack channel. We would love to hear what you’re building!