End to End Spark Structured Streaming for Kafka Topics

Apache Kafka and Apache Spark are two leading technologies used to build the streaming data pipelines that feed data lakes and lake houses. At a really high level, Kafka streams messages to Spark where they are transformed into a format that can be read in by applications and saved to storage. This architecture makes it possible to build any variety of real-time, event-driven analytics and AI/ML applications.

We wrote about this architecture in an earlier post, Spark Structured Streaming With Kafka and MinIO, demonstrating how to leverage its unified batch and streaming API to create a dataframe from data published to Kafka. This architecture alleviates the burden of optimizing the low level elements of streaming and provides end-to-end functionality. Kafka acts as the central hub for real time data streams that are processed programmatically by Spark Structured Streaming. Once data is processed, Spark Structured Streaming publishes results to MinIO where they are saved as objects; collectively, these objects are the data lake.

MinIO was designed and built to provide performant, resilient and scalable cloud-native object storage for data lakes and the applications they enable. The best S3 API compatibility outside of S3 itself gives developers confidence that they can use their own custom software, cloud-native analytics or AI/ML without a problem. Erasure coding protects data in the data lake, while replication makes it available wherever it is needed. MinIO takes complete advantage of underlying hardware (see Selecting the Best Hardware for Your MinIO Deployment) to deliver the greatest possible performance – we’ve benchmarked it at 325 GiB/s (349 GB/s) on GETs and 165 GiB/s (177 GB/s) on PUTs with just 32 nodes of off-the-shelf NVMe SSDs.

In the previous blog post, we saw how to consume Apache Kafka events in Apache Spark Structured Streaming. In this blog post, we will look at how to create Kafka topic events and consume them into MinIO end-to-end with Spark Structured Streaming without using the Kafka producers or connectors

Prerequisites

Before we get started, we need to have the following ready

  1. Kafka
  2. Kafka Schema Registry
  3. Spark Operator
  4. MinIO cluster

After we have the above prerequisites ready, we will do the following

  • Modify Kafka Topic Partition
  • Set up Spark Structured Streaming Consumer
  • Set up Spark Structured Streaming Producer

Modify Kafka Topic Partition

In order for us to gain full parallelization capabilities from Spark, we need to set the number of nyc-avro-topic Kafka topic partitions to 10 so that Spark Structured Streaming can use 10 workers to pull data from Kafka simultaneously, as shown below.

%%writefile sample-code/spark-job/kafka-nyc-avro-topic.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: nyc-avro-topic
  namespace: kafka
  labels:
    strimzi.io/cluster: my-kafka-cluster
spec:
  partitions: 10
  replicas: 3

Note: Before you apply the above change, it is highly recommended to delete the nyc-avro-topic if it already exists based on the run from previous blog posts.

Spark Structured Streaming Consumer

We’ve written a sample Spark Consumer Streaming Python snippet that uses Spark to connect to our MinIO backend. It then listens to the Kafka broker on the topic nyc-avro-topic for new messages and then writes them to the MinIO bucket s3a://warehouse-v/k8/spark-stream/.

You can find a snippet of the code below and the full code is here.

%%writefile sample-code/src/main-streaming-spark-consumer.py
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
import json

...

taxi_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .trigger(processingTime='1 second') \
    .option("path", "s3a://warehouse-v/k8/spark-stream/") \
    .option("checkpointLocation", "s3a://warehouse-v/k8/checkpoint") \
    .start() \
    .awaitTermination()

Let’s dig into what changed in the above code. In this case, the Avro file is based on the Spark implementation, so there is no need for the Confluence dependency and it isn’t necessary to skip the first 6 bits like we did earlier. We also added a 1 second delay before polling for Kafka events each time

taxi_df = stream_df.select(from_avro("value", json.dumps(value_schema_dict)).alias("data")).select("data.*")

taxi_df.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .trigger(processingTime='1 second') \
    .option("path", "s3a://warehouse-v/k8/spark-stream/") \
    .option("checkpointLocation", "s3a://warehouse-v/k8/checkpoint") \
    .start() \
    .awaitTermination()

Now, let’s build the Docker image using the Dockerfile below with the dependencies needed for our Python code above to run. We’ll also include the .py script as part of the docker image that will be executed.

Build the Docker image with the image here or use openlake/sparkjob-demo:3.3.2

%%writefile sample-code/spark-job/sparkjob-streaming-consumer.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-stream-optimized
  namespace: spark-operator
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "openlake/sparkjob-demo:3.3.2"
  imagePullPolicy: Always
  mainApplicationFile: local:///app/main-streaming-spark-consumer.py
  sparkVersion: "3.3.2"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 1
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 3
    memory: "2048m"
    labels:
      version: 3.3.2
    serviceAccount: my-release-spark
    env:
      - name: AWS_REGION
        value: us-east-1
      - name: AWS_ACCESS_KEY_ID
        value: openlakeuser
      - name: AWS_SECRET_ACCESS_KEY
        value: openlakeuser
  executor:
    cores: 1
    instances: 10
    memory: "1024m"
    labels:
      version: 3.3.2
    env:
      - name: AWS_REGION
        value: us-east-1
      - name: AWS_ACCESS_KEY_ID
        value: openlakeuser
      - name: AWS_SECRET_ACCESS_KEY
        value: openlakeuser

Spark Structured Streaming Producer

Now that we have the Kafka topic configured correctly, let’s create a Kafka Producer using Spark Structured Streaming as shown below. Click here for the full code snippet

%%writefile sample-code/src/spark-streaming-kafka-producer.py
import json
from io import BytesIO
import os
import avro.schema
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import to_avro
from pyspark.sql.functions import struct
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

...

df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))
df = df.select(to_avro(struct([df[x] for x in df.columns]), value_schema_str).alias("value"))

df.write \
    .format("kafka") \
    .option("kafka.bootstrap.servers",
            os.getenv("KAFKA_BOOTSTRAM_SERVER", "my-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092")) \
    .option("flushInterval", "100ms") \
    .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") \
    .option("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer") \
    .option("schema.registry.url",
            os.getenv("KAFKA_SCHEMA_REGISTRY", "http://kafka-schema-registry-cp-schema-registry.kafka.svc.cluster.local:8081")) \
    .option("topic", "nyc-avro-topic") \
    .save()


In the above code, we read the taxi-data.csv data from the MinIO bucket in this block

df = spark.read.option("header", "true").schema(schema).csv(
    os.getenv("INPUT_PATH", "s3a://openlake/spark/sample-data/taxi-data.csv"))

We transform the dataframe to Avro in this code block

df = df.select(to_avro(struct([df[x] for x in df.columns]), value_schema_str).alias("value"))

Finally, we use the following code block to write the Kafka events for the topic nyc-avro-topic  to the my-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092 Kafka server and this http://kafka-schema-registry-cp-schema-registry.kafka.svc.cluster.local:8081 Kafka Schema Registry URL

df.write \
    .format("kafka") \
    .option("kafka.bootstrap.servers",
            os.getenv("KAFKA_BOOTSTRAM_SERVER", "my-kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092")) \
    .option("flushInterval", "100ms") \
    .option("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") \
    .option("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer") \
    .option("schema.registry.url",
            os.getenv("KAFKA_SCHEMA_REGISTRY", "http://kafka-schema-registry-cp-schema-registry.kafka.svc.cluster.local:8081")) \
    .option("topic", "nyc-avro-topic") \
    .save()

Build the Docker image with the above code or use the one we built in openlake/sparkjob-demo:3.3.2

%%writefile sample-code/spark-job/sparkjob-kafka-producer.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: kafka-stream-producer
  namespace: spark-operator
spec:
  type: Python
  pythonVersion: "3"
  mode: cluster
  image: "openlake/sparkjob-demo:3.3.2"
  imagePullPolicy: Always
  mainApplicationFile: local:///app/spark-streaming-kafka-producer.py
  sparkVersion: "3.3.2"
  restartPolicy:
    type: OnFailure
    onFailureRetries: 3
    onFailureRetryInterval: 10
    onSubmissionFailureRetries: 5
    onSubmissionFailureRetryInterval: 20
  driver:
    cores: 3
    # coreLimit: "1200m"
    memory: "2048m"
    labels:
      version: 3.3.2
    serviceAccount: my-release-spark
    env:
      - name: INPUT_PATH
        value: "s3a://openlake/spark/sample-data/taxi-data.csv"
      - name: AWS_REGION
        value: us-east-1
      - name: AWS_ACCESS_KEY_ID
        value: openlakeuser
      - name: AWS_SECRET_ACCESS_KEY
        value: openlakeuser
  executor:
    cores: 1
    instances: 10
    memory: "1024m"
    labels:
      version: 3.3.2
    env:
      - name: INPUT_PATH
        value: "s3a://openlake/spark/sample-data/taxi-data.csv"
      - name: AWS_REGION
        value: us-east-1
      - name: AWS_ACCESS_KEY_ID
        value: openlakeuser
      - name: AWS_SECRET_ACCESS_KEY
        value: openlakeuser

Based on the configurations we have set up with 10 partitions for nyc-avro-topic and 10 executors from the Spark Kafka producers and consumers, all of the ~112M rows are streamed and consumed in less than 10 minutes, a significant decrease from the original ~3hrs it took using our previous Kafka/Spark streaming setup. This is a big performance gain and processing streams at this speed is crucial when developing real-time  applications.

Note: If we did not apply the 10 partitions change in the nyc-avro-topic Spark producer, we would still be able to complete in <10 min but the consumer will still take time to complete based on the number of partitions in the topic level.

Performance Benchmarks

We measured the number of S3 API calls made by the Spark Structured Streaming consumer alone and obtained the below numbers

API                             RX      TX      CALLS   ERRORS

s3.CompleteMultipartUpload      7.5 KiB 7.3 KiB 16      0       

s3.DeleteMultipleObjects        22 KiB  6.8 KiB 60      0       

s3.HeadObject                   51 KiB  0 B     200     0       

s3.ListObjectsV1                1.5 KiB 9.0 KiB 6       0       

s3.ListObjectsV2                15 KiB  20 KiB  60      0       

s3.NewMultipartUpload           4.1 KiB 6.1 KiB 16      0       

s3.PutObject                    1.1 GiB 0 B     63      0       

s3.PutObjectPart                1.2 GiB 0 B     32      0       


Summary:


Total: 453 CALLS, 2.4 GiB RX, 49 KiB TX - in 160.71s


If we repeat this exercise without MinIO's checkpoint manager we will end up with the below numbers

API                             RX      TX      CALLS   ERRORS

s3.CompleteMultipartUpload      6.1 KiB 5.9 KiB 13      0       

s3.CopyObject                   6.1 KiB 4.1 KiB 18      0       

s3.DeleteMultipleObjects        30 KiB  8.8 KiB 78      0       

s3.DeleteObject                 4.6 KiB 0 B     18      0       

s3.HeadObject                   110 KiB 0 B     432     0       

s3.ListObjectsV2                63 KiB  124 KiB 248     0       

s3.NewMultipartUpload           3.3 KiB 4.9 KiB 13      0       

s3.PutObject                    1.3 GiB 0 B     66      0       

s3.PutObjectPart                1.1 GiB 0 B     26      0       


Summary:


Total: 912 CALLS, 2.3 GiB RX, 147 KiB TX - in 166.55s


We can clearly see the significant performance improvements with the end-to-end Spark Structured Streaming for Kafka producer and consumer and with MinIO's checkpoint manager, we further enhanced performance by reducing the number of S3 API calls.

Furthermore, if the above sample code was run on a versioned bucket and if we did an mc ls --versions --recursive opl/warehouse-v/k8 --summarize we will end up with 84 objects using MinIO's checkpoint manager versus 140 objects using the default checkpoint manager, which again doesn't cleanup the delete markers.

High-Performance Streaming Opens a World of Possibilities

This blog post demonstrated how to create Kafka topic events and consume them into MinIO end-to-end with Spark Structured Streaming, without using the Kafka producers or connectors. We did this to simplify our streaming architecture, and we decreased the time needed from three hours to less than ten minutes using the MinIO checkpoint manager.

You now have the knowledge to build very fast streaming data pipelines using Kafka, Spark and MinIO. In the streaming world, faster is better because it makes for more timely analyses. These three cloud-native applications are software defined, making them an awesomely powerful combination for building multi-cloud streaming data lakes.

Download MinIO today and start building a cloud-native streaming data lake to support your real-time analytics and AI/ML initiatives.