async
package: Generators,
async/await, and asynchronous streams for Rvignettes/README.Rmd
README.Rmd
This is an R package implementing generators, async blocks, and streams (collectively known as “coroutines.”)
debugAsync(obj, TRUE)
to pause before each call at R level.
You can also use debugAsync(obj, internal=TRUE)
to step
through at the coroutine implementation level.switch
supports goto()
to transfer to a
different branch.on.exit()
.gen(function(x, y) ...)
returns a function that constructs
generators.run(...)
will execute a generator expression
immediately and collect the results in a list.stream()
coroutine backed
by a channel
class (asynchronous iterator).graphAsync(gen)
(this
requires you have Graphviz dot
command installed on your
system.)For more details see NEWS.md.
g <- gen({...})
is like a function that knows how to
“pause.” The code in a generator runs until it hits a
yield()
call, then returns that value. The next time you
call the generator it picks up where it left off and runs until the next
yield
.
From the outside a generator implements the iteror
interface. You extract each yielded value with
nextOr(g, or)
, and you can use generators anywhere you can
use an iteror. The iteror
class is cross compatible with
the iterators
package.
Consider a sequence of numbers x[i]
, starting with an
arbitrary x[1]
, where each subsequent element is produced
by applying the rule:
x[i]
is even, then the next value will be
x[i+1] = x[i]/2
.x[i]
is odd, the next value will be
x[i+1] = 3*x[i]+1
.An infinite sequence of numbers will continue form each staring point
x[1]
, but it is conjectured
that all sequences will eventually reach the loop 1, 4, 2, 1, 4, 2, ….
The following generator produces the Collatz sequence, starting from
x
, and terminating when (or if?) the sequence reaches
1.
library(async)
collatz <- gen(function(x) {
yield(x)
while (x > 1) {
if (x %% 2 == 0)
yield(x <- x / 2L)
else yield(x <- 3L * x + 1)
}
})
The call to gen
produces a generator. You can get values
one at a time with nextOr()
.
ctz <- collatz(12)
nextOr(ctz, NA)
## [1] 12
nextOr(ctz, NA)
## [1] 6
nextOr(ctz, NA)
## [1] 3
nextOr(ctz, NA)
## [1] 10
nextOr(ctz, NA)
## [1] 5
You can also use any other method that applies to an iterator, like
as.list
.
collatz(27L) |> as.list() |> as.numeric()
## [1] 27 82 41 124 62 31 94 47 142 71 214 107 322 161 484
## [16] 242 121 364 182 91 274 137 412 206 103 310 155 466 233 700
## [31] 350 175 526 263 790 395 1186 593 1780 890 445 1336 668 334 167
## [46] 502 251 754 377 1132 566 283 850 425 1276 638 319 958 479 1438
## [61] 719 2158 1079 3238 1619 4858 2429 7288 3644 1822 911 2734 1367 4102 2051
## [76] 6154 3077 9232 4616 2308 1154 577 1732 866 433 1300 650 325 976 488
## [91] 244 122 61 184 92 46 23 70 35 106 53 160 80 40 20
## [106] 10 5 16 8 4 2 1
#Try collatz(63728127L) |> as.list() |> as.numeric()...
For more examples, see the “Clapping Music” vignette.
Like gen
, async({...})
takes a block of
sequential code, which runs until it reaches a call to
await(p)
. The argument p
should be a promise,
(as defined by the promises package, which represents an unfinished
external computation.) In turn, async()
constructs and
returns a promise.
An async
block runs until it reaches a call to
await(p)
and pauses. When the promise p
resolves, the async
block continues. If p
rejects, that is evaluated like an error; you can use
await(p)
into a tryCatch
to handle rejections.
When the async
block finishes, or throws an error, its
promise resolves or rejects.
async
doesn’t handle running concurrent tasks by itself;
it builds on existing packages like future
and
later
. The later
package lets you assign tasks
to be done in the event loop, when R is idle.
Ring a bell 5 times at 10 second intervals (subject to R being idle):
async({
for (i in 1:5) {
await(delay(10)) #delay() uses later::later()
cat("Beep", i, "\n")
beepr::beep(2)
}
})
async()
can be used in Shiny apps! For an example, here
is a version of the “Cranwhales” demo
app using async/await..
async()
allows you to naturally keep track of more than
one concurrent process. The web spider
vignette shows how this can improve the speed of web scraping using
concurrent connections.
async
can also work with future
objects to
run computations in parallel. Download, parse, and summarize a dataset
in background processes like this:
library(future)
library(dplyr)
plan(multiprocess(workers=2))
url <- "http://analytics.globalsuperhypermegamart.com/2020/March.csv.gz"
dest <- "March.csv.gz"
dataset <- async({
if(!file.exists(dest)) {
await(future({
cat("Downloading\n")
download.file(url, dest)
}))
}
data <- await(future({
cat("Parsing\n")
read.csv(dest) |>
mutate(time = hms::trunc_hms(time, 60*60)) |>
group_by(time) |>
summarize(sales=sum(amount))
}))
})
# When the data is ready, plot it (in the main process:)
async({
await(dataset) |>
ggplot(aes(time, n)) +
xlab("Time") +
ylab("Sales")
})
New in version 0.3 are asynchronous streams and channels. A channel
is an interface for asynchronous iteration; stream()
lets
you do things with channels by writing code with await
and
yield
. Here is an example of channels being used to “walk
and chew gum concurrently:”
walk <- stream({
for (i in 1:10)
for (step in c("left", "right")) {
yield(step)
await(delay(0.5))
}
})
chewGum <- stream(for (i in 1:12) {
yield("chew")
await(delay(0.8))
})
printEach <- async(function(st) {
for (each in st) {cat(each, ", ", sep="")}
cat("\n")
})
all <- combine(walk, chewGum) |> printEach()
## left, chew, right, chew, left, right, chew, left, chew, right, left, chew, right, chew, left, right, chew, left, right, chew, left, chew, right, left, chew, right, chew, left, right, chew, left, right,
A longer article will be forthcoming, but the basic gist is the
async
package transforms your given program into a state
machine.
A coroutine expression is first scanned for uses of
await
, yield
, for
,
break
and other control flow calls. Those calls are swapped
out for implementations local to the async
package. Other R
calls are wrapped in functions; all these functions are linked together
in so that each function calls the next in sequence. The result is a
graph of functions calling each other, each call corresponding to a step
in the program.
As of async
version 0.3 you can extract and visualize
this graph with graphAsync(g)
. (You will need Graphviz
dot
installed to render these graphs.
ctz <- collatz(23)
graphAsync(ctz, type="svg") #creates a file "ctz.svg"
Since each step in the program’s execution corresponds to a function
call, when execution reaches a yield
, the program’s state
is just the “next function” that would have been called (that is, a continuation.) To
pause and resume execution, a generator saves that “next function” until
the next time nextOr()
is called.
You can also enable single-stepping at the graph level by calling:
debugAsync(ctz, internal=TRUE)