Building an ML Data Pipeline with MinIO and Kubeflow v2.0

Building an ML Data Pipeline with MinIO and Kubeflow v2.0

Kubeflow Pipelines (KFP) is the most popular feature of Kubeflow. A Python engineer can turn a function written in plain old Python into a component that runs in Kubernetes using the KFP decorators. If you used KFP v1, be warned - the programming model in KFP v2 is very different - however, it is a big improvement. Transforming plain old Python into reusable components and orchestrating these components into pipelines is a lot easier.

In this post I want to go beyond the obligatory “Hello World” demo and present something that I hope you will find either directly usable or at the very least a framework for plugging in your own logic.

What I will do is show how to build a KFP Pipeline that downloads US Census Bureau Data (which is a public data set that is free to access) and saves this data to MinIO. MinIO is a great way to store your ML data and models. Using MinIO, you can save training sets, validation sets, test sets, and models without worrying about scale or performance. Also, someday AI will be regulated; when this day comes, you will need MinIO’s enterprise features (object locking, versioning, encryption, and legal locks) to secure your data at rest and to make sure you do not accidentally delete something that a regulatory agency may request.

You can learn more about the data we will be using here. To get an API key for the Census API, go to the Census Bureau’s site for developers. This is very simple. All you need to do is specify an email address.

What We Will Build

In this post, I will build a pipeline that takes a table code (an identifier within the Census Bureau’s dataset) and a year as parameters. It will then download the table via an API, if we have not previously downloaded it.

We will only call the Census API if we have not previously downloaded the table. When we call the ACS API, we will save the data in an instance of MinIO that we set up for storing raw data. This is different from the MinIO instance KFP uses internally. We could have tried to use KFP’s instance of MinIO - however, this is not the best design for an ML Data Pipeline. You will want a storage solution that is totally under your control for the reasons I described earlier. Below is a diagram of our Kubeflow and MinIO deployments that illustrates the purpose of each instance of MinIO.

Before we start writing code, let's create a logical design of our pipeline.

Logical Pipeline Design

The pipelines you run in KFP are known as Directed Acyclic Graphs (DAG). They move in one direction and do not backtrack - no closed loops. This is what you would expect of a data pipeline. Below is the logical design of the DAG we will build and run in KFP. It is self-explanatory. Starting with a conceptual workflow is a good way to help you transform your logic into functions that will leverage KFP to the fullest.

Now that we have a logical design, let’s start coding. I am going to assume you have KFP installed and that you also have set up your own instance of MinIO. If you do not have KFP 2.0 and MinIO installed, check out Setting up a Development Machine with Kubeflow Pipeline 2.0 and MinIO.

Creating Python Functions from a Logical Design

Each task in the logical design above is going to become a Python function. The function signatures below show how the parameters and return values would be designed if we were writing a Python script or stand alone service without KFP. I want to discuss this in case you are migrating existing code to KFP.

def survey_data_exists(survey_code: str, year: int) -> bool:
'''Check MinIO to see if the survey data exists.'''
pass

def download_survey_data(table_code: str, year: int) -> pd.DataFrame:
'''Download the survey data using the CB API and return a Pandas dataframe.'''
pass

def save_survey_data(bucket: str, object_name: str, survey_df: pd.DataFrame) -> None:
'''Save the survey data which is a Pandas dataframe to the MinIO bucket.'''
pass

def get_survey_data(bucket: str, object_name: str) -> pd.DataFrame:
pass


A few comments about the functions above. They use type hints. If you are writing plain old Python, you can opt out of type hints because they are optional. In Kubeflow Pipelines, they are not - you must use type hints so that KFP can tell you if your parameters and return values do not match when assembling functions into a pipeline. This is a good thing. KFP will find type mismatch errors when you compile your pipeline. These same errors would be very hard to track down at runtime within a cluster.

It may be tempting to combine functions so that you have fewer functions to manage. For example, the last three functions could be combined into one by using a simple “if else” statement and then the first function would not be needed. This is not a best practice when using a tool like KFP. As we will see, KFP has constructs for conditions and loops. By using KFPs constructs you will get better visualizations of your pipeline in the KFP UI. Parallelisms are also possible which will improve pipeline performance. Finally, if we keep our functions simple we will get better reuse.

We are now ready to create Kubeflow Pipeline components using our Python functions.

Creating KFP Components from Python Functions

The code below is the complete implementation of our Pipeline components. When you use tools like KFP and MinIO, you really do not have a lot of plumbing code to write.

@dsl.component(packages_to_install=['minio==7.1.14'])
def table_data_exists(bucket: str, table_code: str, year: int) -> bool:
  '''
  Check for the existence of Census table data in MinIO.
  '''
  from minio import Minio
  from minio.error import S3Error
  import logging

  object_name=f'{table_code}-{year}.csv'

  logger = logging.getLogger('kfp_logger')
  logger.setLevel(logging.INFO)
  logger.info(bucket)
  logger.info(table_code)
  logger.info(year)
  logger.info(object_name)
 
  try:
      # Create client with access and secret key.
      client = Minio('host.docker.internal:9000',
                  'Access key here.',
                  'Secret key here.',
                  secure=False)

      bucket_found = client.bucket_exists(bucket)
      if not bucket_found:
          return False

      objects = client.list_objects(bucket)
      found = False
      for obj in objects:
          logger.info(obj.object_name)
          if object_name == obj.object_name: found = True

  except S3Error as s3_err:
      logger.error(f'S3 Error occurred: {s3_err}.')
  except Error as err:
      logger.error(f'Error occurred: {err}.')

  return found


@dsl.component(packages_to_install=['pandas==1.3.5', 'requests'])
def download_table_data(dataset: str, table_code: str, year: int, table_df: Output[Dataset]):
  '''
  Returns all fields for the specified table. The output is a DataFrame saved to csv.
  '''
  import logging
  import pandas as pd
  import requests

  logger = logging.getLogger('kfp_logger')
  logger.setLevel(logging.INFO)

  census_endpoint = f'https://api.census.gov/data/{year}/{dataset}'
  census_key = 'Census key here.'
 
  # Setup a simple dictionary for the requests parameters.
  get_token = f'group({table_code})'
  params = {'key': census_key,
            'get': get_token,
            'for': 'county:*'
            }

  # sending get request and saving the response as response object
  response = requests.get(url=census_endpoint, params=params)
 
  # Extract the data in json format.
  # The first row of our matrix contains the column names. The remaining rows
  # are the data.
  survey_data = response.json()
  df = pd.DataFrame(survey_data[1:], columns = survey_data[0])
  df.to_csv(table_df.path, index=False)
  logger.info(f'Table {table_code} for {year} has been downloaded.')


@dsl.component(packages_to_install=['pandas==1.3.5', 'minio==7.1.14'])
def save_table_data(bucket: str, table_code: str, year: int, table_df: Input[Dataset]):
  import io
  import logging
  from minio import Minio
  from minio.error import S3Error
  import pandas as pd

  object_name=f'{table_code}-{year}.csv'

  logger = logging.getLogger('kfp_logger')
  logger.setLevel(logging.INFO)
  logger.info(bucket)
  logger.info(table_code)
  logger.info(year)
  logger.info(object_name)

  df = pd.read_csv(table_df.path)

  try:
      # Create client with access and secret key
      client = Minio('host.docker.internal:9000',
                  'Access key here.',
                  'Secret key here.',
                  secure=False)

      # Make the bucket if it does not exist.
      found = client.bucket_exists(bucket)
      if not found:
          logger.info(f'Creating bucket: {bucket}.')
          client.make_bucket(bucket)

      # Upload the dataframe as an object.
      encoded_df = df.to_csv(index=False).encode('utf-8')
      client.put_object(bucket, object_name, data=io.BytesIO(encoded_df), length=len(encoded_df), content_type='application/csv')
      logger.info(f'{object_name} successfully uploaded to bucket {bucket}.')
      logger.info(f'Object length: {len(df)}.')

  except S3Error as s3_err:
      logger.error(f'S3 Error occurred: {s3_err}.')
  except Error as err:
      logger.error(f'Error occurred: {err}.')


@dsl.component(packages_to_install=['pandas==1.3.5', 'minio==7.1.14'])
def get_table_data(bucket: str, table_code: str, year: int, table_df: Output[Dataset]):
  import io
  import logging
  from minio import Minio
  from minio.error import S3Error
  import pandas as pd

  object_name=f'{table_code}-{year}.csv'

  logger = logging.getLogger('kfp_logger')
  logger.setLevel(logging.INFO)
  logger.info(bucket)
  logger.info(table_code)
  logger.info(year)
  logger.info(object_name)

  # Get data of an object.
  try:
      # Create client with access and secret key
      client = Minio('host.docker.internal:9000',
                  'Access key here.',
                  'Secret key here.',
                  secure=False)

      response = client.get_object(bucket, object_name)
      df = pd.read_csv(io.BytesIO(response.data))
      df.to_csv(table_df.path, index=False)
      logger.info(f'Object: {object_name} has been retrieved from bucket: {bucket} in MinIO object storage.')
      logger.info(f'Object length: {len(df)}.')

  except S3Error as s3_err:
      logger.error(f'S3 Error occurred: {s3_err}.')
  except Error as err:
      logger.error(f'Error occurred: {err}.')

  finally:
      response.close()
      response.release_conn()

The most important fact to keep in mind as you implement and troubleshoot these functions is that at runtime they are not functions at all. They will be components. In other words, KFP will take each function and deploy it to its own container. This sample uses Lightweight Python Components. You can also use containerized Python components which give you more control over what is put into the container. There is also a containerized components option for non-Python code.

KFP introduces several constructs to help you seamlessly create functions that can behave as standalone components running in a container. They are the component decorator, parameters, and artifacts. Let’s walk through these tools so that you understand how KFP deploys functions and passes data between them at run time.

Components

The component decorator tells KFP that a function should be deployed as a component. Carefully look at how this decorator is used in the code above. Since the function will be deployed separately to a container you need to tell KFP its dependencies. This is done using the packages_to_install parameter of the decorator. This only ensures that dependencies are installed (via pip). It does not import them for you. You need to do this yourself within the function definition. This may look a little unorthodox as most of us are used to importing dependencies at the module level - but is OK when using a tool like KFP that turns functions into services.

Passing data between components must be done with care. KFP v2 makes the distinction between parameters and artifacts. Parameters are for simple data that is passed between function calls (int, bool, str, float, list, dict). Artifacts, on the other hand, represent data that your functions retrieve from an external source or create - such as datasets, models, and metrics that depict the accuracy of your model. You can even use artifact to create HTML and Markdown if you want to style your output such that it is more presentable in the Kubeflow UI. Since artifacts can be large KFP uses its own instance of MinIO to store them.

Parameters (and return values)

KFP makes use of Python type hints for specifying simple input parameters and simple return values. You are limited to using str, int, float, bool, list, and dict. The table_data_exists function above shows how parameters are specified in a function signature. Syntactically, you specify these the same way you would with standard Python. Remember using type hints is a requirement. At runtime, KFP takes care of marshaling these values between components - which are running in different containers.

If a function requires a more complicated data type as an input or if it returns a complicated data type then use artifacts.

Artifacts

Artifacts are different from input parameters and output values in that they may get large. Examples of an artifact are: a dataset, a model, metrics (the results of ML training efforts), HTML, and Mark Down. Under the hood, KFP uses its own instance of MinIO to store artifacts. When you pass an artifact from one component to another KFP does not pass the artifact directly - rather it stores the artifact in MinIO and passes a reference to the artifact (object) in MinIO. This is really clever. It means that if you have a large artifact that needs to be accessed by several components then the artifact can be efficiently accessed by these components - since MinIO is purpose built for efficient object storage and access.

Let’s look at what happens when you pass an artifact to a component. In the code sample above, save_table_data shows how this is done. Before your function is invoked, KFP copies the artifact from its instance of MinIO to the local file system of the container your component is running in. Your code will need to read this file. This is done using the path attribute of the parameter you declared to be of type Input[Dataset]. In the save_table_data function, I read this file into a Pandas DataFrame.

Output artifacts are specified as function parameters and cannot be the return value of a function. In the code above, get_table_data shows how to use output artifacts. Notice that the table_df parameter has a datatype of  Output[Dataset]. To successfully return data from a function, you must write the data to the location specified in the parameter’s path attribute.  Again, this is a reference to the local file system in your container - KFP will take care of moving this file to its instance of MinIO when your function completes.

We are now ready to assemble our components into a pipeline.

Creating Pipelines from Components

The code below creates our pipeline (or DAG) from the components we implemented in the previous section.

@dsl.pipeline(
  name='census-pipeline',
  description='Pipeline that will download Census data and save to MinIO.'
)
def census_pipeline(bucket: str, dataset: str, table_code: str, year: int) -> Dataset:
  # Positional arguments are not allowed.
  # When I set the name parameter of the condition that task in the DAG fails.

  exists = table_data_exists(bucket=bucket, table_code=table_code, year=year)

  with dsl.Condition(exists.output == False):
      table_data = download_table_data(dataset=dataset, table_code=table_code, year=year)
      save_table_data(bucket=bucket,
                      table_code=table_code,
                      year=year,
                      table_df=table_data.outputs['table_df'])

  with dsl.Condition(exists.output == True):
      table_data = get_table_data(bucket=bucket,
                      table_code=table_code,
                      year=year)

  return table_data.outputs['table_df']

There are a few things worth noting in this function. First, the pipeline decorator is telling KFP that this function contains our pipeline definition. The name and description you specify here will show up in the KFP UI.

Next, the return value of this pipeline function is a Dataset. It turns out that pipelines can be used just like components. When a pipeline has a return value then it can be used within another pipeline. This is a great way to reuse components.

Finally, we are using the dsl.Condition (which is a Python context manager) to only call our download component if the data we need is not already in our instance of MinIO. We could have used a conventional if statement here. However, if we did then KFP would not have any way of knowing that we have a branch in our logic. By using the dsl.Condition construct we are telling KFP about a branch in our pipeline. This will allow the KFP UI to give us a better visual representation.

Running a Pipeline

Once you have your components and your pipeline implemented you are two lines of code away from running your pipeline.

client = Client()

run = client.create_run_from_pipeline_func(census_pipeline, experiment_name='Implementing functions', enable_caching=False,
  arguments={
      'bucket': 'census-data',
      'table_code': 'B01001',
      'year': 2020
  }
)

Choose a meaningful experiment name. The KFP UI has an experiments tab that will group runs with the same experiment name. The code above “compiles” your pipeline and components - which is merely the act of putting everything into a YAML file (including your source code). If you have any type mismatches that I described earlier, then you will find out about these problems while creating the run. This code will also send your pipeline to KFP and run it. Below is a screenshot showing a few successful runs of our pipeline.

Summary

In this post we created a data pipeline that uses KFP and MinIO to download and save US Census data. To do this we set up our own instance of MinIO for storing raw data. This is an important piece of an ML pipeline - someday AI will be regulated and having a storage solution under your control allows you to version, lock, and encrypt data used for training and the models themselves.

We also discussed how KFP uses its own instance of MinIO to efficiently save and access artifacts during pipeline runs.

In my next post, I will show how this data pipeline can be used as input to another pipeline that uses Census data to train a model. If you have questions, drop us a line at hello@min.io or join the discussion on our general Slack channel.

Previous Post Next Post