http-client-branch: connection shutdown should no cancel streams immediately if data is pending in the stream queue. http-client-branch
authordfuchs
Fri, 04 May 2018 17:02:19 +0100
branchhttp-client-branch
changeset 56531 15ff86a732ea
parent 56530 561ec1470111
child 56532 a594484f54db
http-client-branch: connection shutdown should no cancel streams immediately if data is pending in the stream queue.
src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java
src/java.net.http/share/classes/jdk/internal/net/http/Stream.java
src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Fri May 04 14:32:26 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java	Fri May 04 17:02:19 2018 +0100
@@ -273,7 +273,7 @@
      */
     private final WindowController windowController = new WindowController();
     private final FramesController framesController = new FramesController();
-    private final Http2TubeSubscriber subscriber = new Http2TubeSubscriber();
+    private final Http2TubeSubscriber subscriber;
     final ConnectionWindowUpdateSender windowUpdater;
     private volatile Throwable cause;
     private volatile Supplier<ByteBuffer> initial;
@@ -290,6 +290,7 @@
                             String key) {
         this.connection = connection;
         this.client2 = client2;
+        this.subscriber = new Http2TubeSubscriber(client2.client());
         this.nextstreamid = nextstreamid;
         this.key = key;
         this.clientSettings = this.client2.getClientSettings();
@@ -643,7 +644,7 @@
         client2.deleteConnection(this);
         List<Stream<?>> c = new LinkedList<>(streams.values());
         for (Stream<?> s : c) {
-            s.cancelImpl(t);
+            s.connectionClosing(t);
         }
         connection.close();
     }
@@ -1158,14 +1159,19 @@
      * A simple tube subscriber for reading from the connection flow.
      */
     final class Http2TubeSubscriber implements TubeSubscriber {
-        volatile Flow.Subscription subscription;
-        volatile boolean completed;
-        volatile boolean dropped;
-        volatile Throwable error;
-        final ConcurrentLinkedQueue<ByteBuffer> queue
+        private volatile Flow.Subscription subscription;
+        private volatile boolean completed;
+        private volatile boolean dropped;
+        private volatile Throwable error;
+        private final ConcurrentLinkedQueue<ByteBuffer> queue
                 = new ConcurrentLinkedQueue<>();
-        final SequentialScheduler scheduler =
+        private final SequentialScheduler scheduler =
                 SequentialScheduler.synchronizedScheduler(this::processQueue);
+        private final HttpClientImpl client;
+        
+        Http2TubeSubscriber(HttpClientImpl client) {
+            this.client = Objects.requireNonNull(client);
+        }
 
         final void processQueue() {
             try {
@@ -1189,6 +1195,12 @@
             }
         }
 
+        private final void runOrSchedule() {
+            if (client.isSelectorThread()) {
+                scheduler.runOrSchedule(client.theExecutor());
+            } else scheduler.runOrSchedule();
+        }
+
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
             // supports being called multiple time.
@@ -1212,7 +1224,7 @@
             if (debug.on()) debug.log(() -> "onNext: got " + Utils.remaining(item)
                     + " bytes in " + item.size() + " buffers");
             queue.addAll(item);
-            scheduler.runOrSchedule(client().theExecutor());
+            runOrSchedule();
         }
 
         @Override
@@ -1220,7 +1232,7 @@
             if (debug.on()) debug.log(() -> "onError: " + throwable);
             error = throwable;
             completed = true;
-            scheduler.runOrSchedule(client().theExecutor());
+            runOrSchedule();
         }
 
         @Override
@@ -1228,7 +1240,7 @@
             if (debug.on()) debug.log("EOF");
             error = new EOFException("EOF reached while reading");
             completed = true;
-            scheduler.runOrSchedule(client().theExecutor());
+            runOrSchedule();
         }
 
         @Override
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Fri May 04 14:32:26 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Stream.java	Fri May 04 17:02:19 2018 +0100
@@ -1064,6 +1064,15 @@
         cancelImpl(cause);
     }
 
+    void connectionClosing(Throwable cause) {
+        Flow.Subscriber<?> subscriber =
+                responseSubscriber == null ? pendingResponseSubscriber : responseSubscriber;
+        errorRef.compareAndSet(null, cause);
+        if (subscriber != null && !sched.isStopped() && !inputQ.isEmpty()) {
+            sched.runOrSchedule();
+        } else cancelImpl(cause);
+    }
+
     // This method sends a RST_STREAM frame
     void cancelImpl(Throwable e) {
         errorRef.compareAndSet(null, e);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Fri May 04 14:32:26 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java	Fri May 04 17:02:19 2018 +0100
@@ -480,6 +480,11 @@
                         return new EngineResult(sslResult);
                     case OK:
                         int size = dst.position();
+                        if (debug.on()) {
+                            debugr.log("Decoded " + size + " bytes out of " + len
+                                    + " into buffer of " + dst.capacity()
+                                    + " remaining to decode: " + src.remaining());
+                        }
                         // if the record payload was bigger than what was originally
                         // allocated, then sets the adaptiveAppBufferSize to size
                         // and we will use that new size as a guess for the next app
@@ -764,7 +769,7 @@
                             // copy off the bytes to a smaller buffer, and keep
                             // the writeBuffer for next time.
                             dst.flip();
-                            dest = Utils.copy(dst);
+                            dest = Utils.copyAligned(dst);
                             dst.clear();
                         } else {
                             // more than half the buffer was used.
@@ -775,8 +780,8 @@
                             writeBuffer = null;
                         }
                         if (debugw.on())
-                            debugw.log("OK => produced: %d, not wrapped: %d",
-                                       dest.remaining(),  Utils.remaining(src));
+                            debugw.log("OK => produced: %d bytes into %d, not wrapped: %d",
+                                       dest.remaining(),  dest.capacity(), Utils.remaining(src));
                         return new EngineResult(sslResult, dest);
                     case BUFFER_UNDERFLOW:
                         // Shouldn't happen.  Doesn't returns when wrap()
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Fri May 04 14:32:26 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java	Fri May 04 17:02:19 2018 +0100
@@ -534,6 +534,16 @@
         return dst;
     }
 
+    public static ByteBuffer copyAligned(ByteBuffer src) {
+        int len = src.remaining();
+        int size = ((len + 7) >> 3) << 3;
+        assert size >= len;
+        ByteBuffer dst = ByteBuffer.allocate(size);
+        dst.put(src);
+        dst.flip();
+        return dst;
+    }
+
     public static String dump(Object... objects) {
         return Arrays.toString(objects);
     }