src/java.net.http/share/classes/java/net/http/internal/common/SubscriberWrapper.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56091 aedd6133e7a0
child 56093 22d94c4a3641
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
     1 /*
       
     2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package java.net.http.internal.common;
       
    27 
       
    28 import java.io.Closeable;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.nio.ByteBuffer;
       
    31 import java.util.List;
       
    32 import java.util.Objects;
       
    33 import java.util.concurrent.CompletableFuture;
       
    34 import java.util.concurrent.ConcurrentLinkedQueue;
       
    35 import java.util.concurrent.Flow;
       
    36 import java.util.concurrent.Flow.Subscriber;
       
    37 import java.util.concurrent.atomic.AtomicLong;
       
    38 import java.util.concurrent.atomic.AtomicReference;
       
    39 
       
    40 /**
       
    41  * A wrapper for a Flow.Subscriber. This wrapper delivers data to the wrapped
       
    42  * Subscriber which is supplied to the constructor. This class takes care of
       
    43  * downstream flow control automatically and upstream flow control automatically
       
    44  * by default.
       
    45  * <p>
       
    46  * Processing is done by implementing the {@link #incoming(List, boolean)} method
       
    47  * which supplies buffers from upstream. This method (or any other method)
       
    48  * can then call the outgoing() method to deliver processed buffers downstream.
       
    49  * <p>
       
    50  * Upstream error signals are delivered downstream directly. Cancellation from
       
    51  * downstream is also propagated upstream immediately.
       
    52  * <p>
       
    53  * Each SubscriberWrapper has a {@link java.util.concurrent.CompletableFuture}{@code <Void>}
       
    54  * which propagates completion/errors from downstream to upstream. Normal completion
       
    55  * can only occur after onComplete() is called, but errors can be propagated upwards
       
    56  * at any time.
       
    57  */
       
    58 public abstract class SubscriberWrapper
       
    59     implements FlowTube.TubeSubscriber, Closeable, Flow.Processor<List<ByteBuffer>,List<ByteBuffer>>
       
    60                 // TODO: SSLTube Subscriber will never change? Does this really need to be a TS?
       
    61 {
       
    62     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    63     final System.Logger logger =
       
    64             Utils.getDebugLogger(this::dbgString, DEBUG);
       
    65 
       
    66     public enum SchedulingAction { CONTINUE, RETURN, RESCHEDULE }
       
    67 
       
    68     volatile Flow.Subscription upstreamSubscription;
       
    69     final SubscriptionBase downstreamSubscription;
       
    70     volatile boolean upstreamCompleted;
       
    71     volatile boolean downstreamCompleted;
       
    72     volatile boolean completionAcknowledged;
       
    73     private volatile Subscriber<? super List<ByteBuffer>> downstreamSubscriber;
       
    74     // processed byte to send to the downstream subscriber.
       
    75     private final ConcurrentLinkedQueue<List<ByteBuffer>> outputQ;
       
    76     private final CompletableFuture<Void> cf;
       
    77     private final SequentialScheduler pushScheduler;
       
    78     private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
    79     final AtomicLong upstreamWindow = new AtomicLong(0);
       
    80 
       
    81     /**
       
    82      * Wraps the given downstream subscriber. For each call to {@link
       
    83      * #onNext(List<ByteBuffer>) } the given filter function is invoked
       
    84      * and the list (if not empty) returned is passed downstream.
       
    85      *
       
    86      * A {@code CompletableFuture} is supplied which can be used to signal an
       
    87      * error from downstream and which terminates the wrapper or which signals
       
    88      * completion of downstream activity which can be propagated upstream. Error
       
    89      * completion can be signaled at any time, but normal completion must not be
       
    90      * signaled before onComplete() is called.
       
    91      */
       
    92     public SubscriberWrapper()
       
    93     {
       
    94         this.outputQ = new ConcurrentLinkedQueue<>();
       
    95         this.cf = new MinimalFuture<>();
       
    96         this.pushScheduler =
       
    97                 SequentialScheduler.synchronizedScheduler(new DownstreamPusher());
       
    98         this.downstreamSubscription = new SubscriptionBase(pushScheduler,
       
    99                                                            this::downstreamCompletion);
       
   100     }
       
   101 
       
   102     @Override
       
   103     public final void subscribe(Subscriber<?  super List<ByteBuffer>> downstreamSubscriber) {
       
   104         Objects.requireNonNull(downstreamSubscriber);
       
   105         this.downstreamSubscriber = downstreamSubscriber;
       
   106     }
       
   107 
       
   108     /**
       
   109      * Wraps the given downstream wrapper in this. For each call to
       
   110      * {@link #onNext(List<ByteBuffer>) } the incoming() method is called.
       
   111      *
       
   112      * The {@code downstreamCF} from the downstream wrapper is linked to this
       
   113      * wrappers notifier.
       
   114      *
       
   115      * @param downstreamWrapper downstream destination
       
   116      */
       
   117     public SubscriberWrapper(Subscriber<? super List<ByteBuffer>> downstreamWrapper)
       
   118     {
       
   119         this();
       
   120         subscribe(downstreamWrapper);
       
   121     }
       
   122 
       
   123     /**
       
   124      * Delivers data to be processed by this wrapper. Generated data to be sent
       
   125      * downstream, must be provided to the {@link #outgoing(List, boolean)}}
       
   126      * method.
       
   127      *
       
   128      * @param buffers a List of ByteBuffers.
       
   129      * @param complete if true then no more data will be added to the list
       
   130      */
       
   131     protected abstract void incoming(List<ByteBuffer> buffers, boolean complete);
       
   132 
       
   133     /**
       
   134      * This method is called to determine the window size to use at any time. The
       
   135      * current window is supplied together with the current downstream queue size.
       
   136      * {@code 0} should be returned if no change is
       
   137      * required or a positive integer which will be added to the current window.
       
   138      * The default implementation maintains a downstream queue size of no greater
       
   139      * than 5. The method can be overridden if required.
       
   140      *
       
   141      * @param currentWindow the current upstream subscription window
       
   142      * @param downstreamQsize the current number of buffers waiting to be sent
       
   143      *                        downstream
       
   144      *
       
   145      * @return value to add to currentWindow
       
   146      */
       
   147     protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
       
   148         if (downstreamQsize > 5) {
       
   149             return 0;
       
   150         }
       
   151 
       
   152         if (currentWindow == 0) {
       
   153             return 1;
       
   154         } else {
       
   155             return 0;
       
   156         }
       
   157     }
       
   158 
       
   159     /**
       
   160      * Override this if anything needs to be done after the upstream subscriber
       
   161      * has subscribed
       
   162      */
       
   163     protected void onSubscribe() {
       
   164     }
       
   165 
       
   166     /**
       
   167      * Override this if anything needs to be done before checking for error
       
   168      * and processing the input queue.
       
   169      * @return
       
   170      */
       
   171     protected SchedulingAction enterScheduling() {
       
   172         return SchedulingAction.CONTINUE;
       
   173     }
       
   174 
       
   175     protected boolean signalScheduling() {
       
   176         if (downstreamCompleted || pushScheduler.isStopped()) {
       
   177             return false;
       
   178         }
       
   179         pushScheduler.runOrSchedule();
       
   180         return true;
       
   181     }
       
   182 
       
   183     /**
       
   184      * Delivers buffers of data downstream. After incoming()
       
   185      * has been called complete == true signifying completion of the upstream
       
   186      * subscription, data may continue to be delivered, up to when outgoing() is
       
   187      * called complete == true, after which, the downstream subscription is
       
   188      * completed.
       
   189      *
       
   190      * It's an error to call outgoing() with complete = true if incoming() has
       
   191      * not previously been called with it.
       
   192      */
       
   193     public void outgoing(ByteBuffer buffer, boolean complete) {
       
   194         Objects.requireNonNull(buffer);
       
   195         assert !complete || !buffer.hasRemaining();
       
   196         outgoing(List.of(buffer), complete);
       
   197     }
       
   198 
       
   199     /**
       
   200      * Sometime it might be necessary to complete the downstream subscriber
       
   201      * before the upstream completes. For instance, when an SSL server
       
   202      * sends a notify_close. In that case we should let the outgoing
       
   203      * complete before upstream us completed.
       
   204      * @return true, may be overridden by subclasses.
       
   205      */
       
   206     public boolean closing() {
       
   207         return false;
       
   208     }
       
   209 
       
   210     public void outgoing(List<ByteBuffer> buffers, boolean complete) {
       
   211         Objects.requireNonNull(buffers);
       
   212         if (complete) {
       
   213             assert Utils.remaining(buffers) == 0;
       
   214             boolean closing = closing();
       
   215             logger.log(Level.DEBUG,
       
   216                     "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s",
       
   217                     upstreamCompleted, downstreamCompleted, closing);
       
   218             if (!upstreamCompleted && !closing)
       
   219                 throw new IllegalStateException("upstream not completed");
       
   220             completionAcknowledged = true;
       
   221         } else {
       
   222             logger.log(Level.DEBUG, () -> "Adding "
       
   223                                    + Utils.remaining(buffers)
       
   224                                    + " to outputQ queue");
       
   225             outputQ.add(buffers);
       
   226         }
       
   227         logger.log(Level.DEBUG, () -> "pushScheduler "
       
   228                    + (pushScheduler.isStopped() ? " is stopped!" : " is alive"));
       
   229         pushScheduler.runOrSchedule();
       
   230     }
       
   231 
       
   232     /**
       
   233      * Returns a CompletableFuture which completes when this wrapper completes.
       
   234      * Normal completion happens with the following steps (in order):
       
   235      *   1. onComplete() is called
       
   236      *   2. incoming() called with complete = true
       
   237      *   3. outgoing() may continue to be called normally
       
   238      *   4. outgoing called with complete = true
       
   239      *   5. downstream subscriber is called onComplete()
       
   240      *
       
   241      * If the subscription is canceled or onComplete() is invoked the
       
   242      * CompletableFuture completes exceptionally. Exceptional completion
       
   243      * also occurs if downstreamCF completes exceptionally.
       
   244      */
       
   245     public CompletableFuture<Void> completion() {
       
   246         return cf;
       
   247     }
       
   248 
       
   249     /**
       
   250      * Invoked whenever it 'may' be possible to push buffers downstream.
       
   251      */
       
   252     class DownstreamPusher implements Runnable {
       
   253         @Override
       
   254         public void run() {
       
   255             try {
       
   256                 run1();
       
   257             } catch (Throwable t) {
       
   258                 errorCommon(t);
       
   259             }
       
   260         }
       
   261 
       
   262         private void run1() {
       
   263             if (downstreamCompleted) {
       
   264                 logger.log(Level.DEBUG, "DownstreamPusher: downstream is already completed");
       
   265                 return;
       
   266             }
       
   267             switch (enterScheduling()) {
       
   268                 case CONTINUE: break;
       
   269                 case RESCHEDULE: pushScheduler.runOrSchedule(); return;
       
   270                 case RETURN: return;
       
   271                 default:
       
   272                     errorRef.compareAndSet(null,
       
   273                             new InternalError("unknown scheduling command"));
       
   274                     break;
       
   275             }
       
   276             // If there was an error, send it downstream.
       
   277             Throwable error = errorRef.get();
       
   278             if (error != null) {
       
   279                 synchronized(this) {
       
   280                     if (downstreamCompleted) return;
       
   281                     downstreamCompleted = true;
       
   282                 }
       
   283                 logger.log(Level.DEBUG,
       
   284                         () -> "DownstreamPusher: forwarding error downstream: " + error);
       
   285                 pushScheduler.stop();
       
   286                 outputQ.clear();
       
   287                 downstreamSubscriber.onError(error);
       
   288                 return;
       
   289             }
       
   290 
       
   291             // OK - no error, let's proceed
       
   292             if (!outputQ.isEmpty()) {
       
   293                 logger.log(Level.DEBUG,
       
   294                     "DownstreamPusher: queue not empty, downstreamSubscription: %s",
       
   295                      downstreamSubscription);
       
   296             } else {
       
   297                 logger.log(Level.DEBUG,
       
   298                        "DownstreamPusher: queue empty, downstreamSubscription: %s",
       
   299                        downstreamSubscription);
       
   300             }
       
   301 
       
   302             final boolean dbgOn = logger.isLoggable(Level.DEBUG);
       
   303             while (!outputQ.isEmpty() && downstreamSubscription.tryDecrement()) {
       
   304                 List<ByteBuffer> b = outputQ.poll();
       
   305                 if (dbgOn) logger.log(Level.DEBUG,
       
   306                                             "DownstreamPusher: Pushing "
       
   307                                             + Utils.remaining(b)
       
   308                                             + " bytes downstream");
       
   309                 downstreamSubscriber.onNext(b);
       
   310             }
       
   311             upstreamWindowUpdate();
       
   312             checkCompletion();
       
   313         }
       
   314     }
       
   315 
       
   316     void upstreamWindowUpdate() {
       
   317         long downstreamQueueSize = outputQ.size();
       
   318         long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize);
       
   319         if (n > 0)
       
   320             upstreamRequest(n);
       
   321     }
       
   322 
       
   323     @Override
       
   324     public void onSubscribe(Flow.Subscription subscription) {
       
   325         if (upstreamSubscription != null) {
       
   326             throw new IllegalStateException("Single shot publisher");
       
   327         }
       
   328         this.upstreamSubscription = subscription;
       
   329         upstreamRequest(upstreamWindowUpdate(0, 0));
       
   330         logger.log(Level.DEBUG,
       
   331                "calling downstreamSubscriber::onSubscribe on %s",
       
   332                downstreamSubscriber);
       
   333         downstreamSubscriber.onSubscribe(downstreamSubscription);
       
   334         onSubscribe();
       
   335     }
       
   336 
       
   337     @Override
       
   338     public void onNext(List<ByteBuffer> item) {
       
   339         logger.log(Level.DEBUG, "onNext");
       
   340         long prev = upstreamWindow.getAndDecrement();
       
   341         if (prev <= 0)
       
   342             throw new IllegalStateException("invalid onNext call");
       
   343         incomingCaller(item, false);
       
   344         upstreamWindowUpdate();
       
   345     }
       
   346 
       
   347     private void upstreamRequest(long n) {
       
   348         logger.log(Level.DEBUG, "requesting %d", n);
       
   349         upstreamWindow.getAndAdd(n);
       
   350         upstreamSubscription.request(n);
       
   351     }
       
   352 
       
   353     protected void requestMore() {
       
   354         if (upstreamWindow.get() == 0) {
       
   355             upstreamRequest(1);
       
   356         }
       
   357     }
       
   358 
       
   359     public long upstreamWindow() {
       
   360         return upstreamWindow.get();
       
   361     }
       
   362 
       
   363     @Override
       
   364     public void onError(Throwable throwable) {
       
   365         logger.log(Level.DEBUG, () -> "onError: " + throwable);
       
   366         errorCommon(Objects.requireNonNull(throwable));
       
   367     }
       
   368 
       
   369     protected boolean errorCommon(Throwable throwable) {
       
   370         assert throwable != null ||
       
   371                 (throwable = new AssertionError("null throwable")) != null;
       
   372         if (errorRef.compareAndSet(null, throwable)) {
       
   373             logger.log(Level.DEBUG, "error", throwable);
       
   374             pushScheduler.runOrSchedule();
       
   375             upstreamCompleted = true;
       
   376             cf.completeExceptionally(throwable);
       
   377             return true;
       
   378         }
       
   379         return false;
       
   380     }
       
   381 
       
   382     @Override
       
   383     public void close() {
       
   384         errorCommon(new RuntimeException("wrapper closed"));
       
   385     }
       
   386 
       
   387     private void incomingCaller(List<ByteBuffer> l, boolean complete) {
       
   388         try {
       
   389             incoming(l, complete);
       
   390         } catch(Throwable t) {
       
   391             errorCommon(t);
       
   392         }
       
   393     }
       
   394 
       
   395     @Override
       
   396     public void onComplete() {
       
   397         logger.log(Level.DEBUG, () -> "upstream completed: " + toString());
       
   398         upstreamCompleted = true;
       
   399         incomingCaller(Utils.EMPTY_BB_LIST, true);
       
   400         // pushScheduler will call checkCompletion()
       
   401         pushScheduler.runOrSchedule();
       
   402     }
       
   403 
       
   404     /** Adds the given data to the input queue. */
       
   405     public void addData(ByteBuffer l) {
       
   406         if (upstreamSubscription == null) {
       
   407             throw new IllegalStateException("can't add data before upstream subscriber subscribes");
       
   408         }
       
   409         incomingCaller(List.of(l), false);
       
   410     }
       
   411 
       
   412     void checkCompletion() {
       
   413         if (downstreamCompleted || !upstreamCompleted) {
       
   414             return;
       
   415         }
       
   416         if (!outputQ.isEmpty()) {
       
   417             return;
       
   418         }
       
   419         if (errorRef.get() != null) {
       
   420             pushScheduler.runOrSchedule();
       
   421             return;
       
   422         }
       
   423         if (completionAcknowledged) {
       
   424             logger.log(Level.DEBUG, "calling downstreamSubscriber.onComplete()");
       
   425             downstreamSubscriber.onComplete();
       
   426             // Fix me subscriber.onComplete.run();
       
   427             downstreamCompleted = true;
       
   428             cf.complete(null);
       
   429         }
       
   430     }
       
   431 
       
   432     // called from the downstream Subscription.cancel()
       
   433     void downstreamCompletion() {
       
   434         upstreamSubscription.cancel();
       
   435         cf.complete(null);
       
   436     }
       
   437 
       
   438     public void resetDownstreamDemand() {
       
   439         downstreamSubscription.demand.reset();
       
   440     }
       
   441 
       
   442     @Override
       
   443     public String toString() {
       
   444         StringBuilder sb = new StringBuilder();
       
   445         sb.append("SubscriberWrapper:")
       
   446           .append(" upstreamCompleted: ").append(Boolean.toString(upstreamCompleted))
       
   447           .append(" upstreamWindow: ").append(upstreamWindow.toString())
       
   448           .append(" downstreamCompleted: ").append(Boolean.toString(downstreamCompleted))
       
   449           .append(" completionAcknowledged: ").append(Boolean.toString(completionAcknowledged))
       
   450           .append(" outputQ size: ").append(Integer.toString(outputQ.size()))
       
   451           //.append(" outputQ: ").append(outputQ.toString())
       
   452           .append(" cf: ").append(cf.toString())
       
   453           .append(" downstreamSubscription: ").append(downstreamSubscription.toString());
       
   454 
       
   455         return sb.toString();
       
   456     }
       
   457 
       
   458     public String dbgString() {
       
   459         return "SubscriberWrapper";
       
   460     }
       
   461 }