Spark Structured Streaming With Kafka and MinIO
Apache Kafka is the leading open-source distributed event-streaming platform, and it is used to build data pipelines, streaming analytics, data integration and applications. Enterprises love Kafka because of its high availability, high throughput and scalability. They also love Apache Spark to process data and build analytics because it is fast, distributed and fault-tolerant. Spark has evolved over the years, adding functionality like Spark SQL, a module for structured data processing with relational queries. Spark Structured Streaming is built on the Spark SQL API for data stream processing. Combining Kafka with Spark Structured Streaming allows developers to express streaming computations the same as they would write a batch computation on static data.
Kafka and Spark Structured Streaming are used together to build data lakes/lake houses fed by streaming data and provide real time business insights. Feeding the data lake is only part of the equation – to make the most of a data lake the underlying object storage must be highly available, performant, scalable and API-driven.
MinIO makes a great home for streaming data lakes. Industry-leading S3 API compatibility gives developers confidence that they can use their own custom software, cloud-native analytics or AI/ML without a problem. MinIO takes complete advantage of underlying hardware (see Selecting the Best Hardware for Your MinIO Deployment) to deliver the greatest possible performance – we’ve benchmarked it at 325 GiB/s (349 GB/s) on GETs and 165 GiB/s (177 GB/s) on PUTs with just 32 nodes of off-the-shelf NVMe SSDs.
Spark Streaming
Apache Spark Streaming is a powerful and scalable stream processing framework that is part of the larger Apache Spark ecosystem. It provides a high-level and expressive API for processing data streams from various sources such as Kafka, Flume, Kinesis, or custom sources, and enables a wide range of use cases including real-time analytics, machine learning, fraud detection, and more. Spark Streaming follows a micro-batch processing model, where incoming data is divided into small time-based batches, processed in parallel, and the results are aggregated to generate the final output. Spark Streaming provides strong guarantees of fault-tolerance, reliability, and exactly-once processing semantics, making it a popular choice for building scalable and robust stream processing applications.
Spark Structured Streaming
Spark Structured Streaming is a newer addition to the Apache Spark ecosystem that provides a more advanced and unified stream processing API based on the concept of "continuous processing". Spark Structured Streaming extends the familiar DataFrame and Dataset API, which is used for batch processing in Spark, to seamlessly support processing of streaming data as well. It provides a higher-level abstraction for processing data streams, allowing developers to write stream processing code that is similar to batch processing code, making it more intuitive and user-friendly. Spark Structured Streaming offers advanced features such as built-in support for fault-tolerance, event time processing, and state management, making it a powerful and convenient choice for building scalable, reliable, and complex stream processing applications. Spark Structured Streaming also provides tight integration with the larger Apache Spark ecosystem, enabling seamless integration with other Spark modules for end-to-end data processing pipelines that span batch and streaming data.
Structured Streaming is the way to go for modern stream processing needs because
- It's a true streaming model with continuous processing, as opposed to the micro-batch model of Spark Streaming.
- It has a much richer API and set of stream processing features by leveraging Spark SQL.
- It has stronger fault tolerance and consistency guarantees. Data loss is prevented through checkpointing and recovery.
- It supports event-time based processing and reasoning about data across time.
- The API is higher-level and easier to work with versus the lower-level DStreams API.
Structured Streaming is the future of stream processing with Spark and where continued investment and improvements are being made. So for any new streaming application, it is highly recommended to start with Structured Streaming.
In this blog post, we will explore how to process events streamed from Kafka with Spark Structured Streaming. We will also explore the MinIO checkpointer and show the significant performance improvements it has to offer. We will also explore how to save event data from Kafka directly as Iceberg Table into MinIO.
If you haven't set up Kafka yet take a look at this blog post. If you haven't already set up the Kafka topic nyc-avro-topic
and producing events using avro-producer
, please see Making the Most of Streaming with Kafka Schema Registry and MinIO.
PySpark Structured Streaming Application
Below we will write a simple PySpark application that will continuously stream events for the topic nyc-avro-topic
from Kafka and process each record and save it as Parquet
files into MinIO.
Note: We assume that Kafka, Kafka Schema Registry, the Kafka topic nyc-avro-topic
and the Avro producer are up and running. You can refer to Making the Most of Streaming with Kafka Schema Registry and MinIO for detailed instructions.
You can find sample code here.
In the above code:
load_config
has all the Hadoop configurations required for Spark to connect with MinIO to read and write data.
value_schema_dict
has the Avro schema that Spark will use to deserialize data from Kafka.
Configuring the Spark Stream
Let’s dig deeper into how we configure the Spark stream:
stream_df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092") \
.option("subscribe", "nyc-avro-topic") \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.option("mode", "PERMISSIVE") \
.option("truncate", False) \
.option("newRows", 100000) \
.load()
In the above snippet, we specify the Spark Stream format as kafka
and some of the key options we included are:
kafka.bootstrap.servers
- Kafka server endpoint from which to consume the events
subscribe
- list of comma separated strings pointing to the topics which the Spark stream will process. In our case, it is nyc-avro-topic
.
startingOffsets
- specifies from when Spark Stream should start processing the events for the subscribed topics. In our case, it will be earliest
, which is beginning of time for the subscribed topics
Preprocessing Avro Data Stream
# special pre-processing to ignore first 6 bits for Confluent Avro data stream topics |
In our case, since we are using the Confluent Avro data stream, before we start consuming the data in Spark we need to preprocess the byte stream. The first 6 bits contain the Avro schema metadata which is used by the confluent API in Kafka by consumers/connectors. In our case, we can skip these bits for Spark streaming
Write Parquet data to MinIO
taxi_df.writeStream \ |
We open a writestream in Spark that writes the preprocessed dataframe taxi_df
to parquet
format into MinIO in the path
specified in the options, we also give the checkpointLocation
which is a MinIO bucket where Spark will continuously create checkpoints. In case of job failure, Spark will continue from where it left off based on the checkpoints.
%%writefile sample-code/Dockerfile |
Build your own image or use the openlake/sparkjob-demo:3.3.2
from docker hub which has the above code.
You can find example code here.
Apply it
!kubectl apply -f sample-code/spark-job/sparkjob-streaming.yaml |
Note: Since our Kafka Consumer approximately runs for 3hrs to stream all 112M
rows, Spark structured streaming will also take close to the same time when both the producer and consumer are started at the same time.
Performance Benchmarks
We measured the number of S3 API calls made by the Spark structured streaming consumer alone and recorded the results below
API RX TX CALLS ERRORS
s3.CopyObject 6.7 MiB 4.5 MiB 20220 0
s3.DeleteMultipleObjects 11 MiB 3.0 MiB 26938 0
s3.DeleteObject 9.9 MiB 0 B 39922 0
s3.GetObject 1.8 MiB 496 MiB 6736 0
s3.HeadObject 84 MiB 0 B 336680 0
s3.ListObjectsV2 60 MiB 1.6 GiB 241903 0
s3.PutObject 2.3 GiB 0 B 26975 0
Summary:
Total: 699374 CALLS, 2.5 GiB RX, 2.1 GiB TX - in 11999.80s
As you can see from the above table, we make approximately 700K calls to the MinIO endpoint. There is a simple change that we can make to the consumer code to optimize this. If we added a 1 min
delay trigger to the consumer code, instead of continuously polling for new events from Kafka we can significantly reduce the total number of API calls. Here is the optimized code.
taxi_df.writeStream \ |
The key thing to note here is .trigger(processingTime='1 minute')
which will add a 1 min
before polling Kafka events each time. Here is the number with the optimized code
API RX TX CALLS ERRORS
s3.CopyObject 207 KiB 139 KiB 614 0
s3.DeleteMultipleObjects 335 KiB 92 KiB 812 0
s3.DeleteObject 235 KiB 0 B 921 0
s3.GetObject 54 KiB 469 KiB 199 0
s3.HeadObject 2.5 MiB 0 B 9867 0
s3.ListObjectsV2 1.7 MiB 12 MiB 6910 0
s3.PutObject 2.0 GiB 0 B 814 0
Summary:
Total: 20137 CALLS, 2.0 GiB RX, 13 MiB TX - in 12126.59s
As you can see from the above table we went from ~700K
API calls to ~20k
API calls. Just by adding a simple 1 line code change we are able to make big changes to the number of S3 API calls.
MinIO Checkpoint Manager
The above optimization is a huge improvement. If we run the same set of code in a versioned bucket and after all the rows are stored into MinIO if we perform a mc ls --versions --recursive opl/warehouse-v/k8 --summarize
you will still notice objects with v1
and v2
where the v2
are delete markers for the objects that should have been deleted. As consumers continue to add records, delete marker objects build up and waste storage space, and this can create a problem over time.
Enter MinIO Checkpoint Manager, io.minio.spark.checkpoint.S3BasedCheckpointFileManager
, which takes advantage of MinIO's strictly consistent atomic transactions. MinIO's Checkpoint Manager takes full advantage of the native object APIs and avoids the unnecessary baggage from a POSIX-based implementation.
You can easily use the checkpoint manager in your code by adding this 1 line to your Spark config
SparkConf().set("spark.sql.streaming.checkpointFileManagerClass", "io.minio.spark.checkpoint.S3BasedCheckpointFileManager") |
Here is the sample code that you can run to see the results
Build your own image or use the openlake/sparkjob-demo:3.3.2
from Docker Hub, which has the above code. You can find code here.
Deploy the optimized consumer with the new MinIO's checkpoint manager as shown below
!kubectl apply -f sample-code/spark-job/sparkjob-streaming-optimized.yaml |
Performance Benchmarks
We measured the number of S3 API calls made by the Spark structured streaming consumer with the optimized checkpoint manager and recorded the following results
API RX TX CALLS ERRORS
s3.DeleteMultipleObjects 5.8 MiB 1.7 MiB 15801 0
s3.DeleteObject 12 MiB 0 B 46465 0
s3.GetObject 4.0 MiB 2.7 GiB 15802 0
s3.HeadObject 43 MiB 0 B 172825 0
s3.ListObjectsV1 7.8 MiB 7.8 GiB 31402 0
s3.ListObjectsV2 3.9 MiB 5.2 MiB 15782 0
s3.PutObject 4.7 GiB 0 B 63204 0
Summary:
Total: 361281 CALLS, 4.8 GiB RX, 10 GiB TX - in 12160.25s
We can already see that we went from ~700K
API calls to ~361K
API calls, on top of this if we add the 1 min
delay before each polling we will see further improvements
API RX TX CALLS ERRORS
s3.DeleteMultipleObjects 75 KiB 23 KiB 200 0
s3.DeleteObject 100 KiB 0 B 394 0
s3.GetObject 52 KiB 469 KiB 199 0
s3.HeadObject 508 KiB 0 B 1995 0
s3.ListBuckets 150 B 254 B 1 0
s3.ListObjectsV1 75 KiB 2.8 MiB 293 0
s3.ListObjectsV2 51 KiB 67 KiB 200 0
s3.PutObject 2.0 GiB 0 B 803 0
Summary:
Total: 4085 CALLS, 2.0 GiB RX, 3.3 MiB TX - in 11945.35s
From ~20K
API calls earlier we are now down to ~4K
API calls. Another major improvement that we will notice on the versioned bucket
is that no v2 delete marker
objects are present and we only have v1
objects.
Spark Structured Streaming and Data Lakes
In this blog post we saw how to use Spark structured streaming to consume events from Kafka, and we also dug into some of the optimizations that can be done to reduce the number of S3 API calls. We also saw the benefits of using MinIO checkpoint manager and how the implementation avoids all the POSIX baggage and utilizes object storage’s native strict consistency. In the next blog post, we will see how to have the end-to-end Kafka producer and consumer implemented in Spark structured streaming and how that can speed up the entire flow.
Kafka, Spark and MinIO are frequently combined to build data lakes and analytics. All software-defined, they provide a portable multi-cloud home for streaming data, enabling you to feed analytics and AI/ML applications wherever they are.
Download MinIO today and start building your cloud-native streaming data lake.