--- a/test/jdk/java/net/httpclient/BufferingSubscriberTest.java Wed Dec 06 12:39:37 2017 +0000
+++ b/test/jdk/java/net/httpclient/BufferingSubscriberTest.java Wed Dec 06 12:44:05 2017 +0000
@@ -123,6 +123,7 @@
{ 3, 0, 4, 5006, 1000, 50 },
{ 20, 0, 100, 4888, 1000, 100 },
{ 16, 10, 1000, 50 , 1000, 100 },
+ { 16, 10, 1000, 50 , 657, 657 },
};
}
@@ -191,8 +192,8 @@
}
}
- static int accumulatedDataSize(List<ByteBuffer> bufs) {
- return bufs.stream().mapToInt(ByteBuffer::remaining).sum();
+ static long accumulatedDataSize(List<ByteBuffer> bufs) {
+ return bufs.stream().mapToLong(ByteBuffer::remaining).sum();
}
/** Returns a new BB with its contents set to monotonically increasing
@@ -229,7 +230,7 @@
/**
* Example of a factory method which would decorate a buffering
* subscriber to create a new subscriber dependent on buffering capability.
- *
+ * <p>
* The integer type parameter simulates the body just by counting the
* number of bytes in the body.
*/
@@ -244,7 +245,9 @@
return BodySubscriber.buffering(s, bufferSize);
}
- private void requestMore() { subscription.request(requestAmount); }
+ private void requestMore() {
+ subscription.request(requestAmount);
+ }
@Override
public void onSubscribe(Subscription subscription) {
@@ -259,51 +262,59 @@
volatile int wrongSizes;
volatile int totalBytesReceived;
volatile int onNextInvocations;
- volatile int lastSeenSize = -1;
+ volatile long lastSeenSize = -1;
volatile boolean noMoreOnNext; // false
volatile int index; // 0
volatile long count;
@Override
public void onNext(List<ByteBuffer> items) {
- long sz = accumulatedDataSize(items);
- boolean printStamp = delayMillis > 0
- && requestAmount < Long.MAX_VALUE
- && count % 20 == 0;
- if (printStamp) {
- printStamp("stamp", "count=%d sz=%d accumulated=%d",
- count, sz, (totalBytesReceived + sz));
- }
- count++;
- onNextInvocations++;
- assertNotEquals(sz, 0L, "Unexpected empty buffers");
- items.stream().forEach(b -> assertEquals(b.position(), 0));
- assertFalse(noMoreOnNext);
+ try {
+ long sz = accumulatedDataSize(items);
+ boolean printStamp = delayMillis > 0
+ && requestAmount < Long.MAX_VALUE
+ && count % 20 == 0;
+ if (printStamp) {
+ printStamp("stamp", "count=%d sz=%d accumulated=%d",
+ count, sz, (totalBytesReceived + sz));
+ }
+ count++;
+ onNextInvocations++;
+ assertNotEquals(sz, 0L, "Unexpected empty buffers");
+ items.stream().forEach(b -> assertEquals(b.position(), 0));
+ assertFalse(noMoreOnNext);
- if (sz != bufferSize) {
- String msg = sz + ", should be less than bufferSize, " + bufferSize;
- assertTrue(sz < bufferSize, msg);
- assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
- noMoreOnNext = true;
- wrongSizes++;
- } else {
- assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
- }
+ if (sz != bufferSize) {
+ String msg = sz + ", should be less than bufferSize, " + bufferSize;
+ assertTrue(sz < bufferSize, msg);
+ assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
+ noMoreOnNext = true;
+ wrongSizes++;
+ printStamp("onNext",
+ "Possibly received last buffer: sz=%d, accumulated=%d, total=%d",
+ sz, totalBytesReceived, totalBytesReceived + sz);
+ } else {
+ assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
+ }
+ lastSeenSize = sz;
- // Ensure expected contents
- for (ByteBuffer b : items) {
- while (b.hasRemaining()) {
- assertEquals(b.get(), (byte) (index % 100));
- index++;
+ // Ensure expected contents
+ for (ByteBuffer b : items) {
+ while (b.hasRemaining()) {
+ assertEquals(b.get(), (byte) (index % 100));
+ index++;
+ }
}
+
+ totalBytesReceived += sz;
+ assertEquals(totalBytesReceived, index);
+ if (delayMillis > 0 && ((expectedTotalSize - totalBytesReceived) > bufferSize))
+ delayedExecutor.execute(this::requestMore);
+ else
+ requestMore();
+ } catch (Throwable t) {
+ completion.completeExceptionally(t);
}
-
- totalBytesReceived += sz;
- assertEquals(totalBytesReceived, index );
- if (delayMillis > 0)
- delayedExecutor.execute(this::requestMore);
- else
- requestMore();
}
@Override
@@ -326,7 +337,9 @@
}
@Override
- public CompletionStage<Integer> getBody() { return completion; }
+ public CompletionStage<Integer> getBody() {
+ return completion;
+ }
@Override
public String toString() {
@@ -414,8 +427,9 @@
int expectedTotalSize,
long requestAmount) {
assert minBufferSize > 0 && maxBufferSize > 0 && requestAmount > 0;
- int bufferSize = random.nextInt(maxBufferSize - minBufferSize)
- + minBufferSize;
+ int bufferSize = maxBufferSize == minBufferSize ? maxBufferSize :
+ (random.nextInt(maxBufferSize - minBufferSize)
+ + minBufferSize);
if (requestAmount == Long.MAX_VALUE) return bufferSize;
long minDelay = (((long)delaysMillis * expectedTotalSize) / maxBufferSize)
/ requestAmount;