# HG changeset patch # User dfuchs # Date 1512564245 0 # Node ID 2911b5fbc229547e66d35981081d8b3a9a111e26 # Parent b2e6f2945ac3386014e676149df7cfaf0ed010a9 http-client-branch: work around issue with SP::close in BufferingSubscriberTest diff -r b2e6f2945ac3 -r 2911b5fbc229 test/jdk/java/net/httpclient/BufferingSubscriberTest.java --- a/test/jdk/java/net/httpclient/BufferingSubscriberTest.java Wed Dec 06 12:39:37 2017 +0000 +++ b/test/jdk/java/net/httpclient/BufferingSubscriberTest.java Wed Dec 06 12:44:05 2017 +0000 @@ -123,6 +123,7 @@ { 3, 0, 4, 5006, 1000, 50 }, { 20, 0, 100, 4888, 1000, 100 }, { 16, 10, 1000, 50 , 1000, 100 }, + { 16, 10, 1000, 50 , 657, 657 }, }; } @@ -191,8 +192,8 @@ } } - static int accumulatedDataSize(List bufs) { - return bufs.stream().mapToInt(ByteBuffer::remaining).sum(); + static long accumulatedDataSize(List bufs) { + return bufs.stream().mapToLong(ByteBuffer::remaining).sum(); } /** Returns a new BB with its contents set to monotonically increasing @@ -229,7 +230,7 @@ /** * Example of a factory method which would decorate a buffering * subscriber to create a new subscriber dependent on buffering capability. - * + *

* The integer type parameter simulates the body just by counting the * number of bytes in the body. */ @@ -244,7 +245,9 @@ return BodySubscriber.buffering(s, bufferSize); } - private void requestMore() { subscription.request(requestAmount); } + private void requestMore() { + subscription.request(requestAmount); + } @Override public void onSubscribe(Subscription subscription) { @@ -259,51 +262,59 @@ volatile int wrongSizes; volatile int totalBytesReceived; volatile int onNextInvocations; - volatile int lastSeenSize = -1; + volatile long lastSeenSize = -1; volatile boolean noMoreOnNext; // false volatile int index; // 0 volatile long count; @Override public void onNext(List items) { - long sz = accumulatedDataSize(items); - boolean printStamp = delayMillis > 0 - && requestAmount < Long.MAX_VALUE - && count % 20 == 0; - if (printStamp) { - printStamp("stamp", "count=%d sz=%d accumulated=%d", - count, sz, (totalBytesReceived + sz)); - } - count++; - onNextInvocations++; - assertNotEquals(sz, 0L, "Unexpected empty buffers"); - items.stream().forEach(b -> assertEquals(b.position(), 0)); - assertFalse(noMoreOnNext); + try { + long sz = accumulatedDataSize(items); + boolean printStamp = delayMillis > 0 + && requestAmount < Long.MAX_VALUE + && count % 20 == 0; + if (printStamp) { + printStamp("stamp", "count=%d sz=%d accumulated=%d", + count, sz, (totalBytesReceived + sz)); + } + count++; + onNextInvocations++; + assertNotEquals(sz, 0L, "Unexpected empty buffers"); + items.stream().forEach(b -> assertEquals(b.position(), 0)); + assertFalse(noMoreOnNext); - if (sz != bufferSize) { - String msg = sz + ", should be less than bufferSize, " + bufferSize; - assertTrue(sz < bufferSize, msg); - assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize); - noMoreOnNext = true; - wrongSizes++; - } else { - assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize"); - } + if (sz != bufferSize) { + String msg = sz + ", should be less than bufferSize, " + bufferSize; + assertTrue(sz < bufferSize, msg); + assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize); + noMoreOnNext = true; + wrongSizes++; + printStamp("onNext", + "Possibly received last buffer: sz=%d, accumulated=%d, total=%d", + sz, totalBytesReceived, totalBytesReceived + sz); + } else { + assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize"); + } + lastSeenSize = sz; - // Ensure expected contents - for (ByteBuffer b : items) { - while (b.hasRemaining()) { - assertEquals(b.get(), (byte) (index % 100)); - index++; + // Ensure expected contents + for (ByteBuffer b : items) { + while (b.hasRemaining()) { + assertEquals(b.get(), (byte) (index % 100)); + index++; + } } + + totalBytesReceived += sz; + assertEquals(totalBytesReceived, index); + if (delayMillis > 0 && ((expectedTotalSize - totalBytesReceived) > bufferSize)) + delayedExecutor.execute(this::requestMore); + else + requestMore(); + } catch (Throwable t) { + completion.completeExceptionally(t); } - - totalBytesReceived += sz; - assertEquals(totalBytesReceived, index ); - if (delayMillis > 0) - delayedExecutor.execute(this::requestMore); - else - requestMore(); } @Override @@ -326,7 +337,9 @@ } @Override - public CompletionStage getBody() { return completion; } + public CompletionStage getBody() { + return completion; + } @Override public String toString() { @@ -414,8 +427,9 @@ int expectedTotalSize, long requestAmount) { assert minBufferSize > 0 && maxBufferSize > 0 && requestAmount > 0; - int bufferSize = random.nextInt(maxBufferSize - minBufferSize) - + minBufferSize; + int bufferSize = maxBufferSize == minBufferSize ? maxBufferSize : + (random.nextInt(maxBufferSize - minBufferSize) + + minBufferSize); if (requestAmount == Long.MAX_VALUE) return bufferSize; long minDelay = (((long)delaysMillis * expectedTotalSize) / maxBufferSize) / requestAmount;