-
public final class BackpressureUtils
Utility functions for use with backpressure.
-
-
Method Summary
Modifier and Type Method Description static long
getAndAddRequest(AtomicLong requested, long n)
Adds {@code n}
(not validated) to{@code requested}
and returns the value prior to addition once theaddition is successful (uses CAS semantics).static long
multiplyCap(long a, long b)
Multiplies two positive longs and caps the result at Long.MAX_VALUE. static long
addCap(long a, long b)
Adds two positive longs and caps the result at Long.MAX_VALUE. static <T> void
postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual)
Signals the completion of the main sequence and switches to post-completion replay mode. static <T> boolean
postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual)
Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests. static <T, R> void
postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform)
Signals the completion of the main sequence and switches to post-completion replay modeand allows exit transformation on the queued values. static <T, R> boolean
postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform)
Accumulates requests (validated) and handles the completed mode draining of the queue based on the requestsand allows exit transformation on the queued values. static long
produced(AtomicLong requested, long n)
Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE. static boolean
validate(long n)
Validates the requested amount and returns true if it is positive. -
-
Method Detail
-
getAndAddRequest
static long getAndAddRequest(AtomicLong requested, long n)
Adds
{@code n}
(not validated) to{@code requested}
and returns the value prior to addition once theaddition is successful (uses CAS semantics). If overflows then sets{@code requested}
field to{@code Long.MAX_VALUE}
.- Parameters:
requested
- atomic long that should be updatedn
- the number of requests to add to the requested count, positive (not validated)
-
multiplyCap
static long multiplyCap(long a, long b)
Multiplies two positive longs and caps the result at Long.MAX_VALUE.
- Parameters:
a
- the first valueb
- the second value
-
addCap
static long addCap(long a, long b)
Adds two positive longs and caps the result at Long.MAX_VALUE.
- Parameters:
a
- the first valueb
- the second value
-
postCompleteDone
static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual)
Signals the completion of the main sequence and switches to post-completion replay mode.
Don't modify the queue after calling this method!
Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.
The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) sincerequest amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren'tallowed.
- Parameters:
requested
- the holder of current requested amountqueue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the values
-
postCompleteRequest
static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual)
Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.
- Parameters:
requested
- the holder of current requested amountn
- the value requested;queue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the values
-
postCompleteDone
static <T, R> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform)
Signals the completion of the main sequence and switches to post-completion replay modeand allows exit transformation on the queued values.
Don't modify the queue after calling this method!
Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.
The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) sincerequest amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren'tallowed.
- Parameters:
requested
- the holder of current requested amountqueue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the valuesexitTransform
- the transformation to apply on the dequeued value to get the value to be emitted
-
postCompleteRequest
static <T, R> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform)
Accumulates requests (validated) and handles the completed mode draining of the queue based on the requestsand allows exit transformation on the queued values.
Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.
- Parameters:
requested
- the holder of current requested amountn
- the value requested;queue
- the queue holding values to be emitted after completionactual
- the subscriber to receive the valuesexitTransform
- the transformation to apply on the dequeued value to get the value to be emitted
-
produced
static long produced(AtomicLong requested, long n)
Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
- Parameters:
requested
- the requested amount holdern
- the value to subtract from the requested amount, has to be positive (not verified)
-
validate
static boolean validate(long n)
Validates the requested amount and returns true if it is positive.
- Parameters:
n
- the requested amount
-
-
-
-