gather takes a channel as argument and returns a
promise. All values emitted by the channel will be collected
into a vector matching the prototype mode. After the source
channel closes, the promise will resolve with the collected
vector.
Method as.promise.channel is a synonym for gather.
collect and collector are used in the
implementation of the above functions. collect calls the
function fn in its argument, supplying a callback of the form
function (val, name=NULL). I like to call it emit. While
fn is running, it can call emit(x) any number of times.
After fn returns, all the values passed to emit are returned
in a vector, with optional names.
collector() works similarly to collect() but does
not gather values when your inner function returns. Instead, it
provides your inner function with two callbacks, one to add a
value and the second to extract the value; so you can use that
callback to extract values at a later time. For an example of
collector usage see the definition of gather.
gather(ch, list()) returns a [promise] that eventually
resolves with a list. If the channel emits an error, the promise
will reject with that error. The partial results will be attached
to the error's attr(err, "partialResults").
collect returns a vector of the same mode as type.
ch <- stream(for (i in 1:10) {await(delay(0.1)); if (i %% 3 == 0) yield(i)})
if (FALSE) ch |> gather(numeric(0)) |> then(\(x)cat(x, "\n"))
#cumulative sum with collect
cumsum <- function(vec) {
total <- 0
collect(type=0, function(emit) {
for (i in vec) total <- emit(total+i)
})
}
# `as.list.iteror` is implemented simply with `collect`:
as.list.iteror <- function(it) {
collect(\(yield) repeat yield(nextOr(it, break)))
}