src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SynchronousPublisher.java
branchhttp-client-branch
changeset 55763 634d8e14c172
equal deleted inserted replaced
55762:e947a3a50a95 55763:634d8e14c172
       
     1 /*
       
     2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal.common;
       
    27 
       
    28 import java.util.Map;
       
    29 import java.util.Objects;
       
    30 import java.util.Optional;
       
    31 import java.util.WeakHashMap;
       
    32 import java.util.concurrent.Flow.Publisher;
       
    33 import java.util.concurrent.Flow.Subscriber;
       
    34 import java.util.concurrent.Flow.Subscription;
       
    35 import java.util.concurrent.atomic.AtomicInteger;
       
    36 import java.util.concurrent.atomic.AtomicReference;
       
    37 import java.util.concurrent.locks.ReentrantLock;
       
    38 
       
    39 /**
       
    40  * This publisher signals {@code onNext} synchronously and
       
    41  * {@code onComplete}/{@code onError} asynchronously to its only subscriber.
       
    42  *
       
    43  * <p> This publisher supports a single subscriber over this publisher's
       
    44  * lifetime. {@code signalComplete} and {@code signalError} may be called before
       
    45  * the subscriber has subscribed.
       
    46  *
       
    47  * <p> The subscriber's requests are signalled to the subscription supplied to
       
    48  * the {@code feedback} method.
       
    49  *
       
    50  * <p> {@code subscribe} and {@code feedback} methods can be called in any
       
    51  * order.
       
    52  *
       
    53  * <p> {@code signalNext} may be called recursively, the implementation will
       
    54  * bound the depth of the recursion.
       
    55  *
       
    56  * <p> It is always an error to call {@code signalNext} without a sufficient
       
    57  * demand.
       
    58  *
       
    59  * <p> If subscriber throws an exception from any of its methods, the
       
    60  * subscription will be cancelled.
       
    61  */
       
    62 public final class SynchronousPublisher<T> implements Publisher<T> {
       
    63     /*
       
    64      * PENDING, ACTIVE and CANCELLED are states. TERMINATE and DELIVERING are
       
    65      * state modifiers, they cannot appear in the state on their own.
       
    66      *
       
    67      * PENDING, ACTIVE and CANCELLED are mutually exclusive states. Any two of
       
    68      * those bits cannot be set at the same time in state.
       
    69      *
       
    70      * PENDING -----------------> ACTIVE <------> DELIVERING
       
    71      *    |                         |
       
    72      *    +------> TERMINATE <------+
       
    73      *    |            |            |
       
    74      *    |            v            |
       
    75      *    +------> CANCELLED <------+
       
    76      *
       
    77      * The following states are allowed:
       
    78      *
       
    79      *     PENDING
       
    80      *     PENDING | TERMINATE,
       
    81      *     ACTIVE,
       
    82      *     ACTIVE | DELIVERING,
       
    83      *     ACTIVE | TERMINATE,
       
    84      *     ACTIVE | DELIVERING | TERMINATE
       
    85      *     CANCELLED
       
    86      */
       
    87     /**
       
    88      * A state modifier meaning {@code onSubscribe} has not been called yet.
       
    89      *
       
    90      * <p> After {@code onSubscribe} has been called the machine can transition
       
    91      * into {@code ACTIVE}, {@code PENDING | TERMINATE} or {@code CANCELLED}.
       
    92      */
       
    93     private static final int PENDING = 1;
       
    94     /**
       
    95      * A state modifier meaning {@code onSubscribe} has been called, no error
       
    96      * and no completion has been signalled and {@code onNext} may be called.
       
    97      */
       
    98     private static final int ACTIVE = 2;
       
    99     /**
       
   100      * A state modifier meaning no calls to subscriber may be made.
       
   101      *
       
   102      * <p> Once this modifier is set, it will not be unset. It's a final state.
       
   103      */
       
   104     private static final int CANCELLED = 4;
       
   105     /**
       
   106      * A state modifier meaning {@code onNext} is being called and no other
       
   107      * signal may be made.
       
   108      *
       
   109      * <p> This bit can be set at any time. signalNext uses it to ensure the
       
   110      * method is called sequentially.
       
   111      */
       
   112     private static final int DELIVERING = 8;
       
   113     /**
       
   114      * A state modifier meaning the next call must be either {@code onComplete}
       
   115      * or {@code onError}.
       
   116      *
       
   117      * <p> The concrete method depends on the value of {@code terminationType}).
       
   118      * {@code TERMINATE} bit cannot appear on its own, it can be set only with
       
   119      * {@code PENDING} or {@code ACTIVE}.
       
   120      */
       
   121     private static final int TERMINATE = 16;
       
   122     /**
       
   123      * Current demand. If fulfilled, no {@code onNext} signals may be made.
       
   124      */
       
   125     private final Demand demand = new Demand();
       
   126     /**
       
   127      * The current state of the subscription. Contains disjunctions of the above
       
   128      * state modifiers.
       
   129      */
       
   130     private final AtomicInteger state = new AtomicInteger(PENDING);
       
   131     /**
       
   132      * A convenient way to represent 3 values: not set, completion and error.
       
   133      */
       
   134     private final AtomicReference<Optional<Throwable>> terminationType
       
   135             = new AtomicReference<>();
       
   136     /**
       
   137      * {@code signalNext} uses this lock to ensure the method is called in a
       
   138      * thread-safe manner.
       
   139      */
       
   140     private final ReentrantLock nextLock = new ReentrantLock();
       
   141     private T next;
       
   142 
       
   143     private final Object lock = new Object();
       
   144     /**
       
   145      * This map stores the subscribers attempted to subscribe to this publisher.
       
   146      * It is needed so this publisher does not call {@code onSubscribe} on a
       
   147      * subscriber more than once (Rule 2.12).
       
   148      *
       
   149      * <p> It will most likely have a single entry for the only subscriber.
       
   150      * Because this publisher is one-off, subscribing to it more than once is an
       
   151      * error.
       
   152      */
       
   153     private final Map<Subscriber<?>, Object> knownSubscribers
       
   154             = new WeakHashMap<>(1, 1);
       
   155     /**
       
   156      * The active subscriber. This reference will be reset to {@code null} once
       
   157      * the subscription becomes cancelled (Rule 3.13).
       
   158      */
       
   159     private volatile Subscriber<? super T> subscriber;
       
   160     /**
       
   161      * A temporary subscription that receives all calls to
       
   162      * {@code request}/{@code cancel} until two things happen: (1) the feedback
       
   163      * becomes set and (2) {@code onSubscribe} method is called on the
       
   164      * subscriber.
       
   165      *
       
   166      * <p> The first condition is obvious. The second one is about not
       
   167      * propagating requests to {@code feedback} until {@code onSubscribe} call
       
   168      * has been finished. The reason is that Rule 1.3 requires the subscriber
       
   169      * methods to be called in a thread-safe manner. This, in particular,
       
   170      * implies that if called from multiple threads, the calls must not be
       
   171      * concurrent. If, for instance, {@code subscription.request(long)) (and
       
   172      * this is a usual state of affairs) is called from within
       
   173      * {@code onSubscribe} call, the publisher will have to resort to some sort
       
   174      * of queueing (locks, queues, etc.) of possibly arriving {@code onNext}
       
   175      * signals while in {@code onSubscribe}. This publisher doesn't queue
       
   176      * signals, instead it "queues" requests. Because requests are just numbers
       
   177      * and requests are additive, the effective queue is a single number of
       
   178      * total requests made so far.
       
   179      */
       
   180     private final TemporarySubscription temporarySubscription
       
   181             = new TemporarySubscription();
       
   182     private volatile Subscription feedback;
       
   183     /**
       
   184      * Keeping track of whether a subscription may be made. (The {@code
       
   185      * subscriber} field may later become {@code null}, but this flag is
       
   186      * permanent. Once {@code true} forever {@code true}.
       
   187      */
       
   188     private boolean subscribed;
       
   189 
       
   190     @Override
       
   191     public void subscribe(Subscriber<? super T> sub) {
       
   192         Objects.requireNonNull(sub);
       
   193         boolean success = false;
       
   194         boolean duplicate = false;
       
   195         synchronized (lock) {
       
   196             if (!subscribed) {
       
   197                 subscribed = true;
       
   198                 subscriber = sub;
       
   199                 assert !knownSubscribers.containsKey(subscriber);
       
   200                 knownSubscribers.put(subscriber, null);
       
   201                 success = true;
       
   202             } else if (sub.equals(subscriber)) {
       
   203                 duplicate = true;
       
   204             } else if (!knownSubscribers.containsKey(sub)) {
       
   205                 knownSubscribers.put(sub, null);
       
   206             } else {
       
   207                 return;
       
   208             }
       
   209         }
       
   210         if (success) {
       
   211             signalSubscribe();
       
   212         } else if (duplicate) {
       
   213             signalError(new IllegalStateException("Duplicate subscribe"));
       
   214         } else {
       
   215             // This is a best-effort attempt for an isolated publisher to call
       
   216             // a foreign subscriber's methods in a sequential order. However it
       
   217             // cannot be guaranteed unless all publishers share information on
       
   218             // all subscribers in the system. This publisher does its job right.
       
   219             sub.onSubscribe(new NopSubscription());
       
   220             sub.onError(new IllegalStateException("Already subscribed"));
       
   221         }
       
   222     }
       
   223 
       
   224     /**
       
   225      * Accepts a subscription that is signalled with the subscriber's requests.
       
   226      *
       
   227      * @throws NullPointerException
       
   228      *         if {@code subscription} is {@code null}
       
   229      * @throws IllegalStateException
       
   230      *         if there is a feedback subscription already
       
   231      */
       
   232     public void feedback(Subscription subscription) {
       
   233         Objects.requireNonNull(subscription);
       
   234         synchronized (lock) {
       
   235             if (feedback != null) {
       
   236                 throw new IllegalStateException(
       
   237                         "Already has a feedback subscription");
       
   238             }
       
   239             feedback = subscription;
       
   240             if ((state.get() & PENDING) == 0) {
       
   241                 temporarySubscription.replaceWith(new PermanentSubscription());
       
   242             }
       
   243         }
       
   244     }
       
   245 
       
   246     /**
       
   247      * Tries to deliver the specified item to the subscriber.
       
   248      *
       
   249      * <p> The item may not be delivered even if there is a demand. This can
       
   250      * happen as a result of subscriber cancelling the subscription by
       
   251      * signalling {@code cancel} or this publisher cancelling the subscription
       
   252      * by signaling {@code onError} or {@code onComplete}.
       
   253      *
       
   254      * <p> Given no exception is thrown, a call to this method decremented the
       
   255      * demand.
       
   256      *
       
   257      * @param item
       
   258      *         the item to deliver to the subscriber
       
   259      *
       
   260      * @return {@code true} iff the subscriber has received {@code item}
       
   261      * @throws NullPointerException
       
   262      *         if {@code item} is {@code null}
       
   263      * @throws IllegalStateException
       
   264      *         if there is no demand
       
   265      * @throws IllegalStateException
       
   266      *         the method is called concurrently
       
   267      */
       
   268     public boolean signalNext(T item) {
       
   269         Objects.requireNonNull(item);
       
   270         if (!nextLock.tryLock()) {
       
   271             throw new IllegalStateException("Concurrent signalling");
       
   272         }
       
   273         boolean recursion = false;
       
   274         try {
       
   275             next = item;
       
   276             while (true) {
       
   277                 int s = state.get();
       
   278                 if ((s & DELIVERING) == DELIVERING) {
       
   279                     recursion = true;
       
   280                     break;
       
   281                 } else if (state.compareAndSet(s, s | DELIVERING)) {
       
   282                     break;
       
   283                 }
       
   284             }
       
   285             if (!demand.tryDecrement()) {
       
   286                 // Hopefully this will help to find bugs in this publisher's
       
   287                 // clients. Because signalNext should never be issues without
       
   288                 // having a sufficient demand. Even if the thing is cancelled!
       
   289 //                next = null;
       
   290                 throw new IllegalStateException("No demand");
       
   291             }
       
   292             if (recursion) {
       
   293                 return true;
       
   294             }
       
   295             while (next != null) {
       
   296                 int s = state.get();
       
   297                 if ((s & (ACTIVE | TERMINATE)) == (ACTIVE | TERMINATE)) {
       
   298                     if (state.compareAndSet(
       
   299                             s, CANCELLED | (s & ~(ACTIVE | TERMINATE)))) {
       
   300                         // terminationType must be read only after the
       
   301                         // termination condition has been observed
       
   302                         // (those have been stored in the opposite order)
       
   303                         Optional<Throwable> t = terminationType.get();
       
   304                         dispatchTerminationAndUnsubscribe(t);
       
   305                         return false;
       
   306                     }
       
   307                 } else if ((s & ACTIVE) == ACTIVE) {
       
   308                     try {
       
   309                         T t = next;
       
   310                         next = null;
       
   311                         subscriber.onNext(t);
       
   312                     } catch (Throwable t) {
       
   313                         cancelNow();
       
   314                         throw t;
       
   315                     }
       
   316                 } else if ((s & CANCELLED) == CANCELLED) {
       
   317                     return false;
       
   318                 } else if ((s & PENDING) == PENDING) {
       
   319                     // Actually someone called signalNext even before
       
   320                     // onSubscribe has been called, but from this publisher's
       
   321                     // API point of view it's still "No demand"
       
   322                     throw new IllegalStateException("No demand");
       
   323                 } else {
       
   324                     throw new InternalError(String.valueOf(s));
       
   325                 }
       
   326             }
       
   327             return true;
       
   328         } finally {
       
   329             while (!recursion) { // If the call was not recursive unset the bit
       
   330                 int s = state.get();
       
   331                 if ((s & DELIVERING) != DELIVERING) {
       
   332                     throw new InternalError(String.valueOf(s));
       
   333                 } else if (state.compareAndSet(s, s & ~DELIVERING)) {
       
   334                     break;
       
   335                 }
       
   336             }
       
   337             nextLock.unlock();
       
   338         }
       
   339     }
       
   340 
       
   341     /**
       
   342      * Cancels the subscription by signalling {@code onError} to the subscriber.
       
   343      *
       
   344      * <p> Will not signal {@code onError} if the subscription has been
       
   345      * cancelled already.
       
   346      *
       
   347      * <p> This method may be called at any time.
       
   348      *
       
   349      * @param error
       
   350      *         the error to signal
       
   351      *
       
   352      * @throws NullPointerException
       
   353      *         if {@code error} is {@code null}
       
   354      */
       
   355     public void signalError(Throwable error) {
       
   356         terminateNow(Optional.of(error));
       
   357     }
       
   358 
       
   359     /**
       
   360      * Cancels the subscription by signalling {@code onComplete} to the
       
   361      * subscriber.
       
   362      *
       
   363      * <p> Will not signal {@code onComplete} if the subscription has been
       
   364      * cancelled already.
       
   365      *
       
   366      * <p> This method may be called at any time.
       
   367      */
       
   368     public void signalComplete() {
       
   369         terminateNow(Optional.empty());
       
   370     }
       
   371 
       
   372     /**
       
   373      * Must be called first and at most once.
       
   374      */
       
   375     private void signalSubscribe() {
       
   376         assert subscribed;
       
   377         try {
       
   378             subscriber.onSubscribe(temporarySubscription);
       
   379         } catch (Throwable t) {
       
   380             cancelNow();
       
   381             throw t;
       
   382         }
       
   383         while (true) {
       
   384             int s = state.get();
       
   385             if ((s & (PENDING | TERMINATE)) == (PENDING | TERMINATE)) {
       
   386                 if (state.compareAndSet(
       
   387                         s, CANCELLED | (s & ~(PENDING | TERMINATE)))) {
       
   388                     Optional<Throwable> t = terminationType.get();
       
   389                     dispatchTerminationAndUnsubscribe(t);
       
   390                     return;
       
   391                 }
       
   392             } else if ((s & PENDING) == PENDING) {
       
   393                 if (state.compareAndSet(s, ACTIVE | (s & ~PENDING))) {
       
   394                     synchronized (lock) {
       
   395                         if (feedback != null) {
       
   396                             temporarySubscription
       
   397                                     .replaceWith(new PermanentSubscription());
       
   398                         }
       
   399                     }
       
   400                     return;
       
   401                 }
       
   402             } else { // It should not be in any other state
       
   403                 throw new InternalError(String.valueOf(s));
       
   404             }
       
   405         }
       
   406     }
       
   407 
       
   408     private void unsubscribe() {
       
   409         subscriber = null;
       
   410     }
       
   411 
       
   412     private final static class NopSubscription implements Subscription {
       
   413 
       
   414         @Override
       
   415         public void request(long n) { }
       
   416         @Override
       
   417         public void cancel() { }
       
   418     }
       
   419 
       
   420     private final class PermanentSubscription implements Subscription {
       
   421 
       
   422         @Override
       
   423         public void request(long n) {
       
   424             if (n <= 0) {
       
   425                 signalError(new IllegalArgumentException(
       
   426                         "non-positive subscription request"));
       
   427             } else {
       
   428                 demand.increase(n);
       
   429                 feedback.request(n);
       
   430             }
       
   431         }
       
   432 
       
   433         @Override
       
   434         public void cancel() {
       
   435             if (cancelNow()) {
       
   436                 unsubscribe();
       
   437                 // feedback.cancel() is called at most once
       
   438                 // (let's not assume idempotency)
       
   439                 feedback.cancel();
       
   440             }
       
   441         }
       
   442     }
       
   443 
       
   444     /**
       
   445      * Cancels the subscription unless it has been cancelled already.
       
   446      *
       
   447      * @return {@code true} iff the subscription has been cancelled as a result
       
   448      *         of this call
       
   449      */
       
   450     private boolean cancelNow() {
       
   451         while (true) {
       
   452             int s = state.get();
       
   453             if ((s & CANCELLED) == CANCELLED) {
       
   454                 return false;
       
   455             } else if ((s & (ACTIVE | PENDING)) != 0) {
       
   456                 // ACTIVE or PENDING
       
   457                 if (state.compareAndSet(
       
   458                         s, CANCELLED | (s & ~(ACTIVE | PENDING)))) {
       
   459                     unsubscribe();
       
   460                     return true;
       
   461                 }
       
   462             } else {
       
   463                 throw new InternalError(String.valueOf(s));
       
   464             }
       
   465         }
       
   466     }
       
   467 
       
   468     /**
       
   469      * Terminates this subscription unless is has been cancelled already.
       
   470      *
       
   471      * @param t the type of termination
       
   472      */
       
   473     private void terminateNow(Optional<Throwable> t) {
       
   474         // Termination condition must be set only after the termination
       
   475         // type has been set (those will be read in the opposite order)
       
   476         if (!terminationType.compareAndSet(null, t)) {
       
   477             return;
       
   478         }
       
   479         while (true) {
       
   480             int s = state.get();
       
   481             if ((s & CANCELLED) == CANCELLED) {
       
   482                 return;
       
   483             } else if ((s & (PENDING | DELIVERING)) != 0) {
       
   484                 // PENDING or DELIVERING (which implies ACTIVE)
       
   485                 if (state.compareAndSet(s, s | TERMINATE)) {
       
   486                     return;
       
   487                 }
       
   488             } else if ((s & ACTIVE) == ACTIVE) {
       
   489                 if (state.compareAndSet(s, CANCELLED | (s & ~ACTIVE))) {
       
   490                     dispatchTerminationAndUnsubscribe(t);
       
   491                     return;
       
   492                 }
       
   493             } else {
       
   494                 throw new InternalError(String.valueOf(s));
       
   495             }
       
   496         }
       
   497     }
       
   498 
       
   499     private void dispatchTerminationAndUnsubscribe(Optional<Throwable> t) {
       
   500         try {
       
   501             t.ifPresentOrElse(subscriber::onError, subscriber::onComplete);
       
   502         } finally {
       
   503             unsubscribe();
       
   504         }
       
   505     }
       
   506 }