http-client-branch: removed AsyncConnection interface and associated methods in subclasses http-client-branch
authordfuchs
Thu, 09 Nov 2017 14:28:00 +0000
branchhttp-client-branch
changeset 55795 074bb951658a
parent 55794 08e58c1c75fb
child 55796 dfc99ea2b65a
http-client-branch: removed AsyncConnection interface and associated methods in subclasses
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java
test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncConnection.java	Thu Nov 09 14:24:43 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http;
-
-import jdk.incubator.http.internal.common.ByteBufferReference;
-
-import java.io.IOException;
-
-/**
- * Implemented by classes that offer an asynchronous interface.
- *
- * PlainHttpConnection, AsyncSSLConnection.
- *
- * setAsyncCallbacks() is called to set the callback for reading
- * and error notification. Reads all happen on the selector thread, which
- * must not block.
- *
- * Writing uses the same write() methods as used in blocking mode.
- * Queues are employed on the writing side to buffer data while it is waiting
- * to be sent. This strategy relies on HTTP/2 protocol flow control to stop
- * outgoing queue from continually growing. Writes can be initiated by the
- * calling thread, but if socket becomes full then the queue is emptied by
- * the selector thread
- */
-interface AsyncConnection {
-
-    /**
-     * In async mode, this method puts buffers at the end of the send queue.
-     * When in async mode, calling this method should later be followed by
-     * subsequent flushAsync invocation.
-     * That allows multiple threads to put buffers into the queue while some other
-     * thread is writing.
-     */
-    void writeAsync(ByteBufferReference[] buffers) throws IOException;
-
-    /**
-     * In async mode, this method may put buffers at the beginning of send queue,
-     * breaking frames sequence and allowing to write these buffers before other
-     * buffers in the queue.
-     * When in async mode, calling this method should later be followed by
-     * subsequent flushAsync invocation.
-     * That allows multiple threads to put buffers into the queue while some other
-     * thread is writing.
-     */
-    void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException;
-
-    /**
-     * This method should be called after any writeAsync/writeAsyncUnordered
-     * invocation.
-     * If there is a race to flushAsync from several threads one thread
-     * (race winner) capture flush operation and write the whole queue content.
-     * Other threads (race losers) exits from the method (not blocking)
-     * and continue execution.
-     */
-    void flushAsync() throws IOException;
-}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java	Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java	Thu Nov 09 14:28:00 2017 +0000
@@ -93,21 +93,6 @@
     }
 
     @Override
-    public void writeAsync(ByteBufferReference[] buffers) throws IOException {
-        writePublisher.writeAsync(buffers);
-    }
-
-    @Override
-    public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
-        writePublisher.writeAsyncUnordered(buffers);
-    }
-
-    @Override
-    public void flushAsync() throws IOException {
-        writePublisher.flushAsync();
-    }
-
-    @Override
     public void closeExceptionally(Throwable cause) {
         debug.log(Level.DEBUG, () -> "closing: " + cause);
         plainConnection.close();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java	Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java	Thu Nov 09 14:28:00 2017 +0000
@@ -94,21 +94,6 @@
     }
 
     @Override
-    public void writeAsync(ByteBufferReference[] buffers) throws IOException {
-        writePublisher.writeAsync(buffers);
-    }
-
-    @Override
-    public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
-        writePublisher.writeAsyncUnordered(buffers);
-    }
-
-    @Override
-    public void flushAsync() throws IOException {
-        writePublisher.flushAsync();
-    }
-
-    @Override
     public void close() {
         plainConnection.close();
     }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Thu Nov 09 14:28:00 2017 +0000
@@ -512,7 +512,7 @@
     }
 
     /** A Publisher of HTTP/1.1 headers and request body. */
-    final class Http1Publisher implements HttpConnection.HttpPublisher {
+    final class Http1Publisher implements FlowTube.TubePublisher {
 
         final System.Logger  debug = Utils.getDebugLogger(this::dbgString);
         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java	Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java	Thu Nov 09 14:28:00 2017 +0000
@@ -276,7 +276,7 @@
 //    static final char CR = '\r';
 //    static final char LF = '\n';
 
-// ================ Support for plugging into AsyncConnection =================
+// ================ Support for plugging into Http1Receiver   =================
 // ============================================================================
 
     // Callback: Error receiver: Consumer of Throwable.
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java	Thu Nov 09 14:28:00 2017 +0000
@@ -45,9 +45,32 @@
 import java.util.function.Function;
 import java.util.function.Supplier;
 import javax.net.ssl.SSLEngine;
-import jdk.incubator.http.internal.common.*;
+import jdk.incubator.http.HttpConnection.HttpPublisher;
+import jdk.incubator.http.internal.common.ByteBufferPool;
+import jdk.incubator.http.internal.common.ByteBufferReference;
+import jdk.incubator.http.internal.common.FlowTube;
 import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
-import jdk.incubator.http.internal.frame.*;
+import jdk.incubator.http.internal.common.HttpHeadersImpl;
+import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.Utils;
+import jdk.incubator.http.internal.frame.ContinuationFrame;
+import jdk.incubator.http.internal.frame.DataFrame;
+import jdk.incubator.http.internal.frame.ErrorFrame;
+import jdk.incubator.http.internal.frame.FramesDecoder;
+import jdk.incubator.http.internal.frame.FramesEncoder;
+import jdk.incubator.http.internal.frame.GoAwayFrame;
+import jdk.incubator.http.internal.frame.HeaderFrame;
+import jdk.incubator.http.internal.frame.HeadersFrame;
+import jdk.incubator.http.internal.frame.Http2Frame;
+import jdk.incubator.http.internal.frame.MalformedFrame;
+import jdk.incubator.http.internal.frame.OutgoingHeaders;
+import jdk.incubator.http.internal.frame.PingFrame;
+import jdk.incubator.http.internal.frame.PushPromiseFrame;
+import jdk.incubator.http.internal.frame.ResetFrame;
+import jdk.incubator.http.internal.frame.SettingsFrame;
+import jdk.incubator.http.internal.frame.WindowUpdateFrame;
 import jdk.incubator.http.internal.hpack.Encoder;
 import jdk.incubator.http.internal.hpack.Decoder;
 import jdk.incubator.http.internal.hpack.DecodingCallback;
@@ -330,13 +353,7 @@
     private void connectFlows(HttpConnection connection) {
         FlowTube tube =  connection.getConnectionFlow();
         // Connect the flow to our Http2TubeSubscriber:
-        // Using connection.publisher() here is a hack that
-        // allows us to continue calling connection.writeAsync()
-        // and connection.flushAsync() transparently.
-        // We will eventually need to implement our own publisher
-        // to write to the flow instead.
-        tube.connectFlows(connection.publisher(), // hack
-                      subscriber);
+        tube.connectFlows(connection.publisher(), subscriber);
     }
 
     final HttpClientImpl client() {
@@ -458,6 +475,14 @@
 //        return words.stream().collect(Collectors.joining(" "));
 //    }
 
+    private HttpPublisher publisher() {
+        return connection.publisher();
+    }
+
+    private List<ByteBuffer> toBuffers(ByteBufferReference[] refs) {
+        return List.of(ByteBufferReference.toBuffers(refs));
+    }
+
     private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
             throws IOException
     {
@@ -802,8 +827,9 @@
         ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
         Log.logFrames(sf, "OUT");
         // send preface bytes and SettingsFrame together
-        connection.writeAsync(new ByteBufferReference[] {ref});
-        connection.flushAsync();
+        HttpPublisher publisher = publisher();
+        publisher.enqueue(List.of(ref.get()));
+        publisher.signalEnqueued();
         // mark preface sent.
         framesController.markPrefaceSent();
         Log.logTrace("PREFACE_BYTES sent");
@@ -979,18 +1005,19 @@
 
     void sendFrame(Http2Frame frame) {
         try {
+            HttpPublisher publisher = publisher();
             synchronized (sendlock) {
                 if (frame instanceof OutgoingHeaders) {
                     @SuppressWarnings("unchecked")
                     OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
                     Stream<?> stream = registerNewStream(oh);
                     // provide protection from inserting unordered frames between Headers and Continuation
-                    connection.writeAsync(encodeHeaders(oh, stream));
+                    publisher.enqueue(toBuffers(encodeHeaders(oh, stream)));
                 } else {
-                    connection.writeAsync(encodeFrame(frame));
+                    publisher.enqueue(toBuffers(encodeFrame(frame)));
                 }
             }
-            connection.flushAsync();
+            publisher.signalEnqueued();
         } catch (IOException e) {
             if (!closed) {
                 Log.logError(e);
@@ -1006,8 +1033,9 @@
 
     void sendDataFrame(DataFrame frame) {
         try {
-            connection.writeAsync(encodeFrame(frame));
-            connection.flushAsync();
+            HttpPublisher publisher = publisher();
+            publisher.enqueue(toBuffers(encodeFrame(frame)));
+            publisher.signalEnqueued();
         } catch (IOException e) {
             if (!closed) {
                 Log.logError(e);
@@ -1023,8 +1051,9 @@
      */
     void sendUnorderedFrame(Http2Frame frame) {
         try {
-            connection.writeAsyncUnordered(encodeFrame(frame));
-            connection.flushAsync();
+            HttpPublisher publisher = publisher();
+            publisher.enqueueUnordered(toBuffers(encodeFrame(frame)));
+            publisher.signalEnqueued();
         } catch (IOException e) {
             if (!closed) {
                 Log.logError(e);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java	Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java	Thu Nov 09 14:28:00 2017 +0000
@@ -31,7 +31,6 @@
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-import java.util.Arrays;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,7 +38,6 @@
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.Flow;
 import jdk.incubator.http.HttpClient.Version;
-import jdk.incubator.http.internal.common.ByteBufferReference;
 import jdk.incubator.http.internal.common.Demand;
 import jdk.incubator.http.internal.common.FlowTube;
 import jdk.incubator.http.internal.common.SequentialScheduler;
@@ -58,7 +56,7 @@
  *      AsyncSSLConnection: TLS channel direct to server
  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
  */
-abstract class HttpConnection implements Closeable, AsyncConnection {
+abstract class HttpConnection implements Closeable {
 
     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
@@ -123,7 +121,11 @@
                 (connected() ? !getConnectionFlow().isFinished() : true);
     }
 
-    interface HttpPublisher extends FlowTube.TubePublisher { }
+    interface HttpPublisher extends FlowTube.TubePublisher {
+        void enqueue(List<ByteBuffer> buffers) throws IOException;
+        void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
+        void signalEnqueued() throws IOException;
+    }
 
     /**
      * Returns the HTTP publisher associated with this connection.  May be null
@@ -289,8 +291,11 @@
 
     abstract FlowTube getConnectionFlow();
 
-    // This queue and publisher are temporary, and only needed because
-    // the calling code still uses writeAsync/flushAsync
+    /**
+     * A publisher that makes it possible to publish (write)
+     * ordered (normal priority) and unordered (high priority)
+     * buffers downstream.
+     */
     final class PlainHttpPublisher implements HttpPublisher {
         final Object reading;
         PlainHttpPublisher() {
@@ -314,6 +319,7 @@
                 }
                 this.subscriber = subscriber;
             }
+            // TODO: should we do this in the flow?
             subscriber.onSubscribe(subscription);
             signal();
         }
@@ -364,25 +370,23 @@
             }
         }
 
-        public void writeAsync(ByteBufferReference[] buffers) throws IOException {
-            List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers));
-            queue.add(l);
-            int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum();
+        @Override
+        public void enqueue(List<ByteBuffer> buffers) throws IOException {
+            queue.add(buffers);
+            int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
             debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);
         }
 
-        public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
+        @Override
+        public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
             // Unordered frames are sent before existing frames.
-            List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers));
-            int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum();
-            queue.addFirst(l);
+            int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
+            queue.addFirst(buffers);
             debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
         }
 
-        public void flushAsync() throws IOException {
-            // ### Remove flushAsync
-            // no-op. Should not be needed now with Tube.
-            // Tube.write will initiate the low-level write
+        @Override
+        public void signalEnqueued() throws IOException {
             debug.log(Level.DEBUG, "signalling the publisher of the write queue");
             signal();
         }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java	Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java	Thu Nov 09 14:28:00 2017 +0000
@@ -37,7 +37,6 @@
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.CompletableFuture;
-import jdk.incubator.http.internal.common.ByteBufferReference;
 import jdk.incubator.http.internal.common.FlowTube;
 import jdk.incubator.http.internal.common.Log;
 import jdk.incubator.http.internal.common.MinimalFuture;
@@ -53,8 +52,6 @@
     private final Object reading = new Object();
     protected final SocketChannel chan;
     private final FlowTube tube;
-    // The PlainHttpPublisher is a temporary hack needed because we still
-    // use writeAsync/flushAsync
     private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
     private volatile boolean connected;
     private boolean closed;
@@ -157,21 +154,6 @@
     @Override
     HttpPublisher publisher() { return writePublisher; }
 
-    @Override
-    public void writeAsync(ByteBufferReference[] buffers) throws IOException {
-        writePublisher.writeAsync(buffers);
-    }
-
-    @Override
-    public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
-        writePublisher.writeAsyncUnordered(buffers);
-    }
-
-    @Override
-    public void flushAsync() throws IOException {
-        writePublisher.flushAsync();
-    }
-
 
     @Override
     public String toString() {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java	Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java	Thu Nov 09 14:28:00 2017 +0000
@@ -112,21 +112,6 @@
     }
 
     @Override
-    public void writeAsync(ByteBufferReference[] buffers) throws IOException {
-        delegate.writeAsync(buffers);
-    }
-
-    @Override
-    public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
-        delegate.writeAsyncUnordered(buffers);
-    }
-
-    @Override
-    public void flushAsync() throws IOException {
-        delegate.flushAsync();
-    }
-
-    @Override
     public void close() {
         delegate.close();
         connected = false;
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java	Thu Nov 09 14:24:43 2017 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java	Thu Nov 09 14:28:00 2017 +0000
@@ -207,16 +207,6 @@
         @Override public HttpPublisher publisher() {return error();}
         @Override public CompletableFuture<Void> connectAsync() {return error();}
         @Override SocketChannel channel() {return error();}
-        @Override public void flushAsync() throws IOException {error();}
-        @Override
-        public void writeAsync(ByteBufferReference[] buffers) throws IOException {
-            error();
-        }
-        @Override
-        public void writeAsyncUnordered(ByteBufferReference[] buffers)
-                throws IOException {
-            error();
-        }
         @Override
         HttpConnection.DetachedConnectionChannel detachChannel() {
             return error();