gather
takes a channel as argument and returns a
promise. All values emitted by the channel will be collected
into a vector matching the prototype mode. After the source
channel closes, the promise will resolve with the collected
vector.
Method as.promise.channel
is a synonym for gather
.
collect
and collector
are used in the
implementation of the above functions. collect
calls the
function fn
in its argument, supplying a callback of the form
function (val, name=NULL).
I like to call it emit
. While
fn
is running, it can call emit(x)
any number of times.
After fn
returns, all the values passed to emit
are returned
in a vector, with optional names.
collector()
works similarly to collect() but does
not gather values when your inner function returns. Instead, it
provides your inner function with two callbacks, one to add a
value and the second to extract the value; so you can use that
callback to extract values at a later time. For an example of
collector
usage see the definition of gather.
gather(ch, list())
returns a [promise] that eventually
resolves with a list. If the channel emits an error, the promise
will reject with that error. The partial results will be attached
to the error's attr(err, "partialResults")
.
collect
returns a vector of the same mode as type
.
ch <- stream(for (i in 1:10) {await(delay(0.1)); if (i %% 3 == 0) yield(i)})
if (FALSE) ch |> gather(numeric(0)) |> then(\(x)cat(x, "\n"))
#cumulative sum with collect
cumsum <- function(vec) {
total <- 0
collect(type=0, function(emit) {
for (i in vec) total <- emit(total+i)
})
}
# `as.list.iteror` is implemented simply with `collect`:
as.list.iteror <- function(it) {
collect(\(yield) repeat yield(nextOr(it, break)))
}