mirai_map()
performs asynchronous parallel/distributed
map using mirai
.
This function is similar to purrr::map()
, but returns a
‘mirai_map’ object. It is also more advanced as it allows multiple map
over the rows of a dataframe or matrix. It is in fact used to implement
all parallel map variations from that package.
The results of a mirai_map x
may be collected using
x[]
. This waits for all asynchronous operations to complete
if still in progress.
It offers the following key advantages:
parallel
package. Chunking
cannot take into account varying or unpredictable compute times over the
indices, which mirai
scheduling is designed to deal with
optimally. This is demonstrated in the example below.
library(mirai)
library(parallel)
cl <- make_cluster(4)
daemons(4)
#> [1] 4
vec <- c(1, 1, 4, 4, 1, 1, 1, 1)
system.time(mirai_map(vec, Sys.sleep)[])
#> user system elapsed
#> 0.003 0.009 4.007
system.time(parLapply(cl, vec, Sys.sleep))
#> user system elapsed
#> 0.006 0.017 8.004
daemons(0)
#> [1] 0
.args
is used to specify further constant arguments to
.f
- the ‘mean’ and ‘sd’ in the example below:
with(
daemons(3, dispatcher = FALSE),
mirai_map(1:3, rnorm, .args = list(mean = 20, sd = 2))[]
)
#> [[1]]
#> [1] 21.4368
#>
#> [[2]]
#> [1] 18.48262 25.45403
#>
#> [[3]]
#> [1] 19.16141 22.79903 19.24614
Use ...
to further specify objects referenced but not
defined in .f
- the ‘do’ in the anonymous function
below:
daemons(4)
#> [1] 4
ml <- mirai_map(
c(a = 1, b = 2, c = 3),
function(x) do(x, as.logical(x %% 2)),
do = nanonext::random
)
ml
#> < mirai map [0/3] >
ml[]
#> $a
#> [1] "6a"
#>
#> $b
#> [1] 42 b7
#>
#> $c
#> [1] "08cbc5"
Use of
mirai_map()
requires thatdaemons()
have previously been set, and will error if this is not the case.
When collecting the results, optionally specify arguments to
[]
:
x[.flat]
collects and flattens the results, checking
that they are of the same type to avoid coercion.x[.progress]
collects results using a cli
progress bar, if available, showing completion percentage and ETA, or
else a simple text progress indicator of parts completed of the total.
If the map operation completes quickly, the cli
progress
bar may not show at all, and this is by design.x[.stop]
collects the results applying early stopping,
which stops at the first failure and cancels remaining computations. If
the cli
package is available, it will be used for
displaying the error message.Combinations of the above may be supplied in the fashion of
x[.stop, .progress]
.
mirai_map(list(a = 1, b = "a", c = 3), function(x) exp(x))[.stop]
#> Error in `mirai_map()`:
#> ℹ In index: 2.
#> ℹ With name: b.
#> Caused by error in `exp()`:
#> ! non-numeric argument to mathematical function
with(
daemons(4, dispatcher = FALSE, .compute = "sleep"),
mirai_map(c(0.1, 0.2, 0.3), Sys.sleep)[.progress, .flat]
)
#> NULL
Multiple map is performed over the rows of a dataframe or matrix, as this is most often the desired behaviour.
As a dataframe often contains columns of differing type, it is unusual to want to map over the columns, however this is possible by simply transforming it beforehand into a list using
as.list()
.
This allows map over 2 or more arguments by specifying a dataframe. One of those may be an index value for indexed map.
The function .f
must take as many arguments as there are
columns, either explicitly or via ...
.
fruit <- c("melon", "grapes", "coconut")
# create a dataframe for indexed map:
df <- data.frame(i = seq_along(fruit), fruit = fruit)
with(
daemons(3, dispatcher = FALSE, .compute = "fruit"),
mirai_map(df, sprintf, .args = list(fmt = "%d. %s"))[.flat]
)
#> [1] "1. melon" "2. grapes" "3. coconut"
mirai_map()
maps a matrix over its
rows, consistent with the behaviour for dataframes Note
that this is different to the behaviour of lapply()
or
purrr::map()
, which treats a matrix the same as an ordinary
vector.
If instead, mapping over the columns is desired, simply take the transpose of the matrix beforehand using
t()
.