Multi-Node Multi-GPU Parallel Training

Multi-Node Parallel Training with PyTorch and Tensorflow

Multi-Node Parallel Training with PyTorch using torchrun

This section discussese Parallel training with PyTorch. It is completely independent of Saturn Cloud. The next section will describe how to apply these concepts to Saturn Cloud. torchrun is a utility provided by PyTorch to simplify the process of launching and managing distributed training jobs. It is commonly used with PyTorch’s distributed package to scale training across multiple GPUs or nodes. Here’s an overview of its key features and steps for usage:


Core Features of torchrun:

  1. Single Command Launch: Simplifies launching distributed training jobs.
  2. Fault Tolerance: Includes failure detection and automatic restarts (elastic training).
  3. Dynamic Scaling: Supports dynamic node addition/removal during training.

How It Works:

torchrun initializes a distributed environment where processes can communicate using backend protocols (e.g., NCCL, GLOO). Each process is assigned a unique rank and interacts with others to parallelize the workload.


Setup and Usage:

  1. Prepare Your Script:

    • Ensure the training script includes PyTorch’s torch.distributed APIs.
    • For example, wrap your model and optimizer with torch.nn.parallel.DistributedDataParallel.
  2. Install PyTorch Distributed:

    • Ensure PyTorch is installed with the necessary distributed backends (NCCL for GPU, GLOO for CPU).
  3. Run with torchrun: Use the following command:

    torchrun --nproc_per_node=<NUM_GPUS> --nnodes=<NUM_NODES> --node_rank=<NODE_RANK> --master_addr=<MASTER_IP> --master_port=<MASTER_PORT> script.py [script_args]
    
    • --nproc_per_node: Number of processes per node (e.g., number of GPUs).
    • --nnodes: Total number of nodes in the cluster.
    • --node_rank: Rank of the node (0 for master node).
    • --master_addr: IP address of the master node.
    • --master_port: Port for communication.
  4. Environment Variables:

    • Optionally set environment variables like MASTER_ADDR, MASTER_PORT, and WORLD_SIZE to manage distributed training manually.

Key Components in the Training Script:

  1. Initialize Process Group:

    import torch.distributed as dist
    dist.init_process_group(backend='nccl')  # Backend: nccl for GPU, gloo for CPU
    
  2. Model Wrapping: Wrap your model for distributed data parallelism:

    from torch.nn.parallel import DistributedDataParallel as DDP
    model = DDP(model)
    
  3. Distributed Sampler: Use torch.utils.data.distributed.DistributedSampler to split the dataset across processes:

    from torch.utils.data import DataLoader, DistributedSampler
    sampler = DistributedSampler(dataset)
    loader = DataLoader(dataset, sampler=sampler)
    
  4. Sync and Cleanup: Synchronize processes and clean up resources after training:

    dist.barrier()
    dist.destroy_process_group()
    

Advantages of Using torchrun:

  • Scalability: Easily scale across multiple GPUs and nodes.
  • Performance: Optimized for high-performance GPU-to-GPU communication.
  • Ease of Use: Simplifies boilerplate for distributed training.

Example Command:

torchrun --nproc_per_node=4 script.py --epochs 10 --batch-size 64

This command runs the script on 4 GPUs with user-defined script arguments.

By leveraging torchrun, PyTorch users can efficiently scale training workloads while managing distributed resources seamlessly.

Parallel Training with PyTorch on Saturn Cloud

Single node multi-gpu parallel PyTorch training on Saturn Cloud can be run on Saturn Cloud Python Servers interactively, or non-interactively with Saturn Cloud jobs. Multi node multi-gpu parallel PyTorch training on Saturn Cloud can be run non-interactively with Saturn Cloud Jobs. Even though you will ultimately need to use Saturn Cloud jobs, we strongly recommend that you start with single node multi-gpu parallel training interactively on a Python server because troubleshooting jobs has a much longer iteration cycle (you have to wait for machines to spin up, for containers to start, etc), where as on a single machine you can iterate quickly (as fast as you can restart a process)

Recall the previous torchrun command:

torchrun --nproc_per_node=<NUM_GPUS> --nnodes=<NUM_NODES> --node_rank=<NODE_RANK> --master_addr=<MASTER_IP> --master_port=<MASTER_PORT> script.py [script_args]
  • --nproc_per_node: NUM_GPUs - you can hardcode this value based on the instance configuration you have selected (usually 1, 2, 4 or 8 GPUs)
  • --nnodes: NUM_NODES - each Saturn Cloud job has an instance count parameter. This determines the size of your training cluster.
  • --node_rank: NODE_RANK (Rank of the node (0 for master node). Every instance in a Saturn Cloud job is numbered. You can read the rank from the environment variable SATURN_JOB_RANK
  • --master_addr: IP address of the master node. We populate an environment variable SATURN_JOB_LEADER with the DNS address of the 0th node.
  • --master_port: Port for communication - you can choose any port you want. All ports are open for your job nodes to communicate with each other.

In PyTorch you generally do not need the DNS name of the worker nodes, however if you did need to construct it for the 1st, 2nd, or Nth node, the format is ${SATURN_JOB_RUN}-N.${SATURN_INTERNAL_SUBDOMAIN}.${SATURN_NAMESPACE}.svc.cluster.local. SATURN_JOB_RUN, SATURN_INTERNAL_DOMAIN, SATURN_NAMESPACE are all populated for you when you use Saturn Cloud Jobs.

Parallel Training in TensorFlow for Multi-Node Multi-GPU Workloads

This section discusses Parallel training with Tensorflow. It is completely independent of Saturn Cloud. The next section will describe how to apply these concepts to Saturn Cloud. TensorFlow provides robust support for distributed training across multiple nodes and GPUs using its tf.distribute strategy API. Here’s an overview of the process and components involved:


Key Concepts of TensorFlow Distributed Training

  1. tf.distribute.Strategy:

    • Central API to handle distributed training across multiple devices or nodes.
    • Strategies like MultiWorkerMirroredStrategy and ParameterServerStrategy are tailored for multi-node, multi-GPU training.
  2. Data Parallelism:

    • Duplicates model weights across devices and splits the data batch among GPUs/nodes.
    • Gradients are computed independently on each replica and averaged during synchronization.

Steps for Multi-Node Multi-GPU Training

  1. Environment Setup:

    • Set environment variables like TF_CONFIG to specify the cluster configuration and node role.
      export TF_CONFIG='{
          "cluster": {
              "worker": ["worker1:port", "worker2:port"],
              "chief": ["chief:port"]
          },
          "task": {"type": "worker", "index": 0}
      }'
      
  2. Choose a Distribution Strategy:

    • MultiWorkerMirroredStrategy:

      • Synchronizes training across nodes by mirroring model replicas.
      strategy = tf.distribute.MultiWorkerMirroredStrategy()
      
    • ParameterServerStrategy:

      • Uses parameter servers to manage model weights while workers compute gradients.
      strategy = tf.distribute.ParameterServerStrategy()
      
  3. Define and Compile the Model:

    • Wrap model creation, dataset preparation, and training logic in the distribution strategy’s scope.
      with strategy.scope():
          model = build_model()
          model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')
      
  4. Prepare the Dataset:

    • Use tf.data.Dataset and shard it across replicas for efficient input handling.
      dataset = tf.data.Dataset.from_tensor_slices(data).batch(batch_size)
      
  5. Run Training:

    • Train the model as usual using model.fit.
      model.fit(dataset, epochs=10)
      

Strategies for Distributed Training

StrategyUse CaseExample
MultiWorkerMirroredStrategyMulti-GPU across multiple nodes (synchronous training).Training on 4 GPUs in 2 nodes.
ParameterServerStrategyLarge models that require parameter servers (asynchronous).Training with dedicated parameter servers for model weights.

Advantages of TensorFlow Distributed Training

  1. Scalability:

    • Scales training to large datasets and models across multiple GPUs/nodes.
  2. Flexibility:

    • Supports various training strategies (e.g., synchronous/asynchronous).
  3. Ease of Use:

    • High-level API for managing complexities like gradient synchronization and data sharding.

Example Code

import tensorflow as tf

# Define MultiWorkerMirroredStrategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()

# Define model and dataset within the strategy scope
with strategy.scope():
    model = tf.keras.Sequential([...])
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy')

dataset = tf.data.Dataset.from_tensor_slices(...).batch(32)

# Train the model
model.fit(dataset, epochs=10)

By leveraging tf.distribute, TensorFlow makes distributed training accessible, powerful, and highly customizable for multi-node, multi-GPU workloads.