--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Fri Nov 03 10:01:08 2017 -0700
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainHttpConnection.java Wed Dec 06 11:11:59 2017 -0800
@@ -26,73 +26,42 @@
package jdk.incubator.http;
import java.io.IOException;
+import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Consumer;
-import java.util.function.Supplier;
-
-import jdk.incubator.http.internal.common.AsyncWriteQueue;
-import jdk.incubator.http.internal.common.ByteBufferReference;
+import jdk.incubator.http.internal.common.FlowTube;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
/**
- * Plain raw TCP connection direct to destination. 2 modes
- * 1) Blocking used by http/1. In this case the connect is actually non
- * blocking but the request is sent blocking. The first byte of a response
- * is received non-blocking and the remainder of the response is received
- * blocking
- * 2) Non-blocking. In this case (for http/2) the connection is actually opened
- * blocking but all reads and writes are done non-blocking under the
- * control of a Http2Connection object.
+ * Plain raw TCP connection direct to destination.
+ * The connection operates in asynchronous non-blocking mode.
+ * All reads and writes are done non-blocking.
*/
-class PlainHttpConnection extends HttpConnection implements AsyncConnection {
+class PlainHttpConnection extends HttpConnection {
+ private final Object reading = new Object();
protected final SocketChannel chan;
+ private final FlowTube tube;
+ private final PlainHttpPublisher writePublisher = new PlainHttpPublisher(reading);
private volatile boolean connected;
private boolean closed;
// should be volatile to provide proper synchronization(visibility) action
- private volatile Consumer<ByteBufferReference> asyncReceiver;
- private volatile Consumer<Throwable> errorReceiver;
- private volatile Supplier<ByteBufferReference> readBufferSupplier;
- private boolean asyncReading;
- private final AsyncWriteQueue asyncOutputQ = new AsyncWriteQueue(this::asyncOutput);
-
- private final Object reading = new Object();
-
- @Override
- public void startReading() {
- try {
- synchronized(reading) {
- asyncReading = true;
- }
- client.registerEvent(new ReadEvent());
- } catch (IOException e) {
- shutdown();
- }
- }
-
- @Override
- public void stopAsyncReading() {
- synchronized(reading) {
- asyncReading = false;
- }
- client.cancelRegistration(chan);
- }
-
- class ConnectEvent extends AsyncEvent {
- CompletableFuture<Void> cf;
+ final class ConnectEvent extends AsyncEvent {
+ private final CompletableFuture<Void> cf;
ConnectEvent(CompletableFuture<Void> cf) {
- super(AsyncEvent.BLOCKING);
this.cf = cf;
}
@@ -109,38 +78,53 @@
@Override
public void handle() {
try {
- chan.finishConnect();
- } catch (IOException e) {
- cf.completeExceptionally(e);
- return;
+ assert !connected : "Already connected";
+ assert !chan.isBlocking() : "Unexpected blocking channel";
+ debug.log(Level.DEBUG, "ConnectEvent: finishing connect");
+ boolean finished = chan.finishConnect();
+ assert finished : "Expected channel to be connected";
+ debug.log(Level.DEBUG,
+ "ConnectEvent: connect finished: %s", finished);
+ connected = true;
+ // complete async since the event runs on the SelectorManager thread
+ cf.completeAsync(() -> null, client().theExecutor());
+ } catch (Throwable e) {
+ client().theExecutor().execute( () -> cf.completeExceptionally(e));
}
- connected = true;
- cf.complete(null);
}
@Override
- public void abort() {
+ public void abort(IOException ioe) {
close();
+ client().theExecutor().execute( () -> cf.completeExceptionally(ioe));
}
}
@Override
public CompletableFuture<Void> connectAsync() {
- CompletableFuture<Void> plainFuture = new MinimalFuture<>();
+ CompletableFuture<Void> cf = new MinimalFuture<>();
try {
- chan.configureBlocking(false);
- chan.connect(address);
- client.registerEvent(new ConnectEvent(plainFuture));
- } catch (IOException e) {
- plainFuture.completeExceptionally(e);
+ assert !connected : "Already connected";
+ assert !chan.isBlocking() : "Unexpected blocking channel";
+ boolean finished = false;
+ PrivilegedExceptionAction<Boolean> pa = () -> chan.connect(address);
+ try {
+ finished = AccessController.doPrivileged(pa);
+ } catch (PrivilegedActionException e) {
+ cf.completeExceptionally(e.getCause());
+ }
+ if (finished) {
+ debug.log(Level.DEBUG, "connect finished without blocking");
+ connected = true;
+ cf.complete(null);
+ } else {
+ debug.log(Level.DEBUG, "registering connect event");
+ client().registerEvent(new ConnectEvent(cf));
+ }
+ } catch (Throwable throwable) {
+ cf.completeExceptionally(throwable);
}
- return plainFuture;
- }
-
- @Override
- public void connect() throws IOException {
- chan.connect(address);
- connected = true;
+ return cf;
}
@Override
@@ -148,106 +132,29 @@
return chan;
}
+ @Override
+ final FlowTube getConnectionFlow() {
+ return tube;
+ }
+
PlainHttpConnection(InetSocketAddress addr, HttpClientImpl client) {
super(addr, client);
try {
this.chan = SocketChannel.open();
+ chan.configureBlocking(false);
int bufsize = client.getReceiveBufferSize();
chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
+ // wrap the connected channel in a Tube for async reading and writing
+ tube = new SocketTube(client(), chan, Utils::getBuffer);
} catch (IOException e) {
throw new InternalError(e);
}
}
@Override
- long write(ByteBuffer[] buffers, int start, int number) throws IOException {
- if (getMode() != Mode.ASYNC) {
- return chan.write(buffers, start, number);
- }
- // async
- buffers = Utils.reduce(buffers, start, number);
- long n = Utils.remaining(buffers);
- asyncOutputQ.put(ByteBufferReference.toReferences(buffers));
- flushAsync();
- return n;
- }
-
- @Override
- long write(ByteBuffer buffer) throws IOException {
- if (getMode() != Mode.ASYNC) {
- return chan.write(buffer);
- }
- // async
- long n = buffer.remaining();
- asyncOutputQ.put(ByteBufferReference.toReferences(buffer));
- flushAsync();
- return n;
- }
-
- // handle registered WriteEvent; invoked from SelectorManager thread
- void flushRegistered() {
- if (getMode() == Mode.ASYNC) {
- try {
- asyncOutputQ.flushDelayed();
- } catch (IOException e) {
- // Only IOException caused by closed Queue is expected here
- shutdown();
- }
- }
- }
+ HttpPublisher publisher() { return writePublisher; }
- @Override
- public void writeAsync(ByteBufferReference[] buffers) throws IOException {
- if (getMode() != Mode.ASYNC) {
- chan.write(ByteBufferReference.toBuffers(buffers));
- ByteBufferReference.clear(buffers);
- } else {
- asyncOutputQ.put(buffers);
- }
- }
-
- @Override
- public void writeAsyncUnordered(ByteBufferReference[] buffers) throws IOException {
- if (getMode() != Mode.ASYNC) {
- chan.write(ByteBufferReference.toBuffers(buffers));
- ByteBufferReference.clear(buffers);
- } else {
- // Unordered frames are sent before existing frames.
- asyncOutputQ.putFirst(buffers);
- }
- }
-
- @Override
- public void flushAsync() throws IOException {
- if (getMode() == Mode.ASYNC) {
- asyncOutputQ.flush();
- }
- }
-
- @Override
- public void enableCallback() {
- // not used
- assert false;
- }
-
- boolean asyncOutput(ByteBufferReference[] refs, AsyncWriteQueue delayCallback) {
- try {
- ByteBuffer[] bufs = ByteBufferReference.toBuffers(refs);
- while (Utils.remaining(bufs) > 0) {
- long n = chan.write(bufs);
- if (n == 0) {
- delayCallback.setDelayed(refs);
- client.registerEvent(new WriteEvent());
- return false;
- }
- }
- ByteBufferReference.clear(refs);
- } catch (IOException e) {
- shutdown();
- }
- return true;
- }
@Override
public String toString() {
@@ -255,7 +162,7 @@
}
/**
- * Close this connection
+ * Closes this connection
*/
@Override
public synchronized void close() {
@@ -264,80 +171,23 @@
}
closed = true;
try {
- Log.logError("Closing: " + toString());
+ Log.logTrace("Closing: " + toString());
chan.close();
} catch (IOException e) {}
}
@Override
void shutdownInput() throws IOException {
+ debug.log(Level.DEBUG, "Shutting down input");
chan.shutdownInput();
}
@Override
void shutdownOutput() throws IOException {
+ debug.log(Level.DEBUG, "Shutting down output");
chan.shutdownOutput();
}
- void shutdown() {
- close();
- errorReceiver.accept(new IOException("Connection aborted"));
- }
-
- void asyncRead() {
- synchronized (reading) {
- try {
- while (asyncReading) {
- ByteBufferReference buf = readBufferSupplier.get();
- int n = chan.read(buf.get());
- if (n == -1) {
- throw new IOException();
- }
- if (n == 0) {
- buf.clear();
- return;
- }
- buf.get().flip();
- asyncReceiver.accept(buf);
- }
- } catch (IOException e) {
- shutdown();
- }
- }
- }
-
- @Override
- protected ByteBuffer readImpl() throws IOException {
- ByteBuffer dst = ByteBuffer.allocate(8192);
- int n = readImpl(dst);
- if (n > 0) {
- return dst;
- } else if (n == 0) {
- return Utils.EMPTY_BYTEBUFFER;
- } else {
- return null;
- }
- }
-
- private int readImpl(ByteBuffer buf) throws IOException {
- int mark = buf.position();
- int n;
- // FIXME: this hack works in conjunction with the corresponding change
- // in jdk.incubator.http.RawChannel.registerEvent
- //if ((n = buffer.remaining()) != 0) {
- //buf.put(buffer);
- //} else {
- n = chan.read(buf);
- //}
- if (n == -1) {
- return -1;
- }
- Utils.flipToMark(buf, mark);
- // String s = "Receive (" + n + " bytes) ";
- //debugPrint(s, buf);
- return n;
- }
-
@Override
ConnectionPool.CacheKey cacheKey() {
return new ConnectionPool.CacheKey(address, null);
@@ -348,98 +198,6 @@
return connected;
}
- // used for all output in HTTP/2
- class WriteEvent extends AsyncEvent {
- WriteEvent() {
- super(0);
- }
-
- @Override
- public SelectableChannel channel() {
- return chan;
- }
-
- @Override
- public int interestOps() {
- return SelectionKey.OP_WRITE;
- }
-
- @Override
- public void handle() {
- flushRegistered();
- }
-
- @Override
- public void abort() {
- shutdown();
- }
- }
-
- // used for all input in HTTP/2
- class ReadEvent extends AsyncEvent {
- ReadEvent() {
- super(AsyncEvent.REPEATING); // && !BLOCKING
- }
-
- @Override
- public SelectableChannel channel() {
- return chan;
- }
-
- @Override
- public int interestOps() {
- return SelectionKey.OP_READ;
- }
-
- @Override
- public void handle() {
- asyncRead();
- }
-
- @Override
- public void abort() {
- shutdown();
- }
-
- @Override
- public String toString() {
- return super.toString() + "/" + chan;
- }
- }
-
- // used in blocking channels only
- class ReceiveResponseEvent extends AsyncEvent {
- CompletableFuture<Void> cf;
-
- ReceiveResponseEvent(CompletableFuture<Void> cf) {
- super(AsyncEvent.BLOCKING);
- this.cf = cf;
- }
- @Override
- public SelectableChannel channel() {
- return chan;
- }
-
- @Override
- public void handle() {
- cf.complete(null);
- }
-
- @Override
- public int interestOps() {
- return SelectionKey.OP_READ;
- }
-
- @Override
- public void abort() {
- close();
- }
-
- @Override
- public String toString() {
- return super.toString() + "/" + chan;
- }
- }
@Override
boolean isSecure() {
@@ -451,24 +209,91 @@
return false;
}
- @Override
- public void setAsyncCallbacks(Consumer<ByteBufferReference> asyncReceiver,
- Consumer<Throwable> errorReceiver,
- Supplier<ByteBufferReference> readBufferSupplier) {
- this.asyncReceiver = asyncReceiver;
- this.errorReceiver = errorReceiver;
- this.readBufferSupplier = readBufferSupplier;
+ // Support for WebSocket/RawChannelImpl which unfortunately
+ // still depends on synchronous read/writes.
+ // It should be removed when RawChannelImpl moves to using asynchronous APIs.
+ private static final class PlainDetachedChannel
+ extends DetachedConnectionChannel {
+ final PlainHttpConnection plainConnection;
+ boolean closed;
+ PlainDetachedChannel(PlainHttpConnection conn) {
+ // We're handing the connection channel over to a web socket.
+ // We need the selector manager's thread to stay alive until
+ // the WebSocket is closed.
+ conn.client().webSocketOpen();
+ this.plainConnection = conn;
+ }
+
+ @Override
+ SocketChannel channel() {
+ return plainConnection.channel();
+ }
+
+ @Override
+ ByteBuffer read() throws IOException {
+ ByteBuffer dst = ByteBuffer.allocate(8192);
+ int n = readImpl(dst);
+ if (n > 0) {
+ return dst;
+ } else if (n == 0) {
+ return Utils.EMPTY_BYTEBUFFER;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void close() {
+ HttpClientImpl client = plainConnection.client();
+ try {
+ plainConnection.close();
+ } finally {
+ // notify the HttpClientImpl that the websocket is no
+ // no longer operating.
+ synchronized(this) {
+ if (closed == true) return;
+ closed = true;
+ }
+ client.webSocketClose();
+ }
+ }
+
+ @Override
+ public long write(ByteBuffer[] buffers, int start, int number)
+ throws IOException
+ {
+ return channel().write(buffers, start, number);
+ }
+
+ @Override
+ public void shutdownInput() throws IOException {
+ plainConnection.shutdownInput();
+ }
+
+ @Override
+ public void shutdownOutput() throws IOException {
+ plainConnection.shutdownOutput();
+ }
+
+ private int readImpl(ByteBuffer buf) throws IOException {
+ int mark = buf.position();
+ int n;
+ n = channel().read(buf);
+ if (n == -1) {
+ return -1;
+ }
+ Utils.flipToMark(buf, mark);
+ return n;
+ }
}
+ // Support for WebSocket/RawChannelImpl which unfortunately
+ // still depends on synchronous read/writes.
+ // It should be removed when RawChannelImpl moves to using asynchronous APIs.
@Override
- CompletableFuture<Void> whenReceivingResponse() {
- CompletableFuture<Void> cf = new MinimalFuture<>();
- try {
- ReceiveResponseEvent evt = new ReceiveResponseEvent(cf);
- client.registerEvent(evt);
- } catch (IOException e) {
- cf.completeExceptionally(e);
- }
- return cf;
+ DetachedConnectionChannel detachChannel() {
+ client().cancelRegistration(channel());
+ return new PlainDetachedChannel(this);
}
+
}