AI Data Workflows with Kafka and MinIO

AIStor is a foundational component for creating and executing complex data workflows. At the core of this event-driven functionality is MinIO bucket notifications using Kafka. AIStor produces event notifications for all HTTP requests like PUT, POST, COPY, DELETE, GET, HEAD and CompleteMultipartUpload. You can use these notifications to trigger appropriate applications, scripts and Lambda functions to take an action after object upload triggers an event notification.

Event notifications provide a loosely-coupled paradigm for multiple microservices to interact and collaborate. In this paradigm, microservices do not make direct calls to each other, but instead communicate using event notifications. Once a notification is sent, the sending service can return to its tasks, while the receiving service takes action. This level of isolation makes it easier to maintain code — changing one service does not require changing other services as they communicate through notifications, not direct calls.

There are several use cases which rely on AIStor event notification to execute data workflows. For example we can run AI/ML pipelines using the raw data for objects that will be stored in AIStor.

  • The pipeline that processes the data will trigger whenever raw objects are added
  • Based on that objects added, the models will run.
  • The final model can be saved to a bucket in AIStor which could then be consumed by other applications as the final product.

Building a workflow

We’re going to build an example workflow using AIStor and Kafka for a hypothetical image resizer app. It essentially takes incoming images and resizes them according to certain app specifications, then saves them to another bucket where they can be served. This might be done in the real world to resize images and make them available to mobile apps, or simply to resize images to alleviate the strain on resources that occurs when resizing them on the fly.

There are several components to it and Kafka and AIStor are used together to power this complex workflow

  • AIStor, the producer: Raw objects that are coming in are stored in AIStor. Any time an object is added, it sends a message to Kafka to broker a specific topic.
  • Kafka, the broker: The broker maintains the state of the queue, stores the incoming messages, and makes it available for consumers to consume.
  • AIStor, the consumer: The consumer will read this message in the queue, as they come in in real time, process the raw data, and upload it to an AIStor bucket.

AIStor is the foundation of all of this as it is the producer and the consumer of this workflow.

Use a Kubernetes cluster

We need a Kubernetes cluster for our services to run on. You can use any Kubernetes cluster but in this example we’ll use a kind cluster. If Kind is not already installed, please follow this quickstart guide for instructions. Use the below kind cluster configuration to build a simple single master with a multi-worker Kubernetes cluster.

Save this yaml as kind-config.yaml

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker

Launch the cluster (this may take several minutes)

$ kind create cluster --name minio-kafka --config kind-config.yaml

… [truncated]


Set kubectl context to "kind-minio-kafka"
You can now use your cluster with:

kubectl cluster-info --context kind-minio-kafka


… [truncated]

Verify the cluster is up

$ kubectl get no
NAME                        STATUS   ROLES           AGE   VERSION
minio-kafka-control-plane   Ready    control-plane   43s   v1.24.0
minio-kafka-worker          Ready    <none>          21s   v1.24.0
minio-kafka-worker2         Ready    <none>          21s   v1.24.0
minio-kafka-worker3         Ready    <none>          21s   v1.24.0

Installing Kafka

Kafka requires a few services to be running to support it before it can be run. These services are:

  • Certmanager
  • Zookeeper

Let’s install cert-manager in our Kubernetes cluster

$ kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.6.2/cert-manager.yaml

Check the status to verify cert-manager resources have been created

$ kubectl get ns
NAME                 STATUS   AGE
cert-manager         Active   6s
default              Active   20m
kube-node-lease      Active   20m
kube-public          Active   20m
kube-system          Active   20m
local-path-storage   Active   20m

$ kubectl get po -n cert-manager
NAME                                       READY   STATUS    RESTARTS   AGE
cert-manager-74f9fd7fb6-kqhsq              1/1     Running   0          14s
cert-manager-cainjector-67977b8fcc-k49gj   1/1     Running   0          14s
cert-manager-webhook-7ff8d87f4-wg94l       1/1     Running   0          14s

Install zookeeper using Helm charts. If you don’t have helm installed, you can follow the installation guide available in the helm docs.

$ helm repo add pravega https://charts.pravega.io
"pravega" has been added to your repositories

$ helm repo update

$ helm install zookeeper-operator --namespace=zookeeper --create-namespace pravega/zookeeper-operator

… [truncated]


$ kubectl --namespace zookeeper create -f - <<EOF
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
    name: zookeeper
    namespace: zookeeper
spec:
    replicas: 1
EOF

You should see output similar to this, which means the cluster creation is in progress.

zookeepercluster.zookeeper.pravega.io/zookeeper created

Verify that both the zookeeper operator and cluster pods are running

$ kubectl -n zookeeper get po
NAME                                  READY   STATUS    RESTARTS   AGE
zookeeper-0                           1/1     Running   0          31s
zookeeper-operator-5857967dcc-kfxxt   1/1     Running   0          3m4s

Now that we have all the prerequisites out of the way, let’s install the actual Kafka cluster components.

Kafka has an operator called Koperator which we’ll use to manage our Kafka install. It will take about 4-5 minutes for the Kafka cluster to come up.

$ kubectl create --validate=false -f https://github.com/banzaicloud/koperator/releases/download/v0.21.2/kafka-operator.crds.yaml

$ helm repo add banzaicloud-stable https://kubernetes-charts.banzaicloud.com/


$ helm repo update
… [truncated]

$ helm install kafka-operator --namespace=kafka --create-namespace banzaicloud-stable/kafka-operator

… [truncated]

$ kubectl create -n kafka -f https://raw.githubusercontent.com/banzaicloud/koperator/master/config/samples/simplekafkacluster.yaml

Run kubectl -n kafka get po to confirm that Kafka has started. It takes a few minutes for Kafka to be operational. Please wait before proceeding.

Configuring Kafka topic

Let’s configure the topic before configuring it in MinIO; the topic is a prerequisite.

Create a topic called my-topic

$ kubectl apply -n kafka -f - <<EOF
apiVersion: kafka.banzaicloud.io/v1alpha1
kind: KafkaTopic
metadata:
    name: my-topic
spec:
    clusterRef:
        name: kafka
    name: my-topic
    partitions: 1
    replicationFactor: 1
    config:
        "retention.ms": "604800000"
        "cleanup.policy": "delete"
EOF

It should return the following output. If it does not, the topic creation was not successful. If unsuccessful, wait for a few minutes for the Kafka cluster to come online and then rerun it again.

kafkatopic.kafka.banzaicloud.io/my-topic created

We need one of the Kafka pod’s IP and Port for the next few steps

To get the IP:

$ kubectl -n kafka describe po kafka-0- | grep -i IP:
IP:           10.244.1.5
  IP:           10.244.1.5

Note: The IP will be different for you and might not match above.

There are a few ports we’re interested in

$ kubectl -n kafka get po kafka-0- -o yaml | grep -iA1 containerport
    - containerPort: 29092
      name: tcp-internal
--
    - containerPort: 29093
      name: tcp-controller


… [truncated]

  • Tcp-internal 29092: This is the port used when you act as a consumer wanting to process the incoming messages to the Kafka cluster.
  • Tcp-controller 29093: This is the port used when a producer, such as MinIO, wants to send messages to the Kafka cluster.

These IPs and Ports might change in your own setup so please be sure to get the correct value for your cluster.

Install MinIO

We’ll install MinIO in its own namespace in the same Kubernetes cluster as our other resources.

Fetch the MinIO repo

$ git clone https://github.com/minio/operator.git

Apply the resources to install MinIO

$ kubectl apply -k operator/resources


$ kubectl apply -k operator/examples/kustomization/tenant-lite

Verify MinIO is up and running. You can get the port of the MinIO console, in this case 9443.

$ kubectl -n tenant-lite get svc | grep -i console

storage-lite-console             ClusterIP   10.96.0.215     <none>        9443/TCP   6m53s

Set up kubernetes port forwarding: we chose port 39443 here for the host but this could be anything, just be sure to use this same port when accessing the console through a web browser.

$ kubectl -n tenant-lite port-forward svc/storage-lite-console 39443:9443


Forwarding from 127.0.0.1:39443 -> 9443

Forwarding from [::1]:39443 -> 9443

Access through the web browser using the following credentials

URL: https://localhost:39443

User: minio

Pass: minio123

Configure MinIO producer

We’ll configure MinIO to send events to my-topic in the Kafka cluster we created earlier using the mc admin tool.

I’m launching an Ubuntu pod here so I have a clean workspace to work off of and more importantly I’ll have access to all the pods in the cluster without having to port-forward each individual service.

$ kubectl apply -f - <<EOF
apiVersion: v1
kind: Pod
metadata:
  name: ubuntu
  labels:
    app: ubuntu
spec:
  containers:
  - image: ubuntu
    command:
      - "sleep"
      - "604800"
    imagePullPolicy: IfNotPresent
    name: ubuntu
  restartPolicy: Always
EOF

Shell into the Ubuntu pod to make sure it’s up

$ kubectl exec -it ubuntu -- /bin/bash


root@ubuntu:/#

If you see any command prefixed with root@ubuntu:/ that means it is being run from inside this ubuntu pod.

Fetch mc binary and install using the following commands

root@ubuntu:/# apt-get update
apt-get -y install wget
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
mv mc /usr/local/bin/

Verify it’s installed properly

root@ubuntu:/# mc --version


mc version RELEASE.2022-08-05T08-01-28Z (commit-id=351d021b924b4d19f1eb716b9e2bd74644c402d8)

Runtime: go1.18.5 linux/amd64

Copyright (c) 2015-2022 MinIO, Inc.

License GNU AGPLv3 <https://www.gnu.org/licenses/agpl-3.0.html>

Configure mc admin to use our MinIO cluster

  • mc alias set <alias_name> <minio_tenant_url> <minio_username> <minio_password>

In our case that would translate to

root@ubuntu:/# mc alias set myminio https://minio.tenant-lite.svc.cluster.local minio minio123


Added `myminio` successfully.

Verify the configuration is working as expected by running the command below; you should see something similar to 8 drives online, 0 drives offline

root@ubuntu:/# mc admin info myminio


… [truncated]

Pools:
  1st, Erasure sets: 1, Disks per erasure set: 8

8 drives online, 0 drives offline

Set the Kafka configuration in MinIO through mc admin. You will need to customize the command below with

root@ubuntu:/# mc admin config set myminio \
notify_kafka:1 \
brokers="10.244.1.5:29093" \
topic="my-topic" \
tls_skip_verify="off" \
queue_dir="" \
queue_limit="0" \
sasl="off" \
sasl_password="" \
sasl_username="" \
tls_client_auth="0" \
tls="off" \
client_tls_cert="" \
client_tls_key="" \
version="" --insecure

There are a few of these configurations you have to pay particular attention to

  • brokers="10.244.1.5:29093": These are the Kafka servers with the format server1:port1,server2:port2,serverN:portN. Note: If you decide to give more than one Kafka server, you need to give the IPs of all the servers; if you give a partial list it will fail. You can give a single server, but the downside is that if that server goes down then the config will not be aware of the other Kafka servers in the cluster. As we mentioned before there are two ports: TCP-internal 29092 and TCP-controller 29093. Since we are configuring MinIO as the Producer, we’ll use 29093.
  • topic="my-topic": The topic name should match the topic we created earlier in the Kafka cluster. As a reminder, MinIO does not create this topic automatically; it has to be available beforehand.
  • notify_kafka:1: This is the configuration name that will be used to actually add the events later.

Please visit our documentation for more details on these parameters.

Once it is successful you should see the output below

Successfully applied new settings.

And as required let’s restart the admin service

root@ubuntu:/# mc admin service restart myminio


Restart command successfully sent to `myminio`. Type Ctrl-C to quit or wait to follow the status of the restart process.

....

Restarted `myminio` successfully in 2 seconds

Create a bucket in MinIO called images. This is where the raw objects will be stored.

root@ubuntu:/# mc mb myminio/images --insecure


Bucket created successfully `myminio/images`.

We want to limit the messages being sent to the queue only to .jpg images; this could be expanded as needed, for example if you wanted to set the message to fire based on another file extension such as .png.

root@ubuntu:/# mc event add  myminio/images arn:minio:sqs::1:kafka --suffix .jpg


Successfully added arn:minio:sqs::1:kafka


# Verify it has been added properly
root@ubuntu:/# mc event list myminio/images


arn:minio:sqs::1:kafka   s3:ObjectCreated:*,s3:ObjectRemoved:*,s3:ObjectAccessed:*   Filter: suffix=".jpg"

For more details on how to configure Kafka with MinIO, please visit our documentation.

Build MinIO consumer

It would be very cool if we actually had a script that could consume these events produced by MinIO and do some operation with those objects. So why not do that? This way we have a holistic view of the workflow.

While still logged into our ubuntu pod, install python3 and python3-pip for our script to run. Since this is Ubuntu minimal we also need vim to edit our script.

root@ubuntu:/# apt-get -y install python3 python3-pip vim

For our Python consumer script we need to install a few Python packages through pip

root@ubuntu:/# pip3 install minio kafka-python

Collecting minio

  Downloading minio-7.1.11-py3-none-any.whl (76 kB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 76.1/76.1 KB 4.1 MB/s eta 0:00:00

Collecting kafka-python

  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 246.5/246.5 KB 8.6 MB/s eta 0:00:00

Collecting certifi

  Downloading certifi-2022.6.15-py3-none-any.whl (160 kB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 160.2/160.2 KB 11.6 MB/s eta 0:00:00

Collecting urllib3

  Downloading urllib3-1.26.11-py2.py3-none-any.whl (139 kB)

     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 139.9/139.9 KB 18.4 MB/s eta 0:00:00

Installing collected packages: kafka-python, urllib3, certifi, minio

Successfully installed certifi-2022.6.15 kafka-python-2.0.2 minio-7.1.11 urllib3-1.26.11

If you see the above messages then we have successfully installed the required dependencies for our script.

We’ll show the entire script here, then walk you through the different components that are in play. For now save this script as minio_consumer.py

from minio import Minio
import urllib3

from kafka import KafkaConsumer
import json

# Convenient dict for basic config
config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "minio.tenant-lite.svc.cluster.local",
  "minio_username": "minio",
  "minio_password": "minio123",
  "kafka_servers""10.244.1.5:29092",
  "kafka_topic":    "my-topic", # This needs to be created manually
}

# Since we are using self-signed certs we need to disable TLS verification
http_client = urllib3.PoolManager(cert_reqs='CERT_NONE')
urllib3.disable_warnings()

# Initialize MinIO client
minio_client = Minio(config["minio_endpoint"],
              secure=True,
              access_key=config["minio_username"],
              secret_key=config["minio_password"],
              http_client = http_client
              )

# Create destination bucket if it does not exist
if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

# Initialize Kafka consumer
consumer = KafkaConsumer(
  bootstrap_servers=config["kafka_servers"],
  value_deserializer = lambda v: json.loads(v.decode('ascii'))
)

consumer.subscribe(topics=config["kafka_topic"])

try:
  print("Ctrl+C to stop Consumer\n")
  for message in consumer:
    message_from_topic = message.value

    request_type = message_from_topic["EventName"]
    bucket_name, object_path = message_from_topic["Key"].split("/", 1)

    # Only process the request if a new object is created via PUT
    if request_type == "s3:ObjectCreated:Put":
      minio_client.fget_object(bucket_name, object_path, object_path)
     
      print("- Doing some pseudo image resizing or ML processing on %s" % object_path)


      minio_client.fput_object(config["dest_bucket"], object_path, object_path)

      print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))

except KeyboardInterrupt:

  print("\nConsumer stopped.")

  • We’ll import the pip packages we installed earlier in the process

from minio import Minio
import urllib3

from kafka import KafkaConsumer
import json

  • Rather than modifying parameters within the code each time, we’ve surfaced some common configurable parameters in this config dict.

config = {
  "dest_bucket":    "processed", # This will be auto created
  "minio_endpoint": "minio.tenant-lite.svc.cluster.local",
  "minio_username": "minio",
  "minio_password": "minio123",
  "kafka_servers""10.244.1.5:29092",
  "kafka_topic":    "my-topic", # This needs to be created manually

  • The MinIO cluster we launched is using a self-signed cert. When trying to connect we’ll need to ensure it accepts self signed certificates.

http_client = urllib3.PoolManager(cert_reqs='CERT_NONE')
urllib3.disable_warnings()

  • We’ll check to see if our destination bucket to store the processed data exists; if it does not then we’ll go ahead and create one.

if not minio_client.bucket_exists(config["dest_bucket"]):
  minio_client.make_bucket(config["dest_bucket"])
  print("Destination Bucket '%s' has been created" % (config["dest_bucket"]))

  • Configure the Kafka brokers to connect to along with the topic to subscribe to

consumer = KafkaConsumer(
  bootstrap_servers=config["kafka_servers"],
  value_deserializer = lambda v: json.loads(v.decode('ascii'))
)

consumer.subscribe(topics=config["kafka_topic"])

  • When you stop the consumer generally it spits out a stack trace because a consumer is meant to be running forever consuming messages. This will allow us to cleanly exit the consumer

try:
  print("Ctrl+C to stop Consumer\n")


… [truncated]


except KeyboardInterrupt:
  print("\nConsumer stopped.")

As mentioned earlier, we’ll be continuously waiting, listening for new messages on the topic. Once we get a topic we break it down into three components

  • request_type: The type of HTTP request: GET, PUT, HEAD
  • bucket_name: Name of the bucket where the new object was added
  • object_path: Full path to the object where it was added in the bucket

  for message in consumer:
    message_from_topic = message.value

    request_type = message_from_topic["EventName"]
    bucket_name, object_path = message_from_topic["Key"].split("/", 1)

  • Every time you make any request, MinIO will add a message to the topic which will be read by our minio_consumer.py script. So to avoid an infinite loop, let's only process when new objects are added, which in this case is the request type PUT.

if request_type == "s3:ObjectCreated:Put":
      minio_client.fget_object(bucket_name, object_path, object_path)

  • This is where you would add your customer code to build your ML models, resize your images, and process your ETL/ELTs jobs.

      print("- Doing some pseudo image resizing or ML processing on %s" % object_path)

  • Once the object is processed, it will be uploaded to the destination bucket we configured earlier. If the bucket does not exist our script will auto-create it.

minio_client.fput_object(config["dest_bucket"], object_path, object_path)
      print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"]))

There you have it. Other than a few boilerplate code we are essentially doing two things:

  • Listening for messages on a Kafka topic
  • Putting the object in a MinIO bucket

The script is not perfect — you need to add some additional error handling, but it's pretty straightforward. The rest you can modify with your own code base. For more details please visit our MinIO Python SDK documentation.

Consume MinIO events

We’ve built it, now lets see it in action. Create two terminals:

  • Terminal 1 (T1): Ubuntu pod running minio_consumer.py
  • Terminal 2 (T2): Ubuntu pod with mc.

Open T1 and run the minio_consumer.py script we wrote earlier using python3. If at any time you want to quit the script you can type Ctrl+C

root@ubuntu:/# python3 minio_consumer.py


Ctrl+C to stop Consumer

Now let’s open T2 and PUT some objects into the MinIO images bucket we created earlier using mc.

Start by creating a test object

root@ubuntu:/# touch rose.jpg
root@ubuntu:/# echo "a" > rose.jpg

Upload the test object to the images bucket to a few different paths

root@ubuntu:/# mc cp rose.jpg myminio/images --insecure


/rose.jpg:            2 B / 2 B ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 55 B/s 0s


root@ubuntu:/# mc cp rose.jpg myminio/images/deeper/path/rose.jpg --insecure


/rose.jpg:            2 B / 2 B ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 63 B/s 0s

In our other terminal T1 where the MinIO consumer script is running, you should see a few messages similar to below

root@ubuntu:/# python3 minio_consumer.py


… [truncated]


- Doing some pseudo image resizing or ML processing on rose.jpg

- Uploaded processed object 'rose.jpg' to Destination Bucket 'processed'

- Doing some pseudo image resizing or ML processing on deeper/path/rose.jpg

- Uploaded processed object 'deeper/path/rose.jpg' to Destination Bucket 'processed'

We should verify the processed object has been uploaded to the processed bucket also

root@ubuntu:/# mc ls myminio/processed

[2022-08-12 01:03:46 UTC]     2B STANDARD rose.jpg


root@ubuntu:/# mc ls myminio/processed/deeper/path

[2022-08-12 01:09:04 UTC]     2B STANDARD rose.jpg

As you can see we’ve successfully uploaded the object to a processed bucket from unprocessed raw data.

Build workflows on AIStor with notifications

What we showed here is just a sample of what you can achieve with this workflow. By leveraging Kafka’s durable messaging and AIStor's resilient storage, you can build complex AI applications that are backed by an infrastructure that can scale and keep up with workloads such as:

  • Machine learning models
  • Image resizing
  • Processing ETL / ELT jobs

Don’t take our word for it though — build it yourself. You can download AIStor here and you can join our Slack channel here.