http-client-branch: use direct buffer pool for reading off SSL encrypted buffers from the socket + minor test fixes.
--- a/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLConnection.java Mon Apr 23 15:45:40 2018 +0100
@@ -61,6 +61,7 @@
// create the SSLTube wrapping the SocketTube, with the given engine
flow = new SSLTube(engine,
client().theExecutor(),
+ client().getSSLBufferSupplier()::recycle,
plainConnection.getConnectionFlow());
return null; } );
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/AsyncSSLTunnelConnection.java Mon Apr 23 15:45:40 2018 +0100
@@ -65,6 +65,7 @@
// create the SSLTube wrapping the SocketTube, with the given engine
flow = new SSLTube(engine,
client().theExecutor(),
+ client().getSSLBufferSupplier()::recycle,
plainConnection.getConnectionFlow());
return null;} );
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http2Connection.java Mon Apr 23 15:45:40 2018 +0100
@@ -131,22 +131,38 @@
* in case of SSL connection.
*
* 1. Outgoing frames encoded to ByteBuffers.
- * Outgoing ByteBuffers are created with requited size and frequently small (except DataFrames, etc)
+ * Outgoing ByteBuffers are created with required size and frequently small (except DataFrames, etc)
* At this place no pools at all. All outgoing buffers should be collected by GC.
*
* 2. Incoming ByteBuffers (decoded to frames).
* Here, total elimination of BB pool is not a good idea.
* We don't know how many bytes we will receive through network.
- * So here we allocate buffer of reasonable size. The following life of the BB:
- * - If all frames decoded from the BB are other than DataFrame and HeaderFrame (and HeaderFrame subclasses)
- * BB is returned to pool,
- * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer obtained by slice() method.
- * Such BB is never returned to pool and will be GCed.
- * - If we decoded HeadersFrame from the BB. Then header decoding is performed inside processFrame method and
- * the buffer could be release to pool.
+ *
+ * So here is a strategy we could try to implement:
+ * We allocate buffer of reasonable size. The following life of the BB:
+ * - If all frames decoded from the BB are other than DataFrame andHeaderFrame
+ * (and HeaderFrame subclasses) BB is returned to pool,
+ * - If we decoded DataFrame from the BB. In that case DataFrame refers to subbuffer
+ * obtained by slice() method. Such BB is never returned to pool and will be GCed.
+ * - If we decoded HeadersFrame from the BB. Then header decoding is performed
+ * inside processFrame method and the buffer could be release to pool.
*
- * 3. SLL encrypted buffers. Here another pool was introduced and all net buffers are to/from the pool,
- * because of we can't predict size encrypted packets.
+ * At this moment we do not implement this strategy.
+ * Instead we only use a pool for recycling SSL encrypted buffers read from
+ * the socket (see 3).
+ *
+ * 3. SSL encrypted buffers. Here another pool was introduced and all net buffers are to/from
+ * the pool, because of we can't predict size encrypted packets.
+ *
+ * At the moment we only recycle encrypted buffers read from the socket, and we have
+ * a pool of maximum 3 (SocketTube.MAX_BUFFERS = 3) direct buffers which are shared by
+ * all connections on a given client.
+ * This pool is used by all SSL connections - whether HTTP/1.1 or HTTP/2, but only
+ * for SSL encrypted buffers that circulate between the SocketTube publisher and
+ * the SSLFlowDelegate Reader. Limiting the pool to this particular segment allows
+ * us to use direct buffer and avoid one more copy.
+ * See HttpClientImpl.SSLDirectBufferSupplier, SocketTube.SSLDirectBufferSource, and
+ * SSLTube.recycler.
*
*/
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Mon Apr 23 15:45:40 2018 +0100
@@ -28,12 +28,12 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.net.Authenticator;
import java.net.CookieHandler;
import java.net.ProxySelector;
+import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
@@ -47,7 +47,6 @@
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -57,6 +56,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -70,6 +70,7 @@
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.PushPromiseHandler;
import java.net.http.WebSocket;
+import jdk.internal.net.http.common.BufferSupplier;
import jdk.internal.net.http.common.Log;
import jdk.internal.net.http.common.Logger;
import jdk.internal.net.http.common.Pair;
@@ -138,6 +139,13 @@
private final long id;
private final String dbgTag;
+ // The SSL DirectBuffer Supplier provides the ability to recycle
+ // buffers used between the socket reader and the SSLEngine, or
+ // more precisely between the SocketTube publisher and the
+ // SSLFlowDelegate reader.
+ private final SSLDirectBufferSupplier sslBufferSupplier
+ = new SSLDirectBufferSupplier(this);
+
// This reference is used to keep track of the facade HttpClient
// that was returned to the application code.
// It makes it possible to know when the application no longer
@@ -1164,4 +1172,70 @@
0 // only set the size if > 0
);
}
+
+ // Optimization for reading SSL encrypted data
+ // --------------------------------------------
+
+ // Returns a BufferSupplier that can be used for reading
+ // encrypted bytes of the socket. These buffers can then
+ // be recycled by the SSLFlowDelegate::Reader after their
+ // content has been copied in the SSLFlowDelegate::Reader
+ // readBuf.
+ // Because allocating, reading, copying, and recycling
+ // all happen in the SelectorManager thread,
+ // then this BufferSupplier can be shared between all
+ // the SSL connections managed by this client.
+ BufferSupplier getSSLBufferSupplier() {
+ return sslBufferSupplier;
+ }
+
+ // An implementation of BufferSupplier that manage a pool of
+ // maximum 3 direct byte buffers (SocketTube.MAX_BUFFERS) that
+ // are used for reading encrypted bytes off the socket before
+ // copying before unwrapping.
+ private static final class SSLDirectBufferSupplier implements BufferSupplier {
+ private final ConcurrentLinkedQueue<ByteBuffer> queue = new ConcurrentLinkedQueue<>();
+ private final HttpClientImpl client;
+ private final Logger debug;
+ public SSLDirectBufferSupplier(HttpClientImpl client) {
+ this.client = Objects.requireNonNull(client);
+ this.debug = client.debug;
+ }
+
+ // get a buffer from the pool, or allocate a new one if needed.
+ @Override
+ public ByteBuffer get() {
+ assert client.isSelectorThread();
+ ByteBuffer buf = queue.poll();
+ if (buf == null) {
+ if (debug.on()) {
+ // should not appear more than SocketTube.MAX_BUFFERS
+ debug.log("ByteBuffer.allocateDirect(%d)", Utils.BUFSIZE);
+ }
+ buf = ByteBuffer.allocateDirect(Utils.BUFSIZE);
+ } else {
+ // if (debug.on()) { // this trace is mostly noise.
+ // debug.log("ByteBuffer.recycle(%d)", buf.remaining());
+ // }
+ }
+ assert buf.isDirect();
+ assert buf.position() == 0;
+ assert buf.hasRemaining();
+ assert buf.limit() == Utils.BUFSIZE;
+ return buf;
+ }
+
+ // return the buffer to the pool
+ @Override
+ public void recycle(ByteBuffer buffer) {
+ assert client.isSelectorThread();
+ assert buffer.isDirect();
+ assert !buffer.hasRemaining();
+ buffer.position(0);
+ buffer.limit(buffer.capacity());
+ queue.offer(buffer);
+ assert queue.size() <= SocketTube.MAX_BUFFERS;
+ }
+ }
+
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Mon Apr 23 15:45:40 2018 +0100
@@ -26,7 +26,6 @@
package jdk.internal.net.http;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
@@ -39,6 +38,7 @@
import java.util.ArrayList;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import jdk.internal.net.http.common.BufferSupplier;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.FlowTube;
import jdk.internal.net.http.common.Logger;
@@ -59,7 +59,7 @@
private final HttpClientImpl client;
private final SocketChannel channel;
- private final Supplier<ByteBuffer> buffersSource;
+ private final SliceBufferSource sliceBuffersSource;
private final Object lock = new Object();
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
private final InternalReadPublisher readPublisher;
@@ -67,10 +67,11 @@
private final long id = IDS.incrementAndGet();
public SocketTube(HttpClientImpl client, SocketChannel channel,
- Supplier<ByteBuffer> buffersSource) {
+ Supplier<ByteBuffer> buffersFactory) {
this.client = client;
this.channel = channel;
- this.buffersSource = buffersSource;
+ this.sliceBuffersSource = new SliceBufferSource(buffersFactory);
+
this.readPublisher = new InternalReadPublisher();
this.writeSubscriber = new InternalWriteSubscriber();
}
@@ -564,6 +565,7 @@
final InternalReadSubscription impl;
final TubeSubscriber subscriber;
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+ final BufferSource bufferSource;
volatile boolean subscribed;
volatile boolean cancelled;
volatile boolean completed;
@@ -571,6 +573,9 @@
public ReadSubscription(InternalReadSubscription impl,
TubeSubscriber subscriber) {
this.impl = impl;
+ this.bufferSource = subscriber.supportsRecycling()
+ ? new SSLDirectBufferSource(client)
+ : SocketTube.this.sliceBuffersSource;
this.subscriber = subscriber;
}
@@ -779,7 +784,7 @@
if (demand.tryDecrement()) {
// we have demand.
try {
- List<ByteBuffer> bytes = readAvailable();
+ List<ByteBuffer> bytes = readAvailable(subscription.bufferSource);
if (bytes == EOF) {
if (!completed) {
if (debug.on()) debug.log("got read EOF");
@@ -906,6 +911,147 @@
}
// ===================================================================== //
+ // Buffer Management //
+ // ===================================================================== //
+
+ // This interface is used by readAvailable(BufferSource);
+ public interface BufferSource {
+ /**
+ * Returns a buffer to read data from the socket.
+ * Different implementation can have different strategies, as to
+ * which kind of buffer to return, or whether to return the same
+ * buffer. The only constraints are that
+ * a. the buffer returned must not be null
+ * b. the buffer position indicates where to start reading
+ * c. the buffer limit indicates where to stop reading.
+ * d. the buffer is 'free' - that is - it is not used
+ * or retained by anybody else
+ * @return A buffer to read data from the socket.
+ */
+ ByteBuffer getBuffer();
+
+ /**
+ * Append the data read into the buffer to the list of buffer to
+ * be sent downstream to the subscriber. May return a new
+ * list, or append to the given list.
+ *
+ * Different implementation can have different strategies, but
+ * must obviously be consistent with the implementation of the
+ * getBuffer() method. For instance, an implementation could
+ * decide to add the buffer to the list and return a new buffer
+ * next time getBuffer() is called, or could decide to add a buffer
+ * slice to the list and return the same buffer (if remaining
+ * space is available) next time getBuffer() is called.
+ *
+ * @param list The list before adding the data. Can be null.
+ * @param buffer The buffer containing the data to add to the list.
+ * @param start The start position at which data were read.
+ * The current buffer position indicates the end.
+ * @return A possibly new list where a buffer containing the
+ * data read from the socket has been added.
+ */
+ List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buffer, int start);
+ }
+
+ // An implementation of BufferSource used for unencrypted data.
+ // This buffer source uses heap buffers and avoids wasting memory
+ // by forwarding read only buffer slices downstream.
+ // Buffers allocated through this source are simply GC'ed when
+ // they are no longer referenced.
+ static final class SliceBufferSource implements BufferSource {
+ private final Supplier<ByteBuffer> factory;
+ private volatile ByteBuffer current;
+ public SliceBufferSource() {
+ this(Utils::getBuffer);
+ }
+ public SliceBufferSource(Supplier<ByteBuffer> factory) {
+ this.factory = Objects.requireNonNull(factory);
+ }
+
+ // reuse the same buffer if some space remains available.
+ // otherwise, returns a new heap buffer.
+ @Override
+ public final ByteBuffer getBuffer() {
+ ByteBuffer buf = current;
+ buf = (buf == null || !buf.hasRemaining())
+ ? (current = factory.get()) : buf;
+ assert buf.hasRemaining();
+ return buf;
+ }
+
+ // Adds a read only slice to the list, potentially returning a
+ // new list with with that slice at the end.
+ @Override
+ public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
+ // creates a slice to add to the list
+ int limit = buf.limit();
+ buf.limit(buf.position());
+ buf.position(start);
+ ByteBuffer slice = buf.slice();
+
+ // restore buffer state to what it was before creating the slice
+ buf.position(buf.limit());
+ buf.limit(limit);
+
+ // add the buffer to the list
+ return SocketTube.listOf(list, slice.asReadOnlyBuffer());
+ }
+ }
+
+
+ // An implementation of BufferSource used for encrypted data.
+ // This buffer source use direct byte buffers that will be
+ // recycled by the SocketTube subscriber.
+ //
+ static final class SSLDirectBufferSource implements BufferSource {
+ private final Supplier<ByteBuffer> factory;
+ private final HttpClientImpl client;
+ private volatile ByteBuffer current;
+
+ public SSLDirectBufferSource(HttpClientImpl client) {
+ this.client = Objects.requireNonNull(client);
+ this.factory = Objects.requireNonNull(client.getSSLBufferSupplier());
+ }
+
+ // Obtain a 'free' byte buffer from the pool, or return
+ // the same buffer if nothing was read at the previous cycle.
+ // The subscriber will be responsible for recycling this
+ // buffer into the pool (see SSLFlowDelegate.Reader)
+ @Override
+ public final ByteBuffer getBuffer() {
+ assert client.isSelectorThread();
+ ByteBuffer buf = current;
+ if (buf == null) {
+ buf = current = factory.get();
+ }
+ assert buf.hasRemaining();
+ assert buf.position() == 0;
+ return buf;
+ }
+
+ // Adds the buffer to the list. The buffer will be later returned to the
+ // pool by the subscriber (see SSLFlowDelegate.Reader).
+ // The next buffer returned by getBuffer() will be obtained from the
+ // pool. It might be the same buffer or another one.
+ // Because socket tube can read up to MAX_BUFFERS = 3 buffers, and because
+ // recycling will happen in the flow before onNext returns, then the
+ // pool can not grow larger than MAX_BUFFERS = 3 buffers, even though
+ // it's shared by all SSL connections opened on that client.
+ @Override
+ public final List<ByteBuffer> append(List <ByteBuffer> list, ByteBuffer buf, int start) {
+ assert client.isSelectorThread();
+ assert buf.isDirect();
+ assert start == 0;
+ assert current == buf;
+ current = null;
+ buf.limit(buf.position());
+ buf.position(start);
+ // add the buffer to the list
+ return SocketTube.listOf(list, buf);
+ }
+ }
+
+ // ===================================================================== //
// Socket Channel Read/Write //
// ===================================================================== //
static final int MAX_BUFFERS = 3;
@@ -918,11 +1064,8 @@
// is inserted into the returned buffer list, and if the current buffer
// has remaining space, that space will be used to read more data when
// the channel becomes readable again.
- private volatile ByteBuffer current;
- private List<ByteBuffer> readAvailable() throws IOException {
- ByteBuffer buf = current;
- buf = (buf == null || !buf.hasRemaining())
- ? (current = buffersSource.get()) : buf;
+ private List<ByteBuffer> readAvailable(BufferSource buffersSource) throws IOException {
+ ByteBuffer buf = buffersSource.getBuffer();
assert buf.hasRemaining();
int read;
@@ -961,31 +1104,20 @@
// check whether this buffer has still some free space available.
// if so, we will keep it for the next round.
final boolean hasRemaining = buf.hasRemaining();
-
- // creates a slice to add to the list
- int limit = buf.limit();
- buf.limit(buf.position());
- buf.position(pos);
- ByteBuffer slice = buf.slice();
+ list = buffersSource.append(list, buf, pos);
- // restore buffer state to what it was before creating the slice
- buf.position(buf.limit());
- buf.limit(limit);
-
- // add the buffer to the list
- list = addToList(list, slice.asReadOnlyBuffer());
if (read <= 0 || list.size() == MAX_BUFFERS) {
break;
}
- buf = hasRemaining ? buf : (current = buffersSource.get());
+ buf = buffersSource.getBuffer();
pos = buf.position();
assert buf.hasRemaining();
}
return list;
}
- private <T> List<T> addToList(List<T> list, T item) {
+ private static <T> List<T> listOf(List<T> list, T item) {
int size = list == null ? 0 : list.size();
switch (size) {
case 0: return List.of(item);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/BufferSupplier.java Mon Apr 23 15:45:40 2018 +0100
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package jdk.internal.net.http.common;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.Supplier;
+
+/**
+ * This interface allows to recycle buffers used for SSL decryption.
+ * Buffers that are used for reading SSL encrypted data are typically
+ * very short lived, as it is necessary to aggregate their content
+ * before calling SSLEngine::unwrap.
+ * Because both reading and copying happen in the SelectorManager
+ * thread, then it makes it possible to pool these buffers and
+ * recycle them for the next socket read, instead of simply
+ * letting them be GC'ed. That also makes it possible to use
+ * direct byte buffers, and avoid another layer of copying by
+ * the SocketChannel implementation.
+ *
+ * The HttpClientImpl has an implementation of this interface
+ * that allows to reuse the same 3 direct buffers for reading
+ * off SSL encrypted data from the socket.
+ * The BufferSupplier::get method is called by SocketTube
+ * (see SocketTube.SSLDirectBufferSource) and BufferSupplier::recycle
+ * is called by SSLFlowDelegate.Reader.
+ **/
+public interface BufferSupplier extends Supplier<ByteBuffer> {
+ /**
+ * Returns a buffer to read encrypted data off the socket.
+ * @return a buffer to read encrypted data off the socket.
+ */
+ ByteBuffer get();
+
+ /**
+ * Returns a buffer to the pool.
+ *
+ * @param buffer This must be a buffer previously obtained
+ * by calling BufferSupplier::get. The caller must
+ * not touch the buffer after returning it to
+ * the pool.
+ */
+ void recycle(ByteBuffer buffer);
+}
+
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/ByteBufferPool.java Thu Apr 19 16:47:52 2018 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package jdk.internal.net.http.common;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * The class provides reuse of ByteBuffers.
- * It is supposed that all requested buffers have the same size for a long period of time.
- * That is why there is no any logic splitting buffers into different buckets (by size). It's unnecessary.
- *
- * At the same moment it is allowed to change requested buffers size (all smaller buffers will be discarded).
- * It may be needed for example, if after rehandshaking netPacketBufferSize was changed.
- */
-public class ByteBufferPool {
-
- private final java.util.Queue<ByteBuffer> pool = new ConcurrentLinkedQueue<>();
-
- public ByteBufferPool() {
- }
-
- public ByteBufferReference get(int size) {
- ByteBuffer buffer;
- while ((buffer = pool.poll()) != null) {
- if (buffer.capacity() >= size) {
- return ByteBufferReference.of(buffer, this);
- }
- }
- return ByteBufferReference.of(ByteBuffer.allocate(size), this);
- }
-
- public void release(ByteBuffer buffer) {
- buffer.clear();
- pool.offer(buffer);
- }
-
-}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/ByteBufferReference.java Thu Apr 19 16:47:52 2018 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,88 +0,0 @@
-/*
- * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-package jdk.internal.net.http.common;
-
-import java.nio.ByteBuffer;
-import java.util.Objects;
-import java.util.function.Supplier;
-
-public class ByteBufferReference implements Supplier<ByteBuffer> {
-
- private ByteBuffer buffer;
- private final ByteBufferPool pool;
-
- public static ByteBufferReference of(ByteBuffer buffer) {
- return of(buffer, null);
- }
-
- public static ByteBufferReference of(ByteBuffer buffer, ByteBufferPool pool) {
- Objects.requireNonNull(buffer);
- return new ByteBufferReference(buffer, pool);
- }
-
- public static ByteBuffer[] toBuffers(ByteBufferReference... refs) {
- ByteBuffer[] bufs = new ByteBuffer[refs.length];
- for (int i = 0; i < refs.length; i++) {
- bufs[i] = refs[i].get();
- }
- return bufs;
- }
-
- public static ByteBufferReference[] toReferences(ByteBuffer... buffers) {
- ByteBufferReference[] refs = new ByteBufferReference[buffers.length];
- for (int i = 0; i < buffers.length; i++) {
- refs[i] = of(buffers[i]);
- }
- return refs;
- }
-
-
- public static void clear(ByteBufferReference[] refs) {
- for(ByteBufferReference ref : refs) {
- ref.clear();
- }
- }
-
- private ByteBufferReference(ByteBuffer buffer, ByteBufferPool pool) {
- this.buffer = buffer;
- this.pool = pool;
- }
-
- @Override
- public ByteBuffer get() {
- ByteBuffer buf = this.buffer;
- assert buf!=null : "getting ByteBuffer after clearance";
- return buf;
- }
-
- public void clear() {
- ByteBuffer buf = this.buffer;
- assert buf!=null : "double ByteBuffer clearance";
- this.buffer = null;
- if (pool != null) {
- pool.release(buf);
- }
- }
-}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/FlowTube.java Mon Apr 23 15:45:40 2018 +0100
@@ -63,6 +63,8 @@
*/
default void dropSubscription() { }
+ default boolean supportsRecycling() { return false; }
+
}
/**
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Mon Apr 23 15:45:40 2018 +0100
@@ -45,6 +45,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
/**
* Implements SSL using two SubscriberWrappers.
@@ -96,6 +97,7 @@
volatile boolean close_notify_received;
final CompletableFuture<Void> readerCF;
final CompletableFuture<Void> writerCF;
+ final Consumer<ByteBuffer> recycler;
static AtomicInteger scount = new AtomicInteger(1);
final int id;
@@ -109,8 +111,23 @@
Subscriber<? super List<ByteBuffer>> downReader,
Subscriber<? super List<ByteBuffer>> downWriter)
{
+ this(engine, exec, null, downReader, downWriter);
+ }
+
+ /**
+ * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
+ * Flow.Subscriber requires an associated {@link CompletableFuture}
+ * for errors that need to be signaled from downstream to upstream.
+ */
+ public SSLFlowDelegate(SSLEngine engine,
+ Executor exec,
+ Consumer<ByteBuffer> recycler,
+ Subscriber<? super List<ByteBuffer>> downReader,
+ Subscriber<? super List<ByteBuffer>> downWriter)
+ {
this.id = scount.getAndIncrement();
this.tubeName = String.valueOf(downWriter);
+ this.recycler = recycler;
this.reader = new Reader();
this.writer = new Writer();
this.engine = engine;
@@ -212,7 +229,7 @@
* Upstream subscription strategy is to try and keep no more than
* TARGET_BUFSIZE bytes in readBuf
*/
- class Reader extends SubscriberWrapper {
+ class Reader extends SubscriberWrapper implements FlowTube.TubeSubscriber {
// Maximum record size is 16k.
// Because SocketTube can feeds us up to 3 16K buffers,
// then setting this size to 16K means that the readBuf
@@ -237,6 +254,11 @@
readBuf.limit(0); // keep in read mode
}
+ @Override
+ public boolean supportsRecycling() {
+ return recycler != null;
+ }
+
protected SchedulingAction enterScheduling() {
return enterReadScheduling();
}
@@ -292,6 +314,11 @@
reallocReadBuf();
readBuf.put(buf);
readBuf.flip();
+ // should be safe to call inside lock
+ // since the only implementation
+ // offers the buffer to an unbounded queue.
+ // WARNING: do not touch buf after this point!
+ if (recycler != null) recycler.accept(buf);
}
if (complete) {
this.completing = complete;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLTube.java Mon Apr 23 15:45:40 2018 +0100
@@ -77,6 +77,13 @@
private volatile boolean finished;
public SSLTube(SSLEngine engine, Executor executor, FlowTube tube) {
+ this(engine, executor, null, tube);
+ }
+
+ public SSLTube(SSLEngine engine,
+ Executor executor,
+ Consumer<ByteBuffer> recycler,
+ FlowTube tube) {
Objects.requireNonNull(engine);
Objects.requireNonNull(executor);
this.tube = Objects.requireNonNull(tube);
@@ -85,15 +92,17 @@
this.engine = engine;
sslDelegate = new SSLTubeFlowDelegate(engine,
executor,
+ recycler,
readSubscriber,
tube);
}
final class SSLTubeFlowDelegate extends SSLFlowDelegate {
SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
+ Consumer<ByteBuffer> recycler,
SSLSubscriberWrapper readSubscriber,
FlowTube tube) {
- super(engine, executor, readSubscriber, tube);
+ super(engine, executor, recycler, readSubscriber, tube);
}
protected SchedulingAction enterReadScheduling() {
readSubscriber.processPendingSubscriber();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Thu Apr 19 16:47:52 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/Utils.java Mon Apr 23 15:45:40 2018 +0100
@@ -581,25 +581,6 @@
return (int) remain;
}
- public static long remaining(ByteBufferReference[] refs) {
- long remain = 0;
- for (ByteBufferReference ref : refs) {
- remain += ref.get().remaining();
- }
- return remain;
- }
-
- public static int remaining(ByteBufferReference[] refs, int max) {
- long remain = 0;
- for (ByteBufferReference ref : refs) {
- remain += ref.get().remaining();
- if (remain > max) {
- throw new IllegalArgumentException("too many bytes");
- }
- }
- return (int) remain;
- }
-
public static int remaining(ByteBuffer[] refs, int max) {
long remain = 0;
for (ByteBuffer b : refs) {
@@ -623,7 +604,6 @@
public static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.allocate(0);
public static final ByteBuffer[] EMPTY_BB_ARRAY = new ByteBuffer[0];
public static final List<ByteBuffer> EMPTY_BB_LIST = List.of();
- public static final ByteBufferReference[] EMPTY_BBR_ARRAY = new ByteBufferReference[0];
/**
* Returns a slice of size {@code amount} from the given buffer. If the
--- a/test/jdk/java/net/httpclient/ProxyServer.java Thu Apr 19 16:47:52 2018 +0100
+++ b/test/jdk/java/net/httpclient/ProxyServer.java Mon Apr 23 15:45:40 2018 +0100
@@ -73,12 +73,10 @@
*/
public void close() throws IOException {
if (debug) System.out.println("Proxy: closing");
- done = true;
+ done = true;
listener.close();
for (Connection c : connections) {
- if (c.running()) {
- c.close();
- }
+ c.close();
}
}
@@ -179,7 +177,9 @@
return out.isAlive() || in.isAlive();
}
- public void close() throws IOException {
+ private volatile boolean closing;
+ public synchronized void close() throws IOException {
+ closing = true;
if (debug) System.out.println("Closing connection (proxy)");
if (serverSocket != null) serverSocket.close();
if (clientSocket != null) clientSocket.close();
@@ -238,7 +238,13 @@
i++;
commonInit(dest, 80);
- serverOut.write(buf, i, buf.length-i);
+ OutputStream sout;
+ synchronized (this) {
+ if (closing) return;
+ sout = serverOut;
+ }
+ // might fail if we're closing but we don't care.
+ sout.write(buf, i, buf.length-i);
proxyCommon();
} catch (URISyntaxException e) {
@@ -246,7 +252,8 @@
}
}
- void commonInit(String dest, int defaultPort) throws IOException {
+ synchronized void commonInit(String dest, int defaultPort) throws IOException {
+ if (closing) return;
int port;
String[] hostport = dest.split(":");
if (hostport.length == 1) {
@@ -261,7 +268,8 @@
serverIn = new BufferedInputStream(serverSocket.getInputStream());
}
- void proxyCommon() throws IOException {
+ synchronized void proxyCommon() throws IOException {
+ if (closing) return;
out = new Thread(() -> {
try {
byte[] bb = new byte[8000];
@@ -269,6 +277,7 @@
while ((n = clientIn.read(bb)) != -1) {
serverOut.write(bb, 0, n);
}
+ closing = true;
serverSocket.close();
clientSocket.close();
} catch (IOException e) {
@@ -284,6 +293,7 @@
while ((n = serverIn.read(bb)) != -1) {
clientOut.write(bb, 0, n);
}
+ closing = true;
serverSocket.close();
clientSocket.close();
} catch (IOException e) {
@@ -302,7 +312,9 @@
}
void doTunnel(String dest) throws IOException {
+ if (closing) return; // no need to go further.
commonInit(dest, 443);
+ // might fail if we're closing, but we don't care.
clientOut.write("HTTP/1.1 200 OK\r\n\r\n".getBytes());
proxyCommon();
}
--- a/test/jdk/java/net/httpclient/SmallTimeout.java Thu Apr 19 16:47:52 2018 +0100
+++ b/test/jdk/java/net/httpclient/SmallTimeout.java Mon Apr 23 15:45:40 2018 +0100
@@ -85,12 +85,14 @@
ss.setReuseAddress(false);
ss.bind(new InetSocketAddress(InetAddress.getLoopbackAddress(), 0));
int port = ss.getLocalPort();
- URI uri = new URI("http://localhost:" + port + "/");
+ URI u = new URI("http://localhost:" + port + "/");
HttpRequest[] requests = new HttpRequest[TIMEOUTS.length];
out.println("--- TESTING Async");
for (int i = 0; i < TIMEOUTS.length; i++) {
+ final int n = i;
+ URI uri = new URI(u.toString() + "/r" + n);
requests[i] = HttpRequest.newBuilder(uri)
.timeout(Duration.ofMillis(TIMEOUTS[i]))
.GET()
@@ -102,24 +104,25 @@
.whenComplete((HttpResponse<Object> r, Throwable t) -> {
Throwable cause = null;
if (r != null) {
- out.println("Unexpected response: " + r);
- cause = new RuntimeException("Unexpected response");
+ out.println("Unexpected response for r" + n + ": " + r);
+ cause = new RuntimeException("Unexpected response for r" + n);
error = true;
}
if (t != null) {
if (!(t.getCause() instanceof HttpTimeoutException)) {
- out.println("Wrong exception type:" + t.toString());
+ out.println("Wrong exception type for r" + n + ":" + t.toString());
Throwable c = t.getCause() == null ? t : t.getCause();
c.printStackTrace();
cause = c;
error = true;
} else {
- out.println("Caught expected timeout: " + t.getCause());
+ out.println("Caught expected timeout for r" + n +": " + t.getCause());
}
}
if (t == null && r == null) {
- out.println("Both response and throwable are null!");
- cause = new RuntimeException("Both response and throwable are null!");
+ out.println("Both response and throwable are null for r" + n + "!");
+ cause = new RuntimeException("Both response and throwable are null for r"
+ + n + "!");
error = true;
}
queue.add(HttpResult.of(req,cause));
@@ -134,11 +137,14 @@
// Repeat blocking in separate threads. Use queue to wait.
out.println("--- TESTING Sync");
+ System.err.println("================= TESTING Sync =====================");
// For running blocking response tasks
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < TIMEOUTS.length; i++) {
+ final int n = i;
+ URI uri = new URI(u.toString()+"/sync/r" + n);
requests[i] = HttpRequest.newBuilder(uri)
.timeout(Duration.ofMillis(TIMEOUTS[i]))
.GET()
@@ -148,11 +154,13 @@
executor.execute(() -> {
Throwable cause = null;
try {
- client.send(req, BodyHandlers.replacing(null));
+ HttpResponse<?> r = client.send(req, BodyHandlers.replacing(null));
+ out.println("Unexpected success for r" + n +": " + r);
} catch (HttpTimeoutException e) {
- out.println("Caught expected timeout: " + e);
+ out.println("Caught expected timeout for r" + n +": " + e);
} catch (Throwable ee) {
Throwable c = ee.getCause() == null ? ee : ee.getCause();
+ out.println("Unexpected exception for r" + n + ": " + c);
c.printStackTrace();
cause = c;
error = true;