Building an ML Training Pipeline with MinIO and Kubeflow v2.0

Building an ML Training Pipeline with MinIO and Kubeflow v2.0

Introduction

In a previous post, I covered Building an ML Data Pipeline with MinIO and Kubeflow v2.0. The data pipeline I created downloaded US Census data to a dedicated instance of MinIO. This is different from the MinIO instance Kubeflow Pipelines (KFP) uses internally. We could have tried to use KFP’s instance of MinIO - however, this is not the best design for an ML Model Training Pipeline. With GPUs getting faster and faster - it is possible for a GPU to finish its calculations and have to wait for new data. A GPU that is waiting is an underutilized GPU. You will want a high-speed storage solution that is totally under your control. Below is a diagram of our MinIO deployments that illustrates the purpose of each instance.

It is important to note that my data pipeline code could be easily modified to interface with any data source - another external API, an SFTP location, or a queue. If you have an external data source that is slow and unreliable, and you need a copy of the data in a high-speed resilient storage solution, then use my code as a starting point to get the data into MinIO.

In this post, I will take the next step and train a model. I’ll assume you are familiar with the Kubeflow decorators used to create components and pipelines. I’ll also assume you are familiar with passing data between components. If these constructs are new to you, check out my previous post on building a data pipeline.

If we are going to train a model, then we need something to predict.  In other words, we need a dataset with features and labels.

The Dataset

Kaggle is a great place to hunt for datasets. Not only will you find a wide variety of datasets, but many have been the subject of a challenge where individuals and teams have competed for prizes. Since the results of past challenges are preserved, we can compare our results to the winners.

The dataset we will use is from the GoDaddy - Microbusiness Density Forecasting challenge. I like this dataset for a handful of reasons. First, the goal of this competition was to predict monthly microbusiness density by US county. The winning model would be used by policymakers to gain visibility into microbusinesses, resulting in new policies and programs to improve the success and impact of these smallest businesses. Second, the data is broken down by US county - so we can augment this data with US Census data (remember the pipeline I built in my previous post) to increase accuracy. This was allowed in the competition - so we are not giving ourselves an unfair advantage when we compare our results to the leaderboard. Finally, while our model will predict microbusiness density, it could easily be adapted to provide other predictions for a data set that contains information by geography.

Below is a sample of the dataset provided for this challenge. The `microbusiness_density` column is the label (what our model will try to predict), and all other columns are potential features.

Training a model is really a pipeline that moves data through a series of tasks resulting in a trained model. This is why MinIO and KFP are powerful tools for training models. MinIO for performant and reliable access to your raw data and KFP for repeatable pipelines with documented results.

Let’s build a logical design of our pipeline before writing code.

Logical Pipeline Design

Below is the logical design of our pipeline. It is self-explanatory, and if you have ever trained a model, you have seen these tasks before.

Each block shows a high-level task in our pipeline. When building our data pipeline, we were able to use Lightweight Python Components exclusively. Lightweight Python Components are created from hermetic functions - in other words, functions that do not call out to other functions and require minimal imports. These functions get built into an image which gets deployed to a container where it will run. KFP takes care of the build, deployment, and cross-container marshaling data. This is the easiest way to get your code set up within KFP. However, if you have a lot of dependencies or if you want an entire module (or modules) packaged into a single image, then you can use Containerized Python Components. We will use Containerized Python components in this pipeline. Training a Pytorch model requires creating a DataSet class, a class for your model, as well as a handful of other helper functions for validation, testing, and moving data to a GPU if available.

Additionally, notice the artifacts that are passed between our tasks: Dataset, Model, and HTML in the graphic above. I will assume you are familiar with Datasets and setting them up within your code. (This is how we will pass DataFrames through the first part of the pipeline.) In this post, I will introduce a few of the other artifacts.

Let’s start off by creating a Containerized Python Component.

Containerized Python Components

Containerized Python Components allow us to dictate what gets built into the image that KFP uses. Let’s look at a simple example demonstrating how to create a Containerized Python Component and its benefits. The first thing you need to do is create a subfolder within your project. I created a subfolder named ‘src.’ Below is a screenshot showing my `src` folder and all the modules needed to create and train our model. All modules you place in this subfolder will become a part of the image KFP uses for all pipeline components.

Next, consider the function below, which is in the `model_utilities.py` module. This function contains the epoch loop for training a model. It calls out to other functions - `get_optimizer()`, `train_epoch()`, and `validate_loss()`. It also instantiates classes in this module - specifically `CountyDataset` and `MBDModel`.

def train_model(df_train_X, df_train_y, df_valid_X, df_valid_y, 

                epochs=5, lr=0.01, wd=0.0) -> Tuple[nn.Module, List]:
    config = du.get_config()
    categorical_cols = config['categorical_cols']
    continuous_cols = config['continuous_cols']
    embedding_sizes = [(3135, 50), (51, 26)]

    model = RegressionModel(embedding_sizes, len(continuous_cols))

    # Create the datasets
    train_dataset = CountyDataset(df_train_X, df_train_y, categorical_cols)
    valid_dataset = CountyDataset(df_valid_X, df_valid_y, categorical_cols)
    # Create the loaders
    batch_size = 10
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)

    optim = get_optimizer(model, lr = lr, wd = wd)
    results = []
    for i in range(epochs):
    loss = train_epoch(model, optim, train_loader)
    print('Training loss: ', loss)
    val_loss, val_accuracy = validate_model(model, valid_loader)
    results.append((loss, val_loss, val_accuracy))
    return model, results

If we were to use Lightweight Python Components, then all of these function calls would occur in another container, and data would need to get marshaled across a container boundary. This is not ideal. We want model training to be a task that starts in its own container, but we want the helper functions listed above to run in this same container so that these calls are local and fast. This is what Containerized Python Components allow us to do, and we can use the function below, which encapsulates the function above.

@dsl.component(base_image='python:3.10.0',
    target_image='docker/model-training-project/model-training:v1',
    packages_to_install=['numpy==1.24.3', 'pandas==1.3.5', 'torch==1.13.1'])
def train_model(train_X: Input[Dataset], train_y: Input[Dataset],
                valid_X: Input[Dataset], valid_y: Input[Dataset],
                model: Output[Model], training_results: Output[Markdown]) -> None:
    df_train_X = pd.read_csv(train_X.path)
    df_train_y = pd.read_csv(train_y.path)
    df_valid_X = pd.read_csv(valid_X.path)
    df_valid_y = pd.read_csv(valid_y.path)
    mbd_model, results = mu.train_model(df_train_X, df_train_y, df_valid_X, df_valid_y)
    torch.save(mbd_model.state_dict(), model.path)
    with open(training_results.path, 'w') as f:
        for result in results:
            epoch_result = f'Training Loss: {result[0]}, Validation Loss: {result[1]},

               Validation accuracy: {result[2]}.<br>'
            f.write(epoch_result)

You do not need to use any KFP decorators in helper modules. In the example being used here, my helper modules are `data_utilities.py` and `model_utilities.py`. These modules are plain old Python. There is nothing KFP specific within them. Using helper functions the way I am using them here is a nice feature. I am doing all the heavy lifting required to preprocess data and train models in my helper functions, which can be written using plain old Python. All the functions that need to be decorated with KFP decorators are in the `model_training_pipeline.py` module, and they are nothing more than shims between KFP and my helper modules. Consequently, all KFP specific code is encapsulated in a single small module.

Notice that we still use the `dsl.component` decorator. There are a few additional parameters that must get set on the component decorator when using Containerized Python Components. The `base_image` will be used in the FROM command within the docker file that KFP creates for our image. This is optional and will default to Python 3.7 if not specified. The target image parameter is a URI that tells KFP where to put your image. This URI needs to point to an image registry. I am running this demo within Docker Desktop, so the URI I used is a local registry within Docker Desktop. If you are in a public cloud or your organization has its own internal registry, then change this accordingly. Below is an example of what this URI would look like if I needed my image in GCP’s Google Cloud Artifact Registry (notice the `gcr.io` URI).

grc.io/model-training-project/model-training:v1

Finally, you still need to use the `packages_to_install` parameter so that KFP can install any needed third-party libraries into your image. However, you no longer need to place imports within your functions. These imports can now be at the module level. The code for the remaining components is shown below. I am not showing the helper modules for brevity because they contain a lot of code. However, you can get them here. They are plain old Python and Pytorch and can be run outside of KFP in a script or notebook if you want to experiment before sending to KFP.

@dsl.component(base_image='python:3.10.0',
              target_image='docker/model-training-project/model-training:v1',
              packages_to_install=['pandas==1.3.5', 'minio==7.1.14'])
def get_raw_data(bucket: str, object_name: str, table_df: Output[Dataset]):
    '''
    Return an object as a Pandas DataFrame.
    '''
    df = du.get_object(bucket, object_name)
    df.to_csv(table_df.path, index=False)


@dsl.component(base_image='python:3.10.0',
              target_image='docker/model-training-project/model-training:v1',
              packages_to_install=['numpy==1.24.3', 'pandas==1.3.5', 'scikit-learn==1.0.2'])
def preprocess(in_df: Input[Dataset], out_df: Output[Dataset]) -> None:
    '''
    Preprocess the dataframe.
    '''
    df = pd.read_csv(in_df.path)
    df = du.preprocess(df)
    df.to_csv(out_df.path, index=False)


@dsl.component(base_image='python:3.10.0',
              target_image='docker/model-training-project/model-training:v1',
              packages_to_install=['numpy==1.24.3', 'pandas==1.3.5', 'scikit-learn==1.0.2'])
def feature_engineering(pre: Input[Dataset],
                        train_X: Output[Dataset], train_y: Output[Dataset],
                        valid_X: Output[Dataset], valid_y: Output[Dataset],
                        test_X: Output[Dataset], test_y: Output[Dataset],
                        validation_size: int=1, test_size: int=1) -> None:
    '''
    Feature engineering.
    '''
    logger = logging.getLogger('kfp_logger')
    logger.setLevel(logging.INFO)
    df = pd.read_csv(pre.path)
    df_train_X, df_train_y, df_valid_X, df_valid_y, df_test_X, df_test_y =

        du.feature_engineering(df, validation_size, test_size)
    df_train_X.to_csv(train_X.path, index=False)
    df_train_y.to_csv(train_y.path, index=False)
    df_valid_X.to_csv(valid_X.path, index=False)
    df_valid_y.to_csv(valid_y.path, index=False)
    df_test_X.to_csv(test_X.path, index=False)
    df_test_y.to_csv(test_y.path, index=False)
    logger.info('Feature engineering complete.')





@dsl.component(base_image='python:3.10.0',
              target_image='docker/model-training-project/model-training:v1',
              packages_to_install=['numpy==1.24.3', 'pandas==1.3.5', 'torch==1.13.1'])
def test_model(test_X: Input[Dataset], test_y: Input[Dataset], model: Input[Model],            

               test_results: Output[Markdown]) -> None:
    df_test_X = pd.read_csv(test_X.path)
    df_test_y = pd.read_csv(test_y.path)
    mbd_model = mu.create_model()
    mbd_model.load_state_dict(torch.load(model.path))
    smape = mu.test_model(df_test_X, df_test_y, mbd_model)
    with open(test_results.path, 'w') as f:
    f.write('Symetric Mean Absolute Percent Error (SMAPE): **' + str(smape) + '**')

Once all needed modules are in a common folder and the functions that represent pipeline components are decorated correctly, you are ready to build an image for KFP. The KFP command line utility makes this easy.

kfp component build src/ --component-filepattern model_training_pipeline.py --push-image

This command tells KFP the location of the folder containing all modules as well as the module which contains component definitions. Once this command completes successfully, you will notice that several files have been added to your subfolder, as shown below.

Examining these files will give you a deeper understanding of how KFP works. Basically, KFP used all the information placed in the component decorators to create these files. These files were then used to create an image and push it to your registry. If you use Docker Desktop and you used the same `target_image` parameter, then you will see the following in your list of images.

We are now ready to build a pipeline that orchestrates all the functions shown above that were designated as pipeline components.

The Pipeline

The pipeline function for pulling everything together is shown below. It is very straightforward. The output of one component is the input of the next. Notice that all the functions called map to a task in our conceptual pipeline.

@dsl.pipeline(name='model-training-pipeline',
              description='Pipeline that will train a Nueral Network.')
def training_pipeline(bucket: str, object_name: str) -> Markdown:
    raw_dataset = get_raw_data(bucket=bucket, object_name=object_name)
    processed_dataset = preprocess(in_df=raw_dataset.outputs['table_df'])
    final_datasets = feature_engineering(pre=processed_dataset.outputs['out_df'])
    training_results = train_model(train_X=final_datasets.outputs['train_X'],

                                   train_y=final_datasets.outputs['train_y'],
                                  valid_X=final_datasets.outputs['valid_X'], 

                                   valid_y=final_datasets.outputs['valid_y'])
    testing_results = test_model(test_X=final_datasets.outputs['test_X'],     

                                 test_y=final_datasets.outputs['test_y'],
                                model=training_results.outputs['model'])
    return testing_results.outputs['test_results']

To submit this pipeline to KFP, run the following code.

def start_training_pipeline_run():
    client = Client()
    run = client.create_run_from_pipeline_func(training_pipeline,  

                    experiment_name='Containerized Python Components'

                    enable_caching=False,
                    arguments={
                        'bucket': 'microbusiness-density',
                        'object_name': 'train.csv'
                    })
    url = f'{kfp_endpoint}/#/runs/details/{run.run_id}'
    print(url)

Running this pipeline in KFP produces the following visualization in KFP’s Run tab.

Using Artifacts for Reporting Results

Artifacts are used to pass large or complex data between components. Under the hood, KFP saves this data to its instance of MinIO to make it available to any component in your pipeline. A side benefit is that KFP can do type checking and let you know if your datatypes are incorrect when passing the output of one component to the input of another. KFP also provides artifacts for reporting results. Let’s explore these reporting artifacts further.

While training a model, it is a best practice to track the results of your loss function and the accuracy of your model against the validation set. The Metrics, HTML, and Markdown artifacts have a visual component to them. The contents of these artifacts will be easily observable within KFP’s UI. This post used a Markdown object to report model performance during training and testing. Refer to the last few lines of code in the `train_model` component which creates a Markdown object named 'training_results'.

Once this artifact is created, the data it contains will automatically appear in the `Visualization` tab within `training_results`. Below is a screenshot of this visualization.

It is also a best practice to do the same thing after testing your model against the test set.

A Comment on the Model

I want to call out one slick feature within the model used in this post - even if it is slightly orthogonal to the topic at hand. Below is the code - the layer of interest is highlighted. This layer is an embedding layer. It is used to learn about the relationship of counties and states to Microbusiness Density. Embeddings, as used here, are small in memory vector databases. Within the layers of a neural network these vectors are used on categorical features (or non-continuous features) to allow neural networks to learn meaningful representations of categorical or discrete data. Understanding this layer is a good first step to understanding vector databases. Vector databases enable efficient retrieval of similar items based on their vector representations, allowing for tasks like finding similar images, recommending similar products, or searching for similar documents. We at MinIO are looking at Vectors and will have more to say in the coming months.

class RegressionModel(nn.Module):

    def __init__(self, embedding_sizes, n_cont):
        super(RegressionModel, self).__init__()

        # Set up the embeddings.
        self.embeddings = nn.ModuleList([nn.Embedding(categories, size) for categories,size     

              in embedding_sizes])
        n_emb = sum(e.embedding_dim for e in self.embeddings) #length of all embeddings
        self.n_emb, self.n_cont = n_emb, n_cont

        # Set up the rest of the network.
        self.linear1 = nn.Linear(self.n_emb + self.n_cont, 20, bias=True)
        self.linear2 = nn.Linear(20, 20, bias=True)
        self.linear3 = nn.Linear(20, 1, bias=True)

    def forward(self, x_cat, x_cont):
        out = [e(x_cat[:,i]) for i,e in enumerate(self.embeddings)]
        out = torch.cat(out, 1)
        out = torch.cat([out, x_cont], 1)
        out = F.relu(self.linear1(out))
        out = F.relu(self.linear2(out))
        out = self.linear3(out)
        return out

Summary

This post picked up where my last post on data pipelines with KFP left off. In this post, I showed how to use KFP to train a model. I introduced Containerized Python Components, which are better suited for training models. When training models, you need to create classes and use helper functions that are best run in the same container where training occurs.

Next Steps

Install MinIO, KFP, and download the code sample so you can perform your own experiments. Try different model architectures, create new features via feature engineering, and see how your results compare to Kaggle’s leaderboard.

If you have questions or you want to share your results, then drop us a line at hello@min.io or join the discussion on our general Slack channel.

Previous Post Next Post