Applies an R function to a Spark object (typically, a Spark DataFrame).
Usage
spark_apply(
x,
f,
columns = NULL,
memory = TRUE,
group_by = NULL,
packages = NULL,
context = NULL,
name = NULL,
barrier = NULL,
fetch_result_as_sdf = TRUE,
partition_index_param = "",
arrow_max_records_per_batch = NULL,
auto_deps = FALSE,
...
)Arguments
- x
An object (usually a
spark_tbl) coercable to a Spark DataFrame.- f
A function that transforms a data frame partition into a data frame. The function
fhas signaturef(df, context, group1, group2, ...)wheredfis a data frame with the data to be processed,contextis an optional object passed as thecontextparameter andgroup1togroupNcontain the values of thegroup_byvalues. Whengroup_byis not specified,ftakes only one argument.Can also be an
rlanganonymous function. For example, as~ .x + 1to define an expression that adds one to the given.xdata frame.- columns
A vector of column names or a named vector of column types for the transformed object. When not specified, a sample of 10 rows is taken to infer out the output columns automatically, to avoid this performance penalty, specify the column types. The sample size is configurable using the
sparklyr.apply.schema.inferconfiguration option.- memory
Boolean; should the table be cached into memory?
- group_by
Column name used to group by data frame partitions.
- packages
Boolean to distribute
.libPaths()packages to each node, a list of packages to distribute, or a package bundle created withspark_apply_bundle().Defaults to
TRUEor thesparklyr.apply.packagesvalue set inspark_config().For clusters using Yarn cluster mode,
packagescan point to a package bundle created usingspark_apply_bundle()and made available as a Spark file usingconfig$sparklyr.shell.files. For clusters using Livy, packages can be manually installed on the driver node.For offline clusters where
available.packages()is not available, manually download the packages database from https://cran.r-project.org/web/packages/packages.rds and setSys.setenv(sparklyr.apply.packagesdb = "<pathl-to-rds>"). Otherwise, all packages will be used by default.For clusters where R packages already installed in every worker node, the
spark.r.libpathsconfig entry can be set inspark_config()to the local packages library. To specify multiple paths collapse them (without spaces) with a comma delimiter (e.g.,"/lib/path/one,/lib/path/two").- context
Optional object to be serialized and passed back to
f().- name
Optional table name while registering the resulting data frame.
- barrier
Optional to support Barrier Execution Mode in the scheduler.
- fetch_result_as_sdf
Whether to return the transformed results in a Spark Dataframe (defaults to
TRUE). When set toFALSE, results will be returned as a list of R objects instead.NOTE:
fetch_result_as_sdfmust be set toFALSEwhen the transformation function being applied is returning R objects that cannot be stored in a Spark Dataframe (e.g., complex numbers or any other R data type that does not have an equivalent representation among Spark SQL data types).- partition_index_param
Optional if non-empty, then
falso receives the index of the partition being processed as a named argument with this name, in addition to all positional argument(s) it will receiveNOTE: when
fetch_result_as_sdfis set toFALSE, object returned from the transformation function also must be serializable by thebase::serializefunction in R.- arrow_max_records_per_batch
Maximum size of each Arrow record batch, ignored if Arrow serialization is not enabled.
- auto_deps
[Experimental] Whether to infer all required R packages by examining the closure
f()and only distribute required R and their transitive dependencies to Spark worker nodes (default: FALSE). NOTE: this option will only take effect ifpackagesis set toTRUEor is a character vector of R package names. Ifpackagesis a character vector of R package names, then both the set of packages specified bypackagesand the set of inferred packages will be distributed to Spark workers.- ...
Optional arguments; currently unused.
Configuration
spark_config() settings can be specified to change the workers
environment.
For instance, to set additional environment variables to each
worker node use the sparklyr.apply.env.* config, to launch workers
without --vanilla use sparklyr.apply.options.vanilla set to
FALSE, to run a custom script before launching Rscript use
sparklyr.apply.options.rscript.before.
Examples
if (FALSE) { # \dontrun{
library(sparklyr)
sc <- spark_connect(master = "local[3]")
# creates an Spark data frame with 10 elements then multiply times 10 in R
sdf_len(sc, 10) %>% spark_apply(function(df) df * 10)
# using barrier mode
sdf_len(sc, 3, repartition = 3) %>%
spark_apply(nrow, barrier = TRUE, columns = c(id = "integer")) %>%
collect()
} # }