EsStream
Description
A source of asynchronous data events.
An <EsStream> provides a way to receive a sequence of events. Each event is either a data event, also called an element of the stream, or an error event, which is a notification that something has failed. When a stream has emitted all its event, a single "done" event will notify the listener that the end has been reached.
You #listen: on a stream to make it start generating events, and to set up listeners that receive the events. When you listen, you receive an <EsStreamSubscription> object which is the active object providing the events, and which can be used to stop listening again, or to temporarily pause events from the subscription.
There are two kinds of streams: "Single-subscription" streams and"broadcast" streams.
Single-Subscription Streams
A single-subscription stream allows only a single listener during the whole lifetime of the stream. It doesn't start generating events until it has a listener, and it stops sending events when the listener is unsubscribed, even if the source of events could still provide more.
Listening twice on a single-subscription stream is not allowed, even after the first subscription has been canceled.
Single-subscription streams are generally used for streaming chunks of larger contiguous data like file I/O.
Broadcast Streams
A broadcast stream allows any number of listeners, and it fires its events when they are ready, whether there are listeners or not.
Broadcast streams are used for independent events/observers.
If several listeners want to listen to a single-subscription stream, use #asBroadcastStream:onCancel: to create a broadcast stream on top of the non-broadcast stream.
On either kind of stream, stream transformations, such as #select: and #skip:, return a new stream with the same type of events as the one the method was called on, unless otherwise noted.
When an event is fired, the listener(s) at that time will receive the event. If a listener is added to a broadcast stream while an event is being fired, that listener will not receive the event currently being fired. If a listener is canceled, it immediately stops receiving events. Listening on a broadcast stream can be treated as listening on a new stream containing only the events that have not yet been emitted when the #listen: call occurs.
For example, the #first method listens to the stream, then returns the first event that listener receives. This is not necessarily the first even emitted by the stream, but the first of the remaining events of the broadcast stream.
When the "done" event is fired, subscribers are unsubscribed before receiving the event. After the event has been sent, the stream has no subscribers. Adding new subscribers to a broadcast stream after this point is allowed, but they will just receive a new "done" event as soon as possible.
Stream subscriptions always respect "pause" requests. If necessary they need to buffer their input, but often, and preferably, they can simply request their input to pause too.
The default implementation of #isBroadcast returns false. A broadcast stream inheriting from <EsStream> must override #isBroadcast to return true if it wants to signal that it behaves like a broadcast stream.
Class Methods
empty
Creates an empty broadcast stream.
This is a stream which does nothing except sending a done event
when it's listened to
Answers:
<EsEmptyStream>
error:
Creates a stream which emits a single error event before completing.
This stream emits a single error event of @error and default stack trace
and then completes with a done event.
Arguments:
anError - <Object>
Answers:
<EsStream>
error:stackTrace:
Creates a stream which emits a single error event before completing.
This stream emits a single error event of @error and @stackTrace
and then completes with a done event.
Arguments:
anError - <Object>
aStackTrace - <EsAsyncStackTrace> or nil
Answers:
<EsStream>
fromCollection:
Creates a single-subscription stream that gets its data from @aCollection.
@aCollection is iterated when the stream receives a listener, and stops
iterating if the listener cancels the subscription, or if the
EsCollectionIterator>>moveNext method answers `false` or raises.
Iteration is suspended while the stream subscription is paused.
If calling EsCollectionIterator>>moveNext on Collection>>iterator raises,
the stream emits that error and then it closes.
If reading EsCollectionIterator>>current on `Collection>>iterator` raises,
the stream emits that error, but keeps iterating.
Arguments:
aCollection - <Collection>
Answers:
<EsStream>
fromFuture:
Creates a new single-subscription stream from the future.
When the future completes, the stream will fire one event, either
data or error, and then close with a done-event.
Arguments:
future - <EsFuture>
Answers:
<EsStream>
fromFutures:
Create a single-subscription stream from a group of futures.
The stream reports the results of the futures on the stream in the order
in which the futures complete.
Each future provides either a data event or an error event,
depending on how the future completes.
If some futures have already completed when `EsStream fromFutures:` is called,
their results will be emitted in some unspecified order.
When all futures have completed, the stream is closed.
If @futures is empty, the stream closes as soon as possible.
Arguments:
futures - <Collection>
Answers:
<EsStream>
fromIterable:
Creates a single-subscription stream that gets its data from @iterable.
See implementors of #iterator for the current iterables.
An `iterable` is only required to implement a method named #iterator
that answers an <Object> that understands (see <EsCollectionIterator> for example)
#moveNext -> <Boolean>
#current -> <Object> or nil
The iterable is iterated when the stream receives a listener, and stops
iterating if the listener cancels the subscription, or if the
EsCollectionIterator>>moveNext method answers `false` or raises.
Iteration is suspended while the stream subscription is paused.
If calling EsCollectionIterator>>moveNext on Collection>>iterator raises,
the stream emits that error and then it closes.
If reading EsCollectionIterator>>current on `Collection>>iterator` raises,
the stream emits that error, but keeps iterating.
Arguments:
iterable - An object that implements the method #iterator that answers an @Iterator
As an example, see Collection>>iterator
Answers:
<EsStream>
fromStream:transformed:
Creates a stream where all events of an existing stream are piped through
a sink-transformation.
@aTransformBlock is invoked when the answered stream is
listened to. All events from the @source are added into the event sink
that is answered from the invocation. The transformation puts all
transformed events into the sink the @aTransformBlock received during
its evaluation. Conceptually @aTransformBlock creates a transformation pipe
with the input sink being the answered <EsEventSink> and the output sink
being the sink it received.
This factory method is frequently used to build transformers.
The resulting stream is a broadcast stream if [source] is.
Arguments:
stream - <EsStream> bound stream
aTransformBlock - <Block> 1-arg block that accepts an <EsEventSink> and answers an <EsEventSink>
[:sinkIn | <answer sinkOut]
Answers:
<EsStream>
multi:
Creates a multi-subscription stream.
Each time the created stream is listened to,
the @onListen block is evaluated with a new <EsMultiStreamController>
which forwards events to the <EsStreamSubscription>
answered by that #listen* call.
This allows each listener to be treated as an individual stream.
The <EsMultiStreamController> does not support reading its
EsStreamController>>stream. Setting its EsStreamController>>onListen
has no effect since the @onListen handler is evaluated instead,
and the EsStreamController>>onListen won't be evaluated later.
The controller acts like an asynchronous controller,
but provides extra methods for delivering events synchronously.
A multi-subscription stream can behave like any other stream.
If the @onListen block raises on every call after the first,
the stream behaves like a single-subscription stream.
If the stream emits the same events to all current listeners,
it behaves like a broadcast stream.
This will NOT answer a broadcast stream
Arguments:
onListen - <Block> 1-arg block that accepts an <EsMultiStreamController>
Answers:
<EsStream>
multi:isBroadcast:
Creates a multi-subscription stream.
Each time the created stream is listened to,
the @onListen block is evaluated with a new <EsMultiStreamController>
which forwards events to the <EsStreamSubscription>
answered by that #listen* call.
This allows each listener to be treated as an individual stream.
The <EsMultiStreamController> does not support reading its
EsStreamController>>stream. Setting its EsStreamController>>onListen
has no effect since the @onListen handler is evaluated instead,
and the EsStreamController>>onListen won't be evaluated later.
The controller acts like an asynchronous controller,
but provides extra methods for delivering events synchronously.
If @isBroadcast is set to `true`, the answered stream's
EsStream>>isBroadcast will be `true`.
This has no effect on the stream behavior,
it is up to the @onListen block
to act like a broadcast stream if it claims to be one.
A multi-subscription stream can behave like any other stream.
If the @onListen block raises on every call after the first,
the stream behaves like a single-subscription stream.
If the stream emits the same events to all current listeners,
it behaves like a broadcast stream.
Arguments:
onListen - <Block> 1-arg block that accepts an <EsMultiStreamController>
isBroadcast - <Boolean>
Answers:
<EsStream>
periodic:
Creates a stream that repeatedly emits events at @period intervals.
The event values will be `nil`.
The @period must a non-negative Duration of time.
Arguments:
period - <Duration> duration object
<Integer> duration in milliseconds
Answers:
<EsStream>
periodic:do:
Creates a stream that repeatedly emits events at @period intervals.
The event values are computed by evaluating @aComputationBlock.
The argument to this block is an integer that starts with 0 and is incremented for
every event.
The @period must a non-negative Duration of time.
If @aComputationBlock is omitted the event values will all be `nil`
Arguments:
period - <Duration> duration object
<Integer> duration in milliseconds
aComputationBlock - <Block>
Answers:
<EsStream>
value:
Creates a stream which emits a single data event before closing.
This stream emits a single data event of @value
and then closes with a done event.
Arguments:
value - <Object>
Answers:
<EsStream>
Instance Methods
allSatisfy:
Checks whether @aSatisfyBlock evaluates to `true` for all elements provided by this stream.
Evaluates @aSatisfyBlock on each element of this stream.
If the block answers `false`, the answered future is completed with `false`
and processing stops.
If this stream ends without finding an element that @aSatisfyBlock rejects,
the answered future is completed with `true`.
If this stream emits an error, or if the evaluation of @aSatisfyBlock raises an error,
the answered future is completed with that error,
and processing stops.
Arguments:
aSatisfyBlock - <Block> 1-arg block
Answers:
<EsFuture> completes with <Boolean>
anySatisfy:
Checks whether @aSatisfyBlock evaluates to `true` for any element provided by this stream.
Evaluates @aSatisfyBlock on each element of this stream.
If the block answers `true`, the answered future is completed with `true`
and processing stops.
If this stream ends without finding an element that @aSatisfyBlock accepts,
the answered future is completed with `false`.
If this stream emits an error, or if the evaluation of @aSatisfyBlock signals an error,
the answered future is completed with that error,
and processing stops.
Arguments:
aSatisfyBlock - <Block> 1-arg block
Answers:
<EsFuture> completes with <Boolean>
asBroadcastStream
Returns a multi-subscription stream that produces the same events as this.
The answered stream will subscribe to this stream when its first
subscriber is added, and will stay subscribed until this stream ends,
or a handler cancels the subscription.
Use the handler, for example, for pausing the underlying subscription
while having no subscribers to prevent losing events, or canceling the
subscription when there are no listeners.
Answers:
<EsStream> broadcast stream
asBroadcastStream:onCancel:
Returns a multi-subscription stream that produces the same events as this.
The answered stream will subscribe to this stream when its first
subscriber is added, and will stay subscribed until this stream ends,
or a handler cancels the subscription.
If @onListen is provided, it is evalutated with a subscription-like object
that represents the underlying subscription to this stream. It is
possible to pause, resume or cancel the subscription during the evaluation
of @onListen. It is not possible to change the event handlers, including
using EsStreamSubscription>>asFuture:
If @onCancel is provided, it is evaluated in a similar way to @onListen
when the answered stream stops having listener. If it later gets
a new listener, the @onListen block is evaluated again.
Use the handler, for example, for pausing the underlying subscription
while having no subscribers to prevent losing events, or canceling the
subscription when there are no listeners.
Arguments:
onListen - <Block> 1-arg block that accepts an <EsStreamSubscription>.
Could be nil
onCancel - <Block> 1-arg block that accepts an <EsStreamSubscription>.
Could be nil
Answers:
<EsStream> broadcast stream
asOrderedCollection
Collects all elements of this stream into an <OrderedCollection>
Creates an ordered collection and adds all elements of this stream to it
in the order they arrive.
When this stream ends, the answered future is completed with that ordered
collection.
If this stream emits an error,
the answered future is completed with that error,
and processing stops.
Answers:
<EsFuture> completes with <OrderedCollection>
asSet
Collects all elements of this stream into an <Set>
Creates a set and adds all elements of this stream to it
in the order they arrive.
When this stream ends, the answered future is completed with that set.
If this stream emits an error,
the answered future is completed with that error,
and processing stops.
Answers:
<EsFuture> completes with <Set>
asyncCollect:
Transforms each element into a sequence of asynchronous events.
Returns a new stream and for each event of this stream, do the following:
* If the event is an error event or a done event, it is emitted directly
by the answered stream.
* Otherwise it is an element. Then the @aCollectBlock is called
with the element as argument to produce a convert-stream for the element.
* If that eval raise an exception, the error is emitted on the answered stream.
* If the eval answers `nil`, no further action is taken for the elements.
* Otherwise, this stream is paused and convert-stream is listened to.
Every data and error event of the convert-stream is emitted on the answered
stream in the order it is produced.
When the convert-stream ends, this stream is resumed.
The answered stream is a broadcast stream if this stream is.
Arguments:
aCollectBlock - <Block> 1-arg block
Answers:
<EsStream>
asyncExpand:
Transforms each element into a sequence of asynchronous events.
Returns a new stream and for each event of this stream, do the following:
* If the event is an error event or a done event, it is emitted directly
by the answered stream.
* Otherwise it is an element. Then the @aConvertBlock is evaluated
with the element as argument to produce a convert-stream for the element.
* If that eval raise an exception, the error is emitted on the answered stream.
* If the block answers `nil`, no further action is taken for the elements.
* Otherwise, this stream is paused and convert-stream is listened to.
Every data and error event of the convert-stream is emitted on the answered
stream in the order it is produced.
When the convert-stream ends, this stream is resumed.
The answered stream is a broadcast stream if this stream is.
Arguments:
aConvertBlock - <Block> 1-arg block
Answers:
<EsStream>
at:
Answers the value of the @index data event of this stream.
Stops listening to this stream after the (@index)th data event has been
received.
Internally the method cancels its subscription after these elements. This
means that single-subscription (non-broadcast) streams are closed and
cannot be reused after a call to this method.
If an error event occurs before the value is found, the future completes
with this error.
If a done event occurs before the value is found, the future completes
with an Exception.
Arguments:
index - <Integer>
Answers:
<EsFuture>
collect:
Transforms each element of this stream into a new stream event.
Creates a new stream that converts each element of this stream
to a new value using th 1-arg @aCollectBlock, and emits the result.
For each data event, `o`, in this stream, the answered stream
provides a data event with the value `aCollectBlock value: o`.
If @aCollectBlock raises an exception, the answered stream reports it as an error
event instead.
Error and done events are passed through unchanged to the answered stream.
The answered stream is a broadcast stream if this stream is.
@aCollectBlock is evaluated once per data event per listener.
If a broadcast stream is listened to more than once, each subscription
will individually evaluate @aCollectBlock on each data event.
Unlike #transform:, this method does not treat the stream as
chunks of a single value. Instead each event is converted independently
of the previous and following events, which may not always be correct.
For example, UTF-8 encoding, or decoding, will give wrong results
if a surrogate pair, or a multibyte UTF-8 encoding, is split into
separate events, and those events are attempted encoded or decoded
independently.
Arguments:
aCollectBlock - <Block> 1-arg block
Answers:
<EsStream>
detect:
Finds the first element of this stream where @aDetectBlock evaluates to true.
Returns a future that is completed with the first element of this stream
that @aDetectBlock answers `true` for.
If this stream emits an error before the first matching element,
the answered future is completed with that error, and processing stops.
Stops listening to this stream after the first matching element or error
has been received.
Internally the method cancels its subscription after the first element that
matches the predicate. This means that single-subscription (non-broadcast)
streams are closed and cannot be reused after a call to this method.
If an error occurs, or if this stream ends without finding a match,
the answered future is completed with an error.
Arguments:
aDetectBlock - <Block> 1-arg block
Answers:
<EsFuture>
detect:ifNone:
Finds the first element of this stream where @aDetectBlock evaluates to true.
Returns a future that is completed with the first element of this stream
that @aDetectBlock answers `true` for.
If no such element is found before this stream is done, and a
@aNoneBlock block is provides (not nil), the result of calling evaluating @aNoneBlock
becomes the value of the future. If @aNoneBlock raise an exception, the answered
future is completed with that error.
If this stream emits an error before the first matching element,
the answered future is completed with that error, and processing stops.
Stops listening to this stream after the first matching element or error
has been received.
Internally the method cancels its subscription after the first element that
matches the predicate. This means that single-subscription (non-broadcast)
streams are closed and cannot be reused after a call to this method.
If an error occurs, or if this stream ends without finding a match and
with no @aNoneBlock provided,
the answered future is completed with an error.
Arguments:
aDetectBlock - <Block> 1-arg block
aNoneBlock - <Block> 0-arg block
Could be nil
Answers:
<EsFuture>
detectLast:
Finds the last element in this stream matching @aDetectBlock.
If this stream emits an error, the answered future is completed with that
error, and processing stops.
Otherwise as #detect:, except that the last matching element is found
instead of the first.
That means that a non-error result cannot be provided before this stream
is done.
Arguments:
aDetectBlock - <Block> 1-arg block
Answers:
<EsFuture>
detectLast:ifNone:
Finds the last element in this stream matching @aDetectBlock.
If this stream emits an error, the answered future is completed with that
error, and processing stops.
Otherwise as #detect:ifNone:, except that the last matching element is found
instead of the first.
That means that a non-error result cannot be provided before this stream
is done.
Arguments:
aDetectBlock - <Block> 1-arg block
aNoneBlock - <Block> 0-arg block
Could be nil
Answers:
<EsFuture>
distinct
Skips data events if they are equal to the previous data event.
The answered stream provides the same events as this stream, except
that it never provides two consecutive data events that are equal.
That is, errors are passed through to the answered stream, and
data events are passed through if they are distinct from the most
recently emitted data event.
Equality is determined by the '=' method on the last provided data element is used.
If equality raises an error, the data event is replaced by an error event
containing the raised error. The behavior is equivalent to the
original stream emitting the error event, and it doesn't change
the what the most recently emitted data event is.
The answered stream is a broadcast stream if this stream is.
If a broadcast stream is listened to more than once, each subscription
will individually perform the `equals` test.
Answers:
<EsStream>
distinct:
Skips data events if they are equal to the previous data event.
The answered stream provides the same events as this stream, except
that it never provides two consecutive data events that are equal.
That is, errors are passed through to the answered stream, and
data events are passed through if they are distinct from the most
recently emitted data event.
Equality is determined by the evaluation of the provided @anEqualityBlock. If that is
omitted, the '=' method on the last provided data element is used.
If equality raises an error, the data event is replaced by an error event
containing the raised error. The behavior is equivalent to the
original stream emitting the error event, and it doesn't change
the what the most recently emitted data event is.
The answered stream is a broadcast stream if this stream is.
If a broadcast stream is listened to more than once, each subscription
will individually perform the `equals` test.
Arguments:
anEqualityBlock - <Block> 1-arg block
Answers:
<EsStream>
do:
Evaluates @anActionBlock on each element of this stream.
Completes the answerd <EsFuture> when all elements of this stream
have been processed.
If this stream emits an error, or if the call to @anActionBlock raises,
the answered future completes with that error, and processing stops.
Arguments:
anActionBlock - <Block> 1-arg block
Answers:
<EsFuture>
drain
Discards all data on this stream, but signals when it is done or an error
occurred.
When subscribing using this method, cancelOnError will be true. This means
that the future will complete with the first error on this stream and then
cancel the subscription.
If this stream emits an error, the answered future is completed with
that error, and processing is stopped.
In case of a `done` event the future completes with nil
Answers:
<EsFuture>
drain:
Discards all data on this stream, but signals when it is done or an error
occurred.
When subscribing using this method, cancelOnError will be true. This means
that the future will complete with the first error on this stream and then
cancel the subscription.
If this stream emits an error, the answered future is completed with
that error, and processing is stopped.
In case of a `done` event the future completes with the given
@futureValue.
Arguments:
futureValue - <Object>
Could be nil
Answers:
<EsFuture>
expand:
Transforms each element of this stream into a sequence of elements.
Returns a new stream where each element of this stream is replaced
by zero or more data events.
The event values are provided as a <Collection> by an evaluation of @aConvertBlock
with the element as argument, and the elements of the colection are
in iteration order.
If calling @aConvertBlock raises, or if the iteration of the answered values
raises, the error is emitted on the answered stream and iteration ends
for that element of this stream.
Error events and the done event of this stream are forwarded directly
to the answered stream.
The answered stream is a broadcast stream if this stream is.
If a broadcast stream is listened to more than once, each subscription
will individually evaluate @aConvertBlock and expand the events.
Arguments:
aConvertBlock - <Block> 1-arg block
Answers:
<EsStream>
first
The first element of this stream.
Stops listening to this stream after the first element has been received.
Internally the method cancels its subscription after the first element.
This means that single-subscription (non-broadcast) streams are closed
and cannot be reused after a call to this getter.
If an error event occurs before the first data event, the answered future
is completed with that error.
If this stream is empty (a done event occurs before the first data event),
the answered future completes with an error.
Except for the type of the error, this method is equivalent to
`self at: 1`.
Answers:
<EsFuture>
handleError:if:
Creates a wrapper Stream that intercepts some errors from this stream.
If this stream sends an error that matches @shouldHandleError, then it is intercepted
by the @onError block handler.
An asynchronous error `error` is matched by a test function if
`@shouldHandleError value: error` answers true. If @shouldHandleError is omitted, every error is considered
matching.
If the error is intercepted, the @onError block can decide what to do
with it. It can raise if it wants to raise a new (or the same) error,
or simply return to make this stream forget the error.
If the received `error` value is raised again by the @onError block,
it acts like a `resignal` and it is emitted along with its original
stack trace, not the stack trace of the signal inside @onError
If you need to transform an error into a data event, use the more generic
EsStream>>transform: to handle the event by writing a data event to
the output sink.
The answered stream is a broadcast stream if this stream is.
If a broadcast stream is listened to more than once, each subscription
will individually perform the @shouldHandleError test and handle the error.
Arguments:
onError - <Block> 0, 1 or 2-arg culled block with error<Object>, stackTrace<EsAsyncStackTrace> args
shouldHandleError - <Block> 1-arg block that answers <Boolean>
Answers:
<EsStream>
includes:
Answers whether @anObject occurs in the elements provided by this stream.
Compares each element of this stream to @anObject using `=`
If an equal element is found, the answered future is completed with `true`.
If this stream ends without finding a match, the future is completed with
`false`.
If this stream emits an error, or the call to `=` raises an error,
the answered future is completed with that error,
and processing stops.
Arguments:
anObject - <Object>
Answers:
<EsFuture> completes with <Boolean>
inject:into:
Combines a sequence of values by repeatedly applying @anInjectBlock.
This method maintains a value, starting with @initialValue and updated for
each element of this stream.
For each element, the value is updated to the result of evaluating
@anInjectBlock with the previous value and the element.
When this stream is done, the answered future is completed with
the value at that time.
For an empty stream, the future is completed with @initialValue
If this stream emits an error, or the call to @anInjectBlock raises,
the answered future is completed with that error,
and processing is stopped.
Arguments:
initialValue - <Object>
anInjectBlock - <Block> 2-arg block with previous<Object>, each element<Object>
Answers:
<EsFuture>
isBroadcast
Answer whether is stream is a broadcast stream
Answers:
<Boolean>
isEmpty
Whether this stream contains any elements.
Waits for the first element of this stream, then completes the answered
future with `false`.
If this stream ends without emitting any elements, the answered future is
completed with `true`.
If the first event is an error, the answered future is completed with that
error.
This operation listens to this stream, and a non-broadcast stream cannot
be reused after checking whether it is empty.
Answers:
<EsFuture> completes with <Boolean>
iterator
Answers an object that can iterate the values of this stream
Answers:
<EsStreamIterator>
joinUsing:
Combines the string representation of elements into a single string.
Each element is converted to a string using its Object>>asString method.
If @aSeparator is provided, it is inserted between element string
representations.
The answered future is completed with the combined string when this stream
is done.
If this stream emits an error, or the call to Object>>asString raise an exception,
the answered future is completed with that error,
and processing stops.
Arguments:
aSeparator - <EsString>
Answers:
<EsFuture> completes with <EsString>
last
The last element of this stream.
If this stream emits an error event,
the answered future is completed with that error
and processing stops.
If this stream is empty (the done event is the first event),
the answered future completes with an error.
Answers:
<EsFuture>
listen:
Adds a subscription to this stream.
See #listen:onError:onDone:cancelOnError: for a detailed description
Arguments:
onData - <Block> 1-arg or nil
Answers:
<EsStreamSubscription>
listen:onDone:
Adds a subscription to this stream.
See #listen:onError:onDone:cancelOnError: for a detailed description
Arguments:
onData - <Block> 1-arg or nil
onDone - <Block> 0-arg or nil
Answers:
<EsStreamSubscription>
listen:onDone:cancelOnError:
Adds a subscription to this stream.
See #listen:onError:onDone:cancelOnError: for a detailed description
Arguments:
onData - <Block> 1-arg or nil
onDone - <Block> 0-arg or nil
cancelOnError - <Boolean> or nil
Answers:
<EsStreamSubscription>
listen:onError:
Adds a subscription to this stream.
See #listen:onError:onDone:cancelOnError: for a detailed description
Arguments:
onData - <Block> 1-arg or nil
onError - <Block> 1-arg, 2-arg or nil
Answers:
<EsStreamSubscription>
listen:onError:cancelOnError:
Adds a subscription to this stream.
See #listen:onError:onDone:cancelOnError: for a detailed description
Arguments:
onData - <Block> 1-arg or nil
onError - <Block> 1-arg, 2-arg or nil
cancelOnError - <Boolean> or nil
Answers:
<EsStreamSubscription>
listen:onError:onDone:
Adds a subscription to this stream.
See #listen:onError:onDone:cancelOnError: for a detailed description
Arguments:
onData - <Block> 1-arg or nil
onError - <Block> 1-arg, 2-arg or nil
onDone - <Block> 0-arg or nil
Answers:
<EsStreamSubscription>
listen:onError:onDone:cancelOnError:
Adds a subscription to this stream.
Returns an <EsStreamSubscription> which handles events from this stream using
the provided @onData, @onError and @onDone handlers.
The handlers can be changed on the subscription, but they start out
as the provided block handlers.
On each data event from this stream, the subscriber's @onData block handler
is called. If @onData is `nil`, nothing happens.
On errors from this stream, the @onError block handler is called with the
error object and possibly a stack trace.
The @onError block must be a 1-arg block that accepts an error object or
a 2-arg block that accepts an error object and an <EsAsyncStackTrace>
The block argument count determines whether @onError is invoked with a stack
trace argument.
Otherwise it is called with just the error object.
If @onError is nil, any errors on this stream are considered unhandled,
and will be passed to the <EsAsyncUncaughtErrorHandler>.
By default unhandled async errors are treated
as if they were uncaught top-level errors.
If this stream closes and sends a done event, the @onDone handler is
called. If @onDone is `nil`, nothing happens.
If @cancelOnError is `true`, the subscription is automatically canceled
when the first error event is delivered. The default is `false`.
While a subscription is paused, or when it has been canceled,
the subscription doesn't receive events and none of the
event handler blocks are called.
Arguments:
onData - <Block> 1-arg or nil
onError - <Block> 1-arg, 2-arg or nil
onDone - <Block> 0-arg or nil
cancelOnError: <Boolean> or nil
Answers:
<EsStreamSubscription>
next:
Provides at most the first @count data events of this stream.
Returns a stream that emits the same events that this stream would
if listened to at the same time,
until either this stream ends or it has emitted @count data events,
at which point the answered stream is done.
If this stream produces fewer than @count data events before it's done,
so will the answered stream.
Starts listening to this stream when the answered stream is listened to
and stops listening when the first @count data events have been received.
This means that if this is a single-subscription (non-broadcast) streams
it cannot be reused after the answered stream has been listened to.
If this is a broadcast stream, the answered stream is a broadcast stream.
In that case, the events are only counted from the time
the answered stream is listened to.
Arguments:
count - <Integer>
Answers:
<EsStream>
nextWhile:
Forwards data events while @aWhileTrueBlock evaluates to `true`.
Returns a stream that provides the same events as this stream
until @aWhileTrueBlock fails for a data event.
The answered stream is done when either this stream is done,
or when this stream first emits a data event that makes @aWhileTrueBlock
evaluate to `false`.
@aWhileTrueBlock is considered failing if it answers a non-`true` value
or if it raises an exception. If @aWhileTrueBlock raises an exception, the error is emitted as the
last event on the answered streams.
Stops listening to this stream after the accepted elements.
Internally the method cancels its subscription after these elements. This
means that single-subscription (non-broadcast) streams are closed and
cannot be reused after a call to this method.
The answered stream is a broadcast stream if this stream is.
For a broadcast stream, the events are only tested from the time
the answered stream is listened to.
Arguments:
aWhileTrueBlock - <Block> 1-arg
Answers:
<EsStream>
pipe:
Pipes the events of this stream into @streamConsumer.
All events of this stream are added to @streamConsumer using
@stringConsumer>>addStream:.
The @streamConsumer is closed when this stream has been successfully added
to it - when the future answered by #addStream: completes without an error.
Returns a future which completes when this stream has been consumed
and the consumer has been closed.
The answered future completes with the same result as the future answered
by @stringConsumer>>close.
If the call to @stringConsumer>>addStream: fails in some way, this
method fails in the same way.
@StreamConsumer
- See methods in category @StreamConsumer in class <EsStreamSink>
Arguments:
streamConsumer - <Object> that implements the @StreamConsumer interface.
Answers:
<EsFuture>
reduce:
Combines a sequence of values by repeatedly applying @aCombineBlock.
This method maintains a value,
starting with the first element of this stream
and updates for each further element of this stream.
For each element after the first,
the value is updated to the result of evaluating @aCombineBlock
with the previous value and the element.
When this stream is done, the answered future is completed with
the value at that time.
If this stream is empty, the answered future is completed with
an error.
If this stream emits an error, or the call to @aCombineBlock raises,
the answered future is completed with that error,
and processing is stopped.
Arguments:
aCombineBlock - <Block> 2-arg block
Answers:
<EsFuture>
select:
Creates a new stream from this stream that discards some elements.
The new stream sends the same error and done events as this stream,
but it only sends the data events that satisfy @aSelectBlock.
If @aSelectBlock raises an exception, the data event is dropped and the
error is emitted on the answered stream instead.
The answered stream is a broadcast stream if this stream is.
If a broadcast stream is listened to more than once, each subscription
will individually evalutate @aSelectBlock
Arguments:
aSelectBlock - <Block> 1-arg block
Answers:
<EsStream>
single
The single element of this stream.
If this stream emits an error event,
the answered future is completed with that error
and processing stops.
If `self` is empty or has more than one element,
the answered future completes with an error.
Answers:
<EsFuture>
singleDetect:ifNone:
Finds the single element in this stream matching @aDetectBlock.
Like #detectLast:, except that it is an error if more than one
matching element occurs in this stream.
Arguments:
aDetectBlock - <Block> 1-arg block
aNoneBlock - <Block> 0-arg block
Could be nil
Answers:
<EsFuture>
size
The number of elements in this stream.
Waits for all elements of this stream. When this stream ends,
the answered future is completed with the number of elements.
If this stream emits an error,
the answered future is completed with that error,
and processing stops.
This operation listens to this stream, and a non-broadcast stream cannot
be reused after finding its size.
Answers:
<EsFuture> completes with <Integer>
skip:
Skips the first @count data events from this stream.
Returns a stream that emits the same events as this stream would
if listened to at the same time, except that the first @count
data events are not emitted.
The answered stream is done when this stream is.
If this stream emits fewer than [count] data events
before being done, the answered stream emits no data events.
The answered stream is a broadcast stream if this stream is.
For a broadcast stream, the events are only counted from the time
the answered stream is listened to.
Arguments:
count - <Integer>
Answers:
<EsStream>
skipWhile:
Skip data events from this stream while they are matched by @aWhileTrueBlock
Returns a stream that emits the same events as this stream,
except that data events are not emitted until a data event fails @aWhileTrueBlock
The test fails when called with a data event
if it answers a non-`true` value or if the evaluation of @aWhileTrueBlock raises an exception.
If @aWhileTrueBlock raises, the error is emitted as an error event
on the answered stream instead of the data event,
otherwise the event that made @aWhileTrueBlock answer non-true is emitted as the
first data event.
Error and done events are provided by the answered stream unmodified.
The answered stream is a broadcast stream if this stream is.
For a broadcast stream, the events are only tested from the time
the answered stream is listened to.
Arguments:
aWhileTrueBlock - <Block> 1-arg
Answers:
<EsStream>
timeout:onTimeout:
Creates a new stream with the same events as this stream.
Whenever more than @timeLimit passes between two events from this stream,
the @onTimeout block is evaluated, which can emit further events on
the answered stream.
The countdown doesn't start until the answered stream is listened to.
The countdown is reset every time an event is forwarded from this stream,
or when this stream is paused and resumed.
The @onTimeout block is evaluated with one argument: an
<EsEventSink> that allows putting events into the answered stream.
This `EsEventSink` is only valid during the evaluation of @onTimeout.
Calling EsEventSink>>close on the sink passed to @onTimeout closes the
answered stream, and no further events are processed.
If @onTimeout is omitted, a timeout will just put an <EsAsyncTimeoutException>
into the error channel of the answered stream.
If the evaluation of @onTimeout raises an exception, the error is emitted on the answered
stream.
The answered stream is a broadcast stream if this stream is.
If a broadcast stream is listened to more than once, each subscription
will have its individually timer that starts counting on listen,
and the subscriptions' timers can be paused individually.
Arguments:
timeLimit - <Duration> duration object
<Integer> duration in milliseconds
onTimeout - <Block> 1-arg block that takes an <EsEventSink> as an argument
Answers:
<EsStream>
transform:
Applies @streamTransformer to this stream.
Answers the transformed stream,
that is, the result of `@streamTransformer bind: self`.
This method simply allows writing the call to `streamTransformer bind:`
in a chained fashion, like
```
((stream collect: [:each | mapping value: each]) transform: transformation) asOrderedCollection
```
which can be more convenient than calling `bind` directly.
The @streamTransformer can return any stream.
Whether the answered stream is a broadcast stream or not,
and which elements it will contain,
is entirely up to the transformation.
This method should always be used for transformations which treat
the entire stream as representing a single value
which has perhaps been split into several parts for transport,
like a file being read from disk or being fetched over a network.
The transformation will then produce a new stream which
transforms the stream's value incrementally.
The resulting stream may again be chunks of the result, but does not have to
correspond to specific events from the source string.
Arguments:
streamTransformer - <EsStreamTransformer>
Answers:
<EsStream>
Last modified date: 04/21/2022