The async
package allows you to write code that executes
incrementally.
When an ordinary R function is called, control does not return until
that function either produces a value, or finishes with an error. A
async({})
block. will run until it reaches an
await(pr)
and pauses, returning control until the value of
promise pr
is available. So a coroutine’s evaluation can be
temporally interleaved with other operations.
However, this does not require any changes to R’s execution model;
coroutines are implemented in terms of base R, within its
single-threaded execution model. In order to pause and resume, a
coroutine has to explicitly keep track of its internal state. When it
reaches yield
it saves its place, then resumes on the next
nextOr
.
In other words a generator is a kind of state machine sitting behind
the iteror
interface. An async
also operates
as a kind of state machine, one that interacts with
thepromise
interface; its state advances according to its
program and how awaited promises and other calls resolve.
The particular state transitions, and the internal state that is
tracked, are constructed from the coroutine expression. In other words
you could say that the async
package implements a
mini-language for describing state machines.
The illusion that a coroutine works like a parallel stream of R code is due to this mini-language being constructed to function analogously to base R. However, the language is slightly different from base R, so this document tries to collect and explain those differences, and tries to paint a picture of how these objects operate.
The first thing to know about the coroutine mini-language is that a
yield()
or await()
can only appear under the
arguments of “pausable” functions. The async
package has
built in pausable implementations of R’s most commonly used control flow
and error handling functions. The function
async::pausables()
will return a list of all pausable
functions it can see. As of this writing, the list is:
## [1] "(" "{" "&&" "<-" "<<-" "||"
## [7] "await" "awaitNext" "break" "for" "goto" "if"
## [13] "next" "nextOr" "on.exit" "repeat" "return" "switch"
## [19] "try" "tryCatch" "while" "yield" "yieldFrom"
While any R functions can be used in a coroutine expression,
yield
or await
can only appear within the
arguments of the above functions. This restriction applies recursively;
the following is illegal because list
is not pausable.
## Error in trans_call(expr, path): A pause or break appears in an argument to `list`, which is not pausable. Consider using split_pipes=TRUE
The error message refers to option split_pipes
, which
offers a workaround. If split_pipes=TRUE
, an await in the
leftmost argument of a function can be rewritten using a temp variable.
async({await(x) + 5})
will be rewritten using the “base R
pipe” i.e. async({await(x) ->.; . + 5})
Constructing a gen
or async
is somewhat
like calling a function, in that it creates a new environment,
executes a given block of code with respect to that environment, and
supports actions like break
, return
,
stop
, and on,exit
. that apply to a scope.
Coroutine constructors by themselves do not take arguments; to create a
generator with respect to some argument, you simply write a function
that takes and argument and returns a generator:
# given a sequence of sequences, concatenate them like c() into one sequence
chain <- function(sequences) {
force(sequences)
gen({
for (seq in sequences) {
yieldFrom(seq)
}
})
}
Note that it is a good practice in R to force()
the
function arguments, as in any case when a function returns an inner
construct that captures its scope. Because it’s easy to forget to force
arguments, the async
package offers a “generator function”
syntax. So the above can be rewritten as:
All that is going on here is when gen
sees
function
as the head of its expression, it just adds the
force
and moves the call to “gen” inside the function,
reproducing the earlier version..
Coroutines do not have a stack or a notion of procedure calls;
function definition is left to R, and the coroutine parser will ignore
anything written in a function definition. If you write an inner
function in a generator, that inner function will not be able to
yield
.
names <- c("Ana", "Bob", "Charlie")
g <- gen({
greet <- function(name) yield(paste0("Good morning, ", name))
for (i in names) {
greet(name)
}
})
as.list(g)
## Error in yield(paste0("Good morning, ", name)): yield() called outside a generator
However, you can use yieldFrom()
to return any number of
values from another iterator.
names <- c("Ana", "Bob", "Charlie")
greet <- gen(function(name) {
yield(paste0("Good morning, ", name))
})
run(type="", {
for (n in names) {
yieldFrom(greet(n))
}
})
## [1] "Good morning, Ana" "Good morning, Bob" "Good morning, Charlie"
for
loops in a coroutine can run over iterators as well
as plain vectors. The in
argument to a for
loop will be converted to the iteror
interface using
iteror(x)
.
When writing an async
or stream
routine,
for
loops can be used to consume values from
channels/streams, without explicitly calling await()
.
repeat
, while
and for
otherwise work like in base R, including support for next
and break
operators.
You can also write looping code by using switch()
with
goto()
, described below.
if
, ||
&&
Coroutine if
statements work similarly to base R.
Short circuiting logical operators ||
and
&&
will try first their left branch, which must
return a scalar logical. If not satisfied with the left branch, they
will evaluate the right argument and return it directly. That is, in a
coroutine yield(TRUE) && "string"
may return
“string”, while in base R this raises an error.
I write “may” above, because if you write ||
in a
coroutine expression you might be using either base R ||
or
the pausable version of ||
. In general you will only be
using pausable ||
if there is a pause somewhere on either
side of it. So if you write (x == 5) || FALSE
you will be
using base R ||
, since ==
does not pause.
Whereas if you write
(await(success) == 5) || yield("assertion")
you will be
using pausable ||
, because there is an await
on one side of it.
switch
and goto
switch()
statements are more strict than in base R; the
input to a switch
must match a given branch or it
will raise an error. That is, in base R,
switch(-1, "not run")
or
switch("bar", foo="not run")
returns invisible
NULL
, while in a coroutine this is an error.
Additionally, switch
statements support a delimited
goto. Within a given switch
statement,
goto("other_branch")
stops executing the present branch and
jumps to the given sibling branch. Calling goto()
without
arguments re-reads the switch input again. For example, you might try to
read an input file that might be in a few different formats.
file_dataset <- async({
filename <- await( download_file(url) )
switch(getExtension(filename),
"txt"=readTxt(filename),
"csv"=tryCatch(readCSV(filename), error=goto("txt")),
"json"={
if (!await(validateSchema(filename))) goto("txt")
tryCatch(read_json(filename), error=goto("txt"))
},
"zip"= {
unzipped <- unzip_async(filename) |> await()
filename <- unzipped
on.exit(unlink(unzipped))
goto(getExtension(unzipped))
}
)
})
Here, if there is an error in reading a .csv
or
.json
file we re-try ingesting it as a text file; on
encountering a zip
file we unzip it and try again with the
new filename.
Note that if a goto
appears inside of a
try(..., finally={...})
call, which is itself inside a
branch, the finally
clause will be executed before
jumping to the new branch.
All of this sequencing we are discussing through this document is
baked into a sort of graph structure when a generator is constructed.
See if you can follow the logic of the preceding async
through its graph:
graphAsync(file_dataset, type="svg")
Is it a bit spaghetti? Here we have nodes for each discrete step in
the computation. Nodes in reverse type are user-level R calls, there are
a few grouped nodes to handle an await
or
tryCatch
.
Each node in the graph is literally a function and each line
represents where it calls the next function. You can step through these
if you set debugAsync(as, debugInternal=TRUE)
.
Saving state, then, is just remembering the last call. (Dotted lines, here, represent where it saves state rather
Starting and finishing may mean different things for different coroutines.
run
is “eager;” executes an entire coroutine expression
immediately without pausing.gen
does not
compute anything when it is first constructed, it only begins executing
when there is a request for data via nextOr
.async
routines are “eager;” they begin executing
immediately on creation, before the constructor returns. An
async
runs until it gets the first await
and
then pauses. allowing the constructor to return. Upon receiving, the
async will continue running.stream
objects can run in either lazy or eager mode. If
lazy=TRUE
, they will not compute anything until something
calls the channel’s nextThen
method, and will pause after
yield
if there are no more listeners. If
lazy=FALSE
, a stream starts executing when constructed,
will not pause after yield
, and may run ahead and queue up
outgoing values until it reaches an await
.await(p)
in an async
or stream, pauses
execution until the given promise resolves.yield(val)
in a generator pauses execution, returns the
given value as the result of nextOr()
.yield(val)
in a stream, resolves the next listener with
the value given. It may or may not pause depending on whether
lazy=TRUE
and whether there are any more pending
requests/yieldFrom(i)
takes an iterator or a stream as
argument, and yields successive values until it is exhausted.
yieldFrom(iter)
is basically equivalent to
for (i in iter) yield(i)
.A coroutine might finish normally by reaching a call to
return(...)
, or by simply reaching the end of its
expression.
If a coroutine reaches a return
call, all enclosing
tryCatch(finally=)
and on.exit
handlers will
be executed before returning the value.
nextOr
() forces its or
argument. The value
returned by the generator expression is discarded.async
finishes normally , its promise interface
resolves with the return value.A coroutine might finish abnormally if the user-level R code throws an error, or if there is an internal error in the coroutine implementation.
gen
or run
routine finishes
abnormally, all on.exit
clauses are triggered, while the
inciting error is allowed to propagate normally without being
caught.async
routine throws an error, it will be caught
and the error will be forwarded to reject a promise.on.exit
handlers are run in a special phase after main
execution finishes (either normally or abnormally). After
on.exit
handlers have all finished, the coroutine will
continue to propagate its error or its return value.
If you were to do something as strange as return()
or
yield()
from an on.exit handler, any pending error will be
cancelled. (This is also how base R’s on.exit
behaves.)
Errors during execution can come from user-level R code, or,
possibly, from within the coroutine implementation itself. The coroutine
implementation of tryCatch
tries to be able to catch either
type. When a coroutine enters a tryCatch
, it briefly saves
state so that it can wind up an R-level tryCatch
and
continuing underneath it. This way it can catch exceptions coming from
both the coroutine implementation and user-level R calls.
A slight difference from base R tryCatch
is you can
supply a non- function value to error
if you don’t care
about parsing the error message; it will just return that value.
For that matter, you can provide an error value or handler directly
to await
, and this might be more efficient as it doesn’t
have to wind up another tryCatch on the coroutine end of things.
You can also use break
or next
or
goto
in the error
value to take those actions
on error.
A coroutine may have a list of exit handlers. The expr
argument to each on.exit
is pulled out at parse time to
make each handler; so that their effective scope is outside of any
enclosing loops or tryCatch calls.
At run time, calling on.exit
registers the given handler
to be executed later.. When the coroutine finishes, either by normal
return or by error, the coroutine saves the return value and starts
executing each handler that was registered. After the last
on.exit
handler has been executed, the saved value is
returned.
A typical use of on.exit
might be to close a file or
other resource after you are done reading; here’s a simple generator
that reads a file one line at a time:
fileLines <- gen(function(...) {
f <- file(..., open="rt")
on.exit(close(f))
while (length(line <- readLines(f, 1)) > 0) yield(line)
})
length(as.list(fileLines("language.Rmd")))
## [1] 306
It is possible (but rather strange) to call stop()
or
return()
from inside an on.exit
handler. In
that case the new value and disposition overrides the old one.
## Error in eval(val, targetEnv): an error
x
## [1] "this instead"
Here he have an odd situation where an error is printed out to the
console, but the value was returned from run
anyway. (Base
R also does this.)
tryCatch(expr={...}, finally={...})
will execute
expr
, and after expr
finishes, whether it
finishes normally or not, tryCatch
will then execute
finally
.
The finally
clause will also be executed if you exit
from expr
without finishing it, for instance by
break
or next
in a loop, or by
goto
in a switch statement.
Assuming finally
finishes normally,
tryCatch
will either return the saved return value or throw
the saved error from expr
. If the finally
clause throws an error then the original error will be lost.
Writing code in a coroutine’s on.exit
clause or in a
tryCatch(finally=)
call does not give any guarantee that
the cleanup code will ever run, because coroutines only execute “on
demand.” If you make a generator designed to return ten items, then call
nextOr
ten times, the generator will never run its exit
handler. The generator will just be paused waiting to reach end of
iteration; you need to call nextOr
an eleventh
time to allow it to reach the end, run ts exit handlers and signal end
of iteration.
finished <- FALSE
g <- gen({
on.exit(finished <<- TRUE)
for (i in 1:10) yield(i)
})
for (j in 1:10) nextOr(g, break)
finished
## [1] FALSE
nextOr(g, "stop")
## [1] "stop"
finished
## [1] TRUE
Similarly, an async
with an on.exit
or
finally
handler will not exevute those handlers if it is
paused on an await
that never resolves. With those caveats
in mind, coroutines do implement both on.exit
and
finally
constructs and they are useful for sequencing
cleanup actions.
tryCatch
and iteration performance
When a coroutine enters a tryCatch
it has to momentarily
save state so that it can wind up an R-level tryCatch
on
the stack and then resume. If there is a a yield
or
await
in the tryCatch expression, the coroutine has to exit
from its tryCatch and re-establish it when execution resumes.
This means that yielding from inside a tryCatch can be more costly,
performance-wise, than a regular yield. This is part of what motivated
the development of the [iterors package][iterors::iteror-package];
desiring a protocol for iteration that did not involve using exceptions
for normal flow control. Just eliminating tryCatch
caused a
2x speedup for many generators, and the iterors
package
contains many other optimizations besides.
generators
and run
try to operate without
using any tryCatch
handlers. This means that if a generator
encounters a problem, the error will bubble up through R’s normal error
handling mechanism, and it should be relatively easy to use
options(error=recover)
to debug the error.
async
and stream
are required to have a
global tryCatch
because any errors they encounter should be
forwarded to listeners. This however makes it a bit harder to debug
them.
The method debugAsync()
will toggle single step
debugging on a coroutine.
If you call debugAsync(co, R=TRUE)
the coroutine
co
will start a browser before executing each user-level R
expression. You can press “n” to step through each expression.
If you call debugAsync(co, internal=TRUE)
the browser
will be opened at the R implementation level. This will present you with
a twisty little path of tailcalls; Step through each call with
n
but make sure to s
to step into a tailcall
to follow execution.
The async
package imports nseval which can be very
handy at the debugger console. I often use get_function()
,
get_call()
, and caller()
to get a handle on
where exactly the debugger is paused at. arg_env
arg_expr
, is_forced
and friends really help
when dealing with R’s lazy evaluation,