How to Set up Kafka and Stream Data to MinIO in Kubernetes

How to Set up Kafka and Stream Data to MinIO in Kubernetes

Apache Kafka is an open-source distributed event streaming platform that is used for building real-time data pipelines and streaming applications. It was originally developed by LinkedIn and is now maintained by the Apache Software Foundation. Kafka is designed to handle high volume, high throughput, and low latency data streams, making it a popular choice for building scalable and reliable data streaming solutions.

Some of the benefits of Kafka include:

  • Scale and Speed:  Handling large-scale data streams and millions of events per second, and .scales horizontally by adding more Kafka brokers to the cluster
  • Fault Tolerance: Replicating data across multiple brokers in a Kafka cluster  ensures that data is highly available and can be recovered in case of failure, making Kafka a reliable choice for critical data streaming applications
  • Versatility: Support for a variety of data sources and data sinks making it highly versatile. It can be used for building a wide range of applications, such as real-time data processing, data ingestion, data streaming, and event-driven architectures
  • Durability: All published messages are stored for a configurable amount of time, allowing consumers to read data at their own pace. This makes Kafka suitable where data needs to be retained for historical analysis or replayed for recovery purposes.

Please see Apache Kafka for more information.

Deploying Kafka on Kubernetes, a widely-used container orchestration platform, offers several additional advantages. Kubernetes enables dynamic scaling of Kafka clusters based on demand, allowing for efficient resource utilization and automatic scaling of Kafka brokers to handle changing data stream volumes. This ensures that Kafka can handle varying workloads without unnecessary resource wastage or performance degradation.

It provides easy deployment, management, and monitoring Running Kafka clusters as containers provides easy deployment, management, and monitoring, and makes them highly portable across different environments. This allows for seamless migration of Kafka clusters across various cloud providers, data centers, or development environments.

Kubernetes includes built-in features for handling failures and ensuring high availability of Kafka clusters. For example, it automatically reschedules failed Kafka broker containers and supports rolling updates without downtime, ensuring continuous availability of Kafka for data streaming applications, thereby enhancing the reliability and fault tolerance of Kafka deployments.

Kafka and MinIO are commonly used to build data streaming solutions. MinIO is a high-performance, distributed object storage system designed to support cloud-native applications with S3-compatible storage for unstructured, semi-structured and structured data. When used as a data sink with Kafka, MinIO  enables organizations to store and process large volumes of data in real-time.

Some benefits of combining Kafka with MinIO include:

  • High Performance: MinIO writes Kafka streams as fast as they come in. A recent benchmark achieved 325 GiB/s (349 GB/s) on GETs and 165 GiB/s (177 GB/s) on PUTs with just 32 nodes of off-the-shelf NVMe SSDs.
  • Scalability: MinIO handles large amounts of data and scales horizontally across multiple nodes, making it a perfect fit for storing data streams generated by Kafka. This allows organizations to store and process massive amounts of data in real-time, making it suitable for big data and high-velocity data streaming use cases.
  • Durability: MinIO provides durable storage, allowing organizations to retain data for long periods of time, such as for historical analysis, compliance requirements, or data recovery purposes.
  • Fault Tolerance: MinIO erasure codes data across multiple nodes, providing fault tolerance and ensuring data durability. This complements Kafka's fault tolerance capabilities, making the overall solution highly available, reliable and resilient.
  • Easy Integration: MinIO is easily integrated with Kafka using Kafka Connect, a built-in framework for connecting Kafka with external systems. This makes it straightforward to stream data from Kafka to MinIO for storage, and vice versa for data retrieval, enabling seamless data flow between Kafka and MinIO. We’ll see how straightforward this is in the tutorial below.

In this post, we will walk through how to set up Kafka on Kubernetes using Strimzi, an open-source project that provides operators to run Apache Kafka and Apache ZooKeeper clusters on Kubernetes, including distributions such as OpenShift. Then we will use Kafka Connect to stream data to MinIO.


Before we start, ensure that you have the following:

  • A running Kubernetes cluster
  • kubectl command-line tool
  • A running MinIO cluster
  • mc command line tool for MinIO
  • Helm package manager

Install Strimzi Operator

The first step is to install the Strimzi operator on your Kubernetes cluster. The Strimzi operator manages the lifecycle of Kafka and ZooKeeper clusters on Kubernetes.

Add the Strimzi Helm chart repository

!helm repo add strimzi

"strimzi" already exists with the same configuration, skipping

Install the chart with release name my-release:

!helm install my-release strimzi/strimzi-kafka-operator --namespace=kafka --create-namespace

NAME: my-release
LAST DEPLOYED: Mon Apr 10 20:03:12 2023
STATUS: deployed
Thank you for installing strimzi-kafka-operator-0.34.0

To create a Kafka cluster refer to the following documentation.

This installs the latest version (0.34.0 at the time of this writing) of the operator in the newly created kafka namespace. For additional configurations refer to this page.

Create Kafka Cluster

Now that we have installed the Strimzi operator, we can create a Kafka cluster. In this example, we will create a Kafka cluster with three Kafka brokers and three ZooKeeper nodes.

Lets create a YAML file as shown here

%%writefile deployment/kafka-cluster.yaml
kind: Kafka
  name: my-kafka-cluster
  namespace: kafka
    version: 3.4.0
    replicas: 3
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2 "3.4"
      type: jbod
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    replicas: 3
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
    topicOperator: {}
    userOperator: {}

Overwriting deployment/kafka-cluster.yaml

Let's create the cluster by deploying the YAML file. We’re deploying a cluster, so it will take some time before it is up and running

!kubectl apply -f deployment/kafka-cluster.yaml created

Check the status of the cluster with

!kubectl -n kafka get kafka my-kafka-cluster
my-kafka-cluster   3                        3                     True    

Now that we have the cluster up and running, let’s produce and consume sample topic events, starting with the kafka topic my-topic.

Create Kafka Topic

Create a YAML file for the kafka topic my-topic as shown below and apply it.

%%writefile deployment/kafka-my-topic.yaml
kind: KafkaTopic
  name: my-topic
  namespace: kafka
  labels: my-kafka-cluster
  partitions: 3
  replicas: 3

Overwriting deployment/kafka-my-topic.yaml

!kubectl apply -f deployment/kafka-my-topic.yaml created

Check the status of the topic with

!kubectl -n kafka get kafkatopic my-topic

my-topic   my-kafka-cluster   3            3                    True

Produce and Consume Messages

With the Kafka cluster and topic set up, we can now produce and consume messages.

To create a Kafka producer pod to produce messages to the my-topic topic, try the below commands in a terminal

kubectl -n kafka run kafka-producer -ti --rm=true --restart=Never -- bin/ --broker-list my-kafka-cluster-kafka-bootstrap:9092 --topic my-topic

This will give us a prompt to send messages to the producer. In parallel, we can bring up the consumer to start consuming the messages that we sent to producer

kubectl -n kafka run kafka-consumer -ti --rm=true --restart=Never -- bin/ --bootstrap-server my-kafka-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

The consumer will replay all the messages that we sent to the producer earlier and, if we add any new messages to the producer,  they will also start showing up at the consumer side.

You can delete the my-topic topic with

!kubectl -n kafka delete kafkatopic my-topic "my-topic" deleted

Now that the Kafka cluster is up and running with a dummy topic producer/consumer, we can start consuming topics directly into MinIO  using the Kafka Connector.

Set Up Kafka Connector with MinIO

Next we will use the Kafka Connector to stream topics directly to MinIO. First let's look at what connectors are and how to set one up. Here is an high level overview of how the different Kafka Components interact


Kafka Connectors

Kafka Connect is an integration toolkit for streaming data between Kafka brokers and other systems. The other system is typically an external data source or target, such as MinIO.

Kafka Connect utilizes a plugin architecture to provide implementation artifacts for connectors, which are used for connecting to external systems and manipulating data. Plugins consist of connectors, data converters, and transforms. Connectors are designed to work with specific external systems and define a schema for their configuration. When configuring Kafka Connect, you configure  the connector instance, and the connector instance then defines a set of tasks for data movement between systems.

In the distributed mode of operation, Strimzi operates Kafka Connect by distributing data streaming tasks across one or more worker pods. A Kafka Connect cluster consists of a group of worker pods, with each connector instantiated on a single worker. Each connector can have one or more tasks that are distributed across the group of workers, enabling highly scalable data pipelines.

Workers in Kafka Connect are responsible for converting data from one format to another, making it suitable for the source or target system. Depending on the configuration of the connector instance, workers may also apply transforms, also known as Single Message Transforms (SMTs), which can adjust messages, such as by filtering certain data, before they are converted. Kafka Connect comes with some built-in transforms, but additional transformations can be provided by plugins as needed.

Kafka Connect uses the following components while streaming data

  • Connectors - create tasks
  • Tasks - move data
  • Workers - run tasks
  • Transformers - manipulate data
  • Converters - convert data

There are 2 types of Connectors

  1. Source Connectors - push data into Kafka
  2. Sink Connectors - extracts data from Kafka to external source like MinIO

Let’s configure a Sink Connector that extracts data from Kafka and stores it into MinIO as shown below


The Sink Connector streams data from Kafka and goes through following steps

  1. A plugin provides the implementation artifacts for the Sink Connector: In Kafka Connect, a Sink Connector is used to stream data from Kafka to an external system. The implementation artifacts for the Sink Connector, such as the code and configuration, are provided by a plugin. Plugins are used to extend the functionality of Kafka Connect and enable connections to different external data systems.
  2. A single worker initiates the Sink Connector instance: In a distributed mode of operation, Kafka Connect runs as a cluster of worker pods. Each worker pod can initiate a Sink Connector instance, which is responsible for streaming data from Kafka to the external data system. The worker manages the lifecycle of the Sink Connector instance, including its initialization and configuration.
  3. The Sink Connector creates tasks to stream data: Once the Sink Connector instance is initiated, it creates one or more tasks to stream data from Kafka to the external data system. Each task is responsible for processing a portion of the data and can run in parallel with other tasks for efficient data processing.
  4. Tasks run in parallel to poll Kafka and return records: The tasks retrieve records from Kafka topics and prepare them for forwarding to the external data system. The parallel processing of tasks enables high throughput and efficient data streaming.
  5. Converters put the records into a format suitable for the external data system: Before forwarding the records to the external data system, converters are used to put the records into a format that is suitable for the specific requirements of the external data system. Converters handle data format conversion, such as from Kafka's binary format to a format supported by the external data system.
  6. Transforms adjust the records, such as filtering or relabeling them: Depending on the configuration of the Sink Connector, transformations, Single Message Transforms (SMTs), can be applied to adjust the records before they are forwarded to the external data system. Transformations can be used for tasks such as filtering, relabeling, or enriching the data to be sent to the external system.
  7. The sink connector is managed using KafkaConnectors or the Kafka Connect API: The Sink Connector, along with its tasks, is managed using KafkaConnectors, or through the Kafka Connect API, which provides programmatic access for managing Kafka Connect. This allows for easy configuration, monitoring, and management of Sink Connectors and their tasks in a Kafka Connect deployment.


We will create a simple example which will perform the following steps

  1. Create a Producer that will stream data from MinIO and produce events for a topic in JSON format
  2. Build a Kafka Connect Image that has S3 dependencies
  3. Deploy the Kafka Connect based on the above image
  4. Deploy Kafka sink connector that consumes kafka topic and stores the data MinIO bucket

Getting Demo Data into MinIO

We will be using the NYC Taxi dataset that is available on MinIO. If you don't have the dataset follow the instructions here


Below is a simple Python code that consumes data from MinIO and produces events for the topic my-topic

%%writefile sample-code/producer/src/
import logging
import os

import fsspec
import pandas as pd
import s3fs

from kafka import KafkaProducer


producer = KafkaProducer(bootstrap_servers="my-kafka-cluster-kafka-bootstrap:9092")

fsspec.config.conf = {
            "key": os.getenv("AWS_ACCESS_KEY_ID", "openlakeuser"),
            "secret": os.getenv("AWS_SECRET_ACCESS_KEY", "openlakeuser"),
            "client_kwargs": {
                "endpoint_url": ""
s3 = s3fs.S3FileSystem()
total_processed = 0
i = 1
for df in pd.read_csv('s3a://openlake/spark/sample-data/taxi-data.csv', chunksize=1000):
    count = 0
    for index, row in df.iterrows():
        producer.send("my-topic", bytes(row.to_json(), 'utf-8'))
        count += 1
    total_processed += count
    if total_processed % 10000 * i == 0:"total processed till now {total_processed}")
        i += 1

Overwriting sample-code/src/

adds requirements and Dockerfile based on which we will build the docker image

%%writefile sample-code/producer/requirements.txt

Overwriting sample-code/producer/requirements.txt

In [14]:
%%writefile sample-code/producer/Dockerfile
FROM python:3.11-slim


COPY requirements.txt .
RUN pip3 install -r requirements.txt

COPY src/ .
CMD ["python3", "-u", "./"]

Overwriting sample-code/Dockerfile

Build and push the Docker image for the producer using the above Docker file or you can use the one available in openlake openlake/kafka-demo-producer

Let's create a YAML file that deploys our producer in the Kubernetes cluster as a job

%%writefile deployment/producer.yaml
apiVersion: batch/v1
kind: Job
  name: producer-job
  namespace: kafka
      name: producer-job
      - name: producer-job
        image: openlake/kafka-demo-producer:latest
      restartPolicy: Never

Writing deployment/producer.yaml

Deploy the producer.yaml file

In [84]:
!kubectl apply -f deployment/producer.yaml

job.batch/producer-job created

Check the logs by using the below command

In [24]:
!kubectl logs -f job.batch/producer-job -n kafka # stop this shell once you are done

<jemalloc>: MADV_DONTNEED does not work (memset will be used instead)
<jemalloc>: (This is the expected behaviour if you are running under QEMU)
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=my-kafka-cluster-kafka-bootstrap:9092 <connecting> [IPv4 ('', 9092)]>: connecting to my-kafka-cluster-kafka-bootstrap:9092 [('', 9092) IPv4]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=my-kafka-cluster-kafka-bootstrap:9092 <connecting> [IPv4 ('', 9092)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.conn:<BrokerConnection node_id=0 <connecting> [IPv4 ('', 9092)]>: connecting to [('', 9092) IPv4]
INFO:kafka.conn:<BrokerConnection node_id=0 <connecting> [IPv4 ('', 9092)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=my-kafka-cluster-kafka-bootstrap:9092 <connected> [IPv4 ('', 9092)]>: Closing connection.
INFO:root:total processed till now 10000
rpc error: code = NotFound desc = an error occurred when try to find container "85acfb121b7b63bf0f46d9ef89aed9b05666b3fb86b4a835e9d2ebf67c6943f9": not found

Now that we have our basic producer sending JSON events to my-topic, let’s deploy Kafka Connect and the corresponding Connector that stores these events in MinIO.

Build Kafka Connect Image

Let's build a Kafka Connect image that has S3 dependencies

%%writefile sample-code/connect/Dockerfile
FROM confluentinc/cp-kafka-connect:7.0.9 as cp
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.4.2
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:7.3.3
USER root:root
# Add S3 dependency
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/ /opt/kafka/plugins/kafka-connect-s3/

Overwriting sample-code/connect/Dockerfile

Build and push the Docker image for the producer using the above Dockerfile  or can use the one available in openlake openlake/kafka-connect:0.34.0

Before we deploy Kafka Connect, we need to create storage topics if not already present for Kafka Connect to work as expected.

Create Storage Topics

Lets create connect-status, connect-configs and connect-offsets topics and deploy them as shown below

%%writefile deployment/connect-status-topic.yaml
kind: KafkaTopic
  name: connect-status
  namespace: kafka
  labels: my-kafka-cluster
  partitions: 1
  replicas: 3
    cleanup.policy: compact

Writing deployment/connect-status-topic.yaml

In [73]:
%%writefile deployment/connect-configs-topic.yaml
kind: KafkaTopic
  name: connect-configs
  namespace: kafka
  labels: my-kafka-cluster
  partitions: 1
  replicas: 3
    cleanup.policy: compact

Writing deployment/connect-configs-topic.yaml

In [74]:
%%writefile deployment/connect-offsets-topic.yaml
kind: KafkaTopic
  name: connect-offsets
  namespace: kafka
  labels: my-kafka-cluster
  partitions: 1
  replicas: 3
    cleanup.policy: compact

Writing deployment/connect-offsets-topic.yaml

Deploy above topics

In [ ]:
!kubectl apply -f deployment/connect-status-topic.yaml
!kubectl apply -f deployment/connect-configs-topic.yaml
!kubectl apply -f deployment/connect-offsets-topic.yaml

Deploy Kafka Connect

Next, create a YAML file for Kafka Connect that uses the above image and deploys it in Kubernetes. Kafka Connect will have 1 replica and make use of the storage topics we created above.

NOTE: spec.template.connectContainer.env has the credentials defined in order for Kafka Connect to store data in the Minio cluster. Other details like the endpoint_url, bucket_name will be part of KafkaConnector

In [75]:
%%writefile deployment/connect.yaml
kind: KafkaConnect
  name: connect-cluster
  namespace: kafka
  annotations: "true"
  image: openlake/kafka-connect:0.34.0
  version: 3.4.0
  replicas: 1
  bootstrapServers: my-kafka-cluster-kafka-bootstrap:9093
      - secretName: my-kafka-cluster-cluster-ca-cert
        certificate: ca.crt
    bootstrap.servers: my-kafka-cluster-kafka-bootstrap:9092 connect-cluster
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    internal.key.converter: org.apache.kafka.connect.json.JsonConverter
    internal.value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false connect-offsets 1 connect-configs 1 connect-status 1 10000
    plugin.path: /opt/kafka/plugins /tmp/connect.offsets
        - name: AWS_ACCESS_KEY_ID
          value: "openlakeuser"
        - name: AWS_SECRET_ACCESS_KEY
          value: "openlakeuser"

Writing deployment/connect.yaml

In [87]:
!kubectl apply -f deployment/connect.yaml created

Deploy Kafka Sink Connector

Now that we have Kafka Connect up and running, the next step is to deploy the Sink Connector that will poll my-topic and store data into the MinIO bucket openlake-tmp.

connector.class - specifies what type of connector the Sink Connector will use, in our case it is io.confluent.connect.s3.S3SinkConnector

store.url - MinIO endpoint URL where you want to store the data from Kafka Connect

storage.class - specifies which storage class to use, in our case we are storing in MinIO so will be used

format.class - Format type to  store data in MinIO, since we would like to store JSON we will use io.confluent.connect.s3.format.json.JsonFormat

In [90]:
%%writefile deployment/connector.yaml
kind: KafkaConnector
  name: "minio-connector"
  namespace: "kafka"
  class: io.confluent.connect.s3.S3SinkConnector
    connector.class: io.confluent.connect.s3.S3SinkConnector
    task.max: '1'
    topics: my-topic
    s3.region: us-east-1 openlake-tmp
    s3.part.size: '5242880'
    flush.size: '1000'
    format.class: io.confluent.connect.s3.format.json.JsonFormat
    behavior.on.null.values: ignore

Overwriting deployment/connector.yaml

In [89]:
!kubectl apply -f deployment/connector.yaml created

We can see files being added to the Minio openlake-tmp bucket with

In [79]:
!mc ls --summarize --recursive play/openlake-tmp/topics/my-topic

]11;?\[2023-04-11 19:53:29 PDT] 368KiB STANDARD partition=0/my-topic+0+0000000000.json
[2023-04-11 19:53:30 PDT] 368KiB STANDARD partition=0/my-topic+0+0000001000.json


[2023-04-11 19:54:07 PDT] 368KiB STANDARD partition=0/my-topic+0+0000112000.json
[2023-04-11 19:54:08 PDT] 368KiB STANDARD partition=0/my-topic+0+0000113000.json
[2023-04-11 19:54:08 PDT] 368KiB STANDARD partition=0/my-topic+0+0000114000.json

Total Size: 41 MiB
Total Objects: 115

We createdan end-to-end implementation of producing topics in Kafka and consuming it directly into MinIO using the Kafka Connectors. This is a great start learning how to use MinIO and Kafka together to build a streaming data repository. But wait, there’s more.

In my next post, I explain and show you how to take this tutorial and turn it into something that is a lot more efficient and performant.

Achieve Streaming Success with Kafka and MinIO

This blog post showed you how to get started building a streaming data lake. Of course, there are many more steps involved between this beginning and production.

MinIO is cloud-native object storage that forms the foundation for ML/AI, analytics, streaming video, and other demanding workloads running in Kubernetes. MinIO scales seamlessly, ensuring that you can simply expand storage to accommodate a growing data lake.

Customers frequently build data lakes using MinIO and expose them to a variety of cloud-native applications for business intelligence, dashboarding and other analysis. They build them using Apache Iceberg, Apache Hudi and Delta Lake. They use Snowflake, SQL Server, or a variety of databases to read data saved in MinIO as external tables. And they use Dremio, Apache Druid and Clickhouse for analytics, and Kubeflow and Tensorflow for ML.

MinIO can even replicate data between clouds to leverage specific applications and frameworks, while it is protected using access control, version control, encryption and erasure coding.

Don’t take our word for it though — build it yourself. You can download MinIO and you can join our Slack channel.

Previous Post Next Post