http-client-branch: work around issue with SP::close in BufferingSubscriberTest http-client-branch
authordfuchs
Wed, 06 Dec 2017 12:44:05 +0000
branchhttp-client-branch
changeset 55965 2911b5fbc229
parent 55964 b2e6f2945ac3
child 55968 11a97b370db0
http-client-branch: work around issue with SP::close in BufferingSubscriberTest
test/jdk/java/net/httpclient/BufferingSubscriberTest.java
--- a/test/jdk/java/net/httpclient/BufferingSubscriberTest.java	Wed Dec 06 12:39:37 2017 +0000
+++ b/test/jdk/java/net/httpclient/BufferingSubscriberTest.java	Wed Dec 06 12:44:05 2017 +0000
@@ -123,6 +123,7 @@
             { 3,              0,          4,         5006,      1000,         50  },
             { 20,             0,          100,       4888,      1000,         100 },
             { 16,             10,         1000,      50 ,       1000,         100 },
+            { 16,             10,         1000,      50 ,       657,          657 },
         };
     }
 
@@ -191,8 +192,8 @@
         }
     }
 
-    static int accumulatedDataSize(List<ByteBuffer> bufs) {
-        return bufs.stream().mapToInt(ByteBuffer::remaining).sum();
+    static long accumulatedDataSize(List<ByteBuffer> bufs) {
+        return bufs.stream().mapToLong(ByteBuffer::remaining).sum();
     }
 
     /** Returns a new BB with its contents set to monotonically increasing
@@ -229,7 +230,7 @@
         /**
          * Example of a factory method which would decorate a buffering
          * subscriber to create a new subscriber dependent on buffering capability.
-         *
+         * <p>
          * The integer type parameter simulates the body just by counting the
          * number of bytes in the body.
          */
@@ -244,7 +245,9 @@
             return BodySubscriber.buffering(s, bufferSize);
         }
 
-        private void requestMore() { subscription.request(requestAmount); }
+        private void requestMore() {
+            subscription.request(requestAmount);
+        }
 
         @Override
         public void onSubscribe(Subscription subscription) {
@@ -259,51 +262,59 @@
         volatile int wrongSizes;
         volatile int totalBytesReceived;
         volatile int onNextInvocations;
-        volatile int lastSeenSize = -1;
+        volatile long lastSeenSize = -1;
         volatile boolean noMoreOnNext; // false
         volatile int index; // 0
         volatile long count;
 
         @Override
         public void onNext(List<ByteBuffer> items) {
-            long sz = accumulatedDataSize(items);
-            boolean printStamp = delayMillis > 0
-                            && requestAmount < Long.MAX_VALUE
-                            && count % 20 == 0;
-            if (printStamp) {
-                printStamp("stamp",  "count=%d sz=%d accumulated=%d",
-                        count, sz, (totalBytesReceived + sz));
-            }
-            count++;
-            onNextInvocations++;
-            assertNotEquals(sz, 0L, "Unexpected empty buffers");
-            items.stream().forEach(b -> assertEquals(b.position(), 0));
-            assertFalse(noMoreOnNext);
+            try {
+                long sz = accumulatedDataSize(items);
+                boolean printStamp = delayMillis > 0
+                        && requestAmount < Long.MAX_VALUE
+                        && count % 20 == 0;
+                if (printStamp) {
+                    printStamp("stamp", "count=%d sz=%d accumulated=%d",
+                            count, sz, (totalBytesReceived + sz));
+                }
+                count++;
+                onNextInvocations++;
+                assertNotEquals(sz, 0L, "Unexpected empty buffers");
+                items.stream().forEach(b -> assertEquals(b.position(), 0));
+                assertFalse(noMoreOnNext);
 
-            if (sz != bufferSize) {
-                String msg = sz + ", should be less than bufferSize, " + bufferSize;
-                assertTrue(sz < bufferSize, msg);
-                assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
-                noMoreOnNext = true;
-                wrongSizes++;
-            } else {
-                assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
-            }
+                if (sz != bufferSize) {
+                    String msg = sz + ", should be less than bufferSize, " + bufferSize;
+                    assertTrue(sz < bufferSize, msg);
+                    assertTrue(lastSeenSize == -1 || lastSeenSize == bufferSize);
+                    noMoreOnNext = true;
+                    wrongSizes++;
+                    printStamp("onNext",
+                            "Possibly received last buffer: sz=%d, accumulated=%d, total=%d",
+                            sz, totalBytesReceived, totalBytesReceived + sz);
+                } else {
+                    assertEquals(sz, bufferSize, "Expected to receive exactly bufferSize");
+                }
+                lastSeenSize = sz;
 
-            // Ensure expected contents
-            for (ByteBuffer b : items) {
-                while (b.hasRemaining()) {
-                    assertEquals(b.get(), (byte) (index % 100));
-                    index++;
+                // Ensure expected contents
+                for (ByteBuffer b : items) {
+                    while (b.hasRemaining()) {
+                        assertEquals(b.get(), (byte) (index % 100));
+                        index++;
+                    }
                 }
+
+                totalBytesReceived += sz;
+                assertEquals(totalBytesReceived, index);
+                if (delayMillis > 0 && ((expectedTotalSize - totalBytesReceived) > bufferSize))
+                    delayedExecutor.execute(this::requestMore);
+                else
+                    requestMore();
+            } catch (Throwable t) {
+                completion.completeExceptionally(t);
             }
-
-            totalBytesReceived += sz;
-            assertEquals(totalBytesReceived, index );
-            if (delayMillis > 0)
-                delayedExecutor.execute(this::requestMore);
-            else
-                requestMore();
         }
 
         @Override
@@ -326,7 +337,9 @@
         }
 
         @Override
-        public CompletionStage<Integer> getBody() { return completion; }
+        public CompletionStage<Integer> getBody() {
+            return completion;
+        }
 
         @Override
         public String toString() {
@@ -414,8 +427,9 @@
                                 int expectedTotalSize,
                                 long requestAmount) {
         assert minBufferSize > 0 && maxBufferSize > 0 && requestAmount > 0;
-        int bufferSize = random.nextInt(maxBufferSize - minBufferSize)
-                + minBufferSize;
+        int bufferSize = maxBufferSize == minBufferSize ? maxBufferSize :
+                (random.nextInt(maxBufferSize - minBufferSize)
+                    + minBufferSize);
         if (requestAmount == Long.MAX_VALUE) return bufferSize;
         long minDelay = (((long)delaysMillis * expectedTotalSize) / maxBufferSize)
                 / requestAmount;