Horovod – Distributed Deep Learning with TensorFlow and PyTorch

Home AI Education Horovod – Distributed Deep Learning with TensorFlow and PyTorch
Horovod - Distributed Deep Learning with TensorFlow and PyTorch

Distributed deep learning has become a key solution for accelerating the training of large-scale models by leveraging the parallel processing capabilities of multiple GPUs. Two of the most popular deep learning frameworks, TensorFlow and PyTorch, have revolutionized the way models are developed and deployed. However, distributing the learning process across multiple GPUs and nodes presents significant challenges. 


Horovod is an open-source distributed deep learning framework originally developed by Uber Technologies that simplifies the process of scaling deep learning models across multiple GPUs and nodes. It provides support for both TensorFlow and PyTorch, making it a versatile tool for data scientists and engineers. 


Horovod Integration with TensorFlow


TensorFlow is widely used for training complex neural networks due to its flexibility and extensive functionality. However, when you’re dealing with huge datasets or very large models, it’s often necessary to spread training across multiple machines and GPUs to reduce training time and increase computing power. Horovod simplifies this deployment process by seamlessly integrating with TensorFlow, enabling efficient parallel training of models.


To begin integrating Horovod with TensorFlow, ensure that both TensorFlow and Horovod are installed in your environment. You can install Horovod using pip with the following command:


pip install horovod


Start by importing Horovod into your TensorFlow training script. The initialization function, `hvd.init()`, sets up the necessary communication structure between the various GPUs and nodes. This initialization prepares the environment for distributed learning.


Here is a detailed example to illustrate this integration:


import tensorflow as tf

import horovod.tensorflow as hvd


# Initialize Horovod



# Pin GPU to be used to process local rank (one GPU per process)

gpus = tf.config.experimental.list_physical_devices(‘GPU’)

for gpu in gpus:

    tf.config.experimental.set_memory_growth(gpu, True)

if gpus:

    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], ‘GPU’)


# Model definition

model = tf.keras.models.Sequential([

    tf.keras.layers.Dense(128, activation=’relu’, input_shape=(784,)),

    tf.keras.layers.Dense(64, activation=’relu’),

    tf.keras.layers.Dense(10, activation=’softmax’)



# Compile the model

initial_lr = 0.001

optimizer = tf.optimizers.Adam(learning_rate=initial_lr * hvd.size())


# Add Horovod Distributed Optimizer

optimizer = hvd.DistributedOptimizer(optimizer)





# Broadcasting initial variables from rank 0 to all other processes

callbacks = [




# Adjust the learning rate based on number of GPUs


    tf.keras.callbacks.LearningRateScheduler(lambda epoch: initial_lr * hvd.size())



# Training data

(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

train_images = train_images / 255.0

test_images = test_images / 255.0


# Horovod: use DistributedSampler to partition the training data.

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))

train_dataset = train_dataset.shuffle(60000).batch(128)

test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(128)


# Horovod: write Cross-Shard Metric Aggregation

class PrintMetrics(tf.keras.callbacks.Callback):

    def on_epoch_end(self, epoch, logs=None):

        if hvd.rank() == 0:

            print(‘Epoch: {} – loss: {:.4f} – accuracy: {:.4f}’.format(epoch, logs[‘loss’], logs[‘accuracy’]))



# Train the model






In this scenario, Horovod takes several key steps to enable distributed learning:


  1. Initialization: The `hvd.init()` function initializes the Horovod connection, which is essential for GPU synchronization during training.
  2. GPU Control: Horovod binds each local process (task) to a single GPU and sets memory increments so that TensorFlow does not use all GPU memory.
  3. Model Compilation: Horovod wraps the optimizer with its `DistributedOptimizer`, which allows averaging the gradient computations across all GPUs. This ensures that each GPU participates in the model update, which ensures a consistent training procedure.
  4. Broadcast: The `hvd.BroadcastGlobalVariablesCallback(0)` callback ensures that all variables are initialized to the same values ​​on different GPUs. This step is crucial because different initializations will lead to different training results.
  5. Learning Rate Tuning: The learning rate scales based on the number of GPUs used. This is done using the `LearningRateScheduler`, ensuring that the learning rate adapts correctly to the increased parallelism.
  6. Dataset Sharding: Using TensorFlow datasets, Horovod’s `DistributedSampler’ distributes the training data across different GPUs, ensuring that each GPU processes a separate subset of the dataset.
  7. Training: The `model.fit` function trains the model with the specified dataset and callbacks, enabling synchronous updates in distributed settings.


With these steps, Horovod ensures that TensorFlow can efficiently scale its learning process, making large-scale deep-learning tasks much easier.


Horovod Integration with PyTorch


PyTorch has gained immense popularity due to its dynamic calculation schedule and ease of use. However, scaling PyTorch models to multiple GPUs and nodes requires efficient parallel training strategies. Horovod supports PyTorch, providing a simple mechanism to distribute training across multiple GPUs and nodes.


First, make sure Horovod is installed in your PyTorch environment. You can do this with pip:


pip install round robin [pytorch]


Then import Horovod into your PyTorch training script and initialize it. Initialization establishes the communication environment needed to synchronize operations between multiple GPUs and nodes.


Here’s a detailed example to demonstrate Horovod’s integration with PyTorch:


import torch

import torch.nn as nn

import torch.optim as optim

import horovod.torch as hvd

from torchvision import datasets, transforms

from torch.utils.data import DataLoader, DistributedSampler


# Initialize Horovod



# Pin GPU to local rank




# Horovod: limit the number of CPU threads to be used per worker



# Define the neural network model

class SimpleModel(nn.Module):

    def __init__(self):

        super(SimpleModel, self).__init__()

        self.fc1 = nn.Linear(784, 128)

        self.fc2 = nn.Linear(128, 64)

        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):

        x = torch.flatten(x, 1)

        x = torch.relu(self.fc1(x))

        x = torch.relu(self.fc2(x))

        x = self.fc3(x)

        return torch.log_softmax(x, dim=1)


# Instantiate the model and move it to GPU

model = SimpleModel().cuda()


# Define the optimizer and learning rate

optimizer = optim.Adam(model.parameters(), lr=0.001)


# Wrap the optimizer with Horovod DistributedOptimizer

optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())


# Horovod: broadcast initial parameters from rank 0 to all other processes

hvd.broadcast_parameters(model.state_dict(), root_rank=0)


# Define the loss function

criterion = nn.NLLLoss()


# Load the dataset and partition it across GPUs

train_dataset = datasets.MNIST(root=’.’, train=True, download=True, transform=transforms.ToTensor())

test_dataset = datasets.MNIST(root=’.’, train=False, download=True, transform=transforms.ToTensor())


# Use DistributedSampler to partition the dataset for multi-GPU training

train_sampler = DistributedSampler(train_dataset, num_replicas=hvd.size(), rank=hvd.rank())

train_loader = DataLoader(train_dataset, batch_size=64, sampler=train_sampler, num_workers=4)

test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)


# Training function

def train(epoch):



    for batch_idx, (data, target) in enumerate(train_loader):

        data, target = data.cuda(), target.cuda()


        output = model(data)

        loss = criterion(output, target)



        if batch_idx % 100 == 0 and hvd.rank() == 0:

            print(‘Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}’.format(

                epoch, batch_idx * len(data), len(train_sampler),

  1. * batch_idx / len(train_loader), loss.item()))


# Test function

def test():


    test_loss = 0

    correct = 0

    with torch.no_grad():

        for data, target in test_loader:

            data, target = data.cuda(), target.cuda()

            output = model(data)

            test_loss += criterion(output, target).item()

            pred = output.argmax(dim=1, keepdim=True)

            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    if hvd.rank() == 0:

        print(‘\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n’.format(

            test_loss, correct, len(test_loader.dataset),

  1. * correct / len(test_loader.dataset)))


# Training loop

for epoch in range(1, 6):




In this scenario, Horovod facilitates distributed learning through several key steps:


  1. Initialization: The `hvd.init()` function initializes the Horovod environment, establishing important communication channels between GPUs.
  2. GPU Management: Horovod binds each process to a specific GPU using `torch.cuda.set_device(hvd.local_rank())`. This ensures that each process runs on a separate GPU.
  3. Model Definition: A simple neural network model is defined and pushed to the GPU using `.cuda()`. This model will be trained in parallel on multiple GPUs.
  4. Optimizer Wrapping: PyTorch’s default optimizer is wrapped by Horovod’s `DistributedOptimizer`, which facilitates gradient averaging across multiple GPUs. This ensures a consistent model update in a distributed setting.
  5. Broadcast Parameters: `hvd.broadcast_parameters(model.state_dict(), root_rank=0)` broadcasts the initial model parameters from the root GPU (rank 0) to all other GPUs. This step ensures that all GPUs start training from the same model state.
  6. Dataset Sharding: Horovod uses `DistributedSampler` to split the dataset across different GPUs, ensuring that each GPU processes a unique subset. This separation is critical to effective distributed learning.
  7. Training function: In the training loop, the model goes into training mode using `model.train()`. The optimizer gradients are zeroed using `optimizer.zero_grad()` and forward pass and loss calculations are performed. `loss.backward()` computes the gradients, which are then averaged across the GPU using a call to `optimizer.step()`.
  8. Test function: The model is evaluated on the test data set using a test function that calculates the average loss and accuracy. This score helps monitor the model’s performance during training.
  9. Training loop: The main training loop calls the “training” function for a specified number of epochs, after which the “test” function is called. This cycle repeats in epochs, providing periodic updates of learning progress and performance metrics.


By following these steps, Horovod efficiently scales PyTorch training across multiple GPUs and nodes, enabling efficient processing of large-scale deep learning tasks. This approach ensures optimal use of computing resources, reducing training time and improving model performance.