End to End Spark Structured Streaming for Kafka Topics
Apache Kafka and Apache Spark are two leading technologies used to build the streaming data pipelines that feed data lakes and lake houses. At a really high level, Kafka streams messages to Spark where they are transformed into a format that can be read in by applications and saved to storage. This architecture makes it possible to build any variety of real-time, event-driven analytics and AI/ML applications.
We wrote about this architecture in an earlier post, Spark Structured Streaming With Kafka and MinIO, demonstrating how to leverage its unified batch and streaming API to create a dataframe from data published to Kafka. This architecture alleviates the burden of optimizing the low level elements of streaming and provides end-to-end functionality. Kafka acts as the central hub for real time data streams that are processed programmatically by Spark Structured Streaming. Once data is processed, Spark Structured Streaming publishes results to MinIO where they are saved as objects; collectively, these objects are the data lake.
MinIO was designed and built to provide performant, resilient and scalable cloud-native object storage for data lakes and the applications they enable. The best S3 API compatibility outside of S3 itself gives developers confidence that they can use their own custom software, cloud-native analytics or AI/ML without a problem. Erasure coding protects data in the data lake, while replication makes it available wherever it is needed. MinIO takes complete advantage of underlying hardware (see Selecting the Best Hardware for Your MinIO Deployment) to deliver the greatest possible performance – we’ve benchmarked it at 325 GiB/s (349 GB/s) on GETs and 165 GiB/s (177 GB/s) on PUTs with just 32 nodes of off-the-shelf NVMe SSDs.
In the previous blog post, we saw how to consume Apache Kafka events in Apache Spark Structured Streaming. In this blog post, we will look at how to create Kafka topic events and consume them into MinIO end-to-end with Spark Structured Streaming without using the Kafka producers or connectors
Prerequisites
Before we get started, we need to have the following ready
After we have the above prerequisites ready, we will do the following
- Modify Kafka Topic Partition
- Set up Spark Structured Streaming Consumer
- Set up Spark Structured Streaming Producer
Modify Kafka Topic Partition
In order for us to gain full parallelization capabilities from Spark, we need to set the number of nyc-avro-topic
Kafka topic partitions to 10 so that Spark Structured Streaming can use 10 workers to pull data from Kafka simultaneously, as shown below.
%%writefile sample-code/spark-job/kafka-nyc-avro-topic.yaml |
Note: Before you apply the above change, it is highly recommended to delete the nyc-avro-topic
if it already exists based on the run from previous blog posts.
Spark Structured Streaming Consumer
We’ve written a sample Spark Consumer Streaming Python snippet that uses Spark to connect to our MinIO backend. It then listens to the Kafka broker on the topic nyc-avro-topic
for new messages and then writes them to the MinIO bucket s3a://warehouse-v/k8/spark-stream/
.
You can find a snippet of the code below and the full code is here.
%%writefile sample-code/src/main-streaming-spark-consumer.py |
Let’s dig into what changed in the above code. In this case, the Avro file is based on the Spark implementation, so there is no need for the Confluence dependency and it isn’t necessary to skip the first 6 bits like we did earlier. We also added a 1 second delay before polling for Kafka events each time
taxi_df = stream_df.select(from_avro("value", json.dumps(value_schema_dict)).alias("data")).select("data.*") |
Now, let’s build the Docker image using the Dockerfile below with the dependencies needed for our Python code above to run. We’ll also include the .py script as part of the docker image that will be executed.
Build the Docker image with the image here or use openlake/sparkjob-demo:3.3.2
%%writefile sample-code/spark-job/sparkjob-streaming-consumer.yaml |
Spark Structured Streaming Producer
Now that we have the Kafka topic configured correctly, let’s create a Kafka Producer using Spark Structured Streaming as shown below. Click here for the full code snippet
%%writefile sample-code/src/spark-streaming-kafka-producer.py |
In the above code, we read the taxi-data.csv
data from the MinIO bucket in this block
df = spark.read.option("header", "true").schema(schema).csv( |
We transform the dataframe to Avro in this code block
df = df.select(to_avro(struct([df[x] for x in df.columns]), value_schema_str).alias("value")) |
Finally, we use the following code block to write the Kafka events for the topic nyc-avro-topic
to the my-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
Kafka server and this http://kafka-schema-registry-cp-schema-registry.kafka.svc.cluster.local:8081
Kafka Schema Registry URL
df.write \ |
Build the Docker image with the above code or use the one we built in openlake/sparkjob-demo:3.3.2
%%writefile sample-code/spark-job/sparkjob-kafka-producer.yaml |
Based on the configurations we have set up with 10 partitions for nyc-avro-topic
and 10 executors from the Spark Kafka producers and consumers, all of the ~112M
rows are streamed and consumed in less than 10 minutes, a significant decrease from the original ~3hrs it took using our previous Kafka/Spark streaming setup. This is a big performance gain and processing streams at this speed is crucial when developing real-time applications.
Note: If we did not apply the 10 partitions
change in the nyc-avro-topic
Spark producer, we would still be able to complete in <10 min
but the consumer will still take time to complete based on the number of partitions in the topic level.
Performance Benchmarks
We measured the number of S3 API calls made by the Spark Structured Streaming consumer alone and obtained the below numbers
API RX TX CALLS ERRORS s3.CompleteMultipartUpload 7.5 KiB 7.3 KiB 16 0 s3.DeleteMultipleObjects 22 KiB 6.8 KiB 60 0 s3.HeadObject 51 KiB 0 B 200 0 s3.ListObjectsV1 1.5 KiB 9.0 KiB 6 0 s3.ListObjectsV2 15 KiB 20 KiB 60 0 s3.NewMultipartUpload 4.1 KiB 6.1 KiB 16 0 s3.PutObject 1.1 GiB 0 B 63 0 s3.PutObjectPart 1.2 GiB 0 B 32 0 Summary: Total: 453 CALLS, 2.4 GiB RX, 49 KiB TX - in 160.71s |
If we repeat this exercise without MinIO's checkpoint manager we will end up with the below numbers
API RX TX CALLS ERRORS s3.CompleteMultipartUpload 6.1 KiB 5.9 KiB 13 0 s3.CopyObject 6.1 KiB 4.1 KiB 18 0 s3.DeleteMultipleObjects 30 KiB 8.8 KiB 78 0 s3.DeleteObject 4.6 KiB 0 B 18 0 s3.HeadObject 110 KiB 0 B 432 0 s3.ListObjectsV2 63 KiB 124 KiB 248 0 s3.NewMultipartUpload 3.3 KiB 4.9 KiB 13 0 s3.PutObject 1.3 GiB 0 B 66 0 s3.PutObjectPart 1.1 GiB 0 B 26 0 Summary: Total: 912 CALLS, 2.3 GiB RX, 147 KiB TX - in 166.55s |
We can clearly see the significant performance improvements with the end-to-end Spark Structured Streaming for Kafka producer and consumer and with MinIO's checkpoint manager, we further enhanced performance by reducing the number of S3 API calls.
Furthermore, if the above sample code was run on a versioned bucket and if we did an mc ls --versions --recursive opl/warehouse-v/k8 --summarize
we will end up with 84 objects using MinIO's checkpoint manager versus 140 objects using the default checkpoint manager, which again doesn't cleanup the delete markers.
High-Performance Streaming Opens a World of Possibilities
This blog post demonstrated how to create Kafka topic events and consume them into MinIO end-to-end with Spark Structured Streaming, without using the Kafka producers or connectors. We did this to simplify our streaming architecture, and we decreased the time needed from three hours to less than ten minutes using the MinIO checkpoint manager.
You now have the knowledge to build very fast streaming data pipelines using Kafka, Spark and MinIO. In the streaming world, faster is better because it makes for more timely analyses. These three cloud-native applications are software defined, making them an awesomely powerful combination for building multi-cloud streaming data lakes.
Download MinIO today and start building a cloud-native streaming data lake to support your real-time analytics and AI/ML initiatives.