gather takes a channel as argument and returns a promise. All values emitted by the channel will be collected into a vector matching the prototype mode. After the source channel closes, the promise will resolve with the collected vector.

Method as.promise.channel is a synonym for gather.

collect and collector are used in the implementation of the above functions. collect calls the function fn in its argument, supplying a callback of the form function (val, name=NULL). I like to call it emit. While fn is running, it can call emit(x) any number of times. After fn returns, all the values passed to emit are returned in a vector, with optional names.

collector() works similarly to collect() but does not gather values when your inner function returns. Instead, it provides your inner function with two callbacks, one to add a value and the second to extract the value; so you can use that callback to extract values at a later time. For an example of collector usage see the definition of gather.

gather(ch, type = list())

# S3 method for channel
as.promise(x)

collect(fn, type = list())

collector(fn, type = list())

Arguments

ch

a channel object.

type

A prototype output vector (similar to the FUN.VALUE argument of vapply) Defaults to list().

x

a channel.

fn

A function, which should accept a single argument, here called emit.

Value

gather(ch, list()) returns a [promise] that eventually resolves with a list. If the channel emits an error, the promise will reject with that error. The partial results will be attached to the error's attr(err, "partialResults").

collect returns a vector of the same mode as type.

Author

Peter Meilstrup

Examples


ch <- stream(for (i in 1:10) {await(delay(0.1)); if (i %% 3 == 0) yield(i)})
if (FALSE)  ch |> gather(numeric(0)) |> then(\(x)cat(x, "\n")) 

#cumulative sum with collect
cumsum <- function(vec) {
  total <- 0
  collect(type=0, function(emit) {
    for (i in vec) total <- emit(total+i)
  })
}

# `as.list.iteror` is implemented simply with `collect`:
as.list.iteror <- function(it) {
  collect(\(yield) repeat yield(nextOr(it, break)))
}