--- a/jdk/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java Fri Apr 29 13:46:19 2016 -0700
+++ b/jdk/src/java.httpclient/share/classes/java/net/http/HttpClientImpl.java Sat Apr 30 00:30:31 2016 +0100
@@ -23,28 +23,32 @@
*/
package java.net.http;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLParameters;
import java.io.IOException;
import java.net.Authenticator;
import java.net.CookieManager;
import java.net.ProxySelector;
import java.net.URI;
-import static java.net.http.Utils.BUFSIZE;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
-import static java.nio.channels.SelectionKey.OP_CONNECT;
-import static java.nio.channels.SelectionKey.OP_READ;
-import static java.nio.channels.SelectionKey.OP_WRITE;
import java.nio.channels.Selector;
-import java.util.*;
-import java.util.stream.Stream;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
-import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLParameters;
+import java.util.stream.Stream;
+
+import static java.net.http.Utils.BUFSIZE;
/**
* Client implementation. Contains all configuration information and also
@@ -53,6 +57,9 @@
*/
class HttpClientImpl extends HttpClient implements BufferHandler {
+ private static final ThreadFactory defaultFactory =
+ (r -> new Thread(null, r, "HttpClient_worker", 0, true));
+
private final CookieManager cookieManager;
private final Redirect followRedirects;
private final ProxySelector proxySelector;
@@ -67,7 +74,6 @@
private final SelectorManager selmgr;
private final FilterFactory filters;
private final Http2ClientImpl client2;
- private static final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
private final LinkedList<TimeoutEvent> timeouts;
public static HttpClientImpl create(HttpClientBuilderImpl builder) {
@@ -115,7 +121,6 @@
throw new InternalError(e);
}
selmgr.setDaemon(true);
- selmgr.setName("HttpSelector");
filters = new FilterFactory();
initFilters();
}
@@ -135,7 +140,7 @@
* 4) - mark connection as blocking
* 5) - call AsyncEvent.handle()
*
- * If exchange needs to block again, then call registerEvent() again
+ * If exchange needs to block again, then call registerEvent() again
*/
void registerEvent(AsyncEvent exchange) throws IOException {
selmgr.register(exchange);
@@ -145,35 +150,56 @@
return client2;
}
- LinkedList<ByteBuffer> freelist = new LinkedList<>();
+ /**
+ * We keep one size of buffer on free list. That size may increase
+ * depending on demand. If that happens we dispose of free buffers
+ * that are smaller than new size.
+ */
+ private final LinkedList<ByteBuffer> freelist = new LinkedList<>();
+ int currentSize = BUFSIZE;
@Override
- public synchronized ByteBuffer getBuffer() {
- if (freelist.isEmpty()) {
- return ByteBuffer.allocate(BUFSIZE);
+ public synchronized ByteBuffer getBuffer(int size) {
+
+ ByteBuffer buf;
+ if (size == -1)
+ size = currentSize;
+
+ if (size > currentSize)
+ currentSize = size;
+
+ while (!freelist.isEmpty()) {
+ buf = freelist.removeFirst();
+ if (buf.capacity() < currentSize)
+ continue;
+ buf.clear();
+ return buf;
}
- return freelist.removeFirst();
+ return ByteBuffer.allocate(size);
}
@Override
public synchronized void returnBuffer(ByteBuffer buffer) {
- buffer.clear();
freelist.add(buffer);
}
+ @Override
+ public synchronized void setMinBufferSize(int n) {
+ currentSize = Math.max(n, currentSize);
+ }
// Main loop for this client's selector
+ private final class SelectorManager extends Thread {
- class SelectorManager extends Thread {
- final Selector selector;
- boolean closed;
-
- final List<AsyncEvent> readyList;
- final List<AsyncEvent> registrations;
+ private final Selector selector;
+ private volatile boolean closed;
+ private final List<AsyncEvent> readyList;
+ private final List<AsyncEvent> registrations;
SelectorManager() throws IOException {
- readyList = new LinkedList<>();
- registrations = new LinkedList<>();
+ super(null, null, "SelectorManager", 0, false);
+ readyList = new ArrayList<>();
+ registrations = new ArrayList<>();
selector = Selector.open();
}
@@ -193,32 +219,13 @@
closed = true;
try {
selector.close();
- } catch (IOException e) {}
- }
-
- private List<AsyncEvent> copy(List<AsyncEvent> list) {
- LinkedList<AsyncEvent> c = new LinkedList<>();
- for (AsyncEvent e : list) {
- c.add(e);
- }
- return c;
- }
-
- String opvals(int i) {
- StringBuilder sb = new StringBuilder();
- if ((i & OP_READ) != 0)
- sb.append("OP_READ ");
- if ((i & OP_CONNECT) != 0)
- sb.append("OP_CONNECT ");
- if ((i & OP_WRITE) != 0)
- sb.append("OP_WRITE ");
- return sb.toString();
+ } catch (IOException ignored) { }
}
@Override
public void run() {
try {
- while (true) {
+ while (!Thread.currentThread().isInterrupted()) {
synchronized (this) {
for (AsyncEvent exchange : registrations) {
SelectableChannel c = exchange.channel();
@@ -229,7 +236,7 @@
if (key == null) {
sa = new SelectorAttachment(c, selector);
} else {
- sa = (SelectorAttachment)key.attachment();
+ sa = (SelectorAttachment) key.attachment();
}
sa.register(exchange);
} catch (IOException e) {
@@ -243,6 +250,7 @@
}
long timeval = getTimeoutValue();
long now = System.currentTimeMillis();
+ //debugPrint(selector);
int n = selector.select(timeval);
if (n == 0) {
signalTimeouts(now);
@@ -251,7 +259,7 @@
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
- SelectorAttachment sa = (SelectorAttachment)key.attachment();
+ SelectorAttachment sa = (SelectorAttachment) key.attachment();
int eventsOccurred = key.readyOps();
sa.events(eventsOccurred).forEach(readyList::add);
sa.resetInterestOps(eventsOccurred);
@@ -260,10 +268,8 @@
selector.selectedKeys().clear();
for (AsyncEvent exchange : readyList) {
- if (exchange instanceof AsyncEvent.Blocking) {
+ if (exchange.blocking()) {
exchange.channel().configureBlocking(true);
- } else {
- assert exchange instanceof AsyncEvent.NonBlocking;
}
executor.synchronize();
handleEvent(exchange); // will be delegated to executor
@@ -272,14 +278,26 @@
}
} catch (Throwable e) {
if (!closed) {
- System.err.println("HttpClientImpl terminating on error");
// This terminates thread. So, better just print stack trace
String err = Utils.stackTrace(e);
Log.logError("HttpClientImpl: fatal error: " + err);
}
+ } finally {
+ shutdown();
}
}
+ void debugPrint(Selector selector) {
+ System.err.println("Selector: debugprint start");
+ Set<SelectionKey> keys = selector.keys();
+ for (SelectionKey key : keys) {
+ SelectableChannel c = key.channel();
+ int ops = key.interestOps();
+ System.err.printf("selector chan:%s ops:%d\n", c, ops);
+ }
+ System.err.println("Selector: debugprint end");
+ }
+
void handleEvent(AsyncEvent e) {
if (closed) {
e.abort();
@@ -303,7 +321,7 @@
private final SelectableChannel chan;
private final Selector selector;
private final ArrayList<AsyncEvent> pending;
- private int interestops;
+ private int interestOps;
SelectorAttachment(SelectableChannel chan, Selector selector) {
this.pending = new ArrayList<>();
@@ -312,53 +330,53 @@
}
void register(AsyncEvent e) throws ClosedChannelException {
- int newops = e.interestOps();
- boolean reRegister = (interestops & newops) != newops;
- interestops |= newops;
+ int newOps = e.interestOps();
+ boolean reRegister = (interestOps & newOps) != newOps;
+ interestOps |= newOps;
pending.add(e);
if (reRegister) {
// first time registration happens here also
- chan.register(selector, interestops, this);
+ chan.register(selector, interestOps, this);
}
}
- int interestOps() {
- return interestops;
- }
-
/**
* Returns a Stream<AsyncEvents> containing only events that are
- * registered with the given {@code interestop}.
+ * registered with the given {@code interestOps}.
*/
- Stream<AsyncEvent> events(int interestop) {
+ Stream<AsyncEvent> events(int interestOps) {
return pending.stream()
- .filter(ev -> (ev.interestOps() & interestop) != 0);
+ .filter(ev -> (ev.interestOps() & interestOps) != 0);
}
/**
- * Removes any events with the given {@code interestop}, and if no
+ * Removes any events with the given {@code interestOps}, and if no
* events remaining, cancels the associated SelectionKey.
*/
- void resetInterestOps(int interestop) {
- int newops = 0;
+ void resetInterestOps(int interestOps) {
+ int newOps = 0;
Iterator<AsyncEvent> itr = pending.iterator();
while (itr.hasNext()) {
AsyncEvent event = itr.next();
int evops = event.interestOps();
- if ((evops & interestop) != 0) {
+ if (event.repeating()) {
+ newOps |= evops;
+ continue;
+ }
+ if ((evops & interestOps) != 0) {
itr.remove();
} else {
- newops |= evops;
+ newOps |= evops;
}
}
- interestops = newops;
+ this.interestOps = newOps;
SelectionKey key = chan.keyFor(selector);
- if (newops == 0) {
+ if (newOps == 0) {
key.cancel();
} else {
- key.interestOps(newops);
+ key.interestOps(newOps);
}
}
}
@@ -366,7 +384,8 @@
/**
* Creates a HttpRequest associated with this group.
*
- * @throws IllegalStateException if the group has been stopped
+ * @throws IllegalStateException
+ * if the group has been stopped
*/
@Override
public HttpRequestBuilderImpl request() {
@@ -376,7 +395,8 @@
/**
* Creates a HttpRequest associated with this group.
*
- * @throws IllegalStateException if the group has been stopped
+ * @throws IllegalStateException
+ * if the group has been stopped
*/
@Override
public HttpRequestBuilderImpl request(URI uri) {
@@ -444,16 +464,12 @@
return version.equals(Version.HTTP_2);
}
- //void setHttp2NotSupported(String host) {
- //http2NotSupported.put(host, false);
- //}
-
- final void initFilters() {
+ private void initFilters() {
addFilter(AuthenticationFilter.class);
addFilter(RedirectFilter.class);
}
- final void addFilter(Class<? extends HeaderFilter> f) {
+ private void addFilter(Class<? extends HeaderFilter> f) {
filters.addFilter(f);
}
@@ -479,14 +495,14 @@
iter.previous();
break;
} else if (!iter.hasNext()) {
- event.delta = event.timeval - listval ;
+ event.delta = event.timeval - listval;
}
}
iter.add(event);
selmgr.wakeupSelector();
}
- synchronized void signalTimeouts(long then) {
+ private synchronized void signalTimeouts(long then) {
if (timeouts.isEmpty()) {
return;
}
@@ -532,12 +548,12 @@
// used for the connection window
int getReceiveBufferSize() {
return Utils.getIntegerNetProperty(
- "sun.net.httpclient.connectionWindowSize", 256 * 1024
+ "java.net.httpclient.connectionWindowSize", 256 * 1024
);
}
// returns 0 meaning block forever, or a number of millis to block for
- synchronized long getTimeoutValue() {
+ private synchronized long getTimeoutValue() {
if (timeouts.isEmpty()) {
return 0;
} else {