Making the Most of Streaming with Kafka Schema Registry and MinIO
Modern data lakes/lake houses fed by streaming data are a focus for many enterprises seeking to take control of their data and apply it to solve business problems. The data-driven business requires a keen understanding of itself both yesterday and today in order to thrive tomorrow. The state of the art in cloud-native event streaming is Apache Kafka with an object storage endpoint such as MinIO.
MinIO is an ideal complement to Kafka architectures given its combination of scalability and performance. These attributes give the architect or developer the ability to build anything they want on top of their streaming data lake/lake house, and the S3 API means that they can use cloud-native analytics and AI/ML frameworks to power their applications. MinIO takes complete advantage of underlying hardware (see Selecting the Best Hardware for Your MinIO Deployment) to deliver the greatest possible performance – we’ve benchmarked it at 325 GiB/s (349 GB/s) on GETs and 165 GiB/s (177 GB/s) on PUTs with just 32 nodes of off-the-shelf NVMe SSDs. Erasure coding makes MinIO a durable and resilient distributed object storage solution, while a rich set of enterprise integrations mean that your data lake storage seamlessly fits into existing infrastructure.
Apache Kafka is a key element of streaming data architecture. In a very basic sense, Kafka is a distributed event streaming platform made up of a collection of processes called brokers. Producers send events to brokers where they are retained based on time, allowing Consumers to read and process events asynchronously. We went into more detail in, How to Set up Kafka and Stream Data to MinIO in Kubernetes, where we showed you how to get started using Kafka Connectors to stream events directly to MinIO.
That post explains the simplest way to stream data so you can get up and running quickly, but it’s important to remember that it may not be efficient and performant enough for production use cases involving large workloads. One shortcoming of the quick and dirty example provided earlier is that it lacks an automated way to verify data, evolve stream schema and add additional downstream consumers. None of these would be a problem in a development or test environment, but they present critical challenges in production. These issues go from minor to major when you add multiple Kafka developers, each making their own changes to streams. The result can be rather serious: When a Kafka topic changes schema – a new column gets added, an existing column gets removed or the data type of a given column gets modified – the consumer may not be aware of these changes, potentially resulting in data corruption.
In an enterprise environment with multiple developers and a multitude of end-users, a topic could break very easily. When a topic breaks, data stops streaming into the data lake. The team has to stop what they’re doing and start troubleshooting. They have to figure out why the original schema was written and which subsequent change broke it, then update the schema manually. Again, this would be fine in dev/test, but totally unacceptable in production environments.
The question becomes - what does a production-grade architecture look like to ensure there are no stream breaking changes associated with Kafka topics? That is the focus of the rest of the post.
Kafka Schema Registry to the Rescue
Kafka Schema Registry is a component in the Apache Kafka ecosystem that provides a centralized schema management service for Kafka producers and consumers. It allows producers to register schemas for the data they produce, and consumers to retrieve and use these schemas for data validation and deserialization. The Schema Registry helps ensure that data exchanged through Kafka is compliant with a predefined schema, enabling data consistency, compatibility, and evolution across different systems and applications.
When using Avro or other schema format, it is critical to manage schemas and evolve them thoughtfully. Schema compatibility checking is enabled in Kafka Schema Registry by versioning every single schema and comparing new schemas to previous versions. The type of compatibility required (backward, forward, full, none, etc) determines how Kafka Schema Registry evaluates each new schema. New schemas that fail compatibility checks are removed from service.
Some key benefits of using Kafka Schema Registry include:
- Schema Evolution: As data formats and requirements evolve over time, it is common for producers and consumers to undergo changes to their data schemas. Kafka Schema Registry provides support for schema evolution, allowing producers to register new versions of schemas while maintaining compatibility with existing consumers. Consumers can retrieve the appropriate schema version for deserialization, ensuring that data is processed correctly even when schema changes occur.
- Data Validation: Kafka Schema Registry enables data validation by allowing producers to register schemas with predefined data types, field names, and other constraints. Consumers can then retrieve and use these schemas to validate incoming data, ensuring that data conforms to the expected structure and format. This helps prevent data processing errors and improves data quality.
- Schema Management: Kafka Schema Registry provides a centralized repository for managing schemas, making it easier to track, version, and manage changes. Producers and consumers can register, retrieve and manage schemas through a simple API, allowing for centralized schema governance and management.
- Interoperability: Kafka Schema Registry promotes interoperability between different producers and consumers by providing a standardized way to define and manage data schemas. Producers and consumers written in different programming languages or using different serialization frameworks can use a common schema registry to ensure data consistency and compatibility across the ecosystem.
- Backward and Forward Compatibility: Kafka Schema Registry allows producers to register backward and forward compatible schemas, enabling smooth upgrades and changes to data schemas without disrupting existing producers and consumers. Backward compatibility ensures that older consumers can still process data produced with a newer schema, while forward compatibility allows newer consumers to process data produced with an older schema.
Strimzi Operator doesn't come with Schema Registry yet, so we will use the one available in the Confluent Helm repository.
In this blog post we will do the following
- Set up Kafka Schema Registry using Helm charts
- Create and deploy a sample producer that uses an Apache Avro schema and sends events
- Build a KafkaConnect container that has an Avro dependency
- Deploy KafkaConnect using the above container
- Deploy a Kafka Connector that reads the schema from Kafka Schema Registry, consumes topic events from the producer and stores data into MinIO in Parquet format
Set up Schema Registry
We will clone the Confluent Helm repository using the following command
In [ ]: |
Use the below command to install Schema Registry using the Helm charts, we will need to provide the bootstrap server endpoint of the existing Kafka cluster we deployed for the installation to be successful
In [ ]: |
You can check if the Schema Registry is up and running by examining the logs as shown below
!kubectl -n kafka logs -f --selector=app=cp-schema-registry -c cp-schema-registry-server # stop this shell once you are done [2023-04-12 16:52:28,481] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain) |
Create Avro Topic
Next, we’ll create a YAML file for the Kafka topic nyc-avro-topic
and apply it. You can find sample code here.
Producer with Avro Schema
We will create a simple Python producer then register the Avro schema with the Kafka Schema Registry and send Kafka topic events. This will be based on the producer that we already had in the previous blog post. You can find sample code here.
Then, add requirements and the Dockerfile on which we will build the Docker image (code).
Build and push the docker image for the producer using the above docker file into your docker registry or you can use the one available in openlake openlake/kafka-demo-avro-producer.
Let's create a YAML that deploys our producer in the Kubernetes cluster as a job (code)
Deploy the avro-producer.yaml
file
In [9]: |
You can check the logs by using the below command
In [14]: |
Build Kafka Connect Image
Let's build a Kafka Connect image that has S3 and Avro dependencies (code)
Build and push the Docker image for the producer using the above Dockerfile into your Docker registry, or you can use the one available in the MinIO Openlake repository.
Before we deploy KafkaConnect
, we first need to create storage topics, if not already present, for KafkaConnect
to work as expected.
Deploy Kafka Connect
Create a YAML file for Kafka Connect that uses the above image and deploy it in Kubernetes. KafkaConnect will have 1 replica and make use of the storage topics that we created in the previous blog post. You can find sample code here.
NOTE: spec.template.connectContainer.env
has the credentials defined in order for KafkaConnect to store data in our Minio cluster. Other details like the endpoint_url
and bucket_name
will be part of KafkaConnector.key.converter
and value.converter
is pointing to AvroConverter (io.confluent.connect.avro.AvroConverter
)
!kubectl apply -f deployment/avro-connect.yaml |
Deploy Kafka Sink Connector
Now that we have Kafka Connect up and running, the next step is to deploy the sink connector that will poll nyc-avro-topic
and store the data into the MinIO bucket openlake-tmp
in Parquet format. Let’s take a look at configurations
connector.class
- specifies what type of connector the sink connector will use. In our case it is io.confluent.connect.s3.S3SinkConnector
store.url
- the MinIO endpoint URL where you want to store the data from KafkaConnect
storage.class
- specifies which storage class to use. In our case, since we are storing in MinIO, io.confluent.connect.s3.storage.S3Storage
will be used
format.class
- Format type in which the data will be stored into MinIO, since we would like to store Parquet we will use io.confluent.connect.s3.format.parquet.ParquetFormat
implementation
value.converter
- Since we want to convert the binary data to Avro we will use io.confluent.connect.avro.AvroConverter
parquet.codec
- Specifies what type of compression we would like to use for the Parquet files, in our case we will use snappy
schema.registry.url
- Specifies the endpoint from which the connector can pull, validate the schema and deserialize the data from the producer
You can find sample code here, which can then be applied.
!kubectl apply -f deployment/avro-connector.yaml |
If all goes well, we will shortly see files being added to the Minio openlake-tmp bucket by executing the below command
!mc ls --summarize --recursive play/openlake-tmp/nyc-taxis-avro/nyc-avro-topic/ |
The current setup that we have is significantly faster, more robust and more storage efficient than the previous basic setup that we had in How to Set up Kafka and Stream Data to MinIO in Kubernetes. You can try running both the producers and connectors to see the performance and memory utilization differences.
We now have an end-to-end setup for efficiently producing data Kafka topics using an Avro schema and consuming it directly into MinIO in Parquet format.
Experimental: Iceberg
Recently Iceberg connector support has been added to Kafka by getindata
, and you can find it in the repository getindata/kafka-connect-iceberg-sink . Below we will explore how to store the nyc-avro-topic
data directly as an Iceberg table into MinIO. From our testing, we believe that this is still experimental and not ready for production, but try it out to get a glimpse of the future.
Iceberg Kafka Connect
Let's create a KafkaConnect that has Iceberg dependencies. Make sure to edit the spec.config.build.output.image
and spec.config.build.output.pushSecret
to point to your Docker Registry before deploying. (code)
Then deploy our new KafkaConnect CRD
!kubectl apply -f deployment/iceberg-connect.yaml |
Deploy Iceberg Sink Connector
Now that we have the Iceberg KafkaConnect deployed, let's deploy the KafkaConnector that will store the Iceberg table directly into MinIO. Iceberg supports different catalog types, so you’ll need to select the catalog that meets your needs. There are three catalogs available as Connectors, and you’ll find example configurations below:
Hadoop Iceberg Sink Connector
This example shows how to use the Hadoop catalog
to create and maintain Iceberg tables in MinIO.
Hive Iceberg Sink Connector
This example shows how to use the Hive catalog
to create and maintain Iceberg tables in MinIO.
Note: iceberg.uri
, iceberg.catalog-impl
, iceberg.table-default.write.data.path
, iceberg.table-default.write.metadata.path
are required for Iceberg Hive catalog work.
Nessie Iceberg Sink Connector
This example shows how to use the Nessie catalog
to create and maintain Iceberg tables in MinIO.
Note: iceberg.uri
, iceberg.ref
, iceberg.catalog-impl
are the key changes required to make the Iceberg Nessie catalog work with MinIO.
Use any of the following commands to deploy the KafkaConnector with the Iceberg Catalog of your choice, by default the Hadoop catalog has been enabled below
!kubectl apply -f deployment/iceberg-hadoop-connector.yaml |
Kafka and Iceberg with MinIO
This blog post showed you how to build a production -grade, end-to-end architecture to stream data from Kafka to MinIO directly as an Iceberg table. As mentioned earlier, the Iceberg connector for Kafka is experimental, and based on our initial experiments it is not yet ready for production; this could change soon as there is active development going on. If you have Spark already set up and would like a production ready solution for storing Iceberg tables in MinIO you can explore Spark Streaming.
Kafka and MinIO are both software-defined, yielding a portable multi-cloud solution for streaming data lakes. They run anywhere – on-premise, public/private cloud and at the edge – to provide the foundation of an event streaming architecture that supports cloud-native analytics and AI/ML applications wherever they are. You are now free to build anything you want, wherever you want.
Download MinIO today and start building your cloud-native data lake.