public final class QueueDrainHelper extends Object
Modifier and Type | Method and Description |
---|---|
static <T,U> boolean |
checkTerminated(boolean d,
boolean empty,
Observer<?> observer,
boolean delayError,
SimpleQueue<?> q,
Disposable disposable,
ObservableQueueDrain<T,U> qd) |
static <T,U> boolean |
checkTerminated(boolean d,
boolean empty,
org.reactivestreams.Subscriber<?> s,
boolean delayError,
SimpleQueue<?> q,
QueueDrain<T,U> qd) |
static <T> SimpleQueue<T> |
createQueue(int capacityHint)
Creates a queue: spsc-array if capacityHint is positive and
spsc-linked-array if capacityHint is negative; in both cases, the
capacity is the absolute value of prefetch.
|
static <T,U> void |
drainLoop(SimplePlainQueue<T> q,
Observer<? super U> a,
boolean delayError,
Disposable dispose,
ObservableQueueDrain<T,U> qd) |
static <T,U> void |
drainMaxLoop(SimplePlainQueue<T> q,
org.reactivestreams.Subscriber<? super U> a,
boolean delayError,
Disposable dispose,
QueueDrain<T,U> qd)
Drain the queue but give up with an error if there aren't enough requests.
|
static <T> void |
postComplete(org.reactivestreams.Subscriber<? super T> actual,
Queue<T> queue,
AtomicLong state,
BooleanSupplier isCancelled)
Signals the completion of the main sequence and switches to post-completion replay mode.
|
static <T> boolean |
postCompleteRequest(long n,
org.reactivestreams.Subscriber<? super T> actual,
Queue<T> queue,
AtomicLong state,
BooleanSupplier isCancelled)
Accumulates requests (not validated) and handles the completed mode draining of the queue based on the requests.
|
static void |
request(org.reactivestreams.Subscription s,
int prefetch)
Requests Long.MAX_VALUE if prefetch is negative or the exact
amount if prefetch is positive.
|
public static <T,U> void drainMaxLoop(SimplePlainQueue<T> q, org.reactivestreams.Subscriber<? super U> a, boolean delayError, Disposable dispose, QueueDrain<T,U> qd)
T
- the queue value typeU
- the emission value typeq
- the queuea
- the subscriberdelayError
- true if errors should be delayed after all normal itemsdispose
- the disposable to call when termination happens and cleanup is necessaryqd
- the QueueDrain instance that gives status information to the drain logicpublic static <T,U> boolean checkTerminated(boolean d, boolean empty, org.reactivestreams.Subscriber<?> s, boolean delayError, SimpleQueue<?> q, QueueDrain<T,U> qd)
public static <T,U> void drainLoop(SimplePlainQueue<T> q, Observer<? super U> a, boolean delayError, Disposable dispose, ObservableQueueDrain<T,U> qd)
public static <T,U> boolean checkTerminated(boolean d, boolean empty, Observer<?> observer, boolean delayError, SimpleQueue<?> q, Disposable disposable, ObservableQueueDrain<T,U> qd)
public static <T> SimpleQueue<T> createQueue(int capacityHint)
T
- the value type of the queuecapacityHint
- the capacity hint, negative value will create an array-based SPSC queuepublic static void request(org.reactivestreams.Subscription s, int prefetch)
s
- the Subscription to request fromprefetch
- the prefetch valuepublic static <T> boolean postCompleteRequest(long n, org.reactivestreams.Subscriber<? super T> actual, Queue<T> queue, AtomicLong state, BooleanSupplier isCancelled)
Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onComplete() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
T
- the value type emittedn
- the request amount, positive (not validated)actual
- the target Subscriber to send events toqueue
- the queue to drain if in the post-complete statestate
- holds the request amount and the post-completed flagisCancelled
- a supplier that returns true if the drain has been cancelledpublic static <T> void postComplete(org.reactivestreams.Subscriber<? super T> actual, Queue<T> queue, AtomicLong state, BooleanSupplier isCancelled)
Don't modify the queue after calling this method!
Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onComplete() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.
T
- the value type emittedactual
- the target Subscriber to send events toqueue
- the queue to drain if in the post-complete statestate
- holds the request amount and the post-completed flagisCancelled
- a supplier that returns true if the drain has been cancelledCopyright © 2019. All rights reserved.