(Experimental as of async 0.3) stream(...)
constructs a channel
object, i.e. an asynchronous iterator, which will compute and
return values according to sequential code written in expr
. A
stream
is a coroutine wearing a channel interface in the same
way that async
is a coroutine wearing a promise interface, and a
gen is a coroutine sitting behind an iteror interface.
A coroutine expression, using some combination of
yield
, await
, awaitNext
, yieldFrom
, standard control flow
operators and other calls.
Undocumented.
See description under async; defaults to
TRUE
.
If TRUE, start paused, and pause after yield()
(see above.)
Compilation level.
Set TRUE to single-step debug at R level. Use debugAsync()
to enable or disable debugging on a stream after it has been created.
Set TRUE to single-step debug at coroutine implementation level.
An optional tracing function.
An object with (at least) classes "stream", "channel", "coroutine", "iteror", "iter".
In a stream expression, you can call yield()
to emit a value, and
await()
to wait for a value from a promise. To have your stream
wait for values from another stream or channel, call
awaitNext()
; you can also use awaitNext
when you are writing an
async
. You can also use a simple for
loop to consume all future
values from a stream or channel.
The lower-level interface to consume values from a stream is by using nextThen from the channel interface.
Streams come in both "lazy" and "eager" varieties. If lazy=TRUE
,
a stream starts idle, and does not process anything
until it is woken up by a call to its channel's nextThen
. It will
pause after reaching yield
if there are no more outstanding
requests. If lazy=FALSE
, a stream will begin executing
immediately, not pausing on yield
, possibly queuing up emitted
values until it needs to await
something.
(For comparison, in this package, gen are lazy in that they do
not start executing until a call to nextOr
and pause
immediately after yield
, while async blocks are eager,
starting at construction and running until they hit an await
.)
Like its coroutine counterparts, if stream
is given a function
expression, like stream(function(...) ...)
, it will return a
"stream function" i.e. a function that constructs a stream object.
# emit values _no more than_ once per second
count_to <- stream(function(n, interval=1) {
for (i in 1:n) {
await(delay(interval))
yield(i)
}
})
accumulate <- stream(function(st, sum=0) {
for (i in st) {sum <- sum + i; yield(sum)}
})
print_each <- async(function(st) for (i in st) print(i))
count_to(10) |> accumulate() |> print_each()
#> async(for (i in st) print(i))
#> <environment: 0x55b5d8a121b0>
#> <async [pending at `.for__received`]>
#> <Promise [pending]>