AI Data Workflows with Kafka and MinIO
AIStor is a foundational component for creating and executing complex data workflows. At the core of this event-driven functionality is MinIO bucket notifications using Kafka. AIStor produces event notifications for all HTTP requests like PUT
, POST
, COPY
, DELETE
, GET
, HEAD
and CompleteMultipartUpload
. You can use these notifications to trigger appropriate applications, scripts and Lambda functions to take an action after object upload triggers an event notification.
Event notifications provide a loosely-coupled paradigm for multiple microservices to interact and collaborate. In this paradigm, microservices do not make direct calls to each other, but instead communicate using event notifications. Once a notification is sent, the sending service can return to its tasks, while the receiving service takes action. This level of isolation makes it easier to maintain code — changing one service does not require changing other services as they communicate through notifications, not direct calls.
There are several use cases which rely on AIStor event notification to execute data workflows. For example we can run AI/ML pipelines using the raw data for objects that will be stored in AIStor.
- The pipeline that processes the data will trigger whenever raw objects are added
- Based on that objects added, the models will run.
- The final model can be saved to a bucket in AIStor which could then be consumed by other applications as the final product.
Building a workflow
We’re going to build an example workflow using AIStor and Kafka for a hypothetical image resizer app. It essentially takes incoming images and resizes them according to certain app specifications, then saves them to another bucket where they can be served. This might be done in the real world to resize images and make them available to mobile apps, or simply to resize images to alleviate the strain on resources that occurs when resizing them on the fly.
There are several components to it and Kafka and AIStor are used together to power this complex workflow
- AIStor, the producer: Raw objects that are coming in are stored in AIStor. Any time an object is added, it sends a message to Kafka to broker a specific topic.
- Kafka, the broker: The broker maintains the state of the queue, stores the incoming messages, and makes it available for consumers to consume.
- AIStor, the consumer: The consumer will read this message in the queue, as they come in in real time, process the raw data, and upload it to an AIStor bucket.
AIStor is the foundation of all of this as it is the producer and the consumer of this workflow.
Use a Kubernetes cluster
We need a Kubernetes cluster for our services to run on. You can use any Kubernetes cluster but in this example we’ll use a kind
cluster. If Kind is not already installed, please follow this quickstart guide for instructions. Use the below kind cluster configuration to build a simple single master with a multi-worker Kubernetes cluster.
Save this yaml as kind-config.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
Launch the cluster (this may take several minutes)
$ kind create cluster --name minio-kafka --config kind-config.yaml … [truncated]
… [truncated] |
Verify the cluster is up
$ kubectl get no |
Installing Kafka
Kafka requires a few services to be running to support it before it can be run. These services are:
- Certmanager
- Zookeeper
Let’s install cert-manager in our Kubernetes cluster
$ kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.6.2/cert-manager.yaml |
Check the status to verify cert-manager resources have been created
$ kubectl get ns |
Install zookeeper using Helm charts. If you don’t have helm installed, you can follow the installation guide available in the helm docs.
$ helm repo add pravega https://charts.pravega.io $ kubectl --namespace zookeeper create -f - <<EOF |
You should see output similar to this, which means the cluster creation is in progress.
zookeepercluster.zookeeper.pravega.io/zookeeper created |
Verify that both the zookeeper operator and cluster pods are running
$ kubectl -n zookeeper get po |
Now that we have all the prerequisites out of the way, let’s install the actual Kafka cluster components.
Kafka has an operator called Koperator which we’ll use to manage our Kafka install. It will take about 4-5 minutes for the Kafka cluster to come up.
$ kubectl create --validate=false -f https://github.com/banzaicloud/koperator/releases/download/v0.21.2/kafka-operator.crds.yaml $ helm repo add banzaicloud-stable https://kubernetes-charts.banzaicloud.com/
|
Run kubectl -n kafka get po
to confirm that Kafka has started. It takes a few minutes for Kafka to be operational. Please wait before proceeding.
Configuring Kafka topic
Let’s configure the topic before configuring it in MinIO; the topic is a prerequisite.
Create a topic called my-topic
$ kubectl apply -n kafka -f - <<EOF |
It should return the following output. If it does not, the topic creation was not successful. If unsuccessful, wait for a few minutes for the Kafka cluster to come online and then rerun it again.
kafkatopic.kafka.banzaicloud.io/my-topic created |
We need one of the Kafka pod’s IP and Port for the next few steps
To get the IP:
$ kubectl -n kafka describe po kafka-0- | grep -i IP: |
Note: The IP will be different for you and might not match above.
There are a few ports we’re interested in
$ kubectl -n kafka get po kafka-0- -o yaml | grep -iA1 containerport
|
Tcp-internal 29092
: This is the port used when you act as a consumer wanting to process the incoming messages to the Kafka cluster.Tcp-controller 29093
: This is the port used when a producer, such as MinIO, wants to send messages to the Kafka cluster.
These IPs and Ports might change in your own setup so please be sure to get the correct value for your cluster.
Install MinIO
We’ll install MinIO in its own namespace in the same Kubernetes cluster as our other resources.
Fetch the MinIO repo
$ git clone https://github.com/minio/operator.git |
Apply the resources to install MinIO
$ kubectl apply -k operator/resources $ kubectl apply -k operator/examples/kustomization/tenant-lite |
Verify MinIO is up and running. You can get the port of the MinIO console, in this case 9443
.
$ kubectl -n tenant-lite get svc | grep -i console |
Set up kubernetes port forwarding: we chose port 39443
here for the host but this could be anything, just be sure to use this same port when accessing the console through a web browser.
$ kubectl -n tenant-lite port-forward svc/storage-lite-console 39443:9443 Forwarding from 127.0.0.1:39443 -> 9443 Forwarding from [::1]:39443 -> 9443 |
Access through the web browser using the following credentials
URL: https://localhost:39443
User: minio
Pass: minio123
Configure MinIO producer
We’ll configure MinIO to send events to my-topic in the Kafka cluster we created earlier using the mc admin tool.
I’m launching an Ubuntu pod here so I have a clean workspace to work off of and more importantly I’ll have access to all the pods in the cluster without having to port-forward
each individual service.
$ kubectl apply -f - <<EOF |
Shell into the Ubuntu pod to make sure it’s up
$ kubectl exec -it ubuntu -- /bin/bash
|
If you see any command prefixed with root@ubuntu:/
that means it is being run from inside this ubuntu pod.
Fetch mc
binary and install using the following commands
root@ubuntu:/# apt-get update |
Verify it’s installed properly
root@ubuntu:/# mc --version
Runtime: go1.18.5 linux/amd64 Copyright (c) 2015-2022 MinIO, Inc. License GNU AGPLv3 <https://www.gnu.org/licenses/agpl-3.0.html> |
Configure mc
admin to use our MinIO cluster
mc alias set <alias_name> <minio_tenant_url> <minio_username> <minio_password>
In our case that would translate to
root@ubuntu:/# mc alias set myminio https://minio.tenant-lite.svc.cluster.local minio minio123 Added `myminio` successfully. |
Verify the configuration is working as expected by running the command below; you should see something similar to 8 drives online, 0 drives offline
root@ubuntu:/# mc admin info myminio … [truncated] |
Set the Kafka configuration in MinIO through mc admin
. You will need to customize the command below with
root@ubuntu:/# mc admin config set myminio \ |
There are a few of these configurations you have to pay particular attention to
brokers="10.244.1.5:29093"
: These are the Kafka servers with the formatserver1:port1,server2:port2,serverN:portN
. Note: If you decide to give more than one Kafka server, you need to give the IPs of all the servers; if you give a partial list it will fail. You can give a single server, but the downside is that if that server goes down then the config will not be aware of the other Kafka servers in the cluster. As we mentioned before there are two ports:TCP-internal 29092
andTCP-controller 29093
. Since we are configuring MinIO as the Producer, we’ll use29093
.topic="my-topic"
: The topic name should match the topic we created earlier in the Kafka cluster. As a reminder, MinIO does not create this topic automatically; it has to be available beforehand.notify_kafka:1
: This is the configuration name that will be used to actually add the events later.
Please visit our documentation for more details on these parameters.
Once it is successful you should see the output below
Successfully applied new settings. |
And as required let’s restart the admin service
root@ubuntu:/# mc admin service restart myminio Restart command successfully sent to `myminio`. Type Ctrl-C to quit or wait to follow the status of the restart process. .... Restarted `myminio` successfully in 2 seconds |
Create a bucket in MinIO called images
. This is where the raw objects will be stored.
root@ubuntu:/# mc mb myminio/images --insecure Bucket created successfully `myminio/images`. |
We want to limit the messages being sent to the queue only to .jpg
images; this could be expanded as needed, for example if you wanted to set the message to fire based on another file extension such as .png
.
root@ubuntu:/# mc event add myminio/images arn:minio:sqs::1:kafka --suffix .jpg
# Verify it has been added properly
|
For more details on how to configure Kafka with MinIO, please visit our documentation.
Build MinIO consumer
It would be very cool if we actually had a script that could consume these events produced by MinIO and do some operation with those objects. So why not do that? This way we have a holistic view of the workflow.
While still logged into our ubuntu pod, install python3
and python3-pip
for our script to run. Since this is Ubuntu minimal we also need vim
to edit our script.
root@ubuntu:/# apt-get -y install python3 python3-pip vim |
For our Python consumer script we need to install a few Python packages through pip
root@ubuntu:/# pip3 install minio kafka-python Collecting minio Downloading minio-7.1.11-py3-none-any.whl (76 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 76.1/76.1 KB 4.1 MB/s eta 0:00:00 Collecting kafka-python Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 246.5/246.5 KB 8.6 MB/s eta 0:00:00 Collecting certifi Downloading certifi-2022.6.15-py3-none-any.whl (160 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 160.2/160.2 KB 11.6 MB/s eta 0:00:00 Collecting urllib3 Downloading urllib3-1.26.11-py2.py3-none-any.whl (139 kB) ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 139.9/139.9 KB 18.4 MB/s eta 0:00:00 Installing collected packages: kafka-python, urllib3, certifi, minio Successfully installed certifi-2022.6.15 kafka-python-2.0.2 minio-7.1.11 urllib3-1.26.11 |
If you see the above messages then we have successfully installed the required dependencies for our script.
We’ll show the entire script here, then walk you through the different components that are in play. For now save this script as minio_consumer.py
from minio import Minio minio_client.fput_object(config["dest_bucket"], object_path, object_path) print("- Uploaded processed object '%s' to Destination Bucket '%s'" % (object_path, config["dest_bucket"])) except KeyboardInterrupt: print("\nConsumer stopped.") |
- We’ll import the pip packages we installed earlier in the process
from minio import Minio |
- Rather than modifying parameters within the code each time, we’ve surfaced some common configurable parameters in this config dict.
config = { |
- The MinIO cluster we launched is using a self-signed cert. When trying to connect we’ll need to ensure it accepts self signed certificates.
http_client = urllib3.PoolManager(cert_reqs='CERT_NONE') |
- We’ll check to see if our destination bucket to store the processed data exists; if it does not then we’ll go ahead and create one.
if not minio_client.bucket_exists(config["dest_bucket"]): |
- Configure the Kafka brokers to connect to along with the topic to subscribe to
consumer = KafkaConsumer( |
- When you stop the consumer generally it spits out a stack trace because a consumer is meant to be running forever consuming messages. This will allow us to cleanly exit the consumer
try: … [truncated]
|
As mentioned earlier, we’ll be continuously waiting, listening for new messages on the topic. Once we get a topic we break it down into three components
request_type
: The type of HTTP request: GET, PUT, HEADbucket_name
: Name of the bucket where the new object was addedobject_path
: Full path to the object where it was added in the bucket
for message in consumer: |
- Every time you make any request, MinIO will add a message to the topic which will be read by our
minio_consumer.py
script. So to avoid an infinite loop, let's only process when new objects are added, which in this case is the request type PUT.
if request_type == "s3:ObjectCreated:Put": |
- This is where you would add your customer code to build your ML models, resize your images, and process your ETL/ELTs jobs.
print("- Doing some pseudo image resizing or ML processing on %s" % object_path) |
- Once the object is processed, it will be uploaded to the destination bucket we configured earlier. If the bucket does not exist our script will auto-create it.
minio_client.fput_object(config["dest_bucket"], object_path, object_path) |
There you have it. Other than a few boilerplate code we are essentially doing two things:
- Listening for messages on a Kafka topic
- Putting the object in a MinIO bucket
The script is not perfect — you need to add some additional error handling, but it's pretty straightforward. The rest you can modify with your own code base. For more details please visit our MinIO Python SDK documentation.
Consume MinIO events
We’ve built it, now lets see it in action. Create two terminals:
- Terminal 1 (T1): Ubuntu pod running
minio_consumer.py
- Terminal 2 (T2): Ubuntu pod with
mc
.
Open T1 and run the minio_consumer.py
script we wrote earlier using python3
. If at any time you want to quit the script you can type Ctrl+C
root@ubuntu:/# python3 minio_consumer.py Ctrl+C to stop Consumer |
Now let’s open T2 and PUT some objects into the MinIO images bucket we created earlier using mc
.
Start by creating a test object
root@ubuntu:/# touch rose.jpg |
Upload the test object to the images bucket to a few different paths
root@ubuntu:/# mc cp rose.jpg myminio/images --insecure /rose.jpg: 2 B / 2 B ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 55 B/s 0s root@ubuntu:/# mc cp rose.jpg myminio/images/deeper/path/rose.jpg --insecure /rose.jpg: 2 B / 2 B ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 63 B/s 0s |
In our other terminal T1 where the MinIO consumer script is running, you should see a few messages similar to below
root@ubuntu:/# python3 minio_consumer.py … [truncated] - Doing some pseudo image resizing or ML processing on rose.jpg - Uploaded processed object 'rose.jpg' to Destination Bucket 'processed' - Doing some pseudo image resizing or ML processing on deeper/path/rose.jpg - Uploaded processed object 'deeper/path/rose.jpg' to Destination Bucket 'processed' |
We should verify the processed object has been uploaded to the processed bucket also
root@ubuntu:/# mc ls myminio/processed [2022-08-12 01:03:46 UTC] 2B STANDARD rose.jpg root@ubuntu:/# mc ls myminio/processed/deeper/path [2022-08-12 01:09:04 UTC] 2B STANDARD rose.jpg |
As you can see we’ve successfully uploaded the object to a processed bucket from unprocessed raw data.
Build workflows on AIStor with notifications
What we showed here is just a sample of what you can achieve with this workflow. By leveraging Kafka’s durable messaging and AIStor's resilient storage, you can build complex AI applications that are backed by an infrastructure that can scale and keep up with workloads such as:
- Machine learning models
- Image resizing
- Processing ETL / ELT jobs
Don’t take our word for it though — build it yourself. You can download AIStor here and you can join our Slack channel here.