Multi-Node Multi-GPU Parallel Training
Multi-Node Parallel Training with PyTorch using torchrun
This section discussese Parallel training with PyTorch. It is completely independent of Saturn Cloud. The next section will describe how to apply these concepts to Saturn Cloud. torchrun
is a utility provided by PyTorch to simplify the process of launching and managing distributed training jobs. It is commonly used with PyTorch’s distributed package to scale training across multiple GPUs or nodes. Here’s an overview of its key features and steps for usage:
Core Features of torchrun
:
- Single Command Launch: Simplifies launching distributed training jobs.
- Fault Tolerance: Includes failure detection and automatic restarts (elastic training).
- Dynamic Scaling: Supports dynamic node addition/removal during training.
How It Works:
torchrun
initializes a distributed environment where processes can communicate using backend protocols (e.g., NCCL, GLOO). Each process is assigned a unique rank and interacts with others to parallelize the workload.
Setup and Usage:
Prepare Your Script:
- Ensure the training script includes PyTorch’s
torch.distributed
APIs. - For example, wrap your model and optimizer with
torch.nn.parallel.DistributedDataParallel
.
- Ensure the training script includes PyTorch’s
Install PyTorch Distributed:
- Ensure PyTorch is installed with the necessary distributed backends (
NCCL
for GPU,GLOO
for CPU).
- Ensure PyTorch is installed with the necessary distributed backends (
Run with
torchrun
: Use the following command:torchrun --nproc_per_node=<NUM_GPUS> --nnodes=<NUM_NODES> --node_rank=<NODE_RANK> --master_addr=<MASTER_IP> --master_port=<MASTER_PORT> script.py [script_args]
--nproc_per_node
: Number of processes per node (e.g., number of GPUs).--nnodes
: Total number of nodes in the cluster.--node_rank
: Rank of the node (0 for master node).--master_addr
: IP address of the master node.--master_port
: Port for communication.
Environment Variables:
- Optionally set environment variables like
MASTER_ADDR
,MASTER_PORT
, andWORLD_SIZE
to manage distributed training manually.
- Optionally set environment variables like
Key Components in the Training Script:
Initialize Process Group:
import torch.distributed as dist dist.init_process_group(backend='nccl') # Backend: nccl for GPU, gloo for CPU
Model Wrapping: Wrap your model for distributed data parallelism:
from torch.nn.parallel import DistributedDataParallel as DDP model = DDP(model)
Distributed Sampler: Use
torch.utils.data.distributed.DistributedSampler
to split the dataset across processes:from torch.utils.data import DataLoader, DistributedSampler sampler = DistributedSampler(dataset) loader = DataLoader(dataset, sampler=sampler)
Sync and Cleanup: Synchronize processes and clean up resources after training:
dist.barrier() dist.destroy_process_group()
Advantages of Using torchrun
:
- Scalability: Easily scale across multiple GPUs and nodes.
- Performance: Optimized for high-performance GPU-to-GPU communication.
- Ease of Use: Simplifies boilerplate for distributed training.
Example Command:
torchrun --nproc_per_node=4 script.py --epochs 10 --batch-size 64
This command runs the script on 4 GPUs with user-defined script arguments.
By leveraging torchrun
, PyTorch users can efficiently scale training workloads while managing distributed resources seamlessly.
Parallel Training with PyTorch on Saturn Cloud
Single node multi-gpu parallel PyTorch training on Saturn Cloud can be run on Saturn Cloud Python Servers interactively, or non-interactively with Saturn Cloud jobs. Multi node multi-gpu parallel PyTorch training on Saturn Cloud can be run non-interactively with Saturn Cloud Jobs. Even though you will ultimately need to use Saturn Cloud jobs, we strongly recommend that you start with single node multi-gpu parallel training interactively on a Python server because troubleshooting jobs has a much longer iteration cycle (you have to wait for machines to spin up, for containers to start, etc), where as on a single machine you can iterate quickly (as fast as you can restart a process)
Recall the previous torchrun command:
torchrun --nproc_per_node=<NUM_GPUS> --nnodes=<NUM_NODES> --node_rank=<NODE_RANK> --master_addr=<MASTER_IP> --master_port=<MASTER_PORT> script.py [script_args]
--nproc_per_node
: NUM_GPUs - you can hardcode this value based on the instance configuration you have selected (usually 1, 2, 4 or 8 GPUs)--nnodes
: NUM_NODES - each Saturn Cloud job has an instance count parameter. This determines the size of your training cluster.--node_rank
: NODE_RANK (Rank of the node (0 for master node). Every instance in a Saturn Cloud job is numbered. You can read the rank from the environment variableSATURN_JOB_RANK
--master_addr
: IP address of the master node. We populate an environment variableSATURN_JOB_LEADER
with the DNS address of the 0th node.--master_port
: Port for communication - you can choose any port you want. All ports are open for your job nodes to communicate with each other.
In PyTorch you generally do not need the DNS name of the worker nodes, however if you did need to construct it for the 1st, 2nd, or Nth node, the format is ${SATURN_JOB_RUN}-N.${SATURN_INTERNAL_SUBDOMAIN}.${SATURN_NAMESPACE}.svc.cluster.local
. SATURN_JOB_RUN
, SATURN_INTERNAL_DOMAIN
, SATURN_NAMESPACE
are all populated for you when you use Saturn Cloud Jobs.
Parallel Training in TensorFlow for Multi-Node Multi-GPU Workloads
This section discusses Parallel training with Tensorflow. It is completely independent of Saturn Cloud. The next section will describe how to apply these concepts to Saturn Cloud. TensorFlow provides robust support for distributed training across multiple nodes and GPUs using its tf.distribute
strategy API. Here’s an overview of the process and components involved:
Key Concepts of TensorFlow Distributed Training
tf.distribute.Strategy
:- Central API to handle distributed training across multiple devices or nodes.
- Strategies like
MultiWorkerMirroredStrategy
andParameterServerStrategy
are tailored for multi-node, multi-GPU training.
Data Parallelism:
- Duplicates model weights across devices and splits the data batch among GPUs/nodes.
- Gradients are computed independently on each replica and averaged during synchronization.
Steps for Multi-Node Multi-GPU Training
Environment Setup:
- Set environment variables like
TF_CONFIG
to specify the cluster configuration and node role.export TF_CONFIG='{ "cluster": { "worker": ["worker1:port", "worker2:port"], "chief": ["chief:port"] }, "task": {"type": "worker", "index": 0} }'
- Set environment variables like
Choose a Distribution Strategy:
MultiWorkerMirroredStrategy
:- Synchronizes training across nodes by mirroring model replicas.
strategy = tf.distribute.MultiWorkerMirroredStrategy()
ParameterServerStrategy
:- Uses parameter servers to manage model weights while workers compute gradients.
strategy = tf.distribute.ParameterServerStrategy()
Define and Compile the Model:
- Wrap model creation, dataset preparation, and training logic in the distribution strategy’s scope.
with strategy.scope(): model = build_model() model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
- Wrap model creation, dataset preparation, and training logic in the distribution strategy’s scope.
Prepare the Dataset:
- Use
tf.data.Dataset
and shard it across replicas for efficient input handling.dataset = tf.data.Dataset.from_tensor_slices(data).batch(batch_size)
- Use
Run Training:
- Train the model as usual using
model.fit
.model.fit(dataset, epochs=10)
- Train the model as usual using
Strategies for Distributed Training
Strategy | Use Case | Example |
---|---|---|
MultiWorkerMirroredStrategy | Multi-GPU across multiple nodes (synchronous training). | Training on 4 GPUs in 2 nodes. |
ParameterServerStrategy | Large models that require parameter servers (asynchronous). | Training with dedicated parameter servers for model weights. |
Advantages of TensorFlow Distributed Training
Scalability:
- Scales training to large datasets and models across multiple GPUs/nodes.
Flexibility:
- Supports various training strategies (e.g., synchronous/asynchronous).
Ease of Use:
- High-level API for managing complexities like gradient synchronization and data sharding.
Example Code
import tensorflow as tf
# Define MultiWorkerMirroredStrategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
# Define model and dataset within the strategy scope
with strategy.scope():
model = tf.keras.Sequential([...])
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
dataset = tf.data.Dataset.from_tensor_slices(...).batch(32)
# Train the model
model.fit(dataset, epochs=10)
By leveraging tf.distribute
, TensorFlow makes distributed training accessible, powerful, and highly customizable for multi-node, multi-GPU workloads.