blob: 47a6cc94fdff17610fb88e82192ae74684d52702 [file] [log] [blame]
# File src/library/parallel/R/snowSOCK.R
# Part of the R package, https://www.R-project.org
#
# Copyright (C) 1995-2019 The R Core Team
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# A copy of the GNU General Public License is available at
# https://www.R-project.org/Licenses/
## Derived from snow 0.3-6 by Luke Tierney
## Uses solely Rscript, and a function in the package rather than scripts.
newPSOCKnode <- function(machine = "localhost", ...,
options = defaultClusterOptions, rank)
{
options <- addClusterOptions(options, list(...))
if (is.list(machine)) {
options <- addClusterOptions(options, machine)
machine <- machine$host
}
outfile <- getClusterOption("outfile", options)
master <- if (machine == "localhost") "localhost"
else getClusterOption("master", options)
port <- getClusterOption("port", options)
setup_timeout <- getClusterOption("setup_timeout", options)
manual <- getClusterOption("manual", options)
timeout <- getClusterOption("timeout", options)
methods <- getClusterOption("methods", options)
useXDR <- getClusterOption("useXDR", options)
## build the local command for starting the worker
env <- paste0("MASTER=", master,
" PORT=", port,
" OUT=", shQuote(outfile),
" SETUPTIMEOUT=", setup_timeout,
" TIMEOUT=", timeout,
" XDR=", useXDR)
arg <- "parallel:::.slaveRSOCK()"
rscript <- if (getClusterOption("homogeneous", options)) {
shQuote(getClusterOption("rscript", options))
} else "Rscript"
rscript_args <- getClusterOption("rscript_args", options)
if(methods) rscript_args <-c("--default-packages=datasets,utils,grDevices,graphics,stats,methods", rscript_args)
## in principle we should quote these,
## but the current possible values do not need quoting
cmd <- if(length(rscript_args))
paste(rscript, paste(rscript_args, collapse = " "),
"-e", shQuote(arg), env)
else paste(rscript, "-e", shQuote(arg), env)
## We do redirection of connections at R level once the process is
## running. We could instead do it at C level here, at least on
## a Unix-alike.
renice <- getClusterOption("renice", options)
if(!is.na(renice) && renice) ## ignore 0
cmd <- sprintf("nice +%d %s", as.integer(renice), cmd)
if (manual) {
cat("Manually start worker on", machine, "with\n ", cmd, "\n")
utils::flush.console()
} else {
## add the remote shell command if needed
if (machine != "localhost") {
## This assumes an ssh-like command
rshcmd <- getClusterOption("rshcmd", options)
user <- getClusterOption("user", options)
## this assume that rshcmd will use a shell, and that is
## the same shell as on the master.
cmd <- shQuote(cmd)
cmd <- paste(rshcmd, "-l", user, machine, cmd)
}
if (.Platform$OS.type == "windows") {
## snow said:
## On Windows using input = something seems needed to
## disconnect standard input of an ssh process when run
## from Rterm (at least using putty's plink). In
## principle this could also be used for supplying a
## password, but that is probably a bad idea. So, for now
## at least, on Windows password-less authentication is
## necessary.
##
## (Not clear if that is the current behaviour: works for me)
system(cmd, wait = FALSE, input = "")
}
else system(cmd, wait = FALSE)
}
con <- socketConnection("localhost", port = port, server = TRUE,
blocking = TRUE, open = "a+b", timeout = timeout)
structure(list(con = con, host = machine, rank = rank),
class = if(useXDR) "SOCKnode" else "SOCK0node")
}
closeNode.SOCKnode <- closeNode.SOCK0node <- function(node) close(node$con)
sendData.SOCKnode <- function(node, data) serialize(data, node$con)
sendData.SOCK0node <- function(node, data) serialize(data, node$con, xdr = FALSE)
recvData.SOCKnode <- recvData.SOCK0node <- function(node) unserialize(node$con)
recvOneData.SOCKcluster <- function(cl)
{
socklist <- lapply(cl, function(x) x$con)
repeat {
ready <- socketSelect(socklist)
if (length(ready) > 0) break;
}
n <- which.max(ready) # may need rotation or some such for fairness
list(node = n, value = unserialize(socklist[[n]]))
}
makePSOCKcluster <- function(names, ...)
{
if (is.numeric(names)) {
names <- as.integer(names[1L])
if(is.na(names) || names < 1L) stop("numeric 'names' must be >= 1")
names <- rep('localhost', names)
}
.check_ncores(length(names))
options <- addClusterOptions(defaultClusterOptions, list(...))
cl <- vector("list", length(names))
for (i in seq_along(cl))
cl[[i]] <- newPSOCKnode(names[[i]], options = options, rank = i)
class(cl) <- c("SOCKcluster", "cluster")
cl
}
print.SOCKcluster <- function(x, ...)
{
nc <- length(x)
hosts <- unique(sapply(x, "[[", "host"))
msg <- sprintf(ngettext(length(hosts),
"socket cluster with %d nodes on host %s",
"socket cluster with %d nodes on hosts %s"),
nc, paste(sQuote(hosts), collapse = ", "))
cat(msg, "\n", sep = "")
invisible(x)
}
print.SOCKnode <- print.SOCK0node <- function(x, ...)
{
sendCall(x, eval, list(quote(Sys.getpid())))
pid <- recvResult(x)
msg <- gettextf("node of a socket cluster on host %s with pid %d",
sQuote(x[["host"]]), pid)
cat(msg, "\n", sep = "")
invisible(x)
}
.slaveRSOCK <- function()
{
makeSOCKmaster <- function(master, port, setup_timeout, timeout, useXDR)
{
port <- as.integer(port)
timeout <- as.integer(timeout)
stopifnot(setup_timeout >= 0)
## Retry scheme parameters (do these need to be customizable?)
retryDelay <- 0.1 # 0.1 second initial delay before retrying
retryScale <- 1.5 # 50% increase of delay at each retry
## Retry multiple times in case the master is not yet ready
t0 <- Sys.time()
repeat {
con <- tryCatch({
socketConnection(master, port = port, blocking = TRUE,
open = "a+b", timeout = timeout)
}, error = identity)
if (inherits(con, "connection")) break
if (difftime(Sys.time(), t0, units="secs") > setup_timeout) break
Sys.sleep(retryDelay)
retryDelay <- retryScale * retryDelay
}
if (inherits(con, "error")) stop(con)
structure(list(con = con),
class = if(useXDR) "SOCKnode" else "SOCK0node")
}
## set defaults in case run manually without args.
master <- "localhost" # hostname of master process
port <- NA_integer_ # no point in getting option on worker
outfile <- Sys.getenv("R_SNOW_OUTFILE") # defaults to ""
setup_timeout <- 120 # retry setup for 2 minutes before failing
timeout <- 2592000L # wait 30 days for new cmds before failing
useXDR <- TRUE # binary serialization
for (a in commandArgs(TRUE)) {
## Or use strsplit?
pos <- regexpr("=", a)
name <- substr(a, 1L, pos - 1L)
value <- substr(a, pos + 1L, nchar(a))
switch(name,
MASTER = {master <- value},
PORT = {port <- value},
OUT = {outfile <- value},
SETUPTIMEOUT = {setup_timeout <- as.numeric(value)},
TIMEOUT = {timeout <- value},
XDR = {useXDR <- as.logical(value)})
}
if (is.na(port)) stop("PORT must be specified")
## We should not need to attach parallel, as we are running in the namespace.
sinkWorkerOutput(outfile)
msg <- sprintf("starting worker pid=%d on %s at %s\n",
Sys.getpid(), paste(master, port, sep = ":"),
format(Sys.time(), "%H:%M:%OS3"))
cat(msg)
slaveLoop(makeSOCKmaster(master, port, setup_timeout, timeout, useXDR))
}