Package 

Class BackpressureUtils

    • Method Summary

      Modifier and Type Method Description
      static long getAndAddRequest(AtomicLong requested, long n) Adds {@code n} (not validated) to {@code requested} and returns the value prior to addition once theaddition is successful (uses CAS semantics).
      static long multiplyCap(long a, long b) Multiplies two positive longs and caps the result at Long.MAX_VALUE.
      static long addCap(long a, long b) Adds two positive longs and caps the result at Long.MAX_VALUE.
      static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual) Signals the completion of the main sequence and switches to post-completion replay mode.
      static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual) Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
      static <T, R> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform) Signals the completion of the main sequence and switches to post-completion replay modeand allows exit transformation on the queued values.
      static <T, R> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform) Accumulates requests (validated) and handles the completed mode draining of the queue based on the requestsand allows exit transformation on the queued values.
      static long produced(AtomicLong requested, long n) Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
      static boolean validate(long n) Validates the requested amount and returns true if it is positive.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Method Detail

      • getAndAddRequest

         static long getAndAddRequest(AtomicLong requested, long n)

        Adds {@code n} (not validated) to {@code requested} and returns the value prior to addition once theaddition is successful (uses CAS semantics). If overflows then sets {@code requested} field to {@code Long.MAX_VALUE}.

        Parameters:
        requested - atomic long that should be updated
        n - the number of requests to add to the requested count, positive (not validated)
      • multiplyCap

         static long multiplyCap(long a, long b)

        Multiplies two positive longs and caps the result at Long.MAX_VALUE.

        Parameters:
        a - the first value
        b - the second value
      • addCap

         static long addCap(long a, long b)

        Adds two positive longs and caps the result at Long.MAX_VALUE.

        Parameters:
        a - the first value
        b - the second value
      • postCompleteDone

         static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual)

        Signals the completion of the main sequence and switches to post-completion replay mode.

        Don't modify the queue after calling this method!

        Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.

        The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) sincerequest amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren'tallowed.

        Parameters:
        requested - the holder of current requested amount
        queue - the queue holding values to be emitted after completion
        actual - the subscriber to receive the values
      • postCompleteRequest

         static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual)

        Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.

        Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.

        Parameters:
        requested - the holder of current requested amount
        n - the value requested;
        queue - the queue holding values to be emitted after completion
        actual - the subscriber to receive the values
      • postCompleteDone

         static <T, R> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform)

        Signals the completion of the main sequence and switches to post-completion replay modeand allows exit transformation on the queued values.

        Don't modify the queue after calling this method!

        Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.

        The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) sincerequest amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren'tallowed.

        Parameters:
        requested - the holder of current requested amount
        queue - the queue holding values to be emitted after completion
        actual - the subscriber to receive the values
        exitTransform - the transformation to apply on the dequeued value to get the value to be emitted
      • postCompleteRequest

         static <T, R> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<out Object> actual, Func1<out Object, out R> exitTransform)

        Accumulates requests (validated) and handles the completed mode draining of the queue based on the requestsand allows exit transformation on the queued values.

        Post-completion backpressure handles the case when a source produces values based onrequests when it is active but more values are available even after its completion.In this case, the onCompleted() can't just emit the contents of the queue but has tocoordinate with the requested amounts. This requires two distinct modes: active andcompleted. In active mode, requests flow through and the queue is not accessed butin completed mode, requests no-longer reach the upstream but help in draining the queue.

        Parameters:
        requested - the holder of current requested amount
        n - the value requested;
        queue - the queue holding values to be emitted after completion
        actual - the subscriber to receive the values
        exitTransform - the transformation to apply on the dequeued value to get the value to be emitted
      • produced

         static long produced(AtomicLong requested, long n)

        Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.

        Parameters:
        requested - the requested amount holder
        n - the value to subtract from the requested amount, has to be positive (not verified)
      • validate

         static boolean validate(long n)

        Validates the requested amount and returns true if it is positive.

        Parameters:
        n - the requested amount