public class SchedulerWhen extends Scheduler implements Subscription
Scheduler
. The only parameter is a
function that flattens an Observable
of Observable
of
Completable
s into just one Completable
. There must be a chain
of operators connecting the returned value to the source Observable
otherwise any work scheduled on the returned Scheduler
will not be
executed.
When Scheduler.createWorker()
is invoked a Observable
of
Completable
s is onNext'd to the combinator to be flattened. If the
inner Observable
is not immediately subscribed to an calls to
Worker#schedule
are buffered. Once the Observable
is
subscribed to actions are then onNext'd as Completable
s.
Finally the actions scheduled on the parent Scheduler
when the inner
most Completable
s are subscribed to.
When the Scheduler.Worker
is unsubscribed the Completable
emits an
onComplete and triggers any behavior in the flattening operator. The
Observable
and all Completable
s give to the flattening
function never onError.
Limit the amount concurrency two at a time without creating a new fix size thread pool:
Scheduler limitScheduler = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // callbacks two at a time return Completable.merge(Observable.merge(workers), 2); });
This is a slightly different way to limit the concurrency but it has some
interesting benefits and drawbacks to the method above. It works by limited
the number of concurrent Scheduler.Worker
s rather than individual actions.
Generally each Observable
uses its own Scheduler.Worker
. This means
that this will essentially limit the number of concurrent subscribes. The
danger comes from using operators like
Observable.zip(Observable, Observable, rx.functions.Func2)
where
subscribing to the first Observable
could deadlock the subscription
to the second.
Scheduler limitScheduler = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // Observables two at a time return Completable.merge(Observable.merge(workers, 2)); });Slowing down the rate to no more than than 1 a second. This suffers from the same problem as the one above I could find an
Observable
operator
that limits the rate without dropping the values (aka leaky bucket
algorithm).
Scheduler slowScheduler = Schedulers.computation().when(workers -> { // use concatenate to make each worker happen one at a time. return Completable.concat(workers.map(actions -> { // delay the starting of the next worker by 1 second. return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS)); })); });
Scheduler.Worker
Constructor and Description |
---|
SchedulerWhen(Func1<Observable<Observable<Completable>>,Completable> combine,
Scheduler actualScheduler) |
Modifier and Type | Method and Description |
---|---|
Scheduler.Worker |
createWorker()
Retrieves or creates a new
Scheduler.Worker that represents serial execution of actions. |
boolean |
isUnsubscribed()
Indicates whether this
Subscription is currently unsubscribed. |
void |
unsubscribe()
Stops the receipt of notifications on the
Subscriber that was registered when this Subscription
was received. |
public SchedulerWhen(Func1<Observable<Observable<Completable>>,Completable> combine, Scheduler actualScheduler)
public void unsubscribe()
Subscription
Subscriber
that was registered when this Subscription
was received.
This allows deregistering an Subscriber
before it has finished receiving all events (i.e. before
onCompleted is called).
unsubscribe
in interface Subscription
public boolean isUnsubscribed()
Subscription
Subscription
is currently unsubscribed.isUnsubscribed
in interface Subscription
true
if this Subscription
is currently unsubscribed, false
otherwisepublic Scheduler.Worker createWorker()
Scheduler
Scheduler.Worker
that represents serial execution of actions.
When work is completed it should be unsubscribed using Subscription.unsubscribe()
.
Work on a Scheduler.Worker
is guaranteed to be sequential.
createWorker
in class Scheduler
Copyright © 2017. All rights reserved.