Deep Dive on TensorFlow training with Amazon SageMaker and Amazon S3

Julien Simon
12 min readJun 22, 2020

--

This is a guest post by Chaim Rand, Machine Learning Algorithm Developer at Mobileye. You can also read part 1 and part 3 for more!

In a previous post, I told you the story of how my team at Mobileye (officially known as Mobileye, an Intel company), transitioned to using the Amazon SageMaker service to train our TensorFlow neural networks in the cloud. In particular, I told you about how one could use SageMaker Pipe Mode to stream training data directly from Amazon S3 storage to training instances, and how this leads to reductions in both training time and cost.

The easiest way to adopt Pipe Mode, is to use PipeModeDataset, a SageMaker implementation of the TensorFlow Dataset interface, which hides all the low level pipe management from you. Using PipeModeDataset requires reformatting your training data into one of the supported file formats (text records, TFRecord and Protobuf). We chose the TFRecord format. See the post to learn more about how we converted our data, as well as additional challenges we faced and how we overcame them.

In this post I want to discuss some alternatives to using PipeModeDataset. Of course, if you have a small dataset (e.g. a couple of hundred MB or a dataset that can easily fit on your training instance’s default EBS storage), you have the option of using File Mode, which downloads all your data to your local training instance one time before starting training. But recall that we are training with mountains of data, often more than 100 TB of data!

Motivation

You are, no doubt, wondering why would anyone need to consider alternatives to PipeModeDataset? Allow me to present a few scenarios.

Increased control

As it stands today, when you choose to use Pipe Mode, you are handing over some of the control over your training flow to SageMaker. There may be situations when you want to have complete control. You may want to control precisely what files are being fed into your pipeline and in what order. You may want to control the mechanism being used to shuffle the files before each epoch. You might be facing some issues when using Pipe Mode, and want to use a method with greater visibility into the inner workings and better ability to debug. We will describe a way to do this using the built-in TensorFlow TFRecordDataset.

Dynamic boosting

In my previous post, I shared how we use boosting in our training to artificially increase the representation of certain subclasses of data (e.g. pink cars), and how we implemented this when training with Pipe Mode. Our solution was to configure our pipes with a SageMaker manifest file, pointing at TFRecord files for different classes, and weighted according to our desired boosting weights.

In some cases, this might not be sufficient.We may need to perform dynamic boosting, in which we continuously update the weights of each class during training. SageMaker does not (yet?) support modifying the manifest file during training. We will show two ways to implement this.

Complex data processing

Suppose that you need to apply heavy processing on the input data pipeline. This processing could of course be implemented using TensorFlow operations directly on the TFRecord files, but there is no arguing that many operations can be implemented much more easily in Python using numpy, Scikit-learn, opencv and other libraries.

Of course, you can create Python code using tensorflow.py_func, but this can easily become your bottleneck due to a combination of the Python Global Interpreter Lock (GIL) and the manner in which TensorFlow parallelizes the py_func in the dataset map call. If you have not heard of the python GIL, count your lucky blessings. Suffice to say, if you are trying to optimize your input data pipeline, you should try to avoid py_func.

It would be preferable to stream your data in a Python friendly format, such as pickle or numpy files, perform the heavy processing in Python (using multiple processes to bypass the GIL limitations), and only then hand it over to TensorFlow. We will show how to do this by reading directly from the SageMaker pipe and using the tf.from_generator dataset.

Using other storage services

There may be reasons (though I can’t think of any good ones) why converting your data to one of the required data formats may be infeasible. Or reasons why you require random access to any of your records at any point in time. You want to be able to train as if you had all of your data stored locally, in whatever format. Such scenarios beg the use of the Amazon FSx, as we describe below.

Measuring Throughput

Surely, I have convinced you that a discussion on alternatives to Pipe Mode is warranted. Before we jump into some of the options at your disposal, a few words about throughput.

For us, one of the primary goals is to maximize the throughput of our training, that is, the number of training steps per second. Each of the options we will present have its pros and cons, but the key indicator by which we will compare them is how they impact throughput, i.e. at what rate can we pull the data from S3 and enter it into the training pipeline. If the input pipeline is not (and will not be) your bottleneck, or if you have all the time (and money) in the world to train, this might not be of concern to you. But it most certainly is to us.

One easy way to isolate and test the throughput over the network is to simply iterate over your dataset without actually performing any training, as shown in the example below (written in TensorFlow 1.14). You can compare this to your training throughput to assess whether your bottleneck is IO.

iter = ds.make_one_shot_iterator()
n = iter.get_next()
count = 0
begin = time.time()
with tf.Session() as sess:
stime = time.time()
while count < 10000:
sess.run(n)
count = count + 1
if count % 100 == 0:
etime = time.time()
print(“step: “+str(count)+” step/sec:
{}”.format(float(100) / (etime — stime)))
stime = time.time()

Another consideration should be the network bandwidth of your training instance. AWS has published a table of the maximum expected network performance per instance type. You can use this table to assess whether you are getting the most bang for your buck. For example, if you are streaming 10 gigabits of data on an ml.p3.8xlarge, you will know that you can’t improve on that.

However, there are some additional things to keep in mind. In particular, just because your instance has a certain bandwidth does not mean that you will be utilizing the full bandwidth when streaming from S3, EC2, etc. This might depend on a host of other things, such as the network bandwidth of the source, whether Elastic Network Adapter (ENA) is enabled, and to what degree the streaming functionality has been optimized (using multi-part downloading and other black magic).

The advantage of Pipe Mode is that you don’t need to worry about all this. You can sleep easy knowing that Amazon SageMaker has highly optimized Pipe Mode and PipeModeDataset, and that you are most likely utilizing your bandwidth to its maximum. But, as you have already been convinced, there are situations where other options should be considered. We will present four alternatives:

  • TFRecordDatasets pointing directly to files in S3,
  • Parsing the raw SageMaker input pipe,
  • Downloading data files directly from Amazon FSx for Lustre,
  • Downloading data files directly from S3.

TFRecordDataset

TFRecordDataset is a dataset implemented by Tensorflow that takes a list or a dataset of TFRecord file paths as input, and iterates over the TFRecords in each of the files.

The good news is that the files paths can be S3 paths. Furthermore, the TFRecordDataset constructor has two control parameters, buffer_size and num_parallel_reads that can be used to optimize the IO throughput when reading from a remote location. Here is a simple example of how one can initialize a TFRecordDataset with the list of files from a SageMaker manifest file.

def get_list_from_manifest(manifest_file):
with open(manifest_file) as f:
manifest = json.load(f)
prefix = manifest[0][‘prefix’]
return [path_join(prefix, tf) for tf in manifest[1:]]
tf_record_filepaths = get_list_from_manifest(manifest_file)ds = tf.data.TFRecordDataset(tf_record_filepaths,
num_parallel_reads=10,
buffer_size=100000000)

Note the settings for buffer_size and num_parallel_reads. With this example, we were able to reach comparable throughput performance to that of PipeModeDataset on an ml.p3.8xlarge instance (see evaluation below.)

There are a number of significant advantages to this method compared to PipeModeDataset.

Greater control

As mentioned above, PipeModeDataset is a bit of a black box. You provide a list of files, and rely on SageMaker to shuffle and stream these with little visibility (to date) into the internal workings, including what file is being streamed at any given moment. You are not able to change the list of files during training, and you cannot implement dynamic boosting.

With PipeModeDataset, you can have greater control and greater visibility. This is demonstrated in the following pseudo-example showing how to implement dynamic boosting with two classes, while implementing our own shuffling and printing each file being streamed. This can be further optimized, and extended to a greater number of classes.

import random, numpy as np, tensorflow as tfclass MyIter(object):
def __init__(self,class1_list, class2_list)
self.class1_list = class1_list
self.class1_list_len = len(class1_list)
self.class1_index = 0
self.class2_list = class2_list
self.class2_list_len = len(class2_list)
self.class2_index = 0
self.boost_weight = 0.5
self.cont = True
# this can be used to update weight mid-training
def update_boost_weight(self, weight):
self.boost_weight = weight
def hault(self):
self.cont = False
def from_class1(self):
if self.class1_index == 0:
random.shuffle(self.class1_list)
file_path = self.class1_list[self.class1_index]
self.class1_index = (self.class1_index+1)%self.class1_list_len
return file_path
def from_class2(self):
if self.class2_index == 0:
random.shuffle(self.class2_list)
file_path = self.class2_list[self.class2_index]
self.class2_index = (self.class2_index+1)%self.class2_list_len
return file_path
def generator(self):
while self.cont:
class_type = np.random.choice(2, 1,
p=[self.boost_weight,1-self.boost_weight])[0]
next_file = self.from_class1() if class_type == 1
else self.from_class2()
print(“feeding file “+next_file)
yield next_file
myiter = MyIter(list1,list2)
filepaths = tf.data.Dataset.from_generator(
myiter.generator,
output_types=tf.string,
output_shapes=())
ds = tf.data.TFRecordDataset(
filepaths,
num_parallel_reads=10,
buffer_size=100000000)

Debugging

TFRecordDataset offers advantages over PipeModeDataset when it comes to debugging. Other than the obvious advantages of debugging when you have greater visibility (e.g. identifying a corrupted TFRecord file), TFRecordDataset also supports local mode unlike PipeModeDataset. This means that you can test your full pipeline, including the input pipeline, locally before uploading to a SageMaker instance.

Channel limit

Contrary to SageMaker Pipe Mode which has a 20 channels per device limit, there is no (documented) limitation to the number of TFRecordDataset you can create. Of course, each dataset requires system resources, so eventually, you will inevitably run up on some limit. To be honest, I did not check whether this is greater than 20, it probably depends on the system you are running on.

TFRecordDataset or PipeModeDataset?

As discussed, TFRecordDataset is very flexible. Still, PipeModeDataset has a number of advantages.

  1. When you use TFRecordDataset, you are inevitably using system resources (memory and CPU) to download and buffer your data. If you are CPU or memory bound, that will hurt. When you use Pipe Mode, Sagemaker provides these resources for this.
  2. Reaching comparable throughput performance with TFRecordDataset to that of pipe mode is no piece of cake. You will need to find the optimal control settings (e.g. buffer_size and num_parallel_reads) and even then there are no guarantees, as we explained above.
  3. PipeModeDataset includes an API for distributing (sharding) the data across multiple instances. If you use TFRecordDataset you would need to program this explicitly.

Parsing the Raw Pipe

While TFRecordDataset supports a limited set of formats, you can stream anything you want over SageMaker pipes. For example, you could stream Python pickle files. Now, you probably won’t want to stream small pickle files, as this will have a negative impact on the pipe throughput, but rather you will want to concatenate many pickle files together into files, roughly of size 100MB. You can adopt the format of TFRecord files which is a concatenation of records of the format:

  • Length — 8 bytes
  • Length CRC — 4 bytes
  • Data — ‘Length’ bytes
  • Data CRC — 4 bytes

In the example below, we are using this technique to create a dataset from a generator that manually parses our TFRecord files. This code snippet can be easily modified to parse files with concatenated pickle files, which you can follow up with your heavy python processing (using multiprocess to work around the GIL).

def get_pipe_generator(channel_name, num_epochs=1):
import struct
def generator():
for epoch in range(num_epochs):
fifo_path = ‘{0}/{1}_{2}’.format(data_dir,
channel_name, epoch)
with open(fifo_path, ‘rb’, buffering=0) as fifo:
cont = True
while cont:
try:
recordLen = fifo.read(8)
recordLen = struct.unpack(‘Q’, recordLen)[0]
# TODO: don’t be lazy!! Check the crc
crcCheck = struct.unpack(‘I’, fifo.read(4))[0]
# TODO: verify that you read recordLen bytes
rec = fifo.read(recordLen)
# TODO: don’t be lazy!! Check the crc
crcCheck = struct.unpack(‘I’, fifo.read(4))[0]
yield rec
except:
cont = False
return generator
ds = tf.data.Dataset.from_generator(
get_pipe_generator(‘train’),
output_types=tf.string,
output_shapes=())

Working with raw pipes also offers you added flexibility. For example, suppose your data is divided into two separate classes streaming over two separate pipes. You could control the rate at which you sample data from each pipe. You can probably see where I am going with this… dynamic boosting!!

Downloading and Parsing Data Files

If you want both the visibility granted by working directly with S3 (as with the TFRecordDataset option), as well as the flexibility of working with a Python format such as pickle files, you will probably want to download and parse your data files on your own.

Here, we show an example in which we use multiple parallel threads to download and parse our TFRecord files. The records are fed into a shared queue, which can be fed into data processor threads, or directly into a dataset (using tf.data.Dataset.from_generator). Once again, this can easily be modified to work with other formats.

import struct, boto3, io
from threading import Thread
from queue import Queue as Q
def file_parser(index, file_list, batch_queue):
session = boto3.session.Session()
s3 = session.resource(‘s3’)
for s3_path in file_list:
bucket, key = s3_path.replace(‘s3://’,’’).split(‘/’, 1)
tc = boto3.s3.transfer.TransferConfig(
max_concurrency=10,
multipart_chunksize=8388608,
num_download_attempts=5,
max_io_queue=100,
io_chunksize=262144)
ioStream = io.BytesIO()
s3.meta.client.download_fileobj(bucket, key, ioStream, Config=tc)
ioStream.seek(0)
recordLen = ioStream.read(8)
while recordLen:
recordLen = struct.unpack(‘Q’, recordLen)[0]
# TODO: don’t be lazy!! Check the crc
crcCheck = struct.unpack(‘I’, fifo.read(4))[0]
# TODO: verify that you read recordLen bytes
rec = fifo.read(recordLen)
# TODO: don’t be lazy!! Check the crc
crcCheck = struct.unpack(‘I’, fifo.read(4))[0]
batch_queue.put(rec,block=True)
recordLen = ioStream.read(8)

q = Q(maxsize=10)
num_threads = 40
for i in range(num_threads):
w = Thread(target=file_parser, args=[files[i::num_threads], q])
w.start()

The implementation above is somewhat naive. Reaching comparable throughput to that of PipeModeDataset will likely be very challenging. There are a number of controls you can use for tuning, including num_threads, max_concurrency, multipart_chunksize, and more.

Using Amazon FSx for Lustre

Training on SageMaker using Amazon FSx is a good solution for people who want to have traditional, file-system style access to their data, but cannot, or don’t want to, download their datasets to local EBS storage. Using FSx requires appropriate configuration as described here.

When you choose the storage capacity of your file system, make note of the throughput capacity, as this will impact the speed at which you will be able to access your data. Typically, the larger the storage capacity, the larger the throughput capacity. Unfortunately, the larger the storage capacity, the higher the cost of FSx. One advantage, however, is that the same file system can be used across multiple SageMaker jobs, which can reduce the cost per-session if you have many training sessions that access the same data.

One thing to keep in mind is that according to the documentation, the files are only copied from S3 to FSx the first time they are accessed. This means that the throughput measured on your first epoch might be measurably lower that the throughput on subsequent epochs. Also, there is an option to preload data.

Bottom line, whether or not FSx is the right solution for you depends on a number of factors, including dataset size, FSx cost, the number of training sessions, and how badly you need file-system style access to your data.

Evaluation

In the chart below we compare the throughput, measured in batches per second, for five different methods of pulling the training data from S3:

1. Using PipeModeDataset (‘pipe’)

2. Using TFRecordDataset pointing to file objects in S3 (‘tfr’)

3. Reading and parsing the TFRecord files from raw SageMaker pipe, and wrapping these with tf.data.Dataset.from_generator (‘pipe gen’)

4. Downloading the TFRecord files directly from S3, reading and parsing the them and wrapping them with tf.data.Dataset.from_generator (‘file gen’)

5. Using TFRecordDataset pointing to the files in FSx (‘fsx’). Please note that preloading was not used in this case.

All the trials were run on ml.p3.8xlarge (10 Gigabit per second network performance).

While the absolute values of the throughput should not mean anything to you (as they are dependent on our own data record size, batch size, etc.), the relative values clearly show the throughput of the tfr, and pipe gen options to be comparable to PipeModeDataset. Note that the performance of the TFRecordDataset is more choppy than the other options. If the throughput of the end to end training pipeline is sensitive to such choppiness, this may be something to consider.

The file gen trial timed out after only 7500 iterations… but I think you get the gist… Recall that this was a naive implementation and that it can most certainly be improved (as shown by the performance of TFRecordDataset).

The fsx throughput starts high, and then drops suddenly before timing out. The reason for this is that the while for the first 9600 iterations, we are accessing files that have already been copied to the file system, we then begin to read files that are being accessed for the first time. This means that FSx is only first copying them from S3. Remember, this latency can be avoided by preloading all of your data.

Summary

Amazon Sagemaker Pipe Mode is a great default choice. It is relatively easy to set up, maximizes network bandwidth utilization and hides all the nitty gritty. But, for those sticky situations such as those mentioned above, it’s good to know that there are other options, isn’t it?

--

--