(Experimental as of async 0.3) stream(...) constructs a channel
object, i.e. an asynchronous iterator, which will compute and
return values according to sequential code written in expr. A
stream is a coroutine wearing a channel interface in the same
way that async is a coroutine wearing a promise interface, and a
gen is a coroutine sitting behind an iteror interface.
A coroutine expression, using some combination of
yield, await, awaitNext, yieldFrom, standard control flow
operators and other calls.
Undocumented.
See description under async; defaults to
TRUE.
If TRUE, start paused, and pause after yield() (see above.)
Compilation level.
Set TRUE to single-step debug at R level. Use debugAsync()
to enable or disable debugging on a stream after it has been created.
Set TRUE to single-step debug at coroutine implementation level.
An optional tracing function.
An object with (at least) classes "stream", "channel", "coroutine", "iteror", "iter".
In a stream expression, you can call yield() to emit a value, and
await() to wait for a value from a promise. To have your stream
wait for values from another stream or channel, call
awaitNext(); you can also use awaitNext when you are writing an
async. You can also use a simple for loop to consume all future
values from a stream or channel.
The lower-level interface to consume values from a stream is by using nextThen from the channel interface.
Streams come in both "lazy" and "eager" varieties. If lazy=TRUE,
a stream starts idle, and does not process anything
until it is woken up by a call to its channel's nextThen. It will
pause after reaching yield if there are no more outstanding
requests. If lazy=FALSE, a stream will begin executing
immediately, not pausing on yield, possibly queuing up emitted
values until it needs to await something.
(For comparison, in this package, gen are lazy in that they do
not start executing until a call to nextOr and pause
immediately after yield, while async blocks are eager,
starting at construction and running until they hit an await.)
Like its coroutine counterparts, if stream is given a function
expression, like stream(function(...) ...), it will return a
"stream function" i.e. a function that constructs a stream object.
# emit values _no more than_ once per second
count_to <- stream(function(n, interval=1) {
for (i in 1:n) {
await(delay(interval))
yield(i)
}
})
accumulate <- stream(function(st, sum=0) {
for (i in st) {sum <- sum + i; yield(sum)}
})
print_each <- async(function(st) for (i in st) print(i))
count_to(10) |> accumulate() |> print_each()
#> async(for (i in st) print(i))
#> <environment: 0x55b5d8a121b0>
#> <async [pending at `.for__received`]>
#> <Promise [pending]>