--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1AsyncReceiver.java Thu Apr 19 16:47:52 2018 +0100
@@ -356,6 +356,7 @@
// be left over in the stream.
try {
setRetryOnError(false);
+ pending.close(null);
onReadError(new IOException("subscription cancelled"));
unsubscribe(pending);
} finally {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java Thu Apr 19 16:47:52 2018 +0100
@@ -375,9 +375,12 @@
(t) -> {
try {
if (t != null) {
- subscriber.onError(t);
- connection.close();
- cf.completeExceptionally(t);
+ try {
+ subscriber.onError(t);
+ } finally {
+ cf.completeExceptionally(t);
+ connection.close();
+ }
}
} finally {
bodyReader.onComplete(t);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Thu Apr 19 16:47:52 2018 +0100
@@ -1160,7 +1160,8 @@
// used for the connection window
int getReceiveBufferSize() {
return Utils.getIntegerNetProperty(
- "jdk.httpclient.receiveBufferSize", 2 * 1024 * 1024
+ "jdk.httpclient.receiveBufferSize",
+ 0 // only set the size if > 0
);
}
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java Thu Apr 19 16:47:52 2018 +0100
@@ -145,31 +145,39 @@
try {
this.chan = SocketChannel.open();
chan.configureBlocking(false);
- int bufsize = client.getReceiveBufferSize();
- if (!trySetReceiveBufferSize(bufsize)) {
- trySetReceiveBufferSize(256*1024);
+ trySetReceiveBufferSize(client.getReceiveBufferSize());
+ if (debug.on()) {
+ int bufsize = getInitialBufferSize();
+ debug.log("Initial receive buffer size is: %d", bufsize);
}
chan.setOption(StandardSocketOptions.TCP_NODELAY, true);
- // wrap the connected channel in a Tube for async reading and writing
+ // wrap the channel in a Tube for async reading and writing
tube = new SocketTube(client(), chan, Utils::getBuffer);
} catch (IOException e) {
throw new InternalError(e);
}
}
- private boolean trySetReceiveBufferSize(int bufsize) {
+ private int getInitialBufferSize() {
try {
- chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
+ return chan.getOption(StandardSocketOptions.SO_RCVBUF);
+ } catch(IOException x) {
if (debug.on())
- debug.log("Receive buffer size is %s",
- chan.getOption(StandardSocketOptions.SO_RCVBUF));
- return true;
+ debug.log("Failed to get initial receive buffer size on %s", chan);
+ }
+ return 0;
+ }
+
+ private void trySetReceiveBufferSize(int bufsize) {
+ try {
+ if (bufsize > 0) {
+ chan.setOption(StandardSocketOptions.SO_RCVBUF, bufsize);
+ }
} catch(IOException x) {
if (debug.on())
debug.log("Failed to set receive buffer size to %d on %s",
bufsize, chan);
}
- return false;
}
@Override
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Thu Apr 19 16:47:52 2018 +0100
@@ -992,7 +992,7 @@
case 1: return List.of(list.get(0), item);
case 2: return List.of(list.get(0), list.get(1), item);
default: // slow path if MAX_BUFFERS > 3
- ArrayList<T> res = new ArrayList<>(list);
+ List<T> res = list instanceof ArrayList ? list : new ArrayList<>(list);
res.add(item);
return res;
}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Thu Apr 19 16:47:52 2018 +0100
@@ -33,7 +33,6 @@
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
import java.io.IOException;
-import java.lang.System.Logger.Level;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -214,8 +213,13 @@
* TARGET_BUFSIZE bytes in readBuf
*/
class Reader extends SubscriberWrapper {
+ // Maximum record size is 16k.
+ // Because SocketTube can feeds us up to 3 16K buffers,
+ // then setting this size to 16K means that the readBuf
+ // can store up to 64K-1 (16K-1 + 3*16K)
+ static final int TARGET_BUFSIZE = 16 * 1024;
+
final SequentialScheduler scheduler;
- static final int TARGET_BUFSIZE = 16 * 1024;
volatile ByteBuffer readBuf;
volatile boolean completing;
final Object readBufferLock = new Object();
@@ -250,7 +254,7 @@
debugr.log("Adding %d bytes to read buffer",
Utils.remaining(buffers));
addToReadBuf(buffers, complete);
- scheduler.runOrSchedule();
+ scheduler.runOrSchedule(exec);
}
@Override
@@ -270,6 +274,9 @@
@Override
protected long upstreamWindowUpdate(long currentWindow, long downstreamQsize) {
if (readBuf.remaining() > TARGET_BUFSIZE) {
+ if (debugr.on())
+ debugr.log("readBuf has more than TARGET_BUFSIZE: %d",
+ readBuf.remaining());
return 0;
} else {
return super.upstreamWindowUpdate(currentWindow, downstreamQsize);
@@ -293,7 +300,7 @@
}
void schedule() {
- scheduler.runOrSchedule();
+ scheduler.runOrSchedule(exec);
}
void stop() {
@@ -303,6 +310,11 @@
AtomicInteger count = new AtomicInteger(0);
+ // minimum number of bytes required to call unwrap.
+ // Usually this is 0, unless there was a buffer underflow.
+ // In this case we need to wait for more bytes than what
+ // we had before calling unwrap() again.
+ volatile int minBytesRequired;
// work function where it all happens
void processData() {
try {
@@ -313,15 +325,23 @@
+ ", engine handshake status:" + engine.getHandshakeStatus());
int len;
boolean complete = false;
- while ((len = readBuf.remaining()) > 0) {
+ while (readBuf.remaining() > (len = minBytesRequired)) {
boolean handshaking = false;
try {
EngineResult result;
synchronized (readBufferLock) {
complete = this.completing;
+ if (debugr.on()) debugr.log("Unwrapping: %s", readBuf.remaining());
+ // Unless there is a BUFFER_UNDERFLOW, we should try to
+ // unwrap any number of bytes. Set minBytesRequired to 0:
+ // we only need to do that if minBytesRequired is not already 0.
+ len = len > 0 ? minBytesRequired = 0 : len;
result = unwrapBuffer(readBuf);
- if (debugr.on())
- debugr.log("Unwrapped: %s", result.result);
+ len = readBuf.remaining();
+ if (debugr.on()) {
+ debugr.log("Unwrapped: result: %s", result.result);
+ debugr.log("Unwrapped: consumed: %s", result.bytesConsumed());
+ }
}
if (result.bytesProduced() > 0) {
if (debugr.on())
@@ -332,12 +352,19 @@
if (result.status() == Status.BUFFER_UNDERFLOW) {
if (debugr.on()) debugr.log("BUFFER_UNDERFLOW");
// not enough data in the read buffer...
- requestMore();
+ // no need to try to unwrap again unless we get more bytes
+ // than minBytesRequired = len in the read buffer.
+ minBytesRequired = len;
synchronized (readBufferLock) {
- // check if we have received some data
+ // more bytes could already have been added...
+ assert readBuf.remaining() >= len;
+ // check if we have received some data, and if so
+ // we can just re-spin the loop
if (readBuf.remaining() > len) continue;
- return;
}
+ // request more data and return.
+ requestMore();
+ return;
}
if (complete && result.status() == Status.CLOSED) {
if (debugr.on()) debugr.log("Closed: completing");
--- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Wed Apr 18 12:55:11 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Thu Apr 19 16:47:52 2018 +0100
@@ -306,14 +306,16 @@
downstreamSubscription);
}
+ boolean datasent = false;
while (!outputQ.isEmpty() && downstreamSubscription.tryDecrement()) {
List<ByteBuffer> b = outputQ.poll();
if (debug.on())
debug.log("DownstreamPusher: Pushing %d bytes downstream",
Utils.remaining(b));
downstreamSubscriber.onNext(b);
+ datasent = true;
}
- upstreamWindowUpdate();
+ if (datasent) upstreamWindowUpdate();
checkCompletion();
}
}
--- a/test/jdk/java/net/httpclient/TimeoutOrdering.java Wed Apr 18 12:55:11 2018 +0100
+++ b/test/jdk/java/net/httpclient/TimeoutOrdering.java Thu Apr 19 16:47:52 2018 +0100
@@ -77,21 +77,22 @@
.build();
final HttpRequest req = requests[i];
+ final int j = i;
CompletableFuture<HttpResponse<Object>> response = client
.sendAsync(req, BodyHandlers.replacing(null))
.whenComplete((HttpResponse<Object> r, Throwable t) -> {
if (r != null) {
- out.println("Unexpected response: " + r);
+ out.println("Unexpected response for r" + j + ": " + r);
error = true;
}
if (t != null) {
if (!(t.getCause() instanceof HttpTimeoutException)) {
- out.println("Wrong exception type:" + t.toString());
+ out.println("Wrong exception type for r" + j + ": " + t.toString());
Throwable c = t.getCause() == null ? t : t.getCause();
c.printStackTrace();
error = true;
} else {
- out.println("Caught expected timeout: " + t.getCause());
+ out.println("Caught expected timeout for r" + j + ": " + t.getCause());
}
}
queue.add(req);
@@ -117,16 +118,21 @@
.build();
final HttpRequest req = requests[i];
+ final int j = i;
executor.execute(() -> {
try {
- client.send(req, BodyHandlers.replacing(null));
+ HttpResponse<?> r = client.send(req, BodyHandlers.replacing(null));
+ out.println("Unexpected response for r" + j + ": " + r);
+ error = true;
} catch (HttpTimeoutException e) {
- out.println("Caught expected timeout: " + e);
- queue.offer(req);
+ out.println("Caught expected timeout for r" + j +": " + e);
} catch (IOException | InterruptedException ee) {
Throwable c = ee.getCause() == null ? ee : ee.getCause();
+ out.println("Wrong exception type for r" + j + ": " + c.toString());
c.printStackTrace();
error = true;
+ } finally {
+ queue.offer(req);
}
});
}