public final class BackpressureUtils extends Object
Modifier and Type | Method and Description |
---|---|
static long |
addCap(long a,
long b)
Adds two positive longs and caps the result at Long.MAX_VALUE.
|
static long |
getAndAddRequest(AtomicLong requested,
long n)
Adds
n (not validated) to requested and returns the value prior to addition once the
addition is successful (uses CAS semantics). |
static long |
multiplyCap(long a,
long b)
Multiplies two positive longs and caps the result at Long.MAX_VALUE.
|
static <T,R> void |
postCompleteDone(AtomicLong requested,
Queue<T> queue,
Subscriber<? super R> actual,
Func1<? super T,? extends R> exitTransform)
Signals the completion of the main sequence and switches to post-completion replay mode
and allows exit transformation on the queued values.
|
static <T> void |
postCompleteDone(AtomicLong requested,
Queue<T> queue,
Subscriber<? super T> actual)
Signals the completion of the main sequence and switches to post-completion replay mode.
|
static <T,R> boolean |
postCompleteRequest(AtomicLong requested,
long n,
Queue<T> queue,
Subscriber<? super R> actual,
Func1<? super T,? extends R> exitTransform)
Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests
and allows exit transformation on the queued values.
|
static <T> boolean |
postCompleteRequest(AtomicLong requested,
long n,
Queue<T> queue,
Subscriber<? super T> actual)
Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
|
static long |
produced(AtomicLong requested,
long n)
Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
|
static boolean |
validate(long n)
Validates the requested amount and returns true if it is positive.
|
public static long getAndAddRequest(AtomicLong requested, long n)
n
(not validated) to requested
and returns the value prior to addition once the
addition is successful (uses CAS semantics). If overflows then sets
requested
field to Long.MAX_VALUE
.requested
- atomic long that should be updatedn
- the number of requests to add to the requested count, positive (not validated)public static long multiplyCap(long a, long b)
a
- the first valueb
- the second valuepublic static long addCap(long a, long b)
a
- the first valueb
- the second valuepublic static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super T> actual)
Don't modify the queue after calling this method!
Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.
T
- the value type to emitrequested
- the holder of current requested amountqueue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the valuespublic static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual)
Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
T
- the value type to emitrequested
- the holder of current requested amountn
- the value requested;queue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the valuespublic static <T,R> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T,? extends R> exitTransform)
Don't modify the queue after calling this method!
Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.
T
- the value type in the queueR
- the value type to emitrequested
- the holder of current requested amountqueue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the valuesexitTransform
- the transformation to apply on the dequeued value to get the value to be emittedpublic static <T,R> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T,? extends R> exitTransform)
Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onCompleted() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
T
- the value type in the queueR
- the value type to emitrequested
- the holder of current requested amountn
- the value requested;queue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the valuesexitTransform
- the transformation to apply on the dequeued value to get the value to be emittedpublic static long produced(AtomicLong requested, long n)
requested
- the requested amount holdern
- the value to subtract from the requested amount, has to be positive (not verified)IllegalStateException
- if n is greater than the current requested amount, which
indicates a bug in the request accounting logicpublic static boolean validate(long n)
n
- the requested amountIllegalArgumentException
- if n is negativeCopyright © 2017. All rights reserved.