src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 42208b2f224e
child 56165 8a6065d830b9
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
       
     1 /*
       
     2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.internal.net.http;
       
    27 
       
    28 import java.io.IOException;
       
    29 import java.lang.System.Logger.Level;
       
    30 import java.net.InetSocketAddress;
       
    31 import java.net.http.HttpResponse.BodyHandler;
       
    32 import java.net.http.HttpResponse.BodySubscriber;
       
    33 import java.nio.ByteBuffer;
       
    34 import java.util.Objects;
       
    35 import java.util.concurrent.CompletableFuture;
       
    36 import java.util.LinkedList;
       
    37 import java.util.List;
       
    38 import java.util.concurrent.ConcurrentLinkedDeque;
       
    39 import java.util.concurrent.Executor;
       
    40 import java.util.concurrent.Flow;
       
    41 import jdk.internal.net.http.common.Demand;
       
    42 import jdk.internal.net.http.common.Log;
       
    43 import jdk.internal.net.http.common.FlowTube;
       
    44 import jdk.internal.net.http.common.SequentialScheduler;
       
    45 import jdk.internal.net.http.common.MinimalFuture;
       
    46 import jdk.internal.net.http.common.Utils;
       
    47 import static java.net.http.HttpClient.Version.HTTP_1_1;
       
    48 
       
    49 /**
       
    50  * Encapsulates one HTTP/1.1 request/response exchange.
       
    51  */
       
    52 class Http1Exchange<T> extends ExchangeImpl<T> {
       
    53 
       
    54     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    55     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    56     private static final System.Logger DEBUG_LOGGER =
       
    57             Utils.getDebugLogger("Http1Exchange"::toString, DEBUG);
       
    58 
       
    59     final HttpRequestImpl request; // main request
       
    60     final Http1Request requestAction;
       
    61     private volatile Http1Response<T> response;
       
    62     final HttpConnection connection;
       
    63     final HttpClientImpl client;
       
    64     final Executor executor;
       
    65     private final Http1AsyncReceiver asyncReceiver;
       
    66 
       
    67     /** Records a possible cancellation raised before any operation
       
    68      * has been initiated, or an error received while sending the request. */
       
    69     private Throwable failed;
       
    70     private final List<CompletableFuture<?>> operations; // used for cancel
       
    71 
       
    72     /** Must be held when operating on any internal state or data. */
       
    73     private final Object lock = new Object();
       
    74 
       
    75     /** Holds the outgoing data, either the headers or a request body part. Or
       
    76      * an error from the request body publisher. At most there can be ~2 pieces
       
    77      * of outgoing data ( onComplete|onError can be invoked without demand ).*/
       
    78     final ConcurrentLinkedDeque<DataPair> outgoing = new ConcurrentLinkedDeque<>();
       
    79 
       
    80     /** The write publisher, responsible for writing the complete request ( both
       
    81      * headers and body ( if any ). */
       
    82     private final Http1Publisher writePublisher = new Http1Publisher();
       
    83 
       
    84     /** Completed when the header have been published, or there is an error */
       
    85     private final CompletableFuture<ExchangeImpl<T>> headersSentCF  = new MinimalFuture<>();
       
    86      /** Completed when the body has been published, or there is an error */
       
    87     private final CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();
       
    88 
       
    89     /** The subscriber to the request's body published. Maybe null. */
       
    90     private volatile Http1BodySubscriber bodySubscriber;
       
    91 
       
    92     enum State { INITIAL,
       
    93                  HEADERS,
       
    94                  BODY,
       
    95                  ERROR,          // terminal state
       
    96                  COMPLETING,
       
    97                  COMPLETED }     // terminal state
       
    98 
       
    99     private State state = State.INITIAL;
       
   100 
       
   101     /** A carrier for either data or an error. Used to carry data, and communicate
       
   102      * errors from the request ( both headers and body ) to the exchange. */
       
   103     static class DataPair {
       
   104         Throwable throwable;
       
   105         List<ByteBuffer> data;
       
   106         DataPair(List<ByteBuffer> data, Throwable throwable){
       
   107             this.data = data;
       
   108             this.throwable = throwable;
       
   109         }
       
   110         @Override
       
   111         public String toString() {
       
   112             return "DataPair [data=" + data + ", throwable=" + throwable + "]";
       
   113         }
       
   114     }
       
   115 
       
   116     /** An abstract supertype for HTTP/1.1 body subscribers. There are two
       
   117      * concrete implementations: {@link Http1Request.StreamSubscriber}, and
       
   118      * {@link Http1Request.FixedContentSubscriber}, for receiving chunked and
       
   119      * fixed length bodies, respectively. */
       
   120     static abstract class Http1BodySubscriber implements Flow.Subscriber<ByteBuffer> {
       
   121         protected volatile Flow.Subscription subscription;
       
   122         protected volatile boolean complete;
       
   123 
       
   124         /** Final sentinel in the stream of request body. */
       
   125         static final List<ByteBuffer> COMPLETED = List.of(ByteBuffer.allocate(0));
       
   126 
       
   127         void request(long n) {
       
   128             DEBUG_LOGGER.log(Level.DEBUG, () ->
       
   129                 "Http1BodySubscriber requesting " + n + ", from " + subscription);
       
   130             subscription.request(n);
       
   131         }
       
   132 
       
   133         static Http1BodySubscriber completeSubscriber() {
       
   134             return new Http1BodySubscriber() {
       
   135                 @Override public void onSubscribe(Flow.Subscription subscription) { error(); }
       
   136                 @Override public void onNext(ByteBuffer item) { error(); }
       
   137                 @Override public void onError(Throwable throwable) { error(); }
       
   138                 @Override public void onComplete() { error(); }
       
   139                 private void error() {
       
   140                     throw new InternalError("should not reach here");
       
   141                 }
       
   142             };
       
   143         }
       
   144     }
       
   145 
       
   146     @Override
       
   147     public String toString() {
       
   148         return "HTTP/1.1 " + request.toString();
       
   149     }
       
   150 
       
   151     HttpRequestImpl request() {
       
   152         return request;
       
   153     }
       
   154 
       
   155     Http1Exchange(Exchange<T> exchange, HttpConnection connection)
       
   156         throws IOException
       
   157     {
       
   158         super(exchange);
       
   159         this.request = exchange.request();
       
   160         this.client = exchange.client();
       
   161         this.executor = exchange.executor();
       
   162         this.operations = new LinkedList<>();
       
   163         operations.add(headersSentCF);
       
   164         operations.add(bodySentCF);
       
   165         if (connection != null) {
       
   166             this.connection = connection;
       
   167         } else {
       
   168             InetSocketAddress addr = request.getAddress();
       
   169             this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
       
   170         }
       
   171         this.requestAction = new Http1Request(request, this);
       
   172         this.asyncReceiver = new Http1AsyncReceiver(executor, this);
       
   173         asyncReceiver.subscribe(new InitialErrorReceiver());
       
   174     }
       
   175 
       
   176     /** An initial receiver that handles no data, but cancels the request if
       
   177      * it receives an error. Will be replaced when reading response body. */
       
   178     final class InitialErrorReceiver implements Http1AsyncReceiver.Http1AsyncDelegate {
       
   179         volatile AbstractSubscription s;
       
   180         @Override
       
   181         public boolean tryAsyncReceive(ByteBuffer ref) {
       
   182             return false;  // no data has been processed, leave it in the queue
       
   183         }
       
   184 
       
   185         @Override
       
   186         public void onReadError(Throwable ex) {
       
   187             cancelImpl(ex);
       
   188         }
       
   189 
       
   190         @Override
       
   191         public void onSubscribe(AbstractSubscription s) {
       
   192             this.s = s;
       
   193         }
       
   194 
       
   195         public AbstractSubscription subscription() {
       
   196             return s;
       
   197         }
       
   198     }
       
   199 
       
   200     @Override
       
   201     HttpConnection connection() {
       
   202         return connection;
       
   203     }
       
   204 
       
   205     private void connectFlows(HttpConnection connection) {
       
   206         FlowTube tube =  connection.getConnectionFlow();
       
   207         debug.log(Level.DEBUG, "%s connecting flows", tube);
       
   208 
       
   209         // Connect the flow to our Http1TubeSubscriber:
       
   210         //   asyncReceiver.subscriber().
       
   211         tube.connectFlows(writePublisher,
       
   212                           asyncReceiver.subscriber());
       
   213     }
       
   214 
       
   215     @Override
       
   216     CompletableFuture<ExchangeImpl<T>> sendHeadersAsync() {
       
   217         // create the response before sending the request headers, so that
       
   218         // the response can set the appropriate receivers.
       
   219         debug.log(Level.DEBUG, "Sending headers only");
       
   220         if (response == null) {
       
   221             response = new Http1Response<>(connection, this, asyncReceiver);
       
   222         }
       
   223 
       
   224         debug.log(Level.DEBUG, "response created in advance");
       
   225         // If the first attempt to read something triggers EOF, or
       
   226         // IOException("channel reset by peer"), we're going to retry.
       
   227         // Instruct the asyncReceiver to throw ConnectionExpiredException
       
   228         // to force a retry.
       
   229         asyncReceiver.setRetryOnError(true);
       
   230 
       
   231         CompletableFuture<Void> connectCF;
       
   232         if (!connection.connected()) {
       
   233             debug.log(Level.DEBUG, "initiating connect async");
       
   234             connectCF = connection.connectAsync();
       
   235             synchronized (lock) {
       
   236                 operations.add(connectCF);
       
   237             }
       
   238         } else {
       
   239             connectCF = new MinimalFuture<>();
       
   240             connectCF.complete(null);
       
   241         }
       
   242 
       
   243         return connectCF
       
   244                 .thenCompose(unused -> {
       
   245                     CompletableFuture<Void> cf = new MinimalFuture<>();
       
   246                     try {
       
   247                         connectFlows(connection);
       
   248 
       
   249                         debug.log(Level.DEBUG, "requestAction.headers");
       
   250                         List<ByteBuffer> data = requestAction.headers();
       
   251                         synchronized (lock) {
       
   252                             state = State.HEADERS;
       
   253                         }
       
   254                         debug.log(Level.DEBUG, "setting outgoing with headers");
       
   255                         assert outgoing.isEmpty() : "Unexpected outgoing:" + outgoing;
       
   256                         appendToOutgoing(data);
       
   257                         cf.complete(null);
       
   258                         return cf;
       
   259                     } catch (Throwable t) {
       
   260                         debug.log(Level.DEBUG, "Failed to send headers: %s", t);
       
   261                         connection.close();
       
   262                         cf.completeExceptionally(t);
       
   263                         return cf;
       
   264                     } })
       
   265                 .thenCompose(unused -> headersSentCF);
       
   266     }
       
   267 
       
   268     @Override
       
   269     CompletableFuture<ExchangeImpl<T>> sendBodyAsync() {
       
   270         assert headersSentCF.isDone();
       
   271         try {
       
   272             bodySubscriber = requestAction.continueRequest();
       
   273             if (bodySubscriber == null) {
       
   274                 bodySubscriber = Http1BodySubscriber.completeSubscriber();
       
   275                 appendToOutgoing(Http1BodySubscriber.COMPLETED);
       
   276             } else {
       
   277                 bodySubscriber.request(1);  // start
       
   278             }
       
   279         } catch (Throwable t) {
       
   280             connection.close();
       
   281             bodySentCF.completeExceptionally(t);
       
   282         }
       
   283         return bodySentCF;
       
   284     }
       
   285 
       
   286     @Override
       
   287     CompletableFuture<Response> getResponseAsync(Executor executor) {
       
   288         CompletableFuture<Response> cf = response.readHeadersAsync(executor);
       
   289         Throwable cause;
       
   290         synchronized (lock) {
       
   291             operations.add(cf);
       
   292             cause = failed;
       
   293             failed = null;
       
   294         }
       
   295 
       
   296         if (cause != null) {
       
   297             Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"
       
   298                             + "\n\tCompleting exceptionally with {2}\n",
       
   299                          request.uri(),
       
   300                          request.timeout().isPresent() ?
       
   301                             // calling duration.toMillis() can throw an exception.
       
   302                             // this is just debugging, we don't care if it overflows.
       
   303                             (request.timeout().get().getSeconds() * 1000
       
   304                              + request.timeout().get().getNano() / 1000000) : -1,
       
   305                          cause);
       
   306             boolean acknowledged = cf.completeExceptionally(cause);
       
   307             debug.log(Level.DEBUG,
       
   308                       () -> acknowledged
       
   309                             ? ("completed response with " + cause)
       
   310                             : ("response already completed, ignoring " + cause));
       
   311         }
       
   312         return cf;
       
   313     }
       
   314 
       
   315     @Override
       
   316     CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
       
   317                                        boolean returnConnectionToPool,
       
   318                                        Executor executor)
       
   319     {
       
   320         BodySubscriber<T> bs = handler.apply(response.responseCode(),
       
   321                                              response.responseHeaders());
       
   322         CompletableFuture<T> bodyCF = response.readBody(bs,
       
   323                                                         returnConnectionToPool,
       
   324                                                         executor);
       
   325         return bodyCF;
       
   326     }
       
   327 
       
   328     @Override
       
   329     CompletableFuture<Void> ignoreBody() {
       
   330         return response.ignoreBody(executor);
       
   331     }
       
   332 
       
   333     ByteBuffer drainLeftOverBytes() {
       
   334         synchronized (lock) {
       
   335             asyncReceiver.stop();
       
   336             return asyncReceiver.drain(Utils.EMPTY_BYTEBUFFER);
       
   337         }
       
   338     }
       
   339 
       
   340     void released() {
       
   341         Http1Response<T> resp = this.response;
       
   342         if (resp != null) resp.completed();
       
   343         asyncReceiver.clear();
       
   344     }
       
   345 
       
   346     void completed() {
       
   347         Http1Response<T> resp = this.response;
       
   348         if (resp != null) resp.completed();
       
   349     }
       
   350 
       
   351     /**
       
   352      * Cancel checks to see if request and responseAsync finished already.
       
   353      * If not it closes the connection and completes all pending operations
       
   354      */
       
   355     @Override
       
   356     void cancel() {
       
   357         cancelImpl(new IOException("Request cancelled"));
       
   358     }
       
   359 
       
   360     /**
       
   361      * Cancel checks to see if request and responseAsync finished already.
       
   362      * If not it closes the connection and completes all pending operations
       
   363      */
       
   364     @Override
       
   365     void cancel(IOException cause) {
       
   366         cancelImpl(cause);
       
   367     }
       
   368 
       
   369     private void cancelImpl(Throwable cause) {
       
   370         LinkedList<CompletableFuture<?>> toComplete = null;
       
   371         int count = 0;
       
   372         synchronized (lock) {
       
   373             if (failed == null)
       
   374                 failed = cause;
       
   375             if (requestAction != null && requestAction.finished()
       
   376                     && response != null && response.finished()) {
       
   377                 return;
       
   378             }
       
   379             connection.close();   // TODO: ensure non-blocking if holding the lock
       
   380             writePublisher.writeScheduler.stop();
       
   381             if (operations.isEmpty()) {
       
   382                 Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."
       
   383                                 + "\n\tCan''t cancel yet with {2}",
       
   384                              request.uri(),
       
   385                              request.timeout().isPresent() ?
       
   386                                 // calling duration.toMillis() can throw an exception.
       
   387                                 // this is just debugging, we don't care if it overflows.
       
   388                                 (request.timeout().get().getSeconds() * 1000
       
   389                                  + request.timeout().get().getNano() / 1000000) : -1,
       
   390                              cause);
       
   391             } else {
       
   392                 for (CompletableFuture<?> cf : operations) {
       
   393                     if (!cf.isDone()) {
       
   394                         if (toComplete == null) toComplete = new LinkedList<>();
       
   395                         toComplete.add(cf);
       
   396                         count++;
       
   397                     }
       
   398                 }
       
   399                 operations.clear();
       
   400             }
       
   401         }
       
   402         Log.logError("Http1Exchange.cancel: count=" + count);
       
   403         if (toComplete != null) {
       
   404             // We might be in the selector thread in case of timeout, when
       
   405             // the SelectorManager calls purgeTimeoutsAndReturnNextDeadline()
       
   406             // There may or may not be other places that reach here
       
   407             // from the SelectorManager thread, so just make sure we
       
   408             // don't complete any CF from within the selector manager
       
   409             // thread.
       
   410             Executor exec = client.isSelectorThread()
       
   411                             ? executor
       
   412                             : this::runInline;
       
   413             while (!toComplete.isEmpty()) {
       
   414                 CompletableFuture<?> cf = toComplete.poll();
       
   415                 exec.execute(() -> {
       
   416                     if (cf.completeExceptionally(cause)) {
       
   417                         debug.log(Level.DEBUG, "completed cf with %s",
       
   418                                  (Object) cause);
       
   419                     }
       
   420                 });
       
   421             }
       
   422         }
       
   423     }
       
   424 
       
   425     private void runInline(Runnable run) {
       
   426         assert !client.isSelectorThread();
       
   427         run.run();
       
   428     }
       
   429 
       
   430     /** Returns true if this exchange was canceled. */
       
   431     boolean isCanceled() {
       
   432         synchronized (lock) {
       
   433             return failed != null;
       
   434         }
       
   435     }
       
   436 
       
   437     /** Returns the cause for which this exchange was canceled, if available. */
       
   438     Throwable getCancelCause() {
       
   439         synchronized (lock) {
       
   440             return failed;
       
   441         }
       
   442     }
       
   443 
       
   444     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just a Throwable. */
       
   445     void appendToOutgoing(Throwable throwable) {
       
   446         appendToOutgoing(new DataPair(null, throwable));
       
   447     }
       
   448 
       
   449     /** Convenience for {@link #appendToOutgoing(DataPair)}, with just data. */
       
   450     void appendToOutgoing(List<ByteBuffer> item) {
       
   451         appendToOutgoing(new DataPair(item, null));
       
   452     }
       
   453 
       
   454     private void appendToOutgoing(DataPair dp) {
       
   455         debug.log(Level.DEBUG, "appending to outgoing " + dp);
       
   456         outgoing.add(dp);
       
   457         writePublisher.writeScheduler.runOrSchedule();
       
   458     }
       
   459 
       
   460     /** Tells whether, or not, there is any outgoing data that can be published,
       
   461      * or if there is an error. */
       
   462     private boolean hasOutgoing() {
       
   463         return !outgoing.isEmpty();
       
   464     }
       
   465 
       
   466     // Invoked only by the publisher
       
   467     // ALL tasks should execute off the Selector-Manager thread
       
   468     /** Returns the next portion of the HTTP request, or the error. */
       
   469     private DataPair getOutgoing() {
       
   470         final Executor exec = client.theExecutor();
       
   471         final DataPair dp = outgoing.pollFirst();
       
   472 
       
   473         if (dp == null)  // publisher has not published anything yet
       
   474             return null;
       
   475 
       
   476         synchronized (lock) {
       
   477             if (dp.throwable != null) {
       
   478                 state = State.ERROR;
       
   479                 exec.execute(() -> {
       
   480                     connection.close();
       
   481                     headersSentCF.completeExceptionally(dp.throwable);
       
   482                     bodySentCF.completeExceptionally(dp.throwable);
       
   483                 });
       
   484                 return dp;
       
   485             }
       
   486 
       
   487             switch (state) {
       
   488                 case HEADERS:
       
   489                     state = State.BODY;
       
   490                     // completeAsync, since dependent tasks should run in another thread
       
   491                     debug.log(Level.DEBUG, "initiating completion of headersSentCF");
       
   492                     headersSentCF.completeAsync(() -> this, exec);
       
   493                     break;
       
   494                 case BODY:
       
   495                     if (dp.data == Http1BodySubscriber.COMPLETED) {
       
   496                         state = State.COMPLETING;
       
   497                         debug.log(Level.DEBUG, "initiating completion of bodySentCF");
       
   498                         bodySentCF.completeAsync(() -> this, exec);
       
   499                     } else {
       
   500                         debug.log(Level.DEBUG, "requesting more body from the subscriber");
       
   501                         exec.execute(() -> bodySubscriber.request(1));
       
   502                     }
       
   503                     break;
       
   504                 case INITIAL:
       
   505                 case ERROR:
       
   506                 case COMPLETING:
       
   507                 case COMPLETED:
       
   508                 default:
       
   509                     assert false : "Unexpected state:" + state;
       
   510             }
       
   511 
       
   512             return dp;
       
   513         }
       
   514     }
       
   515 
       
   516     /** A Publisher of HTTP/1.1 headers and request body. */
       
   517     final class Http1Publisher implements FlowTube.TubePublisher {
       
   518 
       
   519         final System.Logger  debug = Utils.getDebugLogger(this::dbgString);
       
   520         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
       
   521         volatile boolean cancelled;
       
   522         final Http1WriteSubscription subscription = new Http1WriteSubscription();
       
   523         final Demand demand = new Demand();
       
   524         final SequentialScheduler writeScheduler =
       
   525                 SequentialScheduler.synchronizedScheduler(new WriteTask());
       
   526 
       
   527         @Override
       
   528         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
       
   529             assert state == State.INITIAL;
       
   530             Objects.requireNonNull(s);
       
   531             assert subscriber == null;
       
   532 
       
   533             subscriber = s;
       
   534             debug.log(Level.DEBUG, "got subscriber: %s", s);
       
   535             s.onSubscribe(subscription);
       
   536         }
       
   537 
       
   538         volatile String dbgTag;
       
   539         String dbgString() {
       
   540             String tag = dbgTag;
       
   541             Object flow = connection.getConnectionFlow();
       
   542             if (tag == null && flow != null) {
       
   543                 dbgTag = tag = "Http1Publisher(" + flow + ")";
       
   544             } else if (tag == null) {
       
   545                 tag = "Http1Publisher(?)";
       
   546             }
       
   547             return tag;
       
   548         }
       
   549 
       
   550         final class WriteTask implements Runnable {
       
   551             @Override
       
   552             public void run() {
       
   553                 assert state != State.COMPLETED : "Unexpected state:" + state;
       
   554                 debug.log(Level.DEBUG, "WriteTask");
       
   555                 if (subscriber == null) {
       
   556                     debug.log(Level.DEBUG, "no subscriber yet");
       
   557                     return;
       
   558                 }
       
   559                 debug.log(Level.DEBUG, () -> "hasOutgoing = " + hasOutgoing());
       
   560                 while (hasOutgoing() && demand.tryDecrement()) {
       
   561                     DataPair dp = getOutgoing();
       
   562 
       
   563                     if (dp.throwable != null) {
       
   564                         debug.log(Level.DEBUG, "onError");
       
   565                         // Do not call the subscriber's onError, it is not required.
       
   566                         writeScheduler.stop();
       
   567                     } else {
       
   568                         List<ByteBuffer> data = dp.data;
       
   569                         if (data == Http1BodySubscriber.COMPLETED) {
       
   570                             synchronized (lock) {
       
   571                                 assert state == State.COMPLETING : "Unexpected state:" + state;
       
   572                                 state = State.COMPLETED;
       
   573                             }
       
   574                             debug.log(Level.DEBUG,
       
   575                                      "completed, stopping %s", writeScheduler);
       
   576                             writeScheduler.stop();
       
   577                             // Do nothing more. Just do not publish anything further.
       
   578                             // The next Subscriber will eventually take over.
       
   579 
       
   580                         } else {
       
   581                             debug.log(Level.DEBUG, () ->
       
   582                                     "onNext with " + Utils.remaining(data) + " bytes");
       
   583                             subscriber.onNext(data);
       
   584                         }
       
   585                     }
       
   586                 }
       
   587             }
       
   588         }
       
   589 
       
   590         final class Http1WriteSubscription implements Flow.Subscription {
       
   591 
       
   592             @Override
       
   593             public void request(long n) {
       
   594                 if (cancelled)
       
   595                     return;  //no-op
       
   596                 demand.increase(n);
       
   597                 debug.log(Level.DEBUG,
       
   598                         "subscription request(%d), demand=%s", n, demand);
       
   599                 writeScheduler.runOrSchedule(client.theExecutor());
       
   600             }
       
   601 
       
   602             @Override
       
   603             public void cancel() {
       
   604                 debug.log(Level.DEBUG, "subscription cancelled");
       
   605                 if (cancelled)
       
   606                     return;  //no-op
       
   607                 cancelled = true;
       
   608                 writeScheduler.stop();
       
   609             }
       
   610         }
       
   611     }
       
   612 
       
   613     String dbgString() {
       
   614         return "Http1Exchange";
       
   615     }
       
   616 }