Spark Structured Streaming With Kafka and MinIO

Spark Structured Streaming With Kafka and MinIO

Apache Kafka is the leading open-source distributed event-streaming platform, and it is used to build data pipelines, streaming analytics, data integration and applications. Enterprises love Kafka because of its high availability, high throughput and scalability. They also love Apache Spark to process data and build analytics because it is fast, distributed and fault-tolerant. Spark has evolved over the years, adding functionality like Spark SQL, a module for structured data processing with relational queries. Spark Structured Streaming is built on the Spark SQL API for data stream processing. Combining Kafka with Spark Structured Streaming allows developers to express streaming computations the same as they would write a batch computation on static data.

Kafka and Spark Structured Streaming are used together to build data lakes/lake houses fed by streaming data and provide real time business insights. Feeding the data lake is only part of the equation – to make the most of a data lake the underlying object storage must be highly available, performant, scalable and API-driven.

MinIO makes a great home for streaming data lakes. Industry-leading S3 API compatibility gives developers confidence that they can use their own custom software, cloud-native analytics or AI/ML without a problem. MinIO takes complete advantage of underlying hardware (see Selecting the Best Hardware for Your MinIO Deployment) to deliver the greatest possible performance – we’ve benchmarked it at 325 GiB/s (349 GB/s) on GETs and 165 GiB/s (177 GB/s) on PUTs with just 32 nodes of off-the-shelf NVMe SSDs.

Spark Streaming

Apache Spark Streaming is a powerful and scalable stream processing framework that is part of the larger Apache Spark ecosystem.  It provides a high-level and expressive API for processing data streams from various sources such as Kafka, Flume, Kinesis, or custom sources, and enables a wide range of use cases including real-time analytics, machine learning, fraud detection, and more. Spark Streaming follows a micro-batch processing model, where incoming data is divided into small time-based batches, processed in parallel, and the results are aggregated to generate the final output. Spark Streaming provides strong guarantees of fault-tolerance, reliability, and exactly-once processing semantics, making it a popular choice for building scalable and robust stream processing applications.

Spark Structured Streaming

Spark Structured Streaming is a newer addition to the Apache Spark ecosystem that provides a more advanced and unified stream processing API based on the concept of "continuous processing". Spark Structured Streaming extends the familiar DataFrame and Dataset API, which is used for batch processing in Spark, to seamlessly support processing of streaming data as well. It provides a higher-level abstraction for processing data streams, allowing developers to write stream processing code that is similar to batch processing code, making it more intuitive and user-friendly. Spark Structured Streaming offers advanced features such as built-in support for fault-tolerance, event time processing, and state management, making it a powerful and convenient choice for building scalable, reliable, and complex stream processing applications. Spark Structured Streaming also provides tight integration with the larger Apache Spark ecosystem, enabling seamless integration with other Spark modules for end-to-end data processing pipelines that span batch and streaming data.

Structured Streaming is the way to go for modern stream processing needs because

  • It's a true streaming model with continuous processing, as opposed to the micro-batch model of Spark Streaming.
  • It has a much richer API and set of stream processing features by leveraging Spark SQL.
  • It has stronger fault tolerance and consistency guarantees. Data loss is prevented through checkpointing and recovery.
  • It supports event-time based processing and reasoning about data across time.
  • The API is higher-level and easier to work with versus the lower-level DStreams API.

Structured Streaming is the future of stream processing with Spark and where continued investment and improvements are being made. So for any new streaming application, it is highly recommended to start with Structured Streaming.

In this blog post, we will explore how to process events streamed from Kafka with Spark Structured Streaming. We will also explore the MinIO checkpointer and show the significant performance improvements it has to offer. We will also explore how to save event data from Kafka directly as Iceberg Table into MinIO.

If you haven't set up Kafka yet take a look at this blog post. If you haven't already set up the Kafka topic nyc-avro-topic and producing events using avro-producer, please see Making the Most of Streaming with Kafka Schema Registry and MinIO.

PySpark Structured Streaming Application

Below we will write a simple PySpark application that will continuously stream events for the topic nyc-avro-topic from Kafka and process each record and save it as Parquet files into MinIO.

Note: We assume that Kafka, Kafka Schema Registry, the Kafka topic nyc-avro-topic and the Avro producer are up and running. You can refer to Making the Most of Streaming with Kafka Schema Registry and MinIO for detailed instructions.

You can find sample code here.

In the above code:

load_config has all the Hadoop configurations required for Spark to connect with MinIO to read and write data.

value_schema_dict has the Avro schema that Spark will use to deserialize data from Kafka.

Configuring the Spark Stream

Let’s dig deeper into how we configure the Spark stream:

stream_df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092") \
.option("subscribe", "nyc-avro-topic") \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("mode", "PERMISSIVE") \
.option("truncate", False) \
.option("newRows", 100000) \
.load()


In the above snippet, we specify the Spark Stream format as kafka and some of the key options we included are:

kafka.bootstrap.servers - Kafka server endpoint from which to consume the events

subscribe - list of comma separated strings pointing to the topics which the Spark stream will process. In our case, it is nyc-avro-topic.

startingOffsets - specifies from when Spark Stream should start processing the events for the subscribed topics. In our case, it will be earliest, which is beginning of time for the subscribed topics

Preprocessing Avro Data Stream

# special pre-processing to ignore first 6 bits for Confluent Avro data stream topics
taxi_df = stream_df.selectExpr("substring(value, 6) as avro_value").select(
    from_avro("avro_value", json.dumps(value_schema_dict)).alias("data")).select("data.*")

In our case, since we are using the Confluent Avro data stream, before we start consuming the data in Spark we need to preprocess the byte stream. The first 6 bits contain the Avro schema metadata which is used by the confluent API in Kafka by consumers/connectors. In our case, we can skip these bits for Spark streaming

Write Parquet data to MinIO

taxi_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "s3a://warehouse/k8/spark-stream/") \
    .option("checkpointLocation", "s3a://warehouse/k8/checkpoint") \
    .start() \
    .awaitTermination()

We open a writestream in Spark that writes the preprocessed dataframe taxi_df to parquet format into MinIO in the path specified in the options, we also give the checkpointLocation which is a MinIO bucket where Spark will continuously create checkpoints. In case of job failure, Spark will continue from where it left off based on the checkpoints.

%%writefile sample-code/Dockerfile
FROM openlake/spark-py:3.3.2

USER root

WORKDIR /app

RUN pip3 install pyspark==3.3.2

# Add avro dependency
ADD https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar $SPARK_HOME/jars
ADD https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.3.2/spark-avro_2.12-3.3.2.jar $SPARK_HOME/jars

COPY src/*.py .

Build your own image or use the openlake/sparkjob-demo:3.3.2 from docker hub which has the above code.

You can find example code here.

Apply it

!kubectl apply -f sample-code/spark-job/sparkjob-streaming.yaml


Note: Since our Kafka Consumer approximately runs for 3hrs to stream all 112M rows, Spark structured streaming will also take close to the same time when both the producer and consumer are started at the same time.

Performance Benchmarks

We measured the number of S3 API calls made by the Spark structured streaming consumer alone and recorded the results below

API                             RX      TX      CALLS   ERRORS
s3.CopyObject                   6.7 MiB 4.5 MiB 20220   0
s3.DeleteMultipleObjects        11 MiB  3.0 MiB 26938   0
s3.DeleteObject                 9.9 MiB 0 B     39922   0
s3.GetObject                    1.8 MiB 496 MiB 6736    0
s3.HeadObject                   84 MiB  0 B     336680  0
s3.ListObjectsV2                60 MiB  1.6 GiB 241903  0
s3.PutObject                    2.3 GiB 0 B     26975   0

Summary:

Total: 699374 CALLS, 2.5 GiB RX, 2.1 GiB TX - in 11999.80s

As you can see from the above table, we make approximately 700K calls to the MinIO endpoint. There is a simple change that we can make to the consumer code to optimize this. If we added a 1 min delay trigger to the consumer code, instead of continuously polling for new events from Kafka we can significantly reduce the total number of API calls. Here is the optimized code.

taxi_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .trigger(processingTime='1 minute') \
    .option("path", "s3a://warehouse-v/k8/spark-stream/") \
    .option("checkpointLocation", "s3a://warehouse-v/k8/checkpoint") \
    .start() \
    .awaitTermination()

The key thing to note here is .trigger(processingTime='1 minute') which will add a 1 min before polling Kafka events each time. Here is the number with the optimized code

API                             RX      TX      CALLS   ERRORS
s3.CopyObject                   207 KiB 139 KiB 614     0       
s3.DeleteMultipleObjects        335 KiB 92 KiB  812     0       
s3.DeleteObject                 235 KiB 0 B     921     0       
s3.GetObject                    54 KiB  469 KiB 199     0       
s3.HeadObject                   2.5 MiB 0 B     9867    0       
s3.ListObjectsV2                1.7 MiB 12 MiB  6910    0       
s3.PutObject                    2.0 GiB 0 B     814     0       

Summary:

Total: 20137 CALLS, 2.0 GiB RX, 13 MiB TX - in 12126.59s

As you can see from the above table we went from ~700K API calls to ~20k API calls. Just by adding a simple 1 line code change we are able to make big changes to the number of S3 API calls.

MinIO Checkpoint Manager

The above optimization is a huge improvement. If we run the same set of code in a versioned bucket and after all the rows are stored into MinIO if we perform a mc ls --versions --recursive opl/warehouse-v/k8 --summarize you will still notice objects with v1 and v2 where the v2 are delete markers for the objects that should have been deleted. As consumers continue to add records, delete marker objects build up and waste storage space, and this can create a problem over time.

Enter MinIO Checkpoint Manager, io.minio.spark.checkpoint.S3BasedCheckpointFileManager , which takes advantage of MinIO's strictly consistent atomic transactions. MinIO's Checkpoint Manager takes full advantage of the native object APIs and avoids the unnecessary baggage from a POSIX-based implementation.

You can easily use the checkpoint manager in your code by adding this 1 line to your Spark config

SparkConf().set("spark.sql.streaming.checkpointFileManagerClass", "io.minio.spark.checkpoint.S3BasedCheckpointFileManager")

Here is the sample code that you can run to see the results

Build your own image or use the openlake/sparkjob-demo:3.3.2 from Docker Hub, which has the above code. You can find code here.

Deploy the optimized consumer with the new MinIO's checkpoint manager as shown below

!kubectl apply -f sample-code/spark-job/sparkjob-streaming-optimized.yaml

Performance Benchmarks

We measured the number of S3 API calls made by the Spark structured streaming consumer with the optimized checkpoint manager and recorded the following results

API                             RX      TX      CALLS   ERRORS
s3.DeleteMultipleObjects        5.8 MiB 1.7 MiB 15801   0       
s3.DeleteObject                 12 MiB  0 B     46465   0       
s3.GetObject                    4.0 MiB 2.7 GiB 15802   0       
s3.HeadObject                   43 MiB  0 B     172825  0       
s3.ListObjectsV1                7.8 MiB 7.8 GiB 31402   0       
s3.ListObjectsV2                3.9 MiB 5.2 MiB 15782   0       
s3.PutObject                    4.7 GiB 0 B     63204   0       

Summary:

Total: 361281 CALLS, 4.8 GiB RX, 10 GiB TX - in 12160.25s

We can already see that we went from ~700K API calls to ~361K API calls, on top of this if we add the 1 min delay before each polling we will see further improvements

API                             RX      TX      CALLS   ERRORS
s3.DeleteMultipleObjects        75 KiB  23 KiB  200     0       
s3.DeleteObject                 100 KiB 0 B     394     0       
s3.GetObject                    52 KiB  469 KiB 199     0       
s3.HeadObject                   508 KiB 0 B     1995    0       
s3.ListBuckets                  150 B   254 B   1       0       
s3.ListObjectsV1                75 KiB  2.8 MiB 293     0       
s3.ListObjectsV2                51 KiB  67 KiB  200     0       
s3.PutObject                    2.0 GiB 0 B     803     0       

Summary:

Total: 4085 CALLS, 2.0 GiB RX, 3.3 MiB TX - in 11945.35s

From ~20K API calls earlier we are now down to ~4K API calls. Another major improvement that we will notice on the versioned bucket is that no v2 delete marker objects are present and we only have v1 objects.

Spark Structured Streaming and Data Lakes

In this blog post we saw  how to use Spark structured streaming to consume events from Kafka, and we also dug into some of the optimizations that can be done to reduce the number of S3 API calls. We also saw the benefits of using MinIO checkpoint manager and how the implementation avoids all the POSIX baggage and utilizes object storage’s native strict consistency. In the next blog post, we will see how to have the end-to-end Kafka producer and consumer implemented in Spark structured streaming and how that can speed up the entire flow.

Kafka, Spark and MinIO are frequently combined to build data lakes and analytics. All software-defined, they provide a portable multi-cloud home for streaming data, enabling you to feed analytics and AI/ML applications wherever they are.

Download MinIO today and start building your cloud-native streaming data lake.