http-client-branch: unused byte buffers must be returned to the pool before exiting from SocketTube::readAvailable + minor test fixes http-client-branch
authordfuchs
Wed, 25 Apr 2018 19:51:58 +0100
branchhttp-client-branch
changeset 56481 247ed0848e48
parent 56480 97bff67fed21
child 56482 5158cd0b906e
http-client-branch: unused byte buffers must be returned to the pool before exiting from SocketTube::readAvailable + minor test fixes
src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
test/jdk/java/net/httpclient/DependentPromiseActionsTest.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Tue Apr 24 19:45:20 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Wed Apr 25 19:51:58 2018 +0100
@@ -379,11 +379,13 @@
                                     subscriber.onError(t);
                                 } finally {
                                     cf.completeExceptionally(t);
-                                    connection.close();
                                 }
                             }
                         } finally {
                             bodyReader.onComplete(t);
+                            if (t != null) {
+                                connection.close();
+                            }
                         }
                     }));
                 CompletableFuture<State> bodyReaderCF = bodyReader.completion();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Tue Apr 24 19:45:20 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Wed Apr 25 19:51:58 2018 +0100
@@ -1198,7 +1198,7 @@
         private final ByteBuffer[] pool = new ByteBuffer[POOL_SIZE];
         private final HttpClientImpl client;
         private final Logger debug;
-        int tail; // no need for volatile: only accessed in SM thread.
+        private int tail, count; // no need for volatile: only accessed in SM thread.
         public SSLDirectBufferSupplier(HttpClientImpl client) {
             this.client = Objects.requireNonNull(client);
             this.debug = client.debug;
@@ -1208,18 +1208,20 @@
         @Override
         public ByteBuffer get() {
             assert client.isSelectorThread();
-            ByteBuffer buf = tail == 0 ? null : pool[--tail];
-            if (buf == null) {
+            assert tail <= POOL_SIZE : "allocate tail is " + tail;
+            ByteBuffer buf;
+            if (tail == 0) {
                 if (debug.on()) {
                     // should not appear more than SocketTube.MAX_BUFFERS
                     debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
                 }
+                assert count++ < POOL_SIZE : "trying to allocate more than "
+                            + POOL_SIZE + " buffers";
                 buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
             } else {
-                // if (debug.on()) { // this trace is mostly noise.
-                //    debug.log("ByteBuffer.recycle(%d)", buf.remaining());
-                // }
-                assert buf == pool[tail];
+                assert tail > 0 : "non positive tail value: " + tail;
+                tail--;
+                buf = pool[tail];
                 pool[tail] = null;
             }
             assert buf.isDirect();
@@ -1237,9 +1239,15 @@
             assert client.isSelectorThread();
             assert buffer.isDirect();
             assert !buffer.hasRemaining();
+            assert tail < POOL_SIZE : "recycle tail is " + tail;
+            assert tail >= 0;
             buffer.position(0);
             buffer.limit(buffer.capacity());
-            pool[tail++] = buffer;
+            // don't fail if assertions are off. we have asserted above.
+            if (tail < POOL_SIZE) {
+                pool[tail] = buffer;
+                tail++;
+            }
             assert tail <= POOL_SIZE;
             assert tail > 0;
         }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Tue Apr 24 19:45:20 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Wed Apr 25 19:51:58 2018 +0100
@@ -784,7 +784,7 @@
                         if (demand.tryDecrement()) {
                             // we have demand.
                             try {
-                                List<ByteBuffer> bytes = readAvailable(subscription.bufferSource);
+                                List<ByteBuffer> bytes = readAvailable(current.bufferSource);
                                 if (bytes == EOF) {
                                     if (!completed) {
                                         if (debug.on()) debug.log("got read EOF");
@@ -918,6 +918,8 @@
     public interface BufferSource {
         /**
          * Returns a buffer to read data from the socket.
+         *
+         * @implNote
          * Different implementation can have different strategies, as to
          * which kind of buffer to return, or whether to return the same
          * buffer. The only constraints are that
@@ -926,6 +928,7 @@
          *   c. the buffer limit indicates where to stop reading.
          *   d. the buffer is 'free' - that is - it is not used
          *      or retained by anybody else
+         *
          * @return A buffer to read data from the socket.
          */
         ByteBuffer getBuffer();
@@ -935,6 +938,7 @@
          * be sent downstream to the subscriber. May return a new
          * list, or append to the given list.
          *
+         * @implNote
          * Different implementation can have different strategies, but
          * must obviously be consistent with the implementation of the
          * getBuffer() method. For instance, an implementation could
@@ -950,7 +954,18 @@
          * @return A possibly new list where a buffer containing the
          *         data read from the socket has been added.
          */
-        List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buffer, int start);
+        List<ByteBuffer> append(List<ByteBuffer> list, ByteBuffer buffer, int start);
+
+        /**
+         * Called when a buffer obtained from {@code getBuffer} will not
+         * be used because no data has been read.
+         *
+         * @implNote This method can be used, if necessary, to return
+         *  the unused buffer to the pull.
+         *
+         * @param buffer The unused buffer.
+         */
+        default void returnUnused(ByteBuffer buffer) { }
     }
 
     // An implementation of BufferSource used for unencrypted data.
@@ -958,7 +973,7 @@
     // by forwarding read only buffer slices downstream.
     // Buffers allocated through this source are simply GC'ed when
     // they are no longer referenced.
-    static final class SliceBufferSource implements BufferSource {
+    private static final class SliceBufferSource implements BufferSource {
         private final Supplier<ByteBuffer> factory;
         private volatile ByteBuffer current;
         public SliceBufferSource() {
@@ -1003,10 +1018,10 @@
     // This buffer source use direct byte buffers that will be
     // recycled by the SocketTube subscriber.
     //
-    static final class SSLDirectBufferSource implements BufferSource {
-        private final Supplier<ByteBuffer> factory;
+    private static final class SSLDirectBufferSource implements BufferSource {
+        private final BufferSupplier factory;
         private final HttpClientImpl client;
-        private volatile ByteBuffer current;
+        private ByteBuffer current;
 
         public SSLDirectBufferSource(HttpClientImpl client) {
             this.client = Objects.requireNonNull(client);
@@ -1049,6 +1064,23 @@
             // add the buffer to the list
             return SocketTube.listOf(list, buf);
         }
+
+        @Override
+        public void returnUnused(ByteBuffer buffer) {
+            // if current is not null it will not be added to the
+            // list. We need to recycle it now to prevent
+            // the buffer supplier pool to grow over more than
+            // MAX_BUFFERS.
+            assert buffer == current;
+            ByteBuffer buf = current;
+            if (buf != null) {
+                assert buf.position() == 0;
+                current = null;
+                // the supplier assert if buf has remaining
+                buf.limit(buf.position());
+                factory.recycle(buf);
+            }
+        }
     }
 
     // ===================================================================== //
@@ -1079,6 +1111,9 @@
                 }
             } catch (IOException x) {
                 if (buf.position() == pos && list == null) {
+                    // make sure that the buffer source will recycle
+                    // 'buf' if needed
+                    buffersSource.returnUnused(buf);
                     // no bytes have been read, just throw...
                     throw x;
                 } else {
@@ -1094,6 +1129,7 @@
                 // returned if read == -1. If some data has already been read,
                 // then it must be returned. -1 will be returned next time
                 // the caller attempts to read something.
+                buffersSource.returnUnused(buf);
                 if (list == null) {
                     // nothing read - list was null - return EOF or NOTHING
                     list = read == -1 ? EOF : NOTHING;
@@ -1103,7 +1139,6 @@
 
             // check whether this buffer has still some free space available.
             // if so, we will keep it for the next round.
-            final boolean hasRemaining = buf.hasRemaining();
             list = buffersSource.append(list, buf, pos);
 
             if (read <= 0 || list.size() == MAX_BUFFERS) {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Tue Apr 24 19:45:20 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed Apr 25 19:51:58 2018 +0100
@@ -197,9 +197,11 @@
         sb.append("SSL: id ").append(id);
         sb.append(" HS state: " + states(handshakeState));
         sb.append(" Engine state: " + engine.getHandshakeStatus().toString());
-        sb.append(" LL : ");
-        for (String s: stateList) {
-            sb.append(s).append(" ");
+        if (stateList != null) {
+            sb.append(" LL : ");
+            for (String s : stateList) {
+                sb.append(s).append(" ");
+            }
         }
         sb.append("\r\n");
         sb.append("Reader:: ").append(reader.toString());
@@ -813,13 +815,16 @@
     }
 
     final AtomicInteger handshakeState;
-    final ConcurrentLinkedQueue<String> stateList = new ConcurrentLinkedQueue<>();
+    final ConcurrentLinkedQueue<String> stateList =
+            debug.on() ? new ConcurrentLinkedQueue<>() : null;
 
     private boolean doHandshake(EngineResult r, int caller) {
         // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
         handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
-        stateList.add(r.handshakeStatus().toString());
-        stateList.add(Integer.toString(caller));
+        if (stateList != null && debug.on()) {
+            stateList.add(r.handshakeStatus().toString());
+            stateList.add(Integer.toString(caller));
+        }
         switch (r.handshakeStatus()) {
             case NEED_TASK:
                 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
--- a/test/jdk/java/net/httpclient/DependentPromiseActionsTest.java	Tue Apr 24 19:45:20 2018 +0100
+++ b/test/jdk/java/net/httpclient/DependentPromiseActionsTest.java	Wed Apr 25 19:51:58 2018 +0100
@@ -177,6 +177,8 @@
         };
     }
 
+    enum SubscriberType {EAGER, LAZZY}
+
     static final class SemaphoreStallerSupplier
             implements Supplier<SemaphoreStaller> {
         @Override
@@ -291,7 +293,8 @@
         String test = format("testAsStringAsync(%s, %b, %s)",
                 uri, sameClient, stallers);
         testDependent(test, uri, sameClient, BodyHandlers::ofString,
-                this::finish, this::extractString, stallers);
+                this::finish, this::extractString, stallers,
+                SubscriberType.EAGER);
     }
 
     @Test(dataProvider = "variants")
@@ -303,7 +306,8 @@
         String test = format("testAsLinesAsync(%s, %b, %s)",
                 uri, sameClient, stallers);
         testDependent(test, uri, sameClient, BodyHandlers::ofLines,
-                this::finish, this::extractStream, stallers);
+                this::finish, this::extractStream, stallers,
+                SubscriberType.LAZZY);
     }
 
     @Test(dataProvider = "variants")
@@ -315,19 +319,22 @@
         String test = format("testAsInputStreamAsync(%s, %b, %s)",
                 uri, sameClient, stallers);
         testDependent(test, uri, sameClient, BodyHandlers::ofInputStream,
-                this::finish, this::extractInputStream, stallers);
+                this::finish, this::extractInputStream, stallers,
+                SubscriberType.LAZZY);
     }
 
     private <T,U> void testDependent(String name, String uri, boolean sameClient,
                                      Supplier<BodyHandler<T>> handlers,
                                      Finisher finisher,
                                      Extractor<T> extractor,
-                                     Supplier<Staller> stallers)
+                                     Supplier<Staller> stallers,
+                                     SubscriberType subscriberType)
             throws Exception
     {
         out.printf("%n%s%s%n", now(), name);
         try {
-            testDependent(uri, sameClient, handlers, finisher, extractor, stallers);
+            testDependent(uri, sameClient, handlers, finisher,
+                          extractor, stallers, subscriberType);
         } catch (Error | Exception x) {
             FAILURES.putIfAbsent(name, x);
             throw x;
@@ -338,7 +345,8 @@
                                      Supplier<BodyHandler<T>> handlers,
                                      Finisher finisher,
                                      Extractor<T> extractor,
-                                     Supplier<Staller> stallers)
+                                     Supplier<Staller> stallers,
+                                     SubscriberType subscriberType)
             throws Exception
     {
         HttpClient client = null;
@@ -355,7 +363,7 @@
             System.out.println("try stalling in " + where);
             CompletableFuture<HttpResponse<T>> responseCF =
                     client.sendAsync(req, handler, promiseHandler);
-            assert !responseCF.isDone();
+            assert subscriberType == SubscriberType.LAZZY || !responseCF.isDone();
             finisher.finish(where, responseCF, promiseHandler, extractor);
         }
     }