http-client-branch: do not set receive buffer size unless explicitly requested http-client-branch
authordfuchs
Thu, 19 Apr 2018 16:47:52 +0100
branchhttp-client-branch
changeset 56463 b583caf69b39
parent 56453 e86bb3d45b9a
child 56474 fe2bf7b369b8
http-client-branch: do not set receive buffer size unless explicitly requested
src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java
src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java
test/jdk/java/net/httpclient/TimeoutOrdering.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java	Thu Apr 19 16:47:52 2018 +0100
@@ -356,6 +356,7 @@
                 // be left over in the stream.
                 try {
                     setRetryOnError(false);
+                    pending.close(null);
                     onReadError(new IOException("subscription cancelled"));
                     unsubscribe(pending);
                 } finally {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Thu Apr 19 16:47:52 2018 +0100
@@ -375,9 +375,12 @@
                     (t) -> {
                         try {
                             if (t != null) {
-                                subscriber.onError(t);
-                                connection.close();
-                                cf.completeExceptionally(t);
+                                try {
+                                    subscriber.onError(t);
+                                } finally {
+                                    cf.completeExceptionally(t);
+                                    connection.close();
+                                }
                             }
                         } finally {
                             bodyReader.onComplete(t);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Thu Apr 19 16:47:52 2018 +0100
@@ -1160,7 +1160,8 @@
     // used for the connection window
     int getReceiveBufferSize() {
         return Utils.getIntegerNetProperty(
-                "jdk.httpclient.receiveBufferSize", 2 * 1024 * 1024
+                "jdk.httpclient.receiveBufferSize",
+                0 // only set the size if > 0
         );
     }
 }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java	Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java	Thu Apr 19 16:47:52 2018 +0100
@@ -145,31 +145,39 @@
         try {
             this.chan = SocketChannel.open();
             chan.configureBlocking(false);
-            int bufsize = client.getReceiveBufferSize();
-            if (!trySetReceiveBufferSize(bufsize)) {
-                trySetReceiveBufferSize(256*1024);
+            trySetReceiveBufferSize(client.getReceiveBufferSize());
+            if (debug.on()) {
+                int bufsize = getInitialBufferSize();
+                debug.log("Initial receive buffer size is: %d", bufsize);
             }
             chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
-            // wrap the connected channel in a Tube for async reading and writing
+            // wrap the channel in a Tube for async reading and writing
             tube = new SocketTube(client(), chan, Utils::getBuffer);
         } catch (IOException e) {
             throw new InternalError(e);
         }
     }
 
-    private boolean trySetReceiveBufferSize(int bufsize) {
+    private int getInitialBufferSize() {
         try {
-            chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
+            return chan.getOption(StandardSocketOptions.SO_RCVBUF);
+        } catch(IOException x) {
             if (debug.on())
-                debug.log("Receive buffer size is %s",
-                          chan.getOption(StandardSocketOptions.SO_RCVBUF));
-            return true;
+                debug.log("Failed to get initial receive buffer size on %s", chan);
+        }
+        return 0;
+    }
+
+    private void trySetReceiveBufferSize(int bufsize) {
+        try {
+            if (bufsize > 0) {
+                chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
+            }
         } catch(IOException x) {
             if (debug.on())
                 debug.log("Failed to set receive buffer size to %d on %s",
                           bufsize, chan);
         }
-        return false;
     }
 
     @Override
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java	Thu Apr 19 16:47:52 2018 +0100
@@ -992,7 +992,7 @@
             case 1: return List.of(list.get(0), item);
             case 2: return List.of(list.get(0), list.get(1), item);
             default: // slow path if MAX_BUFFERS > 3
-                ArrayList<T> res = new ArrayList<>(list);
+                List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list);
                 res.add(item);
                 return res;
         }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Thu Apr 19 16:47:52 2018 +0100
@@ -33,7 +33,6 @@
 import javax.net.ssl.SSLEngineResult.Status;
 import javax.net.ssl.SSLException;
 import java.io.IOException;
-import java.lang.System.Logger.Level;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -214,8 +213,13 @@
      * TARGET_BUFSIZE bytes in readBuf
      */
     class Reader extends SubscriberWrapper {
+        // Maximum record size is 16k.
+        // Because SocketTube can feeds us up to 3 16K buffers,
+        // then setting this size to 16K means that the readBuf
+        // can store up to 64K-1 (16K-1 + 3*16K)
+        static final int TARGET_BUFSIZE = 16 * 1024;
+
         final SequentialScheduler scheduler;
-        static final int TARGET_BUFSIZE = 16 * 1024;
         volatile ByteBuffer readBuf;
         volatile boolean completing;
         final Object readBufferLock = new Object();
@@ -250,7 +254,7 @@
                 debugr.log("Adding %d bytes to read buffer",
                            Utils.remaining(buffers));
             addToReadBuf(buffers, complete);
-            scheduler.runOrSchedule();
+            scheduler.runOrSchedule(exec);
         }
 
         @Override
@@ -270,6 +274,9 @@
         @Override
         protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
             if (readBuf.remaining() > TARGET_BUFSIZE) {
+                if (debugr.on())
+                    debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
+                               readBuf.remaining());
                 return 0;
             } else {
                 return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
@@ -293,7 +300,7 @@
         }
 
         void schedule() {
-            scheduler.runOrSchedule();
+            scheduler.runOrSchedule(exec);
         }
 
         void stop() {
@@ -303,6 +310,11 @@
 
         AtomicInteger count = new AtomicInteger(0);
 
+        // minimum number of bytes required to call unwrap.
+        // Usually this is 0, unless there was a buffer underflow.
+        // In this case we need to wait for more bytes than what
+        // we had before calling unwrap() again.
+        volatile int minBytesRequired;
         // work function where it all happens
         void processData() {
             try {
@@ -313,15 +325,23 @@
                            + ", engine handshake status:" + engine.getHandshakeStatus());
                 int len;
                 boolean complete = false;
-                while ((len = readBuf.remaining()) > 0) {
+                while (readBuf.remaining() > (len = minBytesRequired)) {
                     boolean handshaking = false;
                     try {
                         EngineResult result;
                         synchronized (readBufferLock) {
                             complete = this.completing;
+                            if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining());
+                            // Unless there is a BUFFER_UNDERFLOW, we should try to
+                            // unwrap any number of bytes. Set minBytesRequired to 0:
+                            // we only need to do that if minBytesRequired is not already 0.
+                            len = len > 0 ? minBytesRequired = 0 : len;
                             result = unwrapBuffer(readBuf);
-                            if (debugr.on())
-                                debugr.log("Unwrapped: %s", result.result);
+                            len = readBuf.remaining();
+                            if (debugr.on()) {
+                                debugr.log("Unwrapped: result: %s", result.result);
+                                debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
+                            }
                         }
                         if (result.bytesProduced() > 0) {
                             if (debugr.on())
@@ -332,12 +352,19 @@
                         if (result.status() == Status.BUFFER_UNDERFLOW) {
                             if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
                             // not enough data in the read buffer...
-                            requestMore();
+                            // no need to try to unwrap again unless we get more bytes
+                            // than minBytesRequired = len in the read buffer.
+                            minBytesRequired = len;
                             synchronized (readBufferLock) {
-                                // check if we have received some data
+                                // more bytes could already have been added...
+                                assert readBuf.remaining() >= len;
+                                // check if we have received some data, and if so
+                                // we can just re-spin the loop
                                 if (readBuf.remaining() > len) continue;
-                                return;
                             }
+                            // request more data and return.
+                            requestMore();
+                            return;
                         }
                         if (complete && result.status() == Status.CLOSED) {
                             if (debugr.on()) debugr.log("Closed: completing");
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java	Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java	Thu Apr 19 16:47:52 2018 +0100
@@ -306,14 +306,16 @@
                                downstreamSubscription);
             }
 
+            boolean datasent = false;
             while (!outputQ.isEmpty() && downstreamSubscription.tryDecrement()) {
                 List<ByteBuffer> b = outputQ.poll();
                 if (debug.on())
                     debug.log("DownstreamPusher: Pushing %d bytes downstream",
                               Utils.remaining(b));
                 downstreamSubscriber.onNext(b);
+                datasent = true;
             }
-            upstreamWindowUpdate();
+            if (datasent) upstreamWindowUpdate();
             checkCompletion();
         }
     }
--- a/test/jdk/java/net/httpclient/TimeoutOrdering.java	Wed Apr 18 12:55:11 2018 +0100
+++ b/test/jdk/java/net/httpclient/TimeoutOrdering.java	Thu Apr 19 16:47:52 2018 +0100
@@ -77,21 +77,22 @@
                                          .build();
 
                 final HttpRequest req = requests[i];
+                final int j = i;
                 CompletableFuture<HttpResponse<Object>> response = client
                     .sendAsync(req, BodyHandlers.replacing(null))
                     .whenComplete((HttpResponse<Object> r, Throwable t) -> {
                         if (r != null) {
-                            out.println("Unexpected response: " + r);
+                            out.println("Unexpected response for r" + j + ": " + r);
                             error = true;
                         }
                         if (t != null) {
                             if (!(t.getCause() instanceof HttpTimeoutException)) {
-                                out.println("Wrong exception type:" + t.toString());
+                                out.println("Wrong exception type for r" + j + ": " + t.toString());
                                 Throwable c = t.getCause() == null ? t : t.getCause();
                                 c.printStackTrace();
                                 error = true;
                             } else {
-                                out.println("Caught expected timeout: " + t.getCause());
+                                out.println("Caught expected timeout for r" + j + ": " + t.getCause());
                             }
                         }
                         queue.add(req);
@@ -117,16 +118,21 @@
                                          .build();
 
                 final HttpRequest req = requests[i];
+                final int j = i;
                 executor.execute(() -> {
                     try {
-                        client.send(req, BodyHandlers.replacing(null));
+                        HttpResponse<?> r = client.send(req, BodyHandlers.replacing(null));
+                        out.println("Unexpected response for r" + j + ": " + r);
+                        error = true;
                     } catch (HttpTimeoutException e) {
-                        out.println("Caught expected timeout: " + e);
-                        queue.offer(req);
+                        out.println("Caught expected timeout for r" + j +": " + e);
                     } catch (IOException | InterruptedException ee) {
                         Throwable c = ee.getCause() == null ? ee : ee.getCause();
+                        out.println("Wrong exception type for r" + j + ": " + c.toString());
                         c.printStackTrace();
                         error = true;
+                    } finally {
+                        queue.offer(req);
                     }
                 });
             }