A channel is an object that represents a sequence of values yet to be determined. It is something like a combination of a promise and an iteror.

channel(obj, ...)

# S3 method for `function`
channel(
  obj,
  ...,
  max_queue = 500L,
  max_awaiting = 500L,
  wakeup = function(...) NULL
)

is.channel(x)

Arguments

obj

A user-provided function; it will receive three callback functions as arguments, in order, emit(val), reject(err) and close()

...

Specialized channel methods may take other arguments.

max_queue

The maximum number of outgoing values to store if there are no listeners. Beyond this, calling emit will return an error.

max_awaiting

The maximum number of pending requests. If there are this many outstanding requests, for values, calling nextThen(ch, ...) or nextElem(ch) will raise an error.

wakeup

You may optionally provide a callback function here. It will be called when the queue is empty and there is at least one listener/outstanding promise.

x

an object.

Value

a channel object, supporting methods "nextThen" and "nextOr"

is.channel(x) returns TRUE if its argument is a channel object.

Details

The channel interface is intended to represent and work with asynchronous, live data sources, for instance event logs, non-blocking connections, paginated query results, reactive values, and other processes that yield a sequence of values over time.

channel is an S3 method and will attempt to convert the argument obj into a channel object according to its class.

The friendly way to obtain values from a channel is to use awaitNext or for loops within an async or stream coroutine.

The low-level interface to obtain values from a channel is to call nextThen(ch, onNext=, onError=, onClose=, ...)], providing callback functions for at least onNext(val). Those callbacks will be appended to an internal queue, and will be called as soon as data is available, in the order that requests were received.

You can also treat a channel as an iteror over promises, calling nextOr(pri) to return a promise representing the next available value. Each promise created this way will be resolved in the order that data come in. Note that this way there is no special signal for end of iteration; a promise will reject with a condition message "StopIteration" to signal end of iteration.

Be careful with the iterator-over-promises interface though: if you call as.list.iteror(pr) you may get stuck in an infinite loop, as as.list keeps calling nextElem and receives more promises to represent values that exist only hypothetically. This is one reason for the max_listeners limit.

The friendly way to create a channel with custom behavior is to use a stream coroutine. Inside of stream() call await to wait on promises, awaitNext to wait on other streams and yield to yield values. To signal end of iteration use return() (which will discard its value) and to signal an error use stop().

The low-level interface to create a channel with custom behavior is to call channel(function(emit, reject, cancel) {...}), providing your own function definition; your function will receive those three callback methods as arguments. Then use whatever means to arrange to call emit(val) some time in the future as data comes in. When you are done emitting values, call the close() callback. To report an error call reject(err); the next requestor will receive the error. If there is more than one listener, other queued listeners will get a close signal.

Author

Peter Meilstrup