Massively Parallel Jobs for Research Workflows
This section is a continuation of the previous section for dispatching jobs to Saturn Cloud. The previous section went over dispatching a single command to Saturn Cloud. However there are also times when you may want to create hundreds or thousands of runs in Saturn Cloud. Doing so by creating a job per run both makeds the Saturn Cloud UI cluttered and difficult to use, but it also makes it hard to retrieve, and understand which runs have succeeded and which have failed.
In addition, with massively parallel jobs you need to expect failures. When you dispatch a single job, the probability of intermittent failures is generally low - this is usually caused by hardware failures, network issues with data stores, AWS request rate limiting, etc. However once you start dispatching millions of jobs, the probability of having to deal with a handful of failures even under ideal conditions goes up dramatically. Massively parallel jobs in Saturn Cloud are designed around these constraints.
This section will go through the process of batching your runs. More precisely, spinning up N concurrent instances of M instance-type, and running J processes/runs of your script concurrently on each instance. Batching runs on large instances is important in order to reduce the impact of job overhead. For example if it takes 5 minutes to spin up a new machine, and I only allocate 5 minutes of work, then half the time of the job is wasted.
We also cover:
- save all the output, stdout, stderr, and status code to networked storage.
- re-running failures locally.
- re-submitting work without re-submitting runs that have already completed.
Note:
This article makes use of the saturn cloud cli, which you can install with pip install saturn-client
. In order to use saturn-client, you need to have the following environment variables set:
- SATURN_BASE_URL (which will look something like
https://app.community.saturnenterprise.io
) - SATURN_TOKEN. Make sure your token is
unscoped
Both of these are automatically populated in all Saturn Cloud resources.
Step 1: Specifying runs
Runs can be specified in json or yaml (yaml is much slower if you have thousands of runs)
runs:
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2023-12-31 --end-dt 2024-01-02 --enrich --first-seen
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-01 --end-dt 2024-01-03 --enrich --first-seen
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-02 --end-dt 2024-01-04 --enrich --first-seen
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-03 --end-dt 2024-01-05 --enrich --first-seen
- cmd: python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-04 --end-dt 2024-01-06 --enrich --first-seen
For example the above is a batch for a backfill script that populates intercom (our support chat tool) with information about our users so that we can better assist them with technical issues.
We expect the following parameter:
- runs: a list of all runs we want to execute. This example only has 4 runs to keep things concise, but you will have more.
Each run expects a single parameter
- cmd: this is the command you would like to execute.
Step 2: Creating your job recipe
As mentioned in the section on dispatching jobs for research workflows, we recommend getting your job to work locally in a Python workspace, and then after that you can clone the Python workspace into a job with a command like this:
$ sc clone workspace my-workspace job my-job --command "..." --owner <org-name>/<username>
then write the recipe to a file:
$ sc get job my-job --owner <org-name>/<username> > /tmp/recipe.yaml
Here, I’m just using ...
as a placeholder for the command, because in the next step (splitting/batching) we will use this recipe file as a template, and replace the command.
Step 3: Splitting runs into batches
The next step is to split your N runs into M batches. These runs will be executed on J machines, nprocs
runs simultaneously at a time.
$ sc split recipe.yaml jobs.yaml --nprocs 8 --scale 2 --remote-output-path file:///home/jovyan/shared/hugo/test/run-output/ --sync /home/jovyan/workspace/sctest/ --file-syncs-base-dir-url file:///home/jovyan/shared/hugo/test/sctest-base-dir
The above commands does a few things
- creates a job using
recipe.yaml
as a template, but one that is designed to execute the runs listed injobs.yaml
- runs 8 runs at a time on each machine
- spins up 2 machines to execute all runs
- stores job output in
file:///home/jovyan/shared/hugo/test/run-output/
- copies the local directory
/home/jovyan/workspace/sctest
to all worker nodes - uses
file:///home/jovyan/shared/hugo/test/sctest-base-dir
as a staging ground for all data copies
The resulting recipe is generated and saved to /tmp/commands/recipe.yaml
. It is as follows:
schema_version: 2024.04.01
type: job
spec:
name: sctest-run
owner: test/hugo
description: ''
image: internal/saturncloud/saturn-python-pytorch:2024.08.01
instance_type: g4dnxlarge
environment_variables: {}
working_directory: /home/jovyan/workspace
start_script: |+
### BEGIN SATURN_CLIENT GENERATED CODE
cp /home/jovyan/shared/hugo/test/sctest-base-dir/home/jovyan/workspace/sctest/data.tar.gz /tmp/data.tar.gz
mkdir -p /home/jovyan/workspace/sctest/
tar -xvzf /tmp/data.tar.gz -C /home/jovyan/workspace/sctest/
cp /home/jovyan/shared/hugo/test/sctest-base-dir/tmp/commands/data.tar.gz /tmp/data.tar.gz
mkdir -p /tmp/commands/
tar -xvzf /tmp/data.tar.gz -C /tmp/commands/
### END SATURN_CLIENT GENERATED CODE
pip install git+https://github.com/saturncloud/saturn-client.git@hugo/nfs-work
token_scope: job:{self}:dask:write
git_repositories: []
secrets: []
shared_folders:
- owner: test/hugo
path: /home/jovyan/shared/hugo/test
name: test
start_dind: false
command:
- sc batch /tmp/commands/0.json
- sc batch /tmp/commands/1.json
- sc batch /tmp/commands/2.json
- sc batch /tmp/commands/3.json
scale: 2
use_spot_instance: false
schedule:
state:
id: 77341afe99df47c6a0570a95dd716668
status: stopped
You can see that the command section has been modified to execute a list of commands (which are all sc batch
). sc batch
will handle the execution of each set of runs and store the output in the proper location. You can also inspect /tmp/commands/0.json
, which contains the following information:
{'runs': [{'remote_output_path': 'file:///home/jovyan/shared/hugo/test/run-output/0',
'cmd': 'python /home/jovyan/workspace/sctest/test.py 1',
'local_results_dir': '/tmp/1e5761678f56443eb02104938c633dbd/',
'status_code': None,
'status': None},
...
],
'remote_output_path': 'file:///home/jovyan/shared/hugo/test/run-output/',
'nprocs': 8}
Step 4: execute the job
$ sc apply /tmp/command/recipe.yaml --start
Step 5: wait for the job to complete and inspect the results
$ sc summarize-batch /tmp/batch.yaml
Will display status information for runs, based on remote storage. Output is as follows:
status status_code remote_output_path cmd
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-01 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2023-12-31 --end-dt 2024-01-02 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-02 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-01 --end-dt 2024-01-03 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-03 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-02 --end-dt 2024-01-04 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-04 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-03 --end-dt 2024-01-05 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-05 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-04 --end-dt 2024-01-06 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-06 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-05 --end-dt 2024-01-07 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-07 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-06 --end-dt 2024-01-08 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-08 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-07 --end-dt 2024-01-09 --enrich --first-seen
completed 0 sfs://internal/hugo/intercom-backfill-2024-03-24/2024-01-09 python operations/pipelines/hosted/enrich_user_data.py --start-dt 2024-01-08 --end-dt 2024-01-10 --enrich --first-seen
Run output
The output for every run is stored in the remote_output_path
specifically
- stdout: the standard output of your command
- stderr: the standardd error of your command
- status_code: the unix status code of your command. 0 means it completed successfully
- results: Optional - any result files your job has written (more on this later)
Results
The sc batch command will populate an environment variable: SATURN_RUN_LOCAL_RESULTS_DIR
. Anything your job writes to that directory will be copied to ${remote_output_path}/results
This output makes it easy to grab any runs that failed, and re-run them locally.
Hardware Selections
This section of the recipe determines the hardware used for runs:
spec:
instance_type: r6axlarge
scale: 2
This means that we will consume at most 2 r6axlarge until all workloads are complete. The instance type and scale can also be passed in as command line options to sc split
, using the --instance-type
and --scale
options.
$ sc options
will list all available instance sizes.
Failures
As mentioned earlier - as the number of runs increases, having to deal with failures is almost guaranteed. As a result the batching infrastructure makes it easy to skip completed runs, and re-try failed runs. You could identify completed/failed runs your self by reading all the individual status_code
files. However the split
command has a few options to make this easier. By default sc split
will schedule all work from your batch.json
or batch.yaml
file. However if you pass --skip-completed
it will automatically ignore everything that has completed successfully, and if you pass --skip-failures
it will automatically skip everything that has failed.
GPUs
This batching framework will execute for example, 40 runs, 8 at a time on an 8 GPU machine. In doing so, you would want each process to be allocated to a single GPU. The sc batch
framework will set the environment variables SATURN_RUN_LOCAL_RANK
. Your code can use that to select the GPU, or you could just set
os.environ['CUDA_VISIBLE_DEVICES'] = os.environ['SATURN_RUN_LOCAL_RANK']