Iterable-Style Datasets using Amazon’s S3 Connector for PyTorch and MinIO

Iterable-Style Datasets using Amazon’s S3 Connector for PyTorch and MinIO

In November of 2023 Amazon announced the S3 Connector for PyTorch. The Amazon S3 Connector for PyTorch provides implementations of PyTorch's dataset primitives (Datasets and DataLoaders) that are purpose-built for S3 object storage. It supports map-style datasets for random data access patterns and iterable-style datasets for streaming sequential data access patterns. 

In a previous post, I introduced the S3 Connector for Pytorch and described the problem it intends to solve in detail. I also described yesteryear libraries that are about to be deprecated in favor of the S3 connector. Specifically, do not use the Amazon S3 Plugin for PyTorch and the CPP-based S3 IO DataPipes. Finally, I covered map-style datasets. I will not recap all this introductory information here, so if you have yet to read my previous post, check it out at your earliest convenience. In this post, I will focus on iterable-style datasets. The documentation for this connector only shows examples of loading data from Amazon S3 - here, I will show you how to use it against MinIO. 

The S3 Connector for PyTorch also includes a checkpointing interface to save and load checkpoints directly to an S3 bucket without first saving to local storage. This is a really nice touch—if you are not ready to adopt a formal MLOps tool and just need an easy way to save your models. I will cover this feature in a future post.

Building an Iterable Style Dataset Manually

An iterable-style datasets is created by implementing a class that overrides the __iter__() method in PyTorch’s IterableDataset base class. Unlike a map-style dataset, there is no __len__() method and there is no __getitem__() method. If you query an interable dataset using Python’s len() function, you will get an error since the __len__() method does not exist.

You can return multiple samples when your iterable-style dataset is called during a training loop. Specifically, you return an iterator object that the data loader will iterate over to create the needed batches. Let’s build a very simple custom iterable-style dataset to understand better how they work. The code below shows how to override the __iter__() method. The full code download can be found here.

class MyIterableDataset(IterableDataset):
  def __init__(self, start: int, end: int, transpose):
      self.start = start
      self.end = end
      self.transpose = transpose

  def __iter__(self):
      worker_info = torch.utils.data.get_worker_info()

      if worker_info is None# single-process data loading, return the full iterator
          iter_start = self.start
          iter_end = self.end
          worker_id = -1

      else# in a worker process
          worker_id = worker_info.id

          # split workload
          per_worker = int(math.ceil((self.end - self.start) / float(worker_info.num_workers)))
          iter_start = self.start + worker_id * per_worker
          iter_end = min(iter_start + per_worker, self.end)

      samples = []
      for sample in range(iter_start, iter_end):
          samples.append(sample)

      return map(self.transpose, samples)


def my_transpose(x):
  return torch.tensor(x)

Notice that you have to keep track of sharding yourself. When you create a data loader that has more than one worker with this dataset, then it is up to you to make sure that you figure out each worker’s share of the data to process (each shard). This is done using the worker_info object and some simple math. 

We can create this dataset and loop through it using the code below, which is similar to a training loop.

batch_size = 2
ds = MyIterableDataset3(start=0, end=10, transpose=my_transpose)
dl = DataLoader(ds, batch_size=batch_size, num_workers=2)

for sample in dl:
    print(sample)

The output will be:

tensor([0, 1])
tensor([5, 6])
tensor([2, 3])
tensor([7, 8])
tensor([4])
tensor([9])

Now that we understand iterable datasets, let’s use the S3 connector’s iterable dataset. But before we do that, let’s look at how to get the S3 Connector to connect to MinIO.

Connecting the S3 Connector to MinIO

Connecting the S3 Connector to MinIO is as simple as setting up environment variables. Afterwards, everything will just work. The trick is setting up the correct environment variables in the proper way. 

The code download for this post uses a .env file to set up environment variables, as shown below. This file also shows the environment variables I used to connect to MinIO directly using the MinIO Python SDK. Notice that the AWS_ENDPOINT_URL needs the protocol, whereas the MinIO variable does not. Also, you may notice some odd behavior with the AWS_REGION variable. Technically, it is not needed when accessing MinIO, but internal checks within the S3 Connector may fail if you pick the wrong value for this variable. If you get one of these errors, read the message carefully and specify the value it requests.

AWS_ACCESS_KEY_ID=admin
AWS_ENDPOINT_URL=http://172.31.128.1:9000
AWS_REGION=us-east-1
AWS_SECRET_ACCESS_KEY=password
MINIO_ENDPOINT=172.31.128.1:9000
MINIO_ACCESS_KEY=admin
MINIO_SECRET_KEY=password
MINIO_SECURE=false

Creating an Iterable-Style Dataset with the S3 Connector

To create an iterable-style dataset using the S3 Connector, you do not need to code and create a class as we did previously. The S3IterableDataset.from_prefix() function will do everything for you. This function assumes that you have set up the environment variables to connect to your S3 object store, as described in the previous section. It also requires that your objects can be found via an S3 prefix. A snippet showing how to use this function is below.

from s3torchconnector import S3IterableDataset


uri = f's3://{bucket_name}/{split}'
aws_region = os.environ['AWS_REGION']
dataset = S3IterableDataset.from_prefix(uri, region=aws_region,
                                        enable_sharding=True,
                                        transform=S3IterTransform(transform))
loader = DataLoader(dataset, batch_size=batch_size, num_workers=num_workers)
return loader, (time.perf_counter()-start_time)

Notice that the URI is an S3 path. Every object that can be recursively found under the path mnist/train is expected to be an object that is part of the training set. If you want to use more than one worker from you data loader (the num_workers parameter), then be sure to set the enable_sharding parameter on your dataset. The function above also requires a transform to transform your object into a tensor and to determine the label. This is done via an instance of the callable class shown below.

from s3torchconnector import S3Reader


class S3IterTransform:
  def __init__(self, transform):
      self.transform = transform

  def __call__(self, object: S3Reader) -> torch.Tensor:
      content = object.read()
      image_pil = Image.open(BytesIO(content))
      image_tensor = self.transform(image_pil)
      label = int(object.key.split('/')[1])

      return (image_tensor, label)

That is all you need to do to create a map-style dataset using the S3 Connector for PyTorch.

Conclusion

The S3 Connector for PyTorch is easy to use and engineers will write less data access code when using it. In this post, I showed how to configure it to connect to MinIO using environment variables. Once configured, three lines of code created an iterable dataset object, which was transformed using a simple callable class.

Next Steps

If your network is the weakest link within your training pipeline, consider creating objects containing multiple samples, which you can even tar or zip. Unfortunately, the S3 connector does not have a way to do this with either map-style or iterable-style datasets. In a future post, I will show how this can be done using a custom-built iterable style dataset.

If you have any questions, be sure to reach out to us on Slack.