Title: | PEcAn Model Execution Utilities |
---|---|
Description: | This package contains utilities for communicating with and executing code on local and remote hosts. In particular, it has PEcAn-specific utilities for starting ecosystem model runs. |
Authors: | David LeBauer [aut], Rob Kooper [aut, cre], Shawn Serbin [aut], Alexey Shiklomanov [aut], Shashank Singh [aut], Chris Black [aut], University of Illinois, NCSA [cph] |
Maintainer: | Rob Kooper <[email protected]> |
License: | BSD_3_clause + file LICENSE |
Version: | 1.8.0.9000 |
Built: | 2025-02-04 23:21:45 UTC |
Source: | https://github.com/PecanProject/pecan |
Check if model run was successful
check_model_run(out, stop.on.error = TRUE)
check_model_run(out, stop.on.error = TRUE)
out |
Output from model execution, as a character. |
stop.on.error |
Throw error if any of the runs fails. Default TRUE. |
TRUE
if model run succeeded. If model run failed, throw an error if stop.on.error
, or return FALSE.
Returns the fully qualified hostname. This is potentially different from Sys.info()['nodename']
which can return just the hostname part and not the domain as well. For example the machine
pecan.ncsa.illinois.edu will return just that as fqdn but only pecan for hostname.
fqdn()
fqdn()
fully qualified hostname
Rob Kooper
fqdn()
fqdn()
Check if host is local
is.localhost(host)
is.localhost(host)
host |
the hostname to be checked |
Given the hostname is this the localhost. This returns true if either the value is localhost, or the value is the same as the fqdn.
true if the host is the local host name
Rob Kooper
is.localhost(fqdn())
is.localhost(fqdn())
Kill tunnel to remote machine
kill.tunnel(settings, exe = TRUE, data = TRUE)
kill.tunnel(settings, exe = TRUE, data = TRUE)
settings |
PEcAn settings list |
exe |
Kill tunnel to executable? |
data |
Kill tunnel to data? |
Rob Kooper
Merge multiple job.sh files into one larger file.
merge_job_files(settings, jobs_per_file = 10, outdir = NULL)
merge_job_files(settings, jobs_per_file = 10, outdir = NULL)
settings |
PEcAn.settings object with host section. |
jobs_per_file |
the number of files you want to merge. |
outdir |
output directory of merged job files. |
vector of the newly created filenames
Dongchen Zhang
Open an SSH tunnel, prompting for passwords as needed
open_tunnel( remote_host, user = NULL, password = NULL, tunnel_dir = "~/.pecan/tunnel/", wait.time = 15, tunnel_script = "~/pecan/web/sshtunnel.sh" )
open_tunnel( remote_host, user = NULL, password = NULL, tunnel_dir = "~/.pecan/tunnel/", wait.time = 15, tunnel_script = "~/pecan/web/sshtunnel.sh" )
remote_host |
name of remote server to connect to (e.g. geo.bu.edu) |
user |
username on remote_host |
password |
password on remote_host |
tunnel_dir |
directory to store tunnel file in, typically from settings$host |
wait.time |
how long to give system to connect before deleting password (seconds) |
tunnel_script |
Path to sshtunnel.sh script file for opening tunnel |
numeric giving ssh PID if configured, otherwise logical with TRUE = success
Get Job ID from qsub output
qsub_get_jobid(out, qsub.jobid, stop.on.error)
qsub_get_jobid(out, qsub.jobid, stop.on.error)
out |
Output from model execution, as a character. |
qsub.jobid |
(character) Regular expression string for extracting job ID from qsub output.
Usually from |
stop.on.error |
Throw error if any of the runs fails. Default TRUE. |
Job ID, as a string
qsub_parallel
qsub_parallel( settings, files = NULL, prefix = "sipnet.out", sleep = 10, hybrid = TRUE )
qsub_parallel( settings, files = NULL, prefix = "sipnet.out", sleep = 10, hybrid = TRUE )
settings |
pecan settings object |
files |
allow submit jobs based on job.sh file paths. |
prefix |
used for detecting if jobs are completed or not. |
sleep |
time (in second) that we wait each time for the jobs to be completed. |
hybrid |
A Boolean argument decide the way of detecting job completion. If it's TRUE then we will detect both the outputted files and job ids on the server. If it's FALSE then we will only detect the job ids on the server. |
Dongchen Zhang
## Not run: qsub_parallel(settings) ## End(Not run)
## Not run: qsub_parallel(settings) ## End(Not run)
Check if qsub run finished
qsub_run_finished(run, host, qstat)
qsub_run_finished(run, host, qstat)
run |
run ID, as an integer |
host |
host structure to execute command on |
qstat |
(string) qstat command for checking job status |
TRUE
if run is marked as DONE, otherwise FALSE.
This will first check to see if the queue already exists in RabbitMQ, if not it will create the queue. If the queue exists, or is created it will return TRUE, it will return FALSE otherwise.
rabbitmq_create_queue( url, auth, vhost, queue, auto_delete = FALSE, durable = TRUE )
rabbitmq_create_queue( url, auth, vhost, queue, auto_delete = FALSE, durable = TRUE )
url |
parsed RabbitMQ URL. |
auth |
the httr authentication object to use. |
vhost |
the vhost where to create the queue. |
queue |
the queue that should be checked/created. |
auto_delete |
should the queue be deleted afterwards (FALSE is default) |
durable |
should the messages exists after a server restart (TRUE is default) |
TRUE if the queue now exists, FALSE otherwise.
Rob Kooper
This will get a message from RabbitMQ, if the queue does not exist it will be created. The message will be converted to a json message that is returned.
rabbitmq_get_message(uri, queue, count = 1, prefix = "", port = 15672)
rabbitmq_get_message(uri, queue, count = 1, prefix = "", port = 15672)
uri |
RabbitMQ URI or URL to rest endpoint |
queue |
the queue the message is received from. |
count |
the number of messages to retrieve from the queue. |
prefix |
prefix for the rabbitmq api endpoint, default is for no prefix. |
port |
port for the management interface, the default is 15672. |
NA if no message was retrieved, or a list of the messages payload.
Alexey Shiklomanov, Rob Kooper
This will parse the uri into smaller pieces that can be used to talk to the rest endpoint for RabbitMQ.
rabbitmq_parse_uri(uri, prefix = "", port = 15672)
rabbitmq_parse_uri(uri, prefix = "", port = 15672)
uri |
the amqp URI |
prefix |
the prefix that the RabbitMQ managmenet interface uses |
port |
the port for rabbitmq managment interface |
a list that contains the url to the mangement interface, username password and vhost.
This will submit a message to RabbitMQ, if the queue does not exist it will be created. The message will be converted to a json message that is submitted.
rabbitmq_post_message(uri, queue, message, prefix = "", port = 15672)
rabbitmq_post_message(uri, queue, message, prefix = "", port = 15672)
uri |
RabbitMQ URI or URL to rest endpoint |
queue |
the queue the message is submitted to |
message |
the message to submit, will beconverted to json. |
prefix |
prefix for the rabbitmq api endpoint, default is for no prefix. |
port |
port for the management interface, the default is 15672. |
the result of the post if message was send, or NA if it failed.
Alexey Shiklomanov, Rob Kooper
It will check the resulting status code and print a message in case something goes wrong.
rabbitmq_send_message(url, auth, body, action = "POST", silent = FALSE)
rabbitmq_send_message(url, auth, body, action = "POST", silent = FALSE)
url |
the full endpoint rest url |
auth |
authentication for rabbitmq in httr:auth |
body |
the actual body to send, this is a rabbitmq message. |
action |
the rest action to perform |
silent |
boolean to indicate if logging should be performed. |
will return NA if message failed, otherwise it will either return the resulting message, or if not availble an empty string "".
Copy file/dir from remote server to local server
remote.copy.from( host, src, dst, options = NULL, delete = FALSE, stderr = FALSE )
remote.copy.from( host, src, dst, options = NULL, delete = FALSE, stderr = FALSE )
host |
list with server, user and optionally tunnel to use. |
src |
remote file/dir to copy |
dst |
local file/dir to copy to |
options |
to be passed to rsync command, if nothing is specified everything will be rsynced |
delete |
in case of local dir should all non-existent files be removed |
stderr |
should stderr be returned |
Copies the file/dir from the remote server to the local server. If the dst is a folder it will copy the file into that folder.
output of command executed
Rob Kooper
## Not run: host <- list(name='geo.bu.edu', user='kooper', tunnel='/tmp/geo.tunnel') remote.copy.from(host, '/tmp/kooper', '/tmp/geo.tmp', delete=TRUE) ## End(Not run)
## Not run: host <- list(name='geo.bu.edu', user='kooper', tunnel='/tmp/geo.tunnel') remote.copy.from(host, '/tmp/kooper', '/tmp/geo.tmp', delete=TRUE) ## End(Not run)
Copies the file/dir to the remote server from the local server. If the dst is a folder it will copy the file into that folder.
remote.copy.to(host, src, dst, options = NULL, delete = FALSE, stderr = FALSE)
remote.copy.to(host, src, dst, options = NULL, delete = FALSE, stderr = FALSE)
host |
host structure to execute command on |
src |
local file/dir to copy |
dst |
remote file/dir to copy to |
options |
additional arguments to be passed to rsync command |
delete |
in case of local dir should all non-existent files be removed |
stderr |
should stderr be returned as well. |
output of command executed
Rob Kooper
## Not run: host <- list(name='geo.bu.edu', user='kooper', tunnel='/tmp/geo.tunnel') remote.copy.to(host, '/tmp/kooper', '/tmp/kooper', delete=TRUE) ## End(Not run)
## Not run: host <- list(name='geo.bu.edu', user='kooper', tunnel='/tmp/geo.tunnel') remote.copy.to(host, '/tmp/kooper', '/tmp/kooper', delete=TRUE) ## End(Not run)
Execute command remotely
remote.execute.cmd(host, cmd, args = character(), stderr = FALSE)
remote.execute.cmd(host, cmd, args = character(), stderr = FALSE)
host |
host structure to execute command on |
cmd |
the system command to be invoked, as a character string. |
args |
a character vector of arguments to command. |
stderr |
should stderr be returned as well. |
Executes the given command on the remote host using ssh. If the user is set the system will login as the given user. If the host given is the local machine it will execute the command locally without ssh.
the captured output of the command (both stdout and stderr)
Rob Kooper
## Not run: host <- list(name='geo.bu.edu', user='kooper', tunnel='/tmp/geo.tunnel') print(remote.execute.cmd(host, 'ls', c('-l', '/'), stderr=TRUE)) ## End(Not run)
## Not run: host <- list(name='geo.bu.edu', user='kooper', tunnel='/tmp/geo.tunnel') print(remote.execute.cmd(host, 'ls', c('-l', '/'), stderr=TRUE)) ## End(Not run)
Execute command remotely
remote.execute.R( script, host = "localhost", user = NA, verbose = FALSE, R = "R", scratchdir = tempdir() )
remote.execute.R( script, host = "localhost", user = NA, verbose = FALSE, R = "R", scratchdir = tempdir() )
script |
the script to be invoked, as a list of commands. |
host |
settings host list |
user |
the username to use for remote login |
verbose |
should the output be printed to the console |
R |
Path to the R executable or binary file. |
scratchdir |
Path to the scratch directory for temporary files during remote execution. |
Executes the given command on the remote host using ssh. If the user is set the system will login as the given user. If the host given is the local machine it will execute the command locally without ssh.
the captured output of the command (both stdout and stderr)
Rob Kooper
## Not run: remote.execute.R('list.files()', host='localhost', verbose=FALSE) ## End(Not run)
## Not run: remote.execute.R('list.files()', host='localhost', verbose=FALSE) ## End(Not run)
Setup model launcher script and job list
setup_modellauncher(run, rundir, host_rundir, mpirun, binary)
setup_modellauncher(run, rundir, host_rundir, mpirun, binary)
run |
(numeric) run ID, as an integer |
rundir |
Local run directory. Usually from |
host_rundir |
Remote host run directory. Usually from |
mpirun |
MPI info, usually from |
binary |
Binary info, usually from |
Start qsub runs
start_qsub( run, qsub_string, rundir, host, host_rundir, host_outdir, stdout_log, stderr_log, job_script, qsub_extra = NULL )
start_qsub( run, qsub_string, rundir, host, host_rundir, host_outdir, stdout_log, stderr_log, job_script, qsub_extra = NULL )
run |
(numeric) run ID, as an integer |
qsub_string |
qsub command string, with arguments. Usually from |
rundir |
Local run directory. Usually from |
host |
Remote host, as a list or character. Usually from |
host_rundir |
Remote host run directory. Usually from |
host_outdir |
Remote host output directory. Usually from |
stdout_log |
Logfile for redirecting |
stderr_log |
Logfile for redirecting |
job_script |
Base name (no path) of script to run. Usually either |
qsub_extra |
Extra |
Output of qsub command, as a character. This output can be parsed for ascertaining submission success.
Start model execution using rabbitmq
start_rabbitmq(folder, rabbitmq_uri, rabbitmq_queue)
start_rabbitmq(folder, rabbitmq_uri, rabbitmq_queue)
folder |
Directory containing jobs to be started |
rabbitmq_uri |
RabbitMQ uri where messages should be posted |
rabbitmq_queue |
Queue to which messages are submitted |
Output of execution command, as a character
(see rabbitmq_post_message()
).
Start model execution in serial mode
start_serial(run, host, rundir, host_rundir, job_script)
start_serial(run, host, rundir, host_rundir, job_script)
run |
(numeric) run ID, as an integer |
host |
Remote host, as a list or character. Usually from |
rundir |
Local run directory. Usually from |
host_rundir |
Remote host run directory. Usually from |
job_script |
Base name (no path) of script to run. Usually either |
Output of execution command, as a character (see remote.execute.cmd()
).
DEFUNCT: This function has been moved to PEcAn.workflow::start_model_runs; please use that instead.
## S3 method for class 'model.runs' start(settings, write = TRUE, stop.on.error = TRUE) runModule.start.model.runs(settings, stop.on.error = TRUE)
## S3 method for class 'model.runs' start(settings, write = TRUE, stop.on.error = TRUE) runModule.start.model.runs(settings, stop.on.error = TRUE)
settings |
pecan settings object |
write |
(logical) Whether or not to write to the database. Default TRUE. |
stop.on.error |
Throw error if any of the runs fails. Default TRUE. |
Shawn Serbin, Rob Kooper, David LeBauer, Alexey Shiklomanov
## Not run: start.model.runs(settings) ## End(Not run)
## Not run: start.model.runs(settings) ## End(Not run)
Test remote execution
test_remote(host, stderr = TRUE, ...)
test_remote(host, stderr = TRUE, ...)
host |
host structure to execute command on |
stderr |
should stderr be returned as well. |
... |
additional arguments. |
TRUE
is remote execution is successful.
If unsuccessful, depends on the value of stderr
.
If stderr
= TRUE (default), this function will throw an error.
If stderr
= FALSE, this function will print a logger error and return FALSE.
# Localhost execution should always work good_host <- list(name = "localhost") test_remote(good_host) bad_host <- list(name = "bigbadwolf") if (!test_remote(bad_host, stderr = FALSE)) { print("Big Bad Wolf is a bad host.") }
# Localhost execution should always work good_host <- list(name = "localhost") test_remote(good_host) bad_host <- list(name = "bigbadwolf") if (!test_remote(bad_host, stderr = FALSE)) { print("Big Bad Wolf is a bad host.") }