-
public final class BackpressureUtilsUtility functions for use with backpressure.
-
-
Method Summary
Modifier and Type Method Description static longgetAndAddRequest(AtomicLong requested, long n)Adds {@code n}(not validated) to{@code requested}and returns the value prior to addition once theaddition is successful (uses CAS semantics).static longmultiplyCap(long a, long b)Multiplies two positive longs and caps the result at Long.MAX_VALUE. static longaddCap(long a, long b)Adds two positive longs and caps the result at Long.MAX_VALUE. static <T> voidpostCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual)Signals the completion of the main sequence and switches to post-completion replay mode. static <T> booleanpostCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual)Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests. static <T, R> voidpostCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform)Signals the completion of the main sequence and switches to post-completion replay modeand allows exit transformation on the queued values. static <T, R> booleanpostCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform)Accumulates requests (validated) and handles the completed mode draining of the queue based on the requestsand allows exit transformation on the queued values. static longproduced(AtomicLong requested, long n)Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE. static booleanvalidate(long n)Validates the requested amount and returns true if it is positive. -
-
Method Detail
-
getAndAddRequest
static long getAndAddRequest(AtomicLong requested, long n)
Adds
{@code n}(not validated) to{@code requested}and returns the value prior to addition once theaddition is successful (uses CAS semantics). If overflows then sets{@code requested}field to{@code Long.MAX_VALUE}.- Parameters:
requested- atomic long that should be updatedn- the number of requests to add to the requested count, positive (not validated)
-
multiplyCap
static long multiplyCap(long a, long b)
Multiplies two positive longs and caps the result at Long.MAX_VALUE.
- Parameters:
a- the first valueb- the second value
-
addCap
static long addCap(long a, long b)
Adds two positive longs and caps the result at Long.MAX_VALUE.
- Parameters:
a- the first valueb- the second value
-
postCompleteDone
static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual)
Signals the completion of the main sequence and switches to post-completion replay mode.
Don't modify the queue after calling this method!
Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.
The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) sincerequest amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren'tallowed.
- Parameters:
requested- the holder of current requested amountqueue- the queue holding values to be emitted after completionactual- the subscriber to receive the values
-
postCompleteRequest
static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual)
Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.
- Parameters:
requested- the holder of current requested amountn- the value requested;queue- the queue holding values to be emitted after completionactual- the subscriber to receive the values
-
postCompleteDone
static <T, R> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform)
Signals the completion of the main sequence and switches to post-completion replay modeand allows exit transformation on the queued values.
Don't modify the queue after calling this method!
Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.
The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) sincerequest amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren'tallowed.
- Parameters:
requested- the holder of current requested amountqueue- the queue holding values to be emitted after completionactual- the subscriber to receive the valuesexitTransform- the transformation to apply on the dequeued value to get the value to be emitted
-
postCompleteRequest
static <T, R> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform)
Accumulates requests (validated) and handles the completed mode draining of the queue based on the requestsand allows exit transformation on the queued values.
Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.
- Parameters:
requested- the holder of current requested amountn- the value requested;queue- the queue holding values to be emitted after completionactual- the subscriber to receive the valuesexitTransform- the transformation to apply on the dequeued value to get the value to be emitted
-
produced
static long produced(AtomicLong requested, long n)
Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
- Parameters:
requested- the requested amount holdern- the value to subtract from the requested amount, has to be positive (not verified)
-
validate
static boolean validate(long n)
Validates the requested amount and returns true if it is positive.
- Parameters:
n- the requested amount
-
-
-
-