Getting Started With Ray Clusters

Ray is a framework for developing and running parallel and distributed applications emphasizing ML tasks. Ray enables users to harness the power of distributed computing without much effort.

Ray is a framework for developing and running parallel and distributed applications emphasizing ML tasks. Ray enables users to harness the power of distributed computing without much effort.

Introduction

Meet Alice, a machine learning engineer at a large tech company. Alice works on developing and training deep learning models to solve complex problems, often requiring many computational resources to train. When working on a single machine, the training process could take weeks on the strongest workstations. To speed up the process, she decides to use Ray, a cluster computing system that allows her to distribute the workload across multiple machines.

With Ray, Alice can iterate on her work faster with quicker turnaround times, using multiple machines in parallel to train different parts of the model simultaneously. Alice can also scale her experiments quickly by automatically adding more machines to the cluster, giving her unlimited computational resources. This cluster allows her to train more complex models, explore more intricate algorithms, and tackle more challenging problems.

Best of all, Alice can utilize Ray to harness the power of parallel computing in her code without worrying about the underlying details of distributed computing. Overall, Alice finds that using Ray has dramatically improved her productivity and efficiency, and she can now deliver better ML models to her team and company.

Does this sound like the life you want? Keep reading!


Ray and the Ray AI Runtime (AIR)

Whether you’re using the framework for non-ML or ML-related tasks, Ray is a fast and flexible open-source framework for distributed computing. Ray has built on top of their core framework to provide Ray AIR, a suite of end-to-end ML tools all accelerated by paralleling computing.

Ray AIR includes:

  • Datasets: Ray AIR provides a variety of distributed data structures, allowing machines to handle more data and parallelize data processing operations, such as data loading and preprocessing across multiple machines.

  • Training: Developers can run AI and ML computations in parallel across multiple machines and GPUs, providing fast and efficient execution of large-scale workloads. Additionally, Ray AIR provides many built-in algorithms and libraries for ML, including support for popular deep learning frameworks such as TensorFlow and PyTorch.

  • Tuning: Ray AIR supports various hyperparameter tuning methods, including grid search, random search, and more advanced techniques like Bayesian optimization. Ray AIR does all the heavy lifting once developers define the hyperparameter search space and tuning algorithm.

  • Scoring: The Ray AI Runtime includes tools and utilities for real-time monitoring and debugging AI and ML workloads, metrics visualizations, and performance profiling tools. Developers can analyze performance bottlenecks and the computation flow of their models efficiently.

  • Serving: Ray AIR emphasizes its flexible scalability and distributed model serving regardless of project size. Ray AIR includes model versioning, tracking, and monitoring tools for model uptime.

  • RL: On top of the mentioned features, Ray AIR provides tools for managing sophisticated RL reward functions, integrating with environments, and receiving reward feedback, making it easier to train RL models and deploy them in real-world scenarios.

I’m Sold. How do I use Ray?

Today, we’ll walk you through how to send jobs using Ray to AWS from your local machine. To do this, we’ll walk through how to define and set up a Ray cluster. In addition, we’ll go through some commands that might be useful to manage your Ray cluster. Let’s get started!

Prerequisites & Essential Ray Concepts

Before we get started, make sure you have the following:

  • The AWS CLI is installed and configured with your login credentials.

  • Here’s a guide on how to configure your AWS CLI.

  • Note: By default, Ray nodes in a Ray AWS cluster have full EC2 and S3 permissions (i.e., arn:aws:iam::aws:policy/AmazonEC2FullAccess and arn:aws:iam::aws:policy/AmazonS3FullAccess). The defaults are sufficient for trying out Ray clusters, but you may want to change Ray nodes' permissions for various reasons (e.g., to reduce the permissions for security reasons).

  • If you wish to minimize these permissions, refer to this discussion here.

  • The AWS user has IAM permissions to create EC2 instances.

  • Python 3.x is installed on your machine (any OS works fine for the host machine).

  • The YAML file included in this section to create the Ray cluster.

What’s a Ray Cluster?

A Ray cluster is a set of machines that work together to perform computations in parallel. Managed by a central coordinator, called the head node, this node is responsible for distributing tasks to the worker nodes, monitoring the state of the cluster, and aggregating results. Each worker node runs its copy of the application and communicates with the head node to receive tasks and report results.

Developers can easily create Ray clusters through the AWS CLI. These clusters are all defined by a YAML file, which allows you to extend or limit the cluster’s resources, types of instances to open, number of worker nodes, the regions of these nodes, and how aggressively to spawn new instances.

Understanding the Ray YAML

Here is the YAML we’ll use for our demo today. We will be referring to this file as demo.yaml:

# A unique identifier for the head node and workers of this cluster.
# EC2 instances will open as ray-${cluster_name}-head or
# ray-${cluster_name}-worker
cluster_name: demo


# The maximum number of workers nodes to launch in addition to the head node.  
# This takes precedence over min_workers which defaults to 0.
max_workers: 5  


# Cloud-provider-specific configuration.
provider:
   type: aws
   region: us-east-1


   # You can define other regions to open up worker nodes here.
   # availability_zone: us-east-1,us-west-2a,us-west-2b


   cache_stopped_nodes: False


# Define all the different node schemas in your cluster here.
# For our demo, we'll define two nodes (can be arbitrary names)
# a head node and a worker node.
available_node_types:
   head_node:
      node_config:
         InstanceType: t2.xlarge


         # Choose the instance image you want.
         # You can find these IDs when you attempt to
         # make a new AWS instance on the AWS Console
         ImageId: ami-0557a15b87f6559cf
         
         # Define disk space of instance
         BlockDeviceMappings:
            - DeviceName: /dev/sda1
              Ebs:
                  VolumeSize: 150


   worker_nodes:
      node_config:
         InstanceType: t2.xlarge
         ImageId: ami-0557a15b87f6559cf
         BlockDeviceMappings:
            - DeviceName: /dev/sda1
              Ebs:
                  VolumeSize: 150


# Define the name of the head node from above here.
head_node_type: head_node


# How Ray will authenticate with newly launched nodes.
auth:
   ssh_user: ubuntu
   # By default Ray creates a new private keypair,
   # but you can also use your own.
   # If you do so, make sure to also set "KeyName" in the head and worker node
   # configurations below.
   # ssh_private_key: /path/to/your/key.pem


# These commands are run every time any new node is created.
setup_commands:
   # Two first lines are a workaround for ssh timing out
   - sleep 4
   - sudo apt update
   - sudo apt install -y python3-pip python-is-python3
   - pip install ray[default] boto3 torch

The above YAML is a basic Ray cluster configuration. We don’t utilize autoscaling, conda, or Docker environments in this setup. If you intend on using any of these features, feel free to explore the following resources to add to your YAML:

Cluster Job Examples

Before we begin our demo, let’s look at some Ray-specific code pieces to understand what’s happening.

Test.py

Here’s a beginner example. Simply import and initialize the library, decorate the functions you wish to parallelize, and collect the results.

One critique of using decorators with Ray is that decorated functions are sent to each worker node without additional parallelization. Decorators are fine for already parallelizable functions, but in ML applications, there are many more aspects to optimize. Some key areas to parallelize are data processing, training, and serving. While it’s possible to parallelize these aspects and use a decorator, this burden is placed upon the user. Our last example will show how Ray can make this easier for ML models.

# test.py
import ray


# `auto` is passed to allow the head node
# to determine the networking.
ray.init(address='auto')


# Functions can be decorated to tell Ray what function
# will be distributed for compute.
# Decorators work perfectly for simple functions.
@ray.remote
def f(x):
    return x * x


# Manual data processing is done to collect results.
futures = [f.remote(i) for i in range(200)]
results = ray.get(futures)
print(results)

Test2.py

In this test, we’re naively checking for prime numbers given a particular range. You’ll notice when running this compute-intensive example, our head node will begin to provision worker nodes if there are none or not enough present.

test2.py

# test2.py
import ray
import time  


ray.init(address='auto')


@ray.remote
def isprime(x):  
    if x > 1:  
        for i in range(2, x):  
            if (x % i) == 0:
                return 0  
        else:  
            return x
    return 0




def main():
    lower = 9000000
    upper = 9010000
    primes = []
    objects = []
    start_time = time.time()


    for num in range(lower, upper + 1):
        x=isprime.remote(num)
        objects.append(x)  
   
    objs = ray.get(objects)


    [primes.append(x) for x in objs if x > 0]
    print(len(primes), primes[0], primes[-1])
    print("Time Elapsed:", (time.time() - start_time))  




if __name__ == "__main__":
    main()

ML Parallelization Example

Lastly, we have an ML example. Here, we define a simple neural network with arbitrary data. Compared to our previous examples using Ray’s decorator, notice how TorchTrainer() is used instead. Ray also includes workflows like this in popular deep learning frameworks like TensorFlow. Ray will automatically do the parallelization by utilizing these Trainer() calls, leading to less debugging for the user.

# ray-pytorch-sample.py
import torch
import torch.nn as nn


import misc.ray as ray
from misc.ray import train
from ray.air import session, Checkpoint
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig


input_size = 1
layer_size = 15
output_size = 1
num_epochs = 3


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(input_size, layer_size)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(layer_size, output_size)


    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))


def train_loop_per_worker():
    dataset_shard = session.get_dataset_shard("train")
    model = NeuralNetwork()
    loss_fn = nn.MSELoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1)


    model = train.torch.prepare_model(model)


    for epoch in range(num_epochs):
        for batches in dataset_shard.iter_torch_batches(batch_size=32, dtypes=torch.float):
            inputs, labels = torch.unsqueeze(batches["x"], 1), batches["y"]
            output = model(inputs)
            loss = loss_fn(output, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            print(f"epoch: {epoch}, loss: {loss.item()}")


        session.report(
            {},
            checkpoint=Checkpoint.from_dict(
                dict(epoch=epoch, model=model.state_dict())
            ),
        )


train_dataset = ray.data.from_items(
    [{"x": x, "y": 2 * x + 1} for x in range(200)]
)
scaling_config = ScalingConfig(num_workers=5)


# If using GPUs, use the below scaling config instead.
# scaling_config = ScalingConfig(num_workers=3, use_gpu=True)
trainer = TorchTrainer(
    train_loop_per_worker=train_loop_per_worker,
    scaling_config=scaling_config,
    datasets={"train": train_dataset})
result = trainer.fit()

Demo: Submitting a Job to the Ray Cluster

Now that we’ve seen how to define our YAML and some examples to run, we’ll install Ray and create our cluster using the YAML above. This demo focuses on setting up a flow where you can continue to work on your local machine and utilize remote compute clusters in the background. For more information regarding model serving and using additional services, refer to the additional resource links below.

Remember, you should have the AWS CLI configured and authenticated with your access key.

  1. Install Ray: pip install ray[default]

  2. Take the YAML file above to use for this demo. As mentioned before, we’ll call this file demo.yaml.

  3. Start the Ray cluster by entering the command: ray up demo.yaml. a. Likewise, to shut down a cluster, simply type ray down demo.yaml.

From here, Ray will create the head node, permissions, ssh access, and run the list of commands at the end of the YAML to provision the instance. You may notice that the worker nodes are not created initially. Once jobs are submitted to the head node, worker nodes will spawn based on how much compute is needed.

  1. Let’s open up the dashboard to observe our cluster live before submitting a job: ray dashboard demo.yaml. You can now monitor the dashboard by going to [http://127.0.0.1:8265](http://127.0.0.1:8265).

  2. Submit a job to the cluster and observe the resources of each worker node from the live dashboard (we’ll submit test2.py in this demo): ray submit demo.yaml test2.py.

Other Helpful Commands

  • SSH into the head node:
    • ray attach demo.yaml
  • If you’ve updated your YAML and need to update the entire cluster’s environment, simply type in the command again:
    • ray up demo.yaml
  • If you need to send a shell command to the cluster:
    • ray exec demo.yaml 'your command here'
  • If you need to send additional helper files to the cluster:
    • ray rsync-up demo.yaml /path/of/source/file /cluster/path/file
  • If you need to download files from the cluster:
    • ray rsync-down demo.yaml /cluster/path/file /local/machine/path

Resources

Be sure to check out our post on how to set up MLFlow here!

About Saturn Cloud

Saturn Cloud is your all-in-one solution for data science & ML development, deployment, and data pipelines in the cloud. Spin up a notebook with 4TB of RAM, add a GPU, connect to a distributed cluster of workers, and more. Request a demo today to learn more.