Model Training and MLOps using MLRun and MinIO

In my previous post on MLRun, we set up a development machine with all the tools needed to experiment with MLRun. Specifically, we used a docker-compose file to create containers for the MLRun UI, the MLRun API Service, Nuclio, MinIO, and a Jupyter service. Once our containers started, we ran a simple smoke test to ensure everything was working correctly. We did this by creating a simple serverless function that logged its inputs and connected to MinIO to get a list of buckets.

Below is a visualization of what we will build and what is in our docker-compose file. I extended the docker-compose file that you will find in the MLRun documentation so that it includes both an API service and MinIO. I will pick up where my last post left off in this post. We will create another serverless function that trains a model. Remember that MLRun aims to eliminate the need for boilerplate code, so we should also see a reduction in the code needed to train the model itself. If you think about the code you write to train a model - especially in PyTorch - it is basically the same regardless of the data used to train the model and the model itself. In other words, it's boilerplate code. In theory, it could be replaced with some added configuration. Additionally, deploying the code itself should be simple. We already saw some of this in my last post. However, deploying a function that drives model training is a little more complicated - so we will need more MLRun features to pull this off. The code accompanying this post (described in the next section) contains utilities that allow models to be trained with datasets that cannot fit into memory. I will not go into these techniques here - that is a subject for another post.

About the Code Download

All the code shown in this post (and more) is here. For brevity, many of the utility functions in my code download are not shown in the code listings of this post. However, many are general-purpose utilities for integrating PyTorch with MinIO. In this section, I will enumerate the important utilities I hope you find useful for all your ML projects using MinIO to store your datasets. 

MinIO Utilities (data_utilities.py):

  • get_bucket_list - Returns a list of buckets in MinIO.
  • get_image_from_minio - returns an image from MinIO. Uses PIL to convert the object back into its original format.
  • get_object_from_minio - Returns an object - as is - from MinIO.
  • get_object_list - Gets a list of objects from a bucket.
  • image_to_byte_stream - transform a PIL Image to a byte stream so that it can be sent to MinIO.
  • load_mnist_to_minio - This function loads the MNIST dataset into a MinIO bucket. Each image is sent as an individual object. This method is useful for performance testing in a cluster as it creates many small images.
  • put_image_to_minio - Puts an image byte stream to MinIO as an object.

PyTorch Utilities (torch_utilities.py):

  • create_minio_data_loaders - Creates a PyTorch data loader with a list of MNIST objects in a MinIO bucket. Useful for simulating model training tests with datasets that cannot fit into memory.
  • create_memory_loaders - Creates a PyTorch data loader where the MNIST images are loaded into memory.
  • MNISTModel - the model used in this post.
  • ConvNet - This is another model the reader can experiment with - It creates a Convolutional Neural Network (CNN) for the MNIST dataset.

Migrating Existing Code to MLRun

Let’s look at how we can migrate existing code to run as a serverless function within MLRun. This may be an approach you take to adopting MLRun if you have a lot of training code for several models but do not have time to change the code to take advantage of all of MLRun’s features. This is a viable approach to iteratively migrating to MLRun - get all your ML code managed by MLRun - then add code to take advantage of additional features. 

Consider the code below, it trains a model on the MNIST dataset. (Imports and logging code have been omitted for brevity. The code download has all imports and logging.) It is a script that you can run from any terminal app. The “train_model” function retrieves the data, creates the model, and then turns control over to the “epoch_loop” function for training. 

def train_model(loader_type: str='memory', bucket_name: str=None, training_parameters: Dict=None):

    # Load the data and log loading metrics.
    if loader_type == 'memory':
        train_loader, test_loader, _ = tu.create_memory_data_loaders(training_parameters['batch_size'])
    elif loader_type == 'minio-by-batch':
        train_loader, test_loader, _ = tu.create_minio_data_loaders(bucket_name,
                                              training_parameters['batch_size'])
    else:
        raise Exception('Unknown loader type. Must be "memory" or "minio-by-batch"')
   
    # Create the model.
    model = tu.MNISTModel(training_parameters['input_size'],

                          training_parameters['hidden_sizes'],
                          training_parameters['output_size'])

    # Train the model.
    start_time = time()
    epoch_loop(model, train_loader, training_parameters)
    training_time_sec = (time()-start_time)

    # Test the model and log the accuracy as a metric.
    testing_metrics = tu.test_model_local(model, test_loader, training_parameters['device'])

   
def epoch_loop(model: nn.Module, loader: DataLoader, training_parameters: Dict[str, Any]) -> Dict[str, Any]:

    # Create the loss and optimizer functions.
    loss_func = nn.NLLLoss()
    optimizer = optim.SGD(model.parameters(), lr=training_parameters['lr'],
                          momentum=training_parameters['momentum'])

    # Epoch loop
    for epoch in range(training_parameters['epochs']):
        total_loss = 0
        for images, labels in loader:
            # Move tensors to the specified device.
            images = images.to(training_parameters['device'])
            labels = labels.to(training_parameters['device'])
           
            # Flatten MNIST images into a 784 long vector.
            images = images.view(images.shape[0], -1)
       
            # Training pass
            optimizer.zero_grad()
            output = model(images)
            loss = loss_func(output, labels)
           
            # Backward pass
            loss.backward()
           
            # And optimizes its weights here
            optimizer.step()
           
            total_loss += loss.item()

        print("Epoch {} - Training loss: {}".format(epoch+1, total_loss/len(loader)))


if __name__ == "__main__":

    # training configuration
    training_parameters = {
        'batch_size': 32,
        'device': torch.device('cuda:0' if torch.cuda.is_available() else 'cpu'),
        'dropout_input': 0.2,
        'dropout_hidden': 0.5,
        'epochs': 2,
        'input_size': 784,
        'hidden_sizes': [1024, 1024, 1024, 1024],
        'lr': 0.025,
        'momentum': 0.5,
        'output_size': 10,
        'smoke_test_size': -1
        }

    train_model(loader_type='memory', bucket_name='mnist'

                training_parameters=training_parameters)

If we want to run this function as a serverless function within MLRun, we need only decorate the “train_model()” function with the “mlrun.handler()” decorator, as shown below.

@mlrun.handler()       
def train_model(loader_type: str='memory', bucket_name: str=None, training_parameters: Dict=None) - > None:
    .....

Next, we need to set up the MLRun environment, create a project, register the training function with MLRun, and run the function. We will drive this from a Jupyter Notebook running within our docker-compose deployment. The cells needed to do this are shown below. (You can find this code in the mnist_training_setup.ipynb notebook in the code download.) The hyperparameters used in this simple demo are hard-coded; however, if you are building a model destined for production, you should use a hyperparameter search to find the optimal values.

import os
import mlrun

# Set the environment:
mlrun.set_environment(env_file='mlrun.env')

# Create the project:
project_name='mnist-training'
project_dir = os.path.abspath('./')
project = mlrun.get_or_create_project(project_name, project_dir, user_project=False)

# Create the training function.
trainer = project.set_function(
    "mnist_training_with_mlrun.py", name="trainer", kind="job",
    image="mlrun/mlrun",
    requirements=["minio", "torch", "torchvision"],
    handler="train_model_with_mlrun"
)

# Hyperparameters
training_parameters = {
    'batch_size': 32,
    'device': 'cpu',
    'dropout_input': 0.2,
    'dropout_hidden': 0.5,
    'epochs': 2,
    'input_size': 784,
    'hidden_sizes': [1024, 1024, 1024, 1024],
    'lr': 0.025,
    'momentum': 0.5,
    'output_size': 10,
    'smoke_test_size': -1
    }

# Run the function.
trainer_run = project.run_function(
    "trainer",
    inputs={"bucket_name": "mnist", "loader_type": "memory"},
    params={"training_parameters": training_parameters},
    local=True
)

That is all that needs to be done to run existing code within MLRun. However, if you look closely at the epoch_loop() function, you will notice that it is boilerplate code. Pretty much every PyTorch project has a similar function regardless of the model or the data being used to train the model. Let’s look at how we can use MLRun to remove this function.

Optimizing Training Code

We can remove the call to the epoch_loop function shown above by using MLRun’s mlrun_torch.train() function. The import for this function and a revised train_model() function are shown below. An “accuracy()” function is also passed to mlrun_torch.train(). 

import mlrun.frameworks.pytorch as mlrun_torch

@mlrun.handler()
def train_model_with_mlrun(context: mlrun.MLClientCtx, loader_type: str='memory', bucket_name: str=None, training_parameters: Dict=None):
  logger = du.create_logger()
  logger.info(loader_type)
  logger.info(bucket_name)
  logger.info(training_parameters)

  # Load the data and log loading metrics.
  if loader_type == 'memory':
      train_loader, test_loader, _ = tu.create_memory_data_loaders(training_parameters['batch_size'])
  elif loader_type == 'minio-by-batch':
      train_loader, test_loader, _ = tu.create_minio_data_loaders(bucket_name, training_parameters['batch_size'])
  else:
      raise Exception('Unknown loader type. Must be "memory" or "minio-by-batch"')
 
  # Train the model and log training metrics.
  logger.info('Creating the model.')  
  model = tu.MNISTModel(training_parameters['input_size'], training_parameters['hidden_sizes'], training_parameters['output_size'])

  loss_func = nn.NLLLoss()
  optimizer = optim.SGD(model.parameters(), lr=training_parameters['lr'], momentum=training_parameters['momentum'])

  # Train the model.
  logger.info('Training the model.')  
  mlrun_torch.train(
      model=model,
      training_set=train_loader,
      loss_function=loss_func,
      optimizer=optimizer,
      validation_set=test_loader,
      metric_functions=[accuracy],
      epochs=training_parameters['epochs'],
      custom_objects_map={"torch_utilities.py": "MNISTModel"},
      custom_objects_directory=os.getcwd(),
      context=context,
  )


def accuracy(y_pred, y_true):
  ps = torch.exp(y_pred)
  pred_label = ps.argmax(1)
  return (sum(pred_label == y_true) / y_true.size()[0]).item()

There are a few things to keep in mind as you migrate your code to use this function.

  • First, if you declare your loss function and your optimizer in your “epoch_loop()” function, then you will need to move these declarations as they must be passed to “mlrun_torch.train()”. 
  • Second, if you perform any transformations within your epoch loop, you will need to move them into your data processing logic or, better yet, a data pipeline. This is especially true if you are performing image augmentation and feature engineering.
  • Finally, to have your model tested against a dataset it has not seen during training, you only need to provide an accuracy() function, as shown above.

While this function reduces the code you have to write, its biggest benefits are metric tracking and artifact management. Let’s take a quick tour of the MLRun UI to see what was saved from our first fully managed run, where we trained a model.

Reviewing Runs

The homepage of the MLRun UI shows a list of all projects. For each project, you can see the number of failed and running jobs.

Drilling into your project you will see more details about your project.

Clicking on a specific job will take you to the most recent run. Where you can review parameters, inputs, artifacts, results, and any output your code logged.

Summary and Next Steps

In this post, I picked up where my last post on setting up MLRun left off. I showed how to use MLRun to host existing model code with minimal changes. However, the best way to take advantage of MLRun's tracking features is to let MLRun manage your model's training.

While moving existing code to MLRun is possible, this technique does not fully take advantage of MLRun’s capabilities for automated tracking. A better approach is to use MLRun’s “mlrun_torch.train()” function. This allows MLRun to fully manage training - artifacts, input parameters, and metrics will be logged.

If you have come this far with MLRun, consider playing with distributed training and Large Language Models next.

If you have any questions, be sure to reach out to us on Slack! The “mlrun” channel of the MLOps Live Slack Workspace is also a great place to get help if you get stuck.