Massively Parallel Jobs for Research Workflows

Dispatching Massively Parallel Jobs to Saturn Cloud for Research Workflows

This section is a continuation of the previous section for dispatching jobs to Saturn Cloud. The previous section went over dispatching a single command to Saturn Cloud. However there are also times when you may want to create hundreds or thousands of runs in Saturn Cloud. Doing so by creating a job per run both makeds the Saturn Cloud UI cluttered and difficult to use, but it also makes it hard to retrieve, and understand which runs have succeeded and which have failed.

In addition, with massively parallel jobs you need to expect failures. When you dispatch a single job, the probability of intermittent failures is generally low - this is usually caused by hardware failures, network issues with data stores, AWS request rate limiting, etc. However once you start dispatching millions of jobs, the probability of having to deal with a handful of failures even under ideal conditions goes up dramatically. Massively parallel jobs in Saturn Cloud are designed around these constraints.

This section will go through the process of batching your runs. More precisely, spinning up N concurrent instances of M instance-type, and running J processes/runs of your script concurrently on each instance. Batching runs on large instances is important in order to reduce the impact of job overhead. For example if it takes 5 minutes to spin up a new machine, and I only allocate 5 minutes of work, then half the time of the job is wasted.

We also cover:

  • save all the output, stdout, stderr, and status code to networked storage.
  • re-running failures locally.
  • re-submitting work without re-submitting runs that have already completed.

Step 1: Specifying runs

Runs can be specified in json or yaml (yaml is much slower if you have thousands of runs)

runs:
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2023-12-31 --end-dt 2024-01-02 --enrich --first-seen
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-01 --end-dt 2024-01-03 --enrich --first-seen
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-02 --end-dt 2024-01-04 --enrich --first-seen
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-03 --end-dt 2024-01-05 --enrich --first-seen
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-04 --end-dt 2024-01-06 --enrich --first-seen

For example the above is a batch for a backfill script that populates intercom (our support chat tool) with information about our users so that we can better assist them with technical issues.

We expect the following parameter:

  • runs: a list of all runs we want to execute. This example only has 4 runs to keep things concise, but you will have more.

Each run expects a single parameter

  • cmd: this is the command you would like to execute.

Step 2: Creating your job recipe

As mentioned in the section on dispatching jobs for research workflows, we recommend getting your job to work locally in a Python workspace, and then after that you can clone the Python workspace into a job with a command like this:

$ sc clone workspace my-workspace job my-job --command "..." --owner <org-name>/<username>

then write the recipe to a file:

$ sc get job my-job --owner <org-name>/<username> > /tmp/recipe.yaml

Here, I’m just using ... as a placeholder for the command, because in the next step (splitting/batching) we will use this recipe file as a template, and replace the command.

Step 3: Splitting runs into batches

The next step is to split your N runs into M batches. These runs will be executed on J machines, nprocs runs simultaneously at a time.

$  sc split recipe.yaml jobs.yaml --nprocs 8 --scale 2 --remote-output-path file:///home/jovyan/shared/hugo/test/run-output/ --sync /home/jovyan/workspace/sctest/ --file-syncs-base-dir-url file:///home/jovyan/shared/hugo/test/sctest-base-dir

The above commands does a few things

  • creates a job using recipe.yaml as a template, but one that is designed to execute the runs listed in jobs.yaml
  • runs 8 runs at a time on each machine
  • spins up 2 machines to execute all runs
  • stores job output in file:///home/jovyan/shared/hugo/test/run-output/
  • copies the local directory /home/jovyan/workspace/sctest to all worker nodes
  • uses file:///home/jovyan/shared/hugo/test/sctest-base-dir as a staging ground for all data copies

The resulting recipe is generated and saved to /tmp/commands/recipe.yaml. It is as follows:

schema_version: 2024.04.01
type: job
spec:
  name: sctest-run
  owner: test/hugo
  description: ''
  image: internal/saturncloud/saturn-python-pytorch:2024.08.01
  instance_type: g4dnxlarge
  environment_variables: {}
  working_directory: /home/jovyan/workspace
  start_script: |+
    ### BEGIN SATURN_CLIENT GENERATED CODE
    cp /home/jovyan/shared/hugo/test/sctest-base-dir/home/jovyan/workspace/sctest/data.tar.gz /tmp/data.tar.gz
    mkdir -p /home/jovyan/workspace/sctest/
    tar -xvzf /tmp/data.tar.gz -C /home/jovyan/workspace/sctest/
    cp /home/jovyan/shared/hugo/test/sctest-base-dir/tmp/commands/data.tar.gz /tmp/data.tar.gz
    mkdir -p /tmp/commands/
    tar -xvzf /tmp/data.tar.gz -C /tmp/commands/
    ### END SATURN_CLIENT GENERATED CODE
    pip install git+https://github.com/saturncloud/saturn-client.git@hugo/nfs-work

  token_scope: job:{self}:dask:write
  git_repositories: []
  secrets: []
  shared_folders:
  - owner: test/hugo
    path: /home/jovyan/shared/hugo/test
    name: test
  start_dind: false
  command:
  - sc batch /tmp/commands/0.json
  - sc batch /tmp/commands/1.json
  - sc batch /tmp/commands/2.json
  - sc batch /tmp/commands/3.json
  scale: 2
  use_spot_instance: false
  schedule:
state:
  id: 77341afe99df47c6a0570a95dd716668
  status: stopped

You can see that the command section has been modified to execute a list of commands (which are all sc batch). sc batch will handle the execution of each set of runs and store the output in the proper location. You can also inspect /tmp/commands/0.json, which contains the following information:

{'runs': [{'remote_output_path': 'file:///home/jovyan/shared/hugo/test/run-output/0',
   'cmd': 'python /home/jovyan/workspace/sctest/test.py 1',
   'local_results_dir': '/tmp/1e5761678f56443eb02104938c633dbd/',
   'status_code': None,
   'status': None},
   ...
  ],
 'remote_output_path': 'file:///home/jovyan/shared/hugo/test/run-output/',
 'nprocs': 8}

Step 4: execute the job

$ sc apply /tmp/command/recipe.yaml --start

Step 5: wait for the job to complete and inspect the results

$ sc summarize-batch /tmp/batch.yaml

Will display status information for runs, based on remote storage. Output is as follows:

status       status_code    remote_output_path                                             cmd
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
completed    0              sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-01    python operations/pipelines/hosted/enrich_user_data.py --start-dt 2023-12-31 --end-dt 2024-01-02 --enrich --first-seen
completed    0              sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-02    python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-01 --end-dt 2024-01-03 --enrich --first-seen
completed    0              sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-03    python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-02 --end-dt 2024-01-04 --enrich --first-seen
completed    0              sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-04    python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-03 --end-dt 2024-01-05 --enrich --first-seen
completed    0              sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-05    python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-04 --end-dt 2024-01-06 --enrich --first-seen
completed    0              sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-06    python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-05 --end-dt 2024-01-07 --enrich --first-seen
completed    0              sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-07    python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-06 --end-dt 2024-01-08 --enrich --first-seen
completed    0              sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-08    python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-07 --end-dt 2024-01-09 --enrich --first-seen
completed    0              sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-09    python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-08 --end-dt 2024-01-10 --enrich --first-seen

Run output

The output for every run is stored in the remote_output_path specifically

  • stdout: the standard output of your command
  • stderr: the standardd error of your command
  • status_code: the unix status code of your command. 0 means it completed successfully
  • results: Optional - any result files your job has written (more on this later)

Results

The sc batch command will populate an environment variable: SATURN_RUN_LOCAL_RESULTS_DIR. Anything your job writes to that directory will be copied to ${remote_output_path}/results

This output makes it easy to grab any runs that failed, and re-run them locally.

Hardware Selections

This section of the recipe determines the hardware used for runs:

spec:
  instance_type: r6axlarge
  scale: 2

This means that we will consume at most 2 r6axlarge until all workloads are complete. The instance type and scale can also be passed in as command line options to sc split, using the --instance-type and --scale options.

$ sc options

will list all available instance sizes.

Failures

As mentioned earlier - as the number of runs increases, having to deal with failures is almost guaranteed. As a result the batching infrastructure makes it easy to skip completed runs, and re-try failed runs. You could identify completed/failed runs your self by reading all the individual status_code files. However the split command has a few options to make this easier. By default sc split will schedule all work from your batch.json or batch.yaml file. However if you pass --skip-completed it will automatically ignore everything that has completed successfully, and if you pass --skip-failures it will automatically skip everything that has failed.

GPUs

This batching framework will execute for example, 40 runs, 8 at a time on an 8 GPU machine. In doing so, you would want each process to be allocated to a single GPU. The sc batch framework will set the environment variables SATURN_RUN_LOCAL_RANK. Your code can use that to select the GPU, or you could just set

os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['SATURN_RUN_LOCAL_RANK']