src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/Http1AsyncReceiver.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56088 38fac6d0521d
child 56090 5c7fb702948a
equal deleted inserted replaced
56088:38fac6d0521d 56089:42208b2f224e
     1 /*
       
     2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal;
       
    27 
       
    28 import java.io.EOFException;
       
    29 import java.io.IOException;
       
    30 import java.lang.System.Logger.Level;
       
    31 import java.nio.ByteBuffer;
       
    32 import java.util.Arrays;
       
    33 import java.util.HashSet;
       
    34 import java.util.List;
       
    35 import java.util.Set;
       
    36 import java.util.concurrent.ConcurrentLinkedDeque;
       
    37 import java.util.concurrent.Executor;
       
    38 import java.util.concurrent.Flow;
       
    39 import java.util.concurrent.atomic.AtomicBoolean;
       
    40 import java.util.concurrent.atomic.AtomicLong;
       
    41 import java.util.concurrent.atomic.AtomicReference;
       
    42 import java.util.stream.Collectors;
       
    43 import jdk.incubator.http.internal.common.Demand;
       
    44 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
       
    45 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    46 import jdk.incubator.http.internal.common.ConnectionExpiredException;
       
    47 import jdk.incubator.http.internal.common.Utils;
       
    48 
       
    49 
       
    50 /**
       
    51  * A helper class that will queue up incoming data until the receiving
       
    52  * side is ready to handle it.
       
    53  */
       
    54 class Http1AsyncReceiver {
       
    55 
       
    56     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
       
    57     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
       
    58 
       
    59     /**
       
    60      * A delegate that can asynchronously receive data from an upstream flow,
       
    61      * parse, it, then possibly transform it and either store it (response
       
    62      * headers) or possibly pass it to a downstream subscriber (response body).
       
    63      * Usually, there will be one Http1AsyncDelegate in charge of receiving
       
    64      * and parsing headers, and another one in charge of receiving, parsing,
       
    65      * and forwarding body. Each will sequentially subscribe with the
       
    66      * Http1AsyncReceiver in turn. There may be additional delegates which
       
    67      * subscribe to the Http1AsyncReceiver, mainly for the purpose of handling
       
    68      * errors while the connection is busy transmitting the request body and the
       
    69      * Http1Exchange::readBody method hasn't been called yet, and response
       
    70      * delegates haven't subscribed yet.
       
    71      */
       
    72     static interface Http1AsyncDelegate {
       
    73         /**
       
    74          * Receives and handles a byte buffer reference.
       
    75          * @param ref A byte buffer reference coming from upstream.
       
    76          * @return false, if the byte buffer reference should be kept in the queue.
       
    77          *         Usually, this means that either the byte buffer reference
       
    78          *         was handled and parsing is finished, or that the receiver
       
    79          *         didn't handle the byte reference at all.
       
    80          *         There may or may not be any remaining data in the
       
    81          *         byte buffer, and the byte buffer reference must not have
       
    82          *         been cleared.
       
    83          *         true, if the byte buffer reference was fully read and
       
    84          *         more data can be received.
       
    85          */
       
    86         public boolean tryAsyncReceive(ByteBuffer ref);
       
    87 
       
    88         /**
       
    89          * Called when an exception is raised.
       
    90          * @param ex The raised Throwable.
       
    91          */
       
    92         public void onReadError(Throwable ex);
       
    93 
       
    94         /**
       
    95          * Must be called before any other method on the delegate.
       
    96          * The subscription can be either used directly by the delegate
       
    97          * to request more data (e.g. if the delegate is a header parser),
       
    98          * or can be forwarded to a downstream subscriber (if the delegate
       
    99          * is a body parser that wraps a response BodySubscriber).
       
   100          * In all cases, it is the responsibility of the delegate to ensure
       
   101          * that request(n) and demand.tryDecrement() are called appropriately.
       
   102          * No data will be sent to {@code tryAsyncReceive} unless
       
   103          * the subscription has some demand.
       
   104          *
       
   105          * @param s A subscription that allows the delegate to control the
       
   106          *          data flow.
       
   107          */
       
   108         public void onSubscribe(AbstractSubscription s);
       
   109 
       
   110         /**
       
   111          * Returns the subscription that was passed to {@code onSubscribe}
       
   112          * @return the subscription that was passed to {@code onSubscribe}..
       
   113          */
       
   114         public AbstractSubscription subscription();
       
   115 
       
   116     }
       
   117 
       
   118     /**
       
   119      * A simple subclass of AbstractSubscription that ensures the
       
   120      * SequentialScheduler will be run when request() is called and demand
       
   121      * becomes positive again.
       
   122      */
       
   123     private static final class Http1AsyncDelegateSubscription
       
   124             extends AbstractSubscription
       
   125     {
       
   126         private final Runnable onCancel;
       
   127         private final SequentialScheduler scheduler;
       
   128         Http1AsyncDelegateSubscription(SequentialScheduler scheduler,
       
   129                                        Runnable onCancel) {
       
   130             this.scheduler = scheduler;
       
   131             this.onCancel = onCancel;
       
   132         }
       
   133         @Override
       
   134         public void request(long n) {
       
   135             final Demand demand = demand();
       
   136             if (demand.increase(n)) {
       
   137                 scheduler.runOrSchedule();
       
   138             }
       
   139         }
       
   140         @Override
       
   141         public void cancel() { onCancel.run();}
       
   142     }
       
   143 
       
   144     private final ConcurrentLinkedDeque<ByteBuffer> queue
       
   145             = new ConcurrentLinkedDeque<>();
       
   146     private final SequentialScheduler scheduler =
       
   147             SequentialScheduler.synchronizedScheduler(this::flush);
       
   148     private final Executor executor;
       
   149     private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
       
   150     private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef;
       
   151     private final AtomicLong received = new AtomicLong();
       
   152     final AtomicBoolean canRequestMore = new AtomicBoolean();
       
   153 
       
   154     private volatile Throwable error;
       
   155     private volatile Http1AsyncDelegate delegate;
       
   156     // This reference is only used to prevent early GC of the exchange.
       
   157     private volatile Http1Exchange<?>  owner;
       
   158     // Only used for checking whether we run on the selector manager thread.
       
   159     private final HttpClientImpl client;
       
   160     private boolean retry;
       
   161 
       
   162     public Http1AsyncReceiver(Executor executor, Http1Exchange<?> owner) {
       
   163         this.pendingDelegateRef = new AtomicReference<>();
       
   164         this.executor = executor;
       
   165         this.owner = owner;
       
   166         this.client = owner.client;
       
   167     }
       
   168 
       
   169     // This is the main loop called by the SequentialScheduler.
       
   170     // It attempts to empty the queue until the scheduler is stopped,
       
   171     // or the delegate is unregistered, or the delegate is unable to
       
   172     // process the data (because it's not ready or already done), which
       
   173     // it signals by returning 'true';
       
   174     private void flush() {
       
   175         ByteBuffer buf;
       
   176         try {
       
   177             assert !client.isSelectorThread() :
       
   178                     "Http1AsyncReceiver::flush should not run in the selector: "
       
   179                     + Thread.currentThread().getName();
       
   180 
       
   181             // First check whether we have a pending delegate that has
       
   182             // just subscribed, and if so, create a Subscription for it
       
   183             // and call onSubscribe.
       
   184             handlePendingDelegate();
       
   185 
       
   186             // Then start emptying the queue, if possible.
       
   187             while ((buf = queue.peek()) != null) {
       
   188                 Http1AsyncDelegate delegate = this.delegate;
       
   189                 debug.log(Level.DEBUG, "Got %s bytes for delegate %s",
       
   190                                        buf.remaining(), delegate);
       
   191                 if (!hasDemand(delegate)) {
       
   192                     // The scheduler will be invoked again later when the demand
       
   193                     // becomes positive.
       
   194                     return;
       
   195                 }
       
   196 
       
   197                 assert delegate != null;
       
   198                 debug.log(Level.DEBUG, "Forwarding %s bytes to delegate %s",
       
   199                           buf.remaining(), delegate);
       
   200                 // The delegate has demand: feed it the next buffer.
       
   201                 if (!delegate.tryAsyncReceive(buf)) {
       
   202                     final long remaining = buf.remaining();
       
   203                     debug.log(Level.DEBUG, () -> {
       
   204                         // If the scheduler is stopped, the queue may already
       
   205                         // be empty and the reference may already be released.
       
   206                         String remstr = scheduler.isStopped() ? "" :
       
   207                                 " remaining in ref: "
       
   208                                 + remaining;
       
   209                         remstr =  remstr
       
   210                                 + " total remaining: " + remaining();
       
   211                         return "Delegate done: " + remaining;
       
   212                     });
       
   213                     canRequestMore.set(false);
       
   214                     // The last buffer parsed may have remaining unparsed bytes.
       
   215                     // Don't take it out of the queue.
       
   216                     return; // done.
       
   217                 }
       
   218 
       
   219                 // removed parsed buffer from queue, and continue with next
       
   220                 // if available
       
   221                 ByteBuffer parsed = queue.remove();
       
   222                 canRequestMore.set(queue.isEmpty());
       
   223                 assert parsed == buf;
       
   224             }
       
   225 
       
   226             // queue is empty: let's see if we should request more
       
   227             checkRequestMore();
       
   228 
       
   229         } catch (Throwable t) {
       
   230             Throwable x = error;
       
   231             if (x == null) error = t; // will be handled in the finally block
       
   232             debug.log(Level.DEBUG, "Unexpected error caught in flush()", t);
       
   233         } finally {
       
   234             // Handles any pending error.
       
   235             // The most recently subscribed delegate will get the error.
       
   236             checkForErrors();
       
   237         }
       
   238     }
       
   239 
       
   240     /**
       
   241      * Must be called from within the scheduler main loop.
       
   242      * Handles any pending errors by calling delegate.onReadError().
       
   243      * If the error can be forwarded to the delegate, stops the scheduler.
       
   244      */
       
   245     private void checkForErrors() {
       
   246         // Handles any pending error.
       
   247         // The most recently subscribed delegate will get the error.
       
   248         // If the delegate is null, the error will be handled by the next
       
   249         // delegate that subscribes.
       
   250         // If the queue is not empty, wait until it it is empty before
       
   251         // handling the error.
       
   252         Http1AsyncDelegate delegate = pendingDelegateRef.get();
       
   253         if (delegate == null) delegate = this.delegate;
       
   254         Throwable x = error;
       
   255         if (delegate != null && x != null && queue.isEmpty()) {
       
   256             // forward error only after emptying the queue.
       
   257             final Object captured = delegate;
       
   258             debug.log(Level.DEBUG, () -> "flushing " + x
       
   259                     + "\n\t delegate: " + captured
       
   260                     + "\t\t queue.isEmpty: " + queue.isEmpty());
       
   261             scheduler.stop();
       
   262             delegate.onReadError(x);
       
   263         }
       
   264     }
       
   265 
       
   266     /**
       
   267      * Must be called from within the scheduler main loop.
       
   268      * Figure out whether more data should be requested from the
       
   269      * Http1TubeSubscriber.
       
   270      */
       
   271     private void checkRequestMore() {
       
   272         Http1AsyncDelegate delegate = this.delegate;
       
   273         boolean more = this.canRequestMore.get();
       
   274         boolean hasDemand = hasDemand(delegate);
       
   275         debug.log(Level.DEBUG, () -> "checkRequestMore: "
       
   276                   + "canRequestMore=" + more + ", hasDemand=" + hasDemand
       
   277                   + (delegate == null ? ", delegate=null" : ""));
       
   278         if (hasDemand) {
       
   279             subscriber.requestMore();
       
   280         }
       
   281     }
       
   282 
       
   283     /**
       
   284      * Must be called from within the scheduler main loop.
       
   285      * Return true if the delegate is not null and has some demand.
       
   286      * @param delegate The Http1AsyncDelegate delegate
       
   287      * @return true if the delegate is not null and has some demand
       
   288      */
       
   289     private boolean hasDemand(Http1AsyncDelegate delegate) {
       
   290         if (delegate == null) return false;
       
   291         AbstractSubscription subscription = delegate.subscription();
       
   292         long demand = subscription.demand().get();
       
   293         debug.log(Level.DEBUG, "downstream subscription demand is %s", demand);
       
   294         return demand > 0;
       
   295     }
       
   296 
       
   297     /**
       
   298      * Must be called from within the scheduler main loop.
       
   299      * Handles pending delegate subscription.
       
   300      * Return true if there was some pending delegate subscription and a new
       
   301      * delegate was subscribed, false otherwise.
       
   302      *
       
   303      * @return true if there was some pending delegate subscription and a new
       
   304      *         delegate was subscribed, false otherwise.
       
   305      */
       
   306     private boolean handlePendingDelegate() {
       
   307         Http1AsyncDelegate pending = pendingDelegateRef.get();
       
   308         if (pending != null && pendingDelegateRef.compareAndSet(pending, null)) {
       
   309             Http1AsyncDelegate delegate = this.delegate;
       
   310             if (delegate != null) unsubscribe(delegate);
       
   311             Runnable cancel = () -> {
       
   312                 debug.log(Level.DEBUG, "Downstream subscription cancelled by %s", pending);
       
   313                 // The connection should be closed, as some data may
       
   314                 // be left over in the stream.
       
   315                 try {
       
   316                     setRetryOnError(false);
       
   317                     onReadError(new IOException("subscription cancelled"));
       
   318                     unsubscribe(pending);
       
   319                 } finally {
       
   320                     Http1Exchange<?> exchg = owner;
       
   321                     stop();
       
   322                     if (exchg != null) exchg.connection().close();
       
   323                 }
       
   324             };
       
   325             // The subscription created by a delegate is only loosely
       
   326             // coupled with the upstream subscription. This is partly because
       
   327             // the header/body parser work with a flow of ByteBuffer, whereas
       
   328             // we have a flow List<ByteBuffer> upstream.
       
   329             Http1AsyncDelegateSubscription subscription =
       
   330                     new Http1AsyncDelegateSubscription(scheduler, cancel);
       
   331             pending.onSubscribe(subscription);
       
   332             this.delegate = delegate = pending;
       
   333             final Object captured = delegate;
       
   334             debug.log(Level.DEBUG, () -> "delegate is now " + captured
       
   335                   + ", demand=" + subscription.demand().get()
       
   336                   + ", canRequestMore=" + canRequestMore.get()
       
   337                   + ", queue.isEmpty=" + queue.isEmpty());
       
   338             return true;
       
   339         }
       
   340         return false;
       
   341     }
       
   342 
       
   343     synchronized void setRetryOnError(boolean retry) {
       
   344         this.retry = retry;
       
   345     }
       
   346 
       
   347     void clear() {
       
   348         debug.log(Level.DEBUG, "cleared");
       
   349         this.pendingDelegateRef.set(null);
       
   350         this.delegate = null;
       
   351         this.owner = null;
       
   352     }
       
   353 
       
   354     void subscribe(Http1AsyncDelegate delegate) {
       
   355         synchronized(this) {
       
   356             pendingDelegateRef.set(delegate);
       
   357         }
       
   358         if (queue.isEmpty()) {
       
   359             canRequestMore.set(true);
       
   360         }
       
   361         debug.log(Level.DEBUG, () ->
       
   362                 "Subscribed pending " + delegate + " queue.isEmpty: "
       
   363                 + queue.isEmpty());
       
   364         // Everything may have been received already. Make sure
       
   365         // we parse it.
       
   366         if (client.isSelectorThread()) {
       
   367             scheduler.runOrSchedule(executor);
       
   368         } else {
       
   369             scheduler.runOrSchedule();
       
   370         }
       
   371     }
       
   372 
       
   373     // Used for debugging only!
       
   374     long remaining() {
       
   375         return Utils.remaining(queue.toArray(Utils.EMPTY_BB_ARRAY));
       
   376     }
       
   377 
       
   378     void unsubscribe(Http1AsyncDelegate delegate) {
       
   379         synchronized(this) {
       
   380             if (this.delegate == delegate) {
       
   381                 debug.log(Level.DEBUG, "Unsubscribed %s", delegate);
       
   382                 this.delegate = null;
       
   383             }
       
   384         }
       
   385     }
       
   386 
       
   387     // Callback: Consumer of ByteBuffer
       
   388     private void asyncReceive(ByteBuffer buf) {
       
   389         debug.log(Level.DEBUG, "Putting %s bytes into the queue", buf.remaining());
       
   390         received.addAndGet(buf.remaining());
       
   391         queue.offer(buf);
       
   392 
       
   393         // This callback is called from within the selector thread.
       
   394         // Use an executor here to avoid doing the heavy lifting in the
       
   395         // selector.
       
   396         scheduler.runOrSchedule(executor);
       
   397     }
       
   398 
       
   399     // Callback: Consumer of Throwable
       
   400     void onReadError(Throwable ex) {
       
   401         Http1AsyncDelegate delegate;
       
   402         Throwable recorded;
       
   403         debug.log(Level.DEBUG, "onError: %s", (Object) ex);
       
   404         synchronized (this) {
       
   405             delegate = this.delegate;
       
   406             recorded = error;
       
   407             if (recorded == null) {
       
   408                 // retry is set to true by HttpExchange when the connection is
       
   409                 // already connected, which means it's been retrieved from
       
   410                 // the pool.
       
   411                 if (retry && (ex instanceof IOException)) {
       
   412                     // could be either EOFException, or
       
   413                     // IOException("connection reset by peer), or
       
   414                     // SSLHandshakeException resulting from the server having
       
   415                     // closed the SSL session.
       
   416                     if (received.get() == 0) {
       
   417                         // If we receive such an exception before having
       
   418                         // received any byte, then in this case, we will
       
   419                         // throw ConnectionExpiredException
       
   420                         // to try & force a retry of the request.
       
   421                         retry = false;
       
   422                         ex = new ConnectionExpiredException(
       
   423                                 "subscription is finished", ex);
       
   424                     }
       
   425                 }
       
   426                 error = ex;
       
   427             }
       
   428             final Throwable t = (recorded == null ? ex : recorded);
       
   429             debug.log(Level.DEBUG, () -> "recorded " + t
       
   430                     + "\n\t delegate: " + delegate
       
   431                     + "\t\t queue.isEmpty: " + queue.isEmpty(), ex);
       
   432         }
       
   433         if (queue.isEmpty() || pendingDelegateRef.get() != null) {
       
   434             // This callback is called from within the selector thread.
       
   435             // Use an executor here to avoid doing the heavy lifting in the
       
   436             // selector.
       
   437             scheduler.runOrSchedule(executor);
       
   438         }
       
   439     }
       
   440 
       
   441     void stop() {
       
   442         debug.log(Level.DEBUG, "stopping");
       
   443         scheduler.stop();
       
   444         delegate = null;
       
   445         owner  = null;
       
   446     }
       
   447 
       
   448     /**
       
   449      * Returns the TubeSubscriber for reading from the connection flow.
       
   450      * @return the TubeSubscriber for reading from the connection flow.
       
   451      */
       
   452     TubeSubscriber subscriber() {
       
   453         return subscriber;
       
   454     }
       
   455 
       
   456     /**
       
   457      * A simple tube subscriber for reading from the connection flow.
       
   458      */
       
   459     final class Http1TubeSubscriber implements TubeSubscriber {
       
   460         volatile Flow.Subscription subscription;
       
   461         volatile boolean completed;
       
   462         volatile boolean dropped;
       
   463 
       
   464         public void onSubscribe(Flow.Subscription subscription) {
       
   465             // supports being called multiple time.
       
   466             // doesn't cancel the previous subscription, since that is
       
   467             // most probably the same as the new subscription.
       
   468             assert this.subscription == null || dropped == false;
       
   469             this.subscription = subscription;
       
   470             dropped = false;
       
   471             canRequestMore.set(true);
       
   472             if (delegate != null) {
       
   473                 scheduler.runOrSchedule(executor);
       
   474             }
       
   475         }
       
   476 
       
   477         void requestMore() {
       
   478             Flow.Subscription s = subscription;
       
   479             if (s == null) return;
       
   480             if (canRequestMore.compareAndSet(true, false)) {
       
   481                 if (!completed && !dropped) {
       
   482                     debug.log(Level.DEBUG,
       
   483                         "Http1TubeSubscriber: requesting one more from upstream");
       
   484                     s.request(1);
       
   485                     return;
       
   486                 }
       
   487             }
       
   488             debug.log(Level.DEBUG, "Http1TubeSubscriber: no need to request more");
       
   489         }
       
   490 
       
   491         @Override
       
   492         public void onNext(List<ByteBuffer> item) {
       
   493             canRequestMore.set(item.isEmpty());
       
   494             for (ByteBuffer buffer : item) {
       
   495                 asyncReceive(buffer);
       
   496             }
       
   497         }
       
   498 
       
   499         @Override
       
   500         public void onError(Throwable throwable) {
       
   501             onReadError(throwable);
       
   502             completed = true;
       
   503         }
       
   504 
       
   505         @Override
       
   506         public void onComplete() {
       
   507             onReadError(new EOFException("EOF reached while reading"));
       
   508             completed = true;
       
   509         }
       
   510 
       
   511         public void dropSubscription() {
       
   512             debug.log(Level.DEBUG, "Http1TubeSubscriber: dropSubscription");
       
   513             // we could probably set subscription to null here...
       
   514             // then we might not need the 'dropped' boolean?
       
   515             dropped = true;
       
   516         }
       
   517 
       
   518     }
       
   519 
       
   520     // Drains the content of the queue into a single ByteBuffer.
       
   521     // The scheduler must be permanently stopped before calling drain().
       
   522     ByteBuffer drain(ByteBuffer initial) {
       
   523         // Revisit: need to clean that up.
       
   524         //
       
   525         ByteBuffer b = initial = (initial == null ? Utils.EMPTY_BYTEBUFFER : initial);
       
   526         assert scheduler.isStopped();
       
   527 
       
   528         if (queue.isEmpty()) return b;
       
   529 
       
   530         // sanity check: we shouldn't have queued the same
       
   531         // buffer twice.
       
   532         ByteBuffer[] qbb = queue.toArray(new ByteBuffer[queue.size()]);
       
   533         assert java.util.stream.Stream.of(qbb)
       
   534                 .collect(Collectors.toSet())
       
   535                 .size() == qbb.length : debugQBB(qbb);
       
   536 
       
   537         // compute the number of bytes in the queue, the number of bytes
       
   538         // in the initial buffer
       
   539         // TODO: will need revisiting - as it is not guaranteed that all
       
   540         // data will fit in single BB!
       
   541         int size = Utils.remaining(qbb, Integer.MAX_VALUE);
       
   542         int remaining = b.remaining();
       
   543         int free = b.capacity() - b.position() - remaining;
       
   544         debug.log(Level.DEBUG,
       
   545             "Flushing %s bytes from queue into initial buffer (remaining=%s, free=%s)",
       
   546             size, remaining, free);
       
   547 
       
   548         // check whether the initial buffer has enough space
       
   549         if (size > free) {
       
   550             debug.log(Level.DEBUG,
       
   551                     "Allocating new buffer for initial: %s", (size + remaining));
       
   552             // allocates a new buffer and copy initial to it
       
   553             b = ByteBuffer.allocate(size + remaining);
       
   554             Utils.copy(initial, b);
       
   555             assert b.position() == remaining;
       
   556             b.flip();
       
   557             assert b.position() == 0;
       
   558             assert b.limit() == remaining;
       
   559             assert b.remaining() == remaining;
       
   560         }
       
   561 
       
   562         // store position and limit
       
   563         int pos = b.position();
       
   564         int limit = b.limit();
       
   565         assert limit - pos == remaining;
       
   566         assert b.capacity() >= remaining + size
       
   567                 : "capacity: " + b.capacity()
       
   568                 + ", remaining: " + b.remaining()
       
   569                 + ", size: " + size;
       
   570 
       
   571         // prepare to copy the content of the queue
       
   572         b.position(limit);
       
   573         b.limit(pos + remaining + size);
       
   574         assert b.remaining() >= size :
       
   575                 "remaining: " + b.remaining() + ", size: " + size;
       
   576 
       
   577         // copy the content of the queue
       
   578         int count = 0;
       
   579         for (int i=0; i<qbb.length; i++) {
       
   580             ByteBuffer b2 = qbb[i];
       
   581             int r = b2.remaining();
       
   582             assert b.remaining() >= r : "need at least " + r + " only "
       
   583                     + b.remaining() + " available";
       
   584             int copied = Utils.copy(b2, b);
       
   585             assert copied == r : "copied="+copied+" available="+r;
       
   586             assert b2.remaining() == 0;
       
   587             count += copied;
       
   588         }
       
   589         assert count == size;
       
   590         assert b.position() == pos + remaining + size :
       
   591                 "b.position="+b.position()+" != "+pos+"+"+remaining+"+"+size;
       
   592 
       
   593         // reset limit and position
       
   594         b.limit(limit+size);
       
   595         b.position(pos);
       
   596 
       
   597         // we can clear the refs
       
   598         queue.clear();
       
   599         final ByteBuffer bb = b;
       
   600         debug.log(Level.DEBUG, () -> "Initial buffer now has " + bb.remaining()
       
   601                 + " pos=" + bb.position() + " limit=" + bb.limit());
       
   602 
       
   603         return b;
       
   604     }
       
   605 
       
   606     private String debugQBB(ByteBuffer[] qbb) {
       
   607         StringBuilder msg = new StringBuilder();
       
   608         List<ByteBuffer> lbb = Arrays.asList(qbb);
       
   609         Set<ByteBuffer> sbb = new HashSet<>(Arrays.asList(qbb));
       
   610 
       
   611         int uniquebb = sbb.size();
       
   612         msg.append("qbb: ").append(lbb.size())
       
   613            .append(" (unique: ").append(uniquebb).append("), ")
       
   614            .append("duplicates: ");
       
   615         String sep = "";
       
   616         for (ByteBuffer b : lbb) {
       
   617             if (!sbb.remove(b)) {
       
   618                 msg.append(sep)
       
   619                    .append(String.valueOf(b))
       
   620                    .append("[remaining=")
       
   621                    .append(b.remaining())
       
   622                    .append(", position=")
       
   623                    .append(b.position())
       
   624                    .append(", capacity=")
       
   625                    .append(b.capacity())
       
   626                    .append("]");
       
   627                 sep = ", ";
       
   628             }
       
   629         }
       
   630         return msg.toString();
       
   631     }
       
   632 
       
   633     volatile String dbgTag;
       
   634     String dbgString() {
       
   635         String tag = dbgTag;
       
   636         if (tag == null) {
       
   637             String flowTag = null;
       
   638             Http1Exchange<?> exchg = owner;
       
   639             Object flow = (exchg != null)
       
   640                     ? exchg.connection().getConnectionFlow()
       
   641                     : null;
       
   642             flowTag = tag = flow == null ? null: (String.valueOf(flow));
       
   643             if (flowTag != null) {
       
   644                 dbgTag = tag = flowTag + " Http1AsyncReceiver";
       
   645             } else {
       
   646                 tag = "Http1AsyncReceiver";
       
   647             }
       
   648         }
       
   649         return tag;
       
   650     }
       
   651 }