http-client-branch: removed AsyncConnection interface and associated methods in subclasses
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncConnection.java Thu Nov 09 14:24:43 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,79 +0,0 @@
-/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http;
-
-import jdk.incubator.http.internal.common.ByteBufferReference;
-
-import java.io.IOException;
-
-/**
- * Implemented by classes that offer an asynchronous interface.
- *
- * PlainHttpConnection, AsyncSSLConnection.
- *
- * setAsyncCallbacks() is called to set the callback for reading
- * and error notification. Reads all happen on the selector thread, which
- * must not block.
- *
- * Writing uses the same write() methods as used in blocking mode.
- * Queues are employed on the writing side to buffer data while it is waiting
- * to be sent. This strategy relies on HTTP/2 protocol flow control to stop
- * outgoing queue from continually growing. Writes can be initiated by the
- * calling thread, but if socket becomes full then the queue is emptied by
- * the selector thread
- */
-interface AsyncConnection {
-
- /**
- * In async mode, this method puts buffers at the end of the send queue.
- * When in async mode, calling this method should later be followed by
- * subsequent flushAsync invocation.
- * That allows multiple threads to put buffers into the queue while some other
- * thread is writing.
- */
- void writeAsync(ByteBufferReference[] buffers) throws IOException;
-
- /**
- * In async mode, this method may put buffers at the beginning of send queue,
- * breaking frames sequence and allowing to write these buffers before other
- * buffers in the queue.
- * When in async mode, calling this method should later be followed by
- * subsequent flushAsync invocation.
- * That allows multiple threads to put buffers into the queue while some other
- * thread is writing.
- */
- void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException;
-
- /**
- * This method should be called after any writeAsync/writeAsyncUnordered
- * invocation.
- * If there is a race to flushAsync from several threads one thread
- * (race winner) capture flush operation and write the whole queue content.
- * Other threads (race losers) exits from the method (not blocking)
- * and continue execution.
- */
- void flushAsync() throws IOException;
-}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java Thu Nov 09 14:28:00 2017 +0000
@@ -93,21 +93,6 @@
}
@Override
- public void writeAsync(ByteBufferReference[] buffers) throws IOException {
- writePublisher.writeAsync(buffers);
- }
-
- @Override
- public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
- writePublisher.writeAsyncUnordered(buffers);
- }
-
- @Override
- public void flushAsync() throws IOException {
- writePublisher.flushAsync();
- }
-
- @Override
public void closeExceptionally(Throwable cause) {
debug.log(Level.DEBUG, () -> "closing: " + cause);
plainConnection.close();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java Thu Nov 09 14:28:00 2017 +0000
@@ -94,21 +94,6 @@
}
@Override
- public void writeAsync(ByteBufferReference[] buffers) throws IOException {
- writePublisher.writeAsync(buffers);
- }
-
- @Override
- public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
- writePublisher.writeAsyncUnordered(buffers);
- }
-
- @Override
- public void flushAsync() throws IOException {
- writePublisher.flushAsync();
- }
-
- @Override
public void close() {
plainConnection.close();
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Thu Nov 09 14:28:00 2017 +0000
@@ -512,7 +512,7 @@
}
/** A Publisher of HTTP/1.1 headers and request body. */
- final class Http1Publisher implements HttpConnection.HttpPublisher {
+ final class Http1Publisher implements FlowTube.TubePublisher {
final System.Logger debug = Utils.getDebugLogger(this::dbgString);
volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Thu Nov 09 14:28:00 2017 +0000
@@ -276,7 +276,7 @@
// static final char CR = '\r';
// static final char LF = '\n';
-// ================ Support for plugging into AsyncConnection =================
+// ================ Support for plugging into Http1Receiver =================
// ============================================================================
// Callback: Error receiver: Consumer of Throwable.
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Thu Nov 09 14:28:00 2017 +0000
@@ -45,9 +45,32 @@
import java.util.function.Function;
import java.util.function.Supplier;
import javax.net.ssl.SSLEngine;
-import jdk.incubator.http.internal.common.*;
+import jdk.incubator.http.HttpConnection.HttpPublisher;
+import jdk.incubator.http.internal.common.ByteBufferPool;
+import jdk.incubator.http.internal.common.ByteBufferReference;
+import jdk.incubator.http.internal.common.FlowTube;
import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
-import jdk.incubator.http.internal.frame.*;
+import jdk.incubator.http.internal.common.HttpHeadersImpl;
+import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.MinimalFuture;
+import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.Utils;
+import jdk.incubator.http.internal.frame.ContinuationFrame;
+import jdk.incubator.http.internal.frame.DataFrame;
+import jdk.incubator.http.internal.frame.ErrorFrame;
+import jdk.incubator.http.internal.frame.FramesDecoder;
+import jdk.incubator.http.internal.frame.FramesEncoder;
+import jdk.incubator.http.internal.frame.GoAwayFrame;
+import jdk.incubator.http.internal.frame.HeaderFrame;
+import jdk.incubator.http.internal.frame.HeadersFrame;
+import jdk.incubator.http.internal.frame.Http2Frame;
+import jdk.incubator.http.internal.frame.MalformedFrame;
+import jdk.incubator.http.internal.frame.OutgoingHeaders;
+import jdk.incubator.http.internal.frame.PingFrame;
+import jdk.incubator.http.internal.frame.PushPromiseFrame;
+import jdk.incubator.http.internal.frame.ResetFrame;
+import jdk.incubator.http.internal.frame.SettingsFrame;
+import jdk.incubator.http.internal.frame.WindowUpdateFrame;
import jdk.incubator.http.internal.hpack.Encoder;
import jdk.incubator.http.internal.hpack.Decoder;
import jdk.incubator.http.internal.hpack.DecodingCallback;
@@ -330,13 +353,7 @@
private void connectFlows(HttpConnection connection) {
FlowTube tube = connection.getConnectionFlow();
// Connect the flow to our Http2TubeSubscriber:
- // Using connection.publisher() here is a hack that
- // allows us to continue calling connection.writeAsync()
- // and connection.flushAsync() transparently.
- // We will eventually need to implement our own publisher
- // to write to the flow instead.
- tube.connectFlows(connection.publisher(), // hack
- subscriber);
+ tube.connectFlows(connection.publisher(), subscriber);
}
final HttpClientImpl client() {
@@ -458,6 +475,14 @@
// return words.stream().collect(Collectors.joining(" "));
// }
+ private HttpPublisher publisher() {
+ return connection.publisher();
+ }
+
+ private List<ByteBuffer> toBuffers(ByteBufferReference[] refs) {
+ return List.of(ByteBufferReference.toBuffers(refs));
+ }
+
private void decodeHeaders(HeaderFrame frame, DecodingCallback decoder)
throws IOException
{
@@ -802,8 +827,9 @@
ByteBufferReference ref = framesEncoder.encodeConnectionPreface(PREFACE_BYTES, sf);
Log.logFrames(sf, "OUT");
// send preface bytes and SettingsFrame together
- connection.writeAsync(new ByteBufferReference[] {ref});
- connection.flushAsync();
+ HttpPublisher publisher = publisher();
+ publisher.enqueue(List.of(ref.get()));
+ publisher.signalEnqueued();
// mark preface sent.
framesController.markPrefaceSent();
Log.logTrace("PREFACE_BYTES sent");
@@ -979,18 +1005,19 @@
void sendFrame(Http2Frame frame) {
try {
+ HttpPublisher publisher = publisher();
synchronized (sendlock) {
if (frame instanceof OutgoingHeaders) {
@SuppressWarnings("unchecked")
OutgoingHeaders<Stream<?>> oh = (OutgoingHeaders<Stream<?>>) frame;
Stream<?> stream = registerNewStream(oh);
// provide protection from inserting unordered frames between Headers and Continuation
- connection.writeAsync(encodeHeaders(oh, stream));
+ publisher.enqueue(toBuffers(encodeHeaders(oh, stream)));
} else {
- connection.writeAsync(encodeFrame(frame));
+ publisher.enqueue(toBuffers(encodeFrame(frame)));
}
}
- connection.flushAsync();
+ publisher.signalEnqueued();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
@@ -1006,8 +1033,9 @@
void sendDataFrame(DataFrame frame) {
try {
- connection.writeAsync(encodeFrame(frame));
- connection.flushAsync();
+ HttpPublisher publisher = publisher();
+ publisher.enqueue(toBuffers(encodeFrame(frame)));
+ publisher.signalEnqueued();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
@@ -1023,8 +1051,9 @@
*/
void sendUnorderedFrame(Http2Frame frame) {
try {
- connection.writeAsyncUnordered(encodeFrame(frame));
- connection.flushAsync();
+ HttpPublisher publisher = publisher();
+ publisher.enqueueUnordered(toBuffers(encodeFrame(frame)));
+ publisher.signalEnqueued();
} catch (IOException e) {
if (!closed) {
Log.logError(e);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java Thu Nov 09 14:28:00 2017 +0000
@@ -31,7 +31,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
-import java.util.Arrays;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
@@ -39,7 +38,6 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Flow;
import jdk.incubator.http.HttpClient.Version;
-import jdk.incubator.http.internal.common.ByteBufferReference;
import jdk.incubator.http.internal.common.Demand;
import jdk.incubator.http.internal.common.FlowTube;
import jdk.incubator.http.internal.common.SequentialScheduler;
@@ -58,7 +56,7 @@
* AsyncSSLConnection: TLS channel direct to server
* AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
*/
-abstract class HttpConnection implements Closeable, AsyncConnection {
+abstract class HttpConnection implements Closeable {
static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
final System.Logger debug = Utils.getDebugLogger(this::dbgString, DEBUG);
@@ -123,7 +121,11 @@
(connected() ? !getConnectionFlow().isFinished() : true);
}
- interface HttpPublisher extends FlowTube.TubePublisher { }
+ interface HttpPublisher extends FlowTube.TubePublisher {
+ void enqueue(List<ByteBuffer> buffers) throws IOException;
+ void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
+ void signalEnqueued() throws IOException;
+ }
/**
* Returns the HTTP publisher associated with this connection. May be null
@@ -289,8 +291,11 @@
abstract FlowTube getConnectionFlow();
- // This queue and publisher are temporary, and only needed because
- // the calling code still uses writeAsync/flushAsync
+ /**
+ * A publisher that makes it possible to publish (write)
+ * ordered (normal priority) and unordered (high priority)
+ * buffers downstream.
+ */
final class PlainHttpPublisher implements HttpPublisher {
final Object reading;
PlainHttpPublisher() {
@@ -314,6 +319,7 @@
}
this.subscriber = subscriber;
}
+ // TODO: should we do this in the flow?
subscriber.onSubscribe(subscription);
signal();
}
@@ -364,25 +370,23 @@
}
}
- public void writeAsync(ByteBufferReference[] buffers) throws IOException {
- List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers));
- queue.add(l);
- int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum();
+ @Override
+ public void enqueue(List<ByteBuffer> buffers) throws IOException {
+ queue.add(buffers);
+ int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);
}
- public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
+ @Override
+ public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
// Unordered frames are sent before existing frames.
- List<ByteBuffer> l = Arrays.asList(ByteBufferReference.toBuffers(buffers));
- int bytes = l.stream().mapToInt(ByteBuffer::remaining).sum();
- queue.addFirst(l);
+ int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
+ queue.addFirst(buffers);
debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
}
- public void flushAsync() throws IOException {
- // ### Remove flushAsync
- // no-op. Should not be needed now with Tube.
- // Tube.write will initiate the low-level write
+ @Override
+ public void signalEnqueued() throws IOException {
debug.log(Level.DEBUG, "signalling the publisher of the write queue");
signal();
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Thu Nov 09 14:28:00 2017 +0000
@@ -37,7 +37,6 @@
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CompletableFuture;
-import jdk.incubator.http.internal.common.ByteBufferReference;
import jdk.incubator.http.internal.common.FlowTube;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
@@ -53,8 +52,6 @@
private final Object reading = new Object();
protected final SocketChannel chan;
private final FlowTube tube;
- // The PlainHttpPublisher is a temporary hack needed because we still
- // use writeAsync/flushAsync
private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
private volatile boolean connected;
private boolean closed;
@@ -157,21 +154,6 @@
@Override
HttpPublisher publisher() { return writePublisher; }
- @Override
- public void writeAsync(ByteBufferReference[] buffers) throws IOException {
- writePublisher.writeAsync(buffers);
- }
-
- @Override
- public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
- writePublisher.writeAsyncUnordered(buffers);
- }
-
- @Override
- public void flushAsync() throws IOException {
- writePublisher.flushAsync();
- }
-
@Override
public String toString() {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Thu Nov 09 14:24:43 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Thu Nov 09 14:28:00 2017 +0000
@@ -112,21 +112,6 @@
}
@Override
- public void writeAsync(ByteBufferReference[] buffers) throws IOException {
- delegate.writeAsync(buffers);
- }
-
- @Override
- public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
- delegate.writeAsyncUnordered(buffers);
- }
-
- @Override
- public void flushAsync() throws IOException {
- delegate.flushAsync();
- }
-
- @Override
public void close() {
delegate.close();
connected = false;
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java Thu Nov 09 14:24:43 2017 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/ConnectionPoolTest.java Thu Nov 09 14:28:00 2017 +0000
@@ -207,16 +207,6 @@
@Override public HttpPublisher publisher() {return error();}
@Override public CompletableFuture<Void> connectAsync() {return error();}
@Override SocketChannel channel() {return error();}
- @Override public void flushAsync() throws IOException {error();}
- @Override
- public void writeAsync(ByteBufferReference[] buffers) throws IOException {
- error();
- }
- @Override
- public void writeAsyncUnordered(ByteBufferReference[] buffers)
- throws IOException {
- error();
- }
@Override
HttpConnection.DetachedConnectionChannel detachChannel() {
return error();