Update - Feb 20, 2023
You can achieve this using the parabar package. Disclaimer: I am the author of the package.
You can use the package in an interactive R session as follows.
# Load the package.
library(parabar)
# Define a task to run in parallel.
task <- function(x) {
# Sleep a bit.
Sys.sleep(0.01)
# Return the result of a computation.
return(x + 1)
}
# Start a backend that supports progress tracking (i.e., `async`).
backend <- start_backend(cores = 4, cluster_type = "psock", backend_type = "async")
# Configure the bar if necessary, or change the bar engine.
configure_bar(
format = " > completed :current out of :total tasks [:percent] [:elapsed]"
)
# Run the task.
results <- par_sapply(backend, 1:1000, task)
# Update the progress bar options.
configure_bar(
format = "[:bar] :percent"
)
# Run the task again.
results <- par_sapply(backend, 1:1000, task)
# Stop the backend.
stop_backend(backend)
If you need more flexibility (e.g., when building an R package). There is
also a lower-level API based on R6 classes.
# Create a specification object.
specification <- Specification$new()
# Set the number of cores.
specification$set_cores(cores = 4)
# Set the cluster type.
specification$set_type(type = "psock")
# Create a progress tracking context.
context <- ProgressDecorator$new()
# Get a backend that supports progress-tracking.
backend <- AsyncBackend$new()
# Register the backend with the context.
context$set_backend(backend)
# Start the backend.
context$start(specification)
# Get a modern bar instance.
bar <- ModernBar$new()
# Register the bar with the context.
context$set_bar(bar)
# Configure the bar.
context$configure_bar(
show_after = 0,
format = " > completed :current out of :total tasks [:percent] [:elapsed]"
)
# Run a task in parallel (i.e., approx. 3.125 seconds).
context$sapply(x = 1:1000, fun = task)
# Get the task output.
output <- backend$get_output()
# Close the backend.
context$stop()
Here is a proposed workflow, which perhaps fits Steve Weston's characterization
as a valiant effort. Yet, with some overhead, it accomplishes what I am
primarily interested in, i.e., (1) a cross-platform solution, (2) not tempering
with low-level parallel implementation details, and (3) being parsimonious
concerning the dependencies used.
In a nutshell, the code below does the following:
- The function
prepare_file_for_logging creates a temporary file (i.e.,
OS-specific location) that will be used later on to report and track the
progress of the parallel task execution.
- The function
par_sapply_with_progress starts an R session in the
background (i.e., without blocking the main session).
- In this background session, it sets up a cluster (i.e., it can be either
PSOCK or FORK) and runs the task in parallel via the function
parallel::parSapply.
- During each task run, the workers (i.e., cluster nodes) report the
progress to the temporary file (i.e., strictly in the form of a new line
to avoid race conditions).
- Back in the main process, the function
track_progress monitors the temporary
file and displays and updates a progress bar based on its contents.
- The main process remains blocked until the progress bar is completed and
the background process has been terminated.
The libraries used are parallel and callr, and some other functions from
base and utils. For the sake of clarity, the code below is explicitly
commented.
Usage
# Load libraries.
library(parallel)
library(callr)
# How many times to run?
runs <- 40
# Prepare file for logging the progress.
file_name <- prepare_file_for_logging()
# Run the task in parallel without blocking the main process.
process <- par_sapply_with_progress(
# Cluster specifications.
cores = 4,
type = "PSOCK",
# Where to write the progress.
file_name = file_name,
# Task specifications (i.e., just like in `parallel::parSapply`).
x = 1:runs,
fun = function(x, y) {
# Wait a little.
Sys.sleep(0.5)
# Do something useful.
return(x * y)
},
args = list(
y = 10
)
)
# Monitor the progress (i.e., blocking the main process until completion).
track_progress(
process = process,
iterations = runs,
file_name = file_name,
cleanup = TRUE
)
# Show the results.
print(process$get_result())
# |=====================================================================| 100%
# [1] 10 20 30 40 50 60 70 80 90 100 110 120 130 140 150 160 170 180
# [19] 190 200 210 220 230 240 250 260 270 280 290 300 310 320 330 340 350 360
# [37] 370 380 390 400
Implementation
The function for preparing a temporary file
# Create and get temporary file name.
prepare_file_for_logging <- function(file_name) {
# If the file name is missing.
if(missing(file_name)) {
# Get a temporary file name (i.e., OS specific).
file_name <- tempfile()
}
# Create the actual file to avoid race conditions.
file_created <- file.create(file_name)
# Indicate if something went wrong creating the file.
stopifnot("Failed to create file." = file_created)
return(file_name)
}
The function for running the task in parallel
# Run task in parallel and log the progress.
par_sapply_with_progress <- function(cores, type, file_name, x, fun, args) {
# Decorate the task function to enable progress tracking.
get_decorated_task <- function(task) {
# Evaluate symbol.
force(task)
# Create wrapper.
return(function(x, file_name, ...) {
# Update the progress on exit.
on.exit({
# Write processed element to file.
cat("\n", file = file_name, append = TRUE)
})
return(task(x, ...))
})
}
# Get the decorated task.
fun_decorated <- get_decorated_task(fun)
# Start a background process.
background_process <- callr::r_bg(function(cores, type, file_name, x, fun_decorated, args) {
# Make cluster.
cluster <- parallel::makeCluster(cores, type = type)
# Close the cluster on exit.
on.exit({
# Stop the cluster.
parallel::stopCluster(cluster)
})
# Output.
output <- do.call(parallel::parSapply, c(
list(cluster, x, fun_decorated, file_name), args
))
# Return the results to the background process.
return(output)
}, args = list(cores, type, file_name, x, fun_decorated, args))
# Return the background process `R6` object.
return(background_process)
}
The function for tracking the progress
# Track progress and keep the main process busy.
track_progress <- function(process, iterations, file_name, cleanup = TRUE) {
if (cleanup) {
on.exit({
# Remove the file (i.e., just in case).
unlink(file_name)
})
}
# Create a progress bar.
bar <- txtProgressBar(min = 0, max = iterations, initial = NA, style = 3)
# Record the number of processed iterations (i.e., runs).
n_tasks_processed <- 0
# While the process is alive.
while(n_tasks_processed < iterations) {
# Get the number of tasks processed.
n_tasks_processed <- length(scan(file_name, blank.lines.skip = FALSE, quiet = TRUE))
# If the process that started the workers is finished.
if(!process$is_alive()) {
# Indicate that all tasks have been processed.
n_tasks_processed <- iterations
}
# Update the progress bar.
setTxtProgressBar(bar, n_tasks_processed)
}
# Close the progress bar.
close(bar)
# Wait for the process to close.
process$wait()
}
Things to consider
Concerning logging and reading the progress from the temporary file, there are
two things I can think of for reducing the overhead:
- First, one can consider reducing the granularity of the reporting (i.e.,
perhaps it is not necessary to log the progress after each task run, but, say,
every fifth or so).
- Second, one may also consider reducing the update frequency of the progress
bar. Currently, the
track_progress continuously scans the temporary file and
updates the progress bar. However, this is likely not necessary. Perhaps a
better way is to set a timeout between subsequent file scans and progress bar
updates.
Finally, I personally prefer opening a cluster once and reusing it across
different parts of my code. In this scenario, I would switch from callr::r_bg
(i.e., short-lived background R process) to callr::r_session (i.e.,
permanent R session) for more control (i.e., also see this question).
I hope this helps others that have struggled with this issue as well.