Making the Most of Streaming with Kafka Schema Registry and MinIO

Making the Most of Streaming with Kafka Schema Registry and MinIO

Modern data lakes/lake houses fed by streaming data are a focus for many enterprises seeking to take control of their data and apply it to solve business problems. The data-driven business requires a keen understanding of itself both yesterday and today in order to thrive tomorrow. The state of the art in cloud-native event streaming is Apache Kafka with an object storage endpoint such as MinIO.

MinIO is an ideal complement to Kafka architectures given its combination of scalability and performance. These attributes give the architect or developer the ability to build anything they want on top of their streaming data lake/lake house, and the S3 API means that they can use cloud-native analytics and AI/ML frameworks to power their applications. MinIO takes complete advantage of underlying hardware (see Selecting the Best Hardware for Your MinIO Deployment) to deliver the greatest possible performance – we’ve benchmarked it at 325 GiB/s (349 GB/s) on GETs and 165 GiB/s (177 GB/s) on PUTs with just 32 nodes of off-the-shelf NVMe SSDs. Erasure coding makes MinIO a durable and resilient distributed object storage solution, while a rich set of enterprise integrations mean that your data lake storage seamlessly fits into existing infrastructure.

Apache Kafka is a key element of streaming data architecture. In a very basic sense, Kafka is a distributed event streaming platform made up of a collection of processes called brokers. Producers send events to brokers where they are retained based on time, allowing Consumers to read and process events asynchronously. We went into more detail in, How to Set up Kafka and Stream Data to MinIO in Kubernetes, where we showed you how to get started using Kafka Connectors to stream events directly to MinIO.

That post explains the simplest way to stream data so you can get up and running quickly, but it’s important to remember that it may not be efficient and performant enough for production use cases involving large workloads. One shortcoming of the quick and dirty example provided earlier is that it lacks an automated way to verify data, evolve stream schema and add additional downstream consumers. None of these would be a problem in a development or test environment, but they present critical challenges in production. These issues go from minor to major when you add multiple Kafka developers, each making their own changes to streams. The result can be rather serious: When a Kafka topic changes schema – a new column gets added, an existing column gets removed or the data type of a given column gets modified – the consumer may not be aware of these changes, potentially resulting in data corruption.

In an enterprise environment with multiple developers and a multitude of end-users, a topic could break very easily. When a topic breaks, data stops streaming into the data lake. The team has to stop what they’re doing and start troubleshooting. They have to figure out why the original schema was written and which subsequent change broke it, then update the schema manually. Again, this would be fine in dev/test, but totally unacceptable in production environments.

The question becomes - what does a production-grade architecture look like to ensure there are no stream breaking changes associated with Kafka topics? That is the focus of the rest of the post.

Kafka Schema Registry to the Rescue

Kafka Schema Registry is a component in the Apache Kafka ecosystem that provides a centralized schema management service for Kafka producers and consumers. It allows producers to register schemas for the data they produce, and consumers to retrieve and use these schemas for data validation and deserialization. The Schema Registry helps ensure that data exchanged through Kafka is compliant with a predefined schema, enabling data consistency, compatibility, and evolution across different systems and applications.

When using Avro or other schema format, it is critical to manage schemas and evolve them thoughtfully. Schema compatibility checking is enabled in Kafka Schema Registry by versioning every single schema and comparing new schemas to previous versions. The type of compatibility required (backward, forward, full, none, etc) determines how Kafka Schema Registry evaluates each new schema. New schemas that fail compatibility checks are removed from service.

Some key benefits of using Kafka Schema Registry include:

  • Schema Evolution: As data formats and requirements evolve over time, it is common for producers and consumers to undergo changes to their data schemas. Kafka Schema Registry provides support for schema evolution, allowing producers to register new versions of schemas while maintaining compatibility with existing consumers. Consumers can retrieve the appropriate schema version for deserialization, ensuring that data is processed correctly even when schema changes occur.
  • Data Validation: Kafka Schema Registry enables data validation by allowing producers to register schemas with predefined data types, field names, and other constraints. Consumers can then retrieve and use these schemas to validate incoming data, ensuring that data conforms to the expected structure and format. This helps prevent data processing errors and improves data quality.
  • Schema Management: Kafka Schema Registry provides a centralized repository for managing schemas, making it easier to track, version, and manage changes. Producers and consumers can register, retrieve and manage schemas through a simple API, allowing for centralized schema governance and management.
  • Interoperability: Kafka Schema Registry promotes interoperability between different producers and consumers by providing a standardized way to define and manage data schemas. Producers and consumers written in different programming languages or using different serialization frameworks can use a common schema registry to ensure data consistency and compatibility across the ecosystem.
  • Backward and Forward Compatibility: Kafka Schema Registry allows producers to register backward and forward compatible schemas, enabling smooth upgrades and changes to data schemas without disrupting existing producers and consumers. Backward compatibility ensures that older consumers can still process data produced with a newer schema, while forward compatibility allows newer consumers to process data produced with an older schema.

Strimzi Operator doesn't come with Schema Registry yet, so we will use the one available in the Confluent Helm repository.

In this blog post we will do the following

  1. Set up Kafka Schema Registry using Helm charts
  2. Create and deploy a sample producer that uses an Apache Avro schema and sends events
  3. Build a KafkaConnect container that has an Avro dependency
  4. Deploy KafkaConnect using the above container
  5. Deploy a Kafka Connector that reads the schema from Kafka Schema Registry, consumes topic events from the producer and stores data into MinIO in Parquet format

Set up Schema Registry

We will clone the Confluent Helm repository using the following command

In [ ]:
!git clone https://github.com/confluentinc/cp-helm-charts.git

In [ ]:
#move to schema registry folder
%cd cp-helm-charts/charts/cp-schema-registry

Use the below command to install Schema Registry using the Helm charts, we will need to provide the bootstrap server endpoint of the existing Kafka cluster we deployed for the installation to be successful

In [ ]:
!helm install kafka-schema-registry --set kafka.bootstrapServers="PLAINTEXT://my-kafka-cluster-kafka-bootstrap:9092" . -n kafka

You can check if the Schema Registry is up and running by examining the logs as shown below

!kubectl -n kafka logs -f --selector=app=cp-schema-registry -c cp-schema-registry-server # stop this shell once you are done

Apr 12, 2023 4:52:25 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime

[2023-04-12 16:52:28,481] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain)


Create Avro Topic

Next, we’ll create a YAML file for the Kafka topic nyc-avro-topic and apply it. You can find sample code here.

Producer with Avro Schema

We will create a simple Python producer then register the Avro schema with the Kafka Schema Registry and send Kafka topic events. This will be based on the producer that we already had in the previous blog post. You can find sample code here.

Then, add requirements and the Dockerfile on which we will build the Docker image (code).

Build and push the docker image for the producer using the above docker file into your docker registry or you can use the one available in openlake openlake/kafka-demo-avro-producer.

Let's create a YAML that deploys our producer in the Kubernetes cluster as a job (code)

Deploy the avro-producer.yaml file

In [9]:
!kubectl apply -f deployment/avro-producer.yaml

job.batch/avro-producer-job created

You can check the logs by using the below command

In [14]:
!kubectl logs -f job.batch/avro-producer-job -n kafka # stop this shell once you are done

Error from server (NotFound): jobs.batch "avro-producer-job" not found

Build Kafka Connect Image

Let's build a Kafka Connect image that has S3 and Avro dependencies (code)

Build and push the Docker image for the producer using the above Dockerfile into your Docker registry, or you can use the one available in the MinIO Openlake repository.

Before we deploy KafkaConnect, we first need to create storage topics, if not already present, for KafkaConnect to work as expected.

Deploy Kafka Connect

Create a YAML file for Kafka Connect that uses the above image and deploy it in Kubernetes. KafkaConnect will have 1 replica and make use of the storage topics that we created in the previous blog post. You can find sample code here.

NOTE: spec.template.connectContainer.env has the credentials defined in order for KafkaConnect to store data in our Minio cluster. Other details like the endpoint_url and bucket_name will be part of KafkaConnector.key.converter and value.converter is pointing to AvroConverter (io.confluent.connect.avro.AvroConverter)

!kubectl apply -f deployment/avro-connect.yaml

kafkaconnect.kafka.strimzi.io/avro-connect-cluster created

Deploy Kafka Sink Connector

Now that we have Kafka Connect up and running, the next step is to deploy the sink connector that will poll nyc-avro-topic and store the data into the MinIO bucket openlake-tmp in Parquet format. Let’s take a look at configurations

connector.class - specifies what type of connector the sink connector will use. In our case it is io.confluent.connect.s3.S3SinkConnector

store.url - the MinIO endpoint URL where you want to store the data from KafkaConnect

storage.class - specifies which storage class to use. In our case, since we are storing in MinIO, io.confluent.connect.s3.storage.S3Storage will be used

format.class - Format type in which the data will be stored into MinIO, since we would like to store Parquet we will use io.confluent.connect.s3.format.parquet.ParquetFormat implementation

value.converter - Since we want to convert the binary data to Avro we will use io.confluent.connect.avro.AvroConverter

parquet.codec - Specifies what type of compression we would like to use for the Parquet files, in our case we will use snappy

schema.registry.url - Specifies the endpoint from which the connector can pull, validate the schema and deserialize the data from the producer

You can find sample code here, which can then be applied.

!kubectl apply -f deployment/avro-connector.yaml

kafkaconnector.kafka.strimzi.io/avro-connector created

If all goes well, we will shortly see files being added to the Minio openlake-tmp bucket by executing the below command

!mc ls --summarize --recursive play/openlake-tmp/nyc-taxis-avro/nyc-avro-topic/

The current setup that we have is significantly faster, more robust and more storage efficient than the previous basic setup that we had in How to Set up Kafka and Stream Data to MinIO in Kubernetes.   You can try running both the producers and connectors to see the performance and memory utilization differences.

We now have an end-to-end setup for efficiently producing data Kafka topics using an Avro schema and consuming it directly into MinIO in Parquet format.

Experimental: Iceberg

Recently Iceberg connector support has been added to Kafka by getindata, and you can find it in the repository getindata/kafka-connect-iceberg-sink . Below we will explore how to store the nyc-avro-topic data directly as an Iceberg table into MinIO. From our testing, we believe that this is still experimental and not ready for production, but try it out to get a glimpse of the future.

Iceberg Kafka Connect

Let's create a KafkaConnect that has Iceberg dependencies.  Make sure to edit the spec.config.build.output.image and spec.config.build.output.pushSecret to point to your Docker Registry before deploying. (code)

Then deploy our new KafkaConnect CRD

!kubectl apply -f deployment/iceberg-connect.yaml

kafkaconnect.kafka.strimzi.io/iceberg-connect-cluster created

Deploy Iceberg Sink Connector

Now that we have the Iceberg KafkaConnect deployed, let's deploy the KafkaConnector that will store the Iceberg table directly into MinIO. Iceberg supports different catalog types, so you’ll need to select the catalog that meets your needs. There are three catalogs available as Connectors, and you’ll find example configurations below:

Hadoop Iceberg Sink Connector

This example shows how to use the Hadoop catalog to create and maintain Iceberg tables in MinIO.

Hive Iceberg Sink Connector

This example shows how to use the Hive catalog to create and maintain Iceberg tables in MinIO.

Note: iceberg.uri, iceberg.catalog-impl, iceberg.table-default.write.data.path, iceberg.table-default.write.metadata.path are required for Iceberg Hive catalog work.

Nessie Iceberg Sink Connector

This example shows how to use the Nessie catalog to create and maintain Iceberg tables in MinIO.

Note: iceberg.uri, iceberg.ref, iceberg.catalog-impl are the key changes required to make the Iceberg Nessie catalog work with MinIO.

Use any of the following commands to deploy the KafkaConnector with the Iceberg Catalog of your choice, by default the Hadoop catalog has been enabled below

!kubectl apply -f deployment/iceberg-hadoop-connector.yaml
# !kubectl apply -f deployment/iceberg-hive-connector.yaml
# !kubectl apply -f deployment/iceberg-nessie-connector.yaml

Kafka and Iceberg with MinIO

This blog post showed you how to build a production -grade, end-to-end architecture  to stream data from Kafka to MinIO directly as an Iceberg table. As mentioned earlier, the Iceberg connector for Kafka is experimental, and based on our initial experiments it is not yet ready for production; this could change soon as there is active development going on. If you have Spark already set up and would like a production ready solution for storing Iceberg tables in MinIO you can explore Spark Streaming.

Kafka and MinIO are both software-defined, yielding a portable multi-cloud solution for streaming data lakes. They run anywhere – on-premise, public/private cloud and at the edge – to provide the foundation of an event streaming architecture that supports cloud-native analytics and AI/ML applications wherever they are. You are now free to build anything you want, wherever you want.

Download MinIO today and start building your cloud-native data lake.