This is the command pegasus-mpi-cluster that can be run in the OnWorks free hosting provider using one of our multiple free online workstations such as Ubuntu Online, Fedora Online, Windows online emulator or MAC OS online emulator
PROGRAM:
NAME
pegasus-mpi-cluster - a tool for running computational workflows expressed as DAGs
(Directed Acyclic Graphs) on computational clusters using MPI.
SYNOPSIS
pegasus-mpi-cluster [options] workflow.dag
DESCRIPTION
pegasus-mpi-cluster is a tool used to run HTC (High Throughput Computing) scientific
workflows on systems designed for HPC (High Performance Computing). Many HPC systems have
custom architectures that are optimized for tightly-coupled, parallel applications. These
systems commonly have exotic, low-latency networks that are designed for passing short
messages very quickly between compute nodes. Many of these networks are so highly
optimized that the compute nodes do not even support a TCP/IP stack. This makes it
impossible to run HTC applications using software that was designed for commodity
clusters, such as Condor.
pegasus-mpi-cluster was developed to enable loosely-coupled HTC applications such as
scientific workflows to take advantage of HPC systems. In order to get around the network
issues outlined above, pegasus-mpi-cluster uses MPI (Message Passing Interface), a
commonly used API for writing SPMD (Single Process, Multiple Data) parallel applications.
Most HPC systems have an MPI implementation that works on whatever exotic network
architecture the system uses.
An pegasus-mpi-cluster job consists of a single master process (this process is rank 0 in
MPI parlance) and several worker processes. The master process manages the workflow and
assigns workflow tasks to workers for execution. The workers execute the tasks and return
the results to the master. Any output written to stdout or stderr by the tasks is captured
(see TASK STDIO).
pegasus-mpi-cluster applications are expressed as DAGs (Directed Acyclic Graphs) (see DAG
FILES). Each node in the graph represents a task, and the edges represent dependencies
between the tasks that constrain the order in which the tasks are executed. Each task is a
program and a set of parameters that need to be run (i.e. a command and some optional
arguments). The dependencies typically represent data flow dependencies in the
application, where the output files produced by one task are needed as inputs for another.
If an error occurs while executing a DAG that causes the workflow to stop, it can be
restarted using a rescue file, which records the progress of the workflow (see RESCUE
FILES). This enables pegasus-mpi-cluster to pick up running the workflow where it stopped.
pegasus-mpi-cluster was designed to work either as a standalone tool or as a complement to
the Pegasus Workflow Managment System (WMS). For more information about using PMC with
Pegasus see the section on PMC AND PEGASUS.
pegasus-mpi-cluster allows applications expressed as a DAG to be executed in parallel on a
large number of compute nodes. It is designed to be simple, lightweight and robust.
OPTIONS
-h, --help
Print help message
-V, --version
Print version information
-v, --verbose
Increase logging verbosity. Adding multiple -v increases the level more. The default
log level is INFO. (see LOGGING)
-q, --quiet
Decrease logging verbosity. Adding multiple -q decreases the level more. The default
log level is INFO. (see LOGGING)
-s, --skip-rescue
Ignore the rescue file for workflow.dag if it exists. Note that pegasus-mpi-cluster
will still create a new rescue file for the current run. The default behavior is to
use the rescue file if one is found. (see RESCUE FILES)
-o path, --stdout path
Path to file for task stdout. (see TASK STDIO and --per-task-stdio)
-e path, --stderr path
Path to file for task stderr. (see TASK STDIO and --per-task-stdio)
-m M, --max-failures M
Stop submitting new tasks after M tasks have failed. Once M has been reached,
pegasus-mpi-cluster will finish running any tasks that have been started, but will not
start any more tasks. This option is used to prevent pegasus-mpi-cluster from
continuing to run a workflow that is suffering from a systematic error, such as a
missing binary or an invalid path. The default for M is 0, which means unlimited
failures are allowed.
-t T, --tries T
Attempt to run each task T times before marking the task as failed. Note that the T
tries do not count as failures for the purposes of the -m option. A task is only
considered failed if it is tried T times and all T attempts result in a non-zero
exitcode. The value of T must be at least 1. The default is 1.
-n, --nolock
Do not lock DAGFILE. By default, pegasus-mpi-cluster will attempt to acquire an
exclusive lock on DAGFILE to prevent multiple MPI jobs from running the same DAG at
the same time. If this option is specified, then the lock will not be acquired.
-r, --rescue path
Path to rescue log. If the file exists, and -s is not specified, then the log will be
used to recover the state of the workflow. The file is truncated after it is read and
a new rescue log is created in its place. The default is to append .rescue to the DAG
file name. (see RESCUE FILES)
--host-script path
Path to a script or executable to launch on each unique host that pegasus-mpi-cluster
is running on. This path can also be set using the PMC_HOST_SCRIPT environment
variable. (see HOST SCRIPTS)
--host-memory size
Amount of memory available on each host in MB. The default is to determine the amount
of physical RAM automatically. This value can also be set using the PMC_HOST_MEMORY
environment variable. (see RESOURCE-BASED SCHEDULING)
--host-cpus cpus
Number of CPUs available on each host. The default is to determine the number of CPU
cores automatically. This value can also be set using the PMC_HOST_CPUS environment
variable. (see RESOURCE-BASED SCHEDULING)
--strict-limits
This enables strict memory usage limits for tasks. When this option is specified, and
a task tries to allocate more memory than was requested in the DAG, the memory
allocation operation will fail.
--max-wall-time minutes
This is the maximum number of minutes that pegasus-mpi-cluster will allow the workflow
to run. When this time expires pegasus-mpi-cluster will abort the workflow and merge
all of the stdout/stderr files of the workers. The value is in minutes, and the
default is unlimited wall time. This option was added so that the output of a workflow
will be recorded even if the workflow exceeds the max wall time of its batch job. This
value can also be set using the PMC_MAX_WALL_TIME environment variable.
--per-task-stdio
This causes PMC to generate a .out.XXX and a .err.XXX file for each task instead of
writing task stdout/stderr to --stdout and --stderr. The name of the files are
"TASKNAME.out.XXX" and "TASKNAME.err.XXX", where "TASKNAME" is the name of the task
from the DAG and "XXX" is a sequence number that is incremented each time the task is
tried. This option overrides the values for --stdout and --stderr. This argument is
used by Pegasus when workflows are planned in PMC-only mode to facilitate debugging
and monitoring.
--jobstate-log
This option causes PMC to generate a jobstate.log file for the workflow. The file is
named "jobstate.log" and is placed in the same directory where the DAG file is
located. If the file already exists, then PMC appends new lines to the existing file.
This option is used by Pegasus when workflows are planned in PMC-only mode to
facilitate monitoring.
--monitord-hack
This option causes PMC to generate a .dagman.out file for the workflow. This file
mimics the contents of the .dagman.out file generated by Condor DAGMan. The point of
this option is to trick monitord into thinking that it is dealing with DAGMan so that
it will generate the appropriate events to populate the STAMPEDE database for
monitoring purposes. The file is named "DAG.dagman.out" where "DAG" is the path to the
PMC DAG file.
--no-resource-log
Do not generate a workflow.dag.resource file for the workflow.
--no-sleep-on-recv
Do not use polling with sleep() to implement message receive. (see Known Issues: CPU
Usage)
--maxfds
Set the maximum number of file descriptors that can be left open by the master for I/O
forwarding. By default this value is set automatically based on the value of
getrlimit(RLIMIT_NOFILE). The value must be at least 1, and cannot be more than
RLIMIT_NOFILE.
--keep-affinity
By default PMC attempts to reset the CPU and memory affinity. This is to ensure that
all available CPUs and memory can be used by PMC tasks on systems that are not
configured properly. This flag tells PMC to keep the affinity settings inherited from
its parent. Note that the memory policy can only be cleared if PMC was compiled with
libnuma. CPU affinity is cleared using sched_setaffinity(), and memory policy is
cleared with set_mempolicy().
DAG FILES
pegasus-mpi-cluster workflows are expressed using a simple text-based format similar to
that used by Condor DAGMan. There are only two record types allowed in a DAG file: TASK
and EDGE. Any blank lines in the DAG (lines with all whitespace characters) are ignored,
as are any lines beginning with # (note that # can only appear at the beginning of a line,
not in the middle).
The format of a TASK record is:
"TASK" id [options...] executable [arguments...]
Where id is the ID of the task, options is a list of task options, executable is the path
to the executable or script to run, and arguments... is a space-separated list of
arguments to pass to the task. An example is:
TASK t01 -m 10 -c 2 /bin/program -a -b
This example specifies a task t01 that requires 10 MB memory and 2 CPUs to run
/bin/program with the arguments -a and -b. The available task options are:
-m M, --request-memory M
The amount of memory required by the task in MB. The default is 0, which means memory
is not considered for this task. This option can be set for a job in the DAX by
specifying the pegasus::pmc_request_memory profile. (see RESOURCE-BASED SCHEDULING)
-c N, --request-cpus N
The number of CPUs required by the task. The default is 1, which implies that the
number of slots on a host should be less than or equal to the number of physical CPUs
in order for all the slots to be used. This option can be set for a job in the DAX by
specifying the pegasus::pmc_request_cpus profile. (see RESOURCE-BASED SCHEDULING)
-t T, --tries T
The number of times to try to execute the task before failing permanently. This is the
task-level equivalent of the --tries command-line option.
-p P, --priority P
The priority of the task. P should be an integer. Larger values have higher priority.
The default is 0. Priorities are simply hints and are not strict—if a task cannot be
matched to an available slot (e.g. due to resource availability), but a lower-priority
task can, then the task will be deferred and the lower priority task will be executed.
This option can be set for a job in the DAX by specifying the pegasus::pmc_priority
profile.
-f VAR=FILE, --pipe-forward VAR=FILE
Forward I/O to file FILE using pipes to communicate with the task. The environment
variable VAR will be set to the value of a file descriptor for a pipe to which the
task can write to get data into FILE. For example, if a task specifies: -f
FOO=/tmp/foo then the environment variable FOO for the task will be set to a number
(e.g. 3) that represents the file /tmp/foo. In order to specify this argument in a
Pegasus DAX you need to set the pegasus::pmc_arguments profile (note that the value of
pmc_arguments must contain the "-f" part of the argument, so a valid value would be:
<profile namespace="pegasus" key="pmc_arguments">-f A=/tmp/a </profile>). (see I/O
FORWARDING)
-F SRC=DEST, --file-forward SRC=DEST
Forward I/O to the file DEST from the file SRC. When the task finishes, the worker
will read the data from SRC and send it to the master where it will be written to the
file DEST. After SRC is read it is deleted. In order to specify this argument in a
Pegasus DAX you need to set the pegasus::pmc_arguments profile. (see I/O FORWARDING)
The format of an EDGE record is:
"EDGE" parent child
Where parent is the ID of the parent task, and child is the ID of the child task. An
example EDGE record is:
EDGE t01 t02
A simple diamond-shaped workflow would look like this:
# diamond.dag
TASK A /bin/echo "I am A"
TASK B /bin/echo "I am B"
TASK C /bin/echo "I am C"
TASK D /bin/echo "I am D"
EDGE A B
EDGE A C
EDGE B D
EDGE C D
RESCUE FILES
Many different types of errors can occur when running a DAG. One or more of the tasks may
fail, the MPI job may run out of wall time, pegasus-mpi-cluster may segfault (we hope
not), the system may crash, etc. In order to ensure that the DAG does not need to be
restarted from the beginning after an error, pegasus-mpi-cluster generates a rescue file
for each workflow.
The rescue file is a simple text file that lists all of the tasks in the workflow that
have finished successfully. This file is updated each time a task finishes, and is flushed
periodically so that if the work- flow fails and the user restarts it, pegasus-mpi-cluster
can determine which tasks still need to be executed. As such, the rescue file is a sort-of
transaction log for the workflow.
The rescue file contains zero or more DONE records. The format of these records is:
"DONE" *taskid*
Where taskid is the ID of the task that finished successfully.
By default, rescue files are named DAGNAME.rescue where DAGNAME is the path to the input
DAG file. The file name can be changed by specifying the -r argument.
PMC AND PEGASUS
Using PMC for Pegasus Task Clustering
PMC can be used as the wrapper for executing clustered jobs in Pegasus. In this mode
Pegasus groups several tasks together and submits them as a single clustered job to a
remote system. PMC then executes the individual tasks in the cluster and returns the
results.
PMC can be specified as the task manager for clustered jobs in Pegasus in three ways:
1. Globally in the properties file
The user can set a property in the properties file that results in all the clustered
jobs of the workflow being executed by PMC. In the Pegasus properties file specify:
#PEGASUS PROPERTIES FILE
pegasus.clusterer.job.aggregator=mpiexec
In the above example, all the clustered jobs on all remote sites will be launched via
PMC as long as the property value is not overridden in the site catalog.
2. By setting the profile key "job.aggregator" in the site catalog:
<site handle="siteX" arch="x86" os="LINUX">
...
<profile namespace="pegasus" key="job.aggregator">mpiexec</profile>
</site>
In the above example, all the clustered jobs on a siteX are going to be executed via
PMC as long as the value is not overridden in the transformation catalog.
3. By setting the profile key "job.aggregator" in the transformation catalog:
tr B {
site siteX {
pfn "/path/to/mytask"
arch "x86"
os "linux"
type "INSTALLED"
profile pegasus "clusters.size" "3"
profile pegasus "job.aggregator" "mpiexec"
}
}
In the above example, all the clustered jobs for transformation B on siteX will be
executed via PMC.
It is usually necessary to have a pegasus::mpiexec entry in your transformation catalog
that specifies a) the path to PMC on the remote site and b) the relevant globus profiles
such as xcount, host_xcount and maxwalltime to control size of the MPI job. That entry
would look like this:
tr pegasus::mpiexec {
site siteX {
pfn "/path/to/pegasus-mpi-cluster"
arch "x86"
os "linux"
type "INSTALLED"
profile globus "maxwalltime" "240"
profile globus "host_xcount" "1"
profile globus "xcount" "32"
}
}
If this transformation catalog entry is not specified, Pegasus will attempt create a
default path on the basis of the environment profile PEGASUS_HOME specified in the site
catalog for the remote site.
PMC can be used with both horizontal and label-based clustering in Pegasus, but we
recommend using label-based clustering so that entire sub-graphs of a Pegasus DAX can be
clustered into a single PMC job, instead of only a single level of the workflow.
Pegasus Profiles for PMC
There are several Pegasus profiles that map to PMC task options:
pmc_request_memory
This profile is used to set the --request-memory task option and is usually specified
in the DAX or transformation catalog.
pmc_request_cpus
This key is used to set the --request-cpus task option and is usually specified in the
DAX or transformation catalog.
pmc_priority
This key is used to set the --priority task option and is usually specified in the
DAX.
These profiles are used by Pegasus when generating PMC’s input DAG when PMC is used as the
task manager for clustered jobs in Pegasus.
The profiles can be specified in the DAX like this:
<job id="ID0000001" name="mytask">
<arguments>-a 1 -b 2 -c 3</arguments>
...
<profile namespace="pegasus" key="pmc_request_memory">1024</profile>
<profile namespace="pegasus" key="pmc_request_cpus">4</profile>
<profile namespace="pegasus" key="pmc_priority">10</profile>
</job>
This example specifies a PMC task that requires 1GB of memory and 4 cores, and has a
priority of 10. It produces a task in the PMC DAG that looks like this:
TASK mytask_ID00000001 -m 1024 -c 4 -p 10 /path/to/mytask -a 1 -b 2 -c 3
Using PMC for the Entire Pegasus DAX
Pegasus can also be configured to run the entire workflow as a single PMC job. In this
mode Pegasus will generate a single PMC DAG for the entire workflow as well as a PBS
script that can be used to submit the workflow.
In contrast to using PMC as a task clustering tool, in this mode there are no jobs in the
workflow executed without PMC. The entire workflow, including auxilliary jobs such as
directory creation and file transfers, is managed by PMC. If Pegasus is configured in this
mode, then DAGMan and Condor are not required.
To run in PMC-only mode, set the property "pegasus.code.generator" to "PMC" in the Pegasus
properties file:
pegasus.code.generator=PMC
In order to submit the resulting PBS job you may need to make changes to the .pbs file
generated by Pegasus to get it to work with your cluster. This mode is experimental and
has not been used extensively.
LOGGING
By default, all logging messages are printed to stderr. If you turn up the logging using
-v then you may end up with a lot of stderr being forwarded from the workers to the
master.
The log levels in order of severity are: FATAL, ERROR, WARN, INFO, DEBUG, and TRACE.
The default logging level is INFO. The logging levels can be increased with -v and
decreased with -q.
TASK STDIO
By default the stdout and stderr of tasks will be redirected to the master’s stdout and
stderr. You can change the path of these files with the -o and -e arguments. You can also
enable per-task stdio files using the --per-task-stdio argument. Note that if per-task
stdio files are not used then the stdio of all workers will be merged into one out and one
err file by the master at the end, so I/O from different workers will not be interleaved,
but I/O from each worker will appear in the order that it was generated. Also note that,
if the job fails for any reason, the outputs will not be merged, but instead there will be
one file for each worker named DAGFILE.out.X and DAGFILE.err.X, where DAGFILE is the path
to the input DAG, and X is the worker’s rank.
HOST SCRIPTS
A host script is a shell script or executable that pegasus-mpi-cluster launches on each
unique host on which it is running. They can be used to start auxilliary services, such as
memcached, that the tasks in a workflow require.
Host scripts are specified using either the --host-script argument or the PMC_HOST_SCRIPT
environment variable.
The host script is started when pegasus-mpi-cluster starts and must exit with an exitcode
of 0 before any tasks can be executed. If it the host script returns a non-zero exitcode,
then the workflow is aborted. The host script is given 60 seconds to do any setup that is
required. If it doesn’t exit in 60 seconds then a SIGALRM signal is delivered to the
process, which, if not handled, will cause the process to terminate.
When the workflow finishes, pegasus-mpi-cluster will deliver a SIGTERM signal to the host
script’s process group. Any child processes left running by the host script will receive
this signal unless they created their own process group. If there were any processes left
to receive this signal, then they will be given a few seconds to exit, then they will be
sent SIGKILL. This is the mechanism by which processes started by the host script can be
informed of the termination of the workflow.
RESOURCE-BASED SCHEDULING
High-performance computing resources often have a low ratio of memory to CPUs. At the same
time, workflow tasks often have high memory requirements. Often, the memory requirements
of a workflow task exceed the amount of memory available to each CPU on a given host. As a
result, it may be necessary to disable some CPUs in order to free up enough memory to run
the tasks. Similarly, many codes have support for multicore hosts. In that case it is
necessary for efficiency to ensure that the number of cores required by the tasks running
on a host do not exceed the number of cores available on that host.
In order to make this process more efficient, pegasus-mpi-cluster supports resource-based
scheduling. In resource-based scheduling the tasks in the workflow can specify how much
memory and how many CPUs they require, and pegasus-mpi-cluster will schedule them so that
the tasks running on a given host do not exceed the amount of physical memory and CPUs
available. This enables pegasus-mpi-cluster to take advantage of all the CPUs available
when the tasks' memory requirement is low, but also disable some CPUs when the tasks'
memory requirement is higher. It also enables workflows with a mixture of single core and
multi-core tasks to be executed on a heterogenous pool.
If there are no hosts available that have enough memory and CPUs to execute one of the
tasks in a workflow, then the workflow is aborted.
Memory
Users can specify both the amount of memory required per task, and the amount of memory
available per host. If the amount of memory required by any task exceeds the available
memory of all the hosts, then the workflow will be aborted. By default, the host memory is
determined automatically, however the user can specify --host-memory to "lie" to
pegasus-mpi-cluster. The amount of memory required for each task is specified in the DAG
using the -m/--request-memory argument (see DAG Files).
CPUs
Users can specify the number of CPUs required per task, and the total number of CPUs
available on each host. If the number of CPUs required by a task exceeds the available
CPUs on all hosts, then the workflow will be aborted. By default, the number of CPUs on a
host is determined automatically, but the user can specify --host-cpus to over- or
under-subscribe the host. The number of CPUs required for each task is specified in the
DAG using the -c/--request-cpus argument (see DAG Files).
I/O FORWARDING
In workflows that have lots of small tasks it is common for the I/O written by those tasks
to be very small. For example, a workflow may have 10,000 tasks that each write a few KB
of data. Typically each task writes to its own file, resulting in 10,000 files. This I/O
pattern is very inefficient on many parallel file systems because it requires the file
system to handle a large number of metadata operations, which are a bottleneck in many
parallel file systems.
One way to handle this problem is to have all 10,000 tasks write to a single file. The
problem with this approach is that it requires those tasks to synchronize their access to
the file using POSIX locks or some other mutual exclusion mechanism. Otherwise, the writes
from different tasks may be interleaved in arbitrary order, resulting in unusable data.
In order to address this use case PMC implements a feature that we call "I/O Forwarding".
I/O forwarding enables each task in a PMC job to write data to an arbitrary number of
shared files in a safe way. It does this by having PMC worker processes collect data
written by the task and send it over over the high-speed network using MPI messaging to
the PMC master process, where it is written to the output file. By having one process (the
PMC master process) write to the file all of the I/O from many parallel tasks can be
synchronized and written out to the files safely.
There are two different ways to use I/O forwarding in PMC: pipes and files. Pipes are more
efficient, but files are easier to use.
I/O forwarding using pipes
I/O forwarding with pipes works by having PMC worker processes collect data from each task
using UNIX pipes. This approach is more efficient than the file-based approach, but it
requires the code of the task to be changed so that the task writes to the pipe instead of
a regular file.
In order to use I/O forwarding a PMC task just needs to specify the -f/--pipe-forward
argument to specify the name of the file to forward data to, and the name of an
environment variable through which the PMC worker process can inform it of the file
descriptor for the pipe.
For example, if there is a task "mytask" that needs to forward data to two files:
"myfile.a" and "myfile.b", it would look like this:
TASK mytask -f A=/tmp/myfile.a -f B=/tmp/myfile.b /bin/mytask
When the /bin/mytask process starts it will have two variables in its environment: "A=3"
and "B=4", for example. The value of these variables is the file descriptor number of the
corresponding files. In this case, if the task wants to write to "/tmp/myfile.a", it gets
the value of environment variable "A", and calls write() on that descriptor number. In C
the code for that looks like this:
char *A = getenv("A");
int fd = atoi(A);
char *message = "Hello, World\n";
write(fd, message, strlen(message));
In some programming languages it is not possible to write to a file descriptor directly.
Fortran, for example, refers to files by unit number instead of using file descriptors. In
these languages you can either link C I/O functions into your binary and call them from
routines written in the other language, or you can open a special file in the Linux /proc
file system to get another handle to the pipe you want to access. For the latter, the file
you should open is "/proc/self/fd/NUMBER" where NUMBER is the file descriptor number you
got from the environment variable. For the example above, the pipe for myfile.a
(environment variable A) is "/proc/self/fd/3".
If you are using pegasus-kickstart, which is probably the case if you are using PMC for a
Pegasus workflow, then there’s a trick you can do to avoid modifying your code. You use
the /proc file system, as described above, but you let pegasus-kickstart handle the path
construction. For example, if your application has an argument, -o, that allows you to
specify the output file then you can write your task like this:
TASK mytask -f A=/tmp/myfile.a /bin/pegasus-kickstart /bin/mytask -o /proc/self/fd/$A
In this case, pegasus-kickstart will replace the $A in your application arguments with the
file descriptor number you want. Your code can open that path normally, write to it, and
then close it as if it were a regular file.
I/O forwarding using files
I/O forwarding with files works by having tasks write out data in files on the local disk.
The PMC worker process reads these files and forwards the data to the master where it can
be written to the desired output file. This approach may be much less efficient than using
pipes because it involves the file system, which has more overhead than a pipe.
File forwarding can be enabled by giving the -F/--file-forward argument to a task.
Here’s an example:
TASK mytask -F /tmp/foo.0=/scratch/foo /bin/mytask -o /tmp/foo.0
In this case, the worker process will expect to find the file /tmp/foo.0 when mytask exits
successfully. It reads the data from that file and sends it to the master to be written to
the end of /scratch/foo. After /tmp/foo.0 is read it will be deleted by the worker
process.
This approach works best on systems where the local disk is a RAM file system such as Cray
XT machines. Alternatively, the task can use /dev/shm on a regular Linux cluster. It might
also work relatively efficiently on a local disk if the file system cache is able to
absorb all of the reads and writes.
I/O forwarding caveats
When using I/O forwarding it is important to consider a few caveats.
First, if the PMC job fails for any reason (including when the workflow is aborted for
violating --max-wall-time), then the files containing forwarded I/O may be corrupted. They
can include partial records, meaning that only part of the I/O from one or more tasks was
written, and they can include duplicate records, meaning that the I/O was written, but the
PMC job failed before the task could be marked as successful, and the workflow was
restarted later. We make no guarantees about the contents of the data files in this case.
It is up to the code that reads the files to a) detect and b) recover from such problems.
To eliminate duplicates the records should include a unique identifier, and to eliminate
partials the records should include a checksum.
Second, you should not use I/O forwarding if your task is going to write a lot of data to
the file. Because the PMC worker is reading data off the pipe/file into memory and sending
it in an MPI message, if you write too much, then the worker process will run the system
out of memory. Also, all the data needs to fit in a single MPI message. In pipe forwarding
there is no hard limit on the size, but in file forwarding the limit is 1MB. We haven’t
benchmarked the performance on large I/O, but anything larger than about 1 MB is probably
too much. At any rate, if your data is larger than 1MB, then I/O forwarding probably won’t
have much of a performance benefit anyway.
Third, the I/O is not written to the file if the task returns a non-zero exitcode. We
assume that if the task failed that you don’t want the data it produced.
Fourth, the data from different tasks is not interleaved. All of the data written by a
given task will appear sequentially in the output file. Note that you can still get
partial records, however, if any data from a task appears it will never be split among
non-adjacent ranges in the output file. If you have 3 tasks that write: "I am a task" you
can get:
I am a taskI am a taskI am a task
and:
I am a taskI amI am a task
but not:
I am a taskI amI am a task a task
Fifth, data from different tasks appears in arbitrary order in the output file. It depends
on what order the tasks were executed by PMC, which may be arbitrary if there are no
dependencies between the tasks. The data that is written should contain enough information
that you are able to determine which task produced it if you require that. PMC does not
add any headers or trailers to the data.
Sixth, a task will only be marked as successful if all of its I/O was successfully
written. If the workflow completed successfully, then the I/O is guaranteed to have been
written.
Seventh, if the master is not able to write to the output file for any reason (e.g. the
master tries to write the I/O to the destination file, but the write() call returns an
error) then the task is marked as failed even if the task produced a non-zero exitcode. In
other words, you may get a non-zero kickstart record even when PMC marks the task failed.
Eighth, the pipes are write-only. If you need to read and write data from the file you
should use file forwarding and not pipe forwarding.
Ninth, all files are opened by the master in append mode. This is so that, if the workflow
fails and has to be restarted, or if a task fails and is retried, the data that was
written previously is not lost. PMC never truncates the files. This is one of the reasons
why you can have partial records and duplicate records in the output file.
Finally, in file forwarding the output file is removed when the task exits. You cannot
rely on the file to be there when the next task runs even if you write it to a shared file
system.
MISC
Resource Utilization
At the end of the workflow run, the master will report the resource utilization of the
job. This is done by adding up the total runtimes of all the tasks executed (including
failed tasks) and dividing by the total wall time of the job times N, where N is both the
total number of processes including the master, and the total number of workers. These two
resource utilization values are provided so that users can get an idea about how
efficiently they are making use of the resources they allocated. Low resource utilization
values suggest that the user should use fewer cores, and longer wall time, on future runs,
while high resource utilization values suggest that the user could use more cores for
future runs and get a shorter wall time.
KNOWN ISSUES
fork() and exec()
In order for the worker processes to start tasks on the compute node the compute nodes
must support the fork() and exec() system calls. If your target machine runs a
stripped-down OS on the compute nodes that does not support these system calls, then
pegasus-mpi-cluster will not work.
CPU Usage
Many MPI implementations are optimized so that message sends and receives do busy waiting
(i.e. they spin/poll on a message send or receive instead of sleeping). The reasoning is
that sleeping adds overhead and, since many HPC systems use space sharing on dedicated
hardware, there are no other processes competing, so spinning instead of sleeping can
produce better performance. On those implementations MPI processes will run at 100% CPU
usage even when they are just waiting for a message. This is a big problem for multicore
tasks in pegasus-mpi-cluster because idle slots consume CPU resources. In order to solve
this problem pegasus-mpi-cluster processes sleep for a short period between checks for
waiting messages. This reduces the load significantly, but causes a short delay in
receiving messages. If you are using an MPI implementation that sleeps on message send and
receive instead of doing busy waiting, then you can disable the sleep by specifying the
--no-sleep-on-recv option. Note that the master will always sleep if --max-wall-time is
specified because there is no way to interrupt or otherwise timeout a blocking call in MPI
(e.g. SIGALRM does not cause MPI_Recv to return EINTR).
ENVIRONMENT VARIABLES
The environment variables below are aliases for command-line options. If the environment
variable is present, then it is used as the default for the associated option. If both are
present, then the command-line option is used.
PMC_HOST_SCRIPT
Alias for the --host-script option.
PMC_HOST_MEMORY
Alias for the --host-memory option.
PMC_HOST_CPUS
Alias for the --host-cpus option.
PMC_MAX_WALL_TIME
Alias for the --max-wall-time option.
Use pegasus-mpi-cluster online using onworks.net services