http-client-branch: ensure ResponseSubscribers.HttpResponseInputStreamTest::read will throw an exception when onError is called
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Tue Nov 07 15:36:25 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Tue Nov 07 19:46:59 2017 +0000
@@ -387,7 +387,15 @@
@Override
public void onError(Throwable thrwbl) {
subscription = null;
- failed = thrwbl;
+ failed = thrwbl == null ? new InternalError("illegal null Throwable") : thrwbl;
+ // The client process that reads the input stream might
+ // be blocked in queue.take().
+ // Tries to offer LAST_LIST to the queue. If the queue is
+ // full we don't care if we can't insert this buffer, as
+ // the client can't be blocked in queue.take() in that case.
+ // Adding LAST_LIST to the queue is harmless, as the client
+ // should find failed != null before handling LAST_LIST.
+ buffers.offer(LAST_LIST);
}
@Override
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/HttpResponseInputStreamTest.java Tue Nov 07 19:46:59 2017 +0000
@@ -0,0 +1,139 @@
+/*
+ * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import jdk.incubator.http.HttpResponse;
+import jdk.incubator.http.HttpResponse.BodySubscriber;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Flow;
+import org.testng.annotations.Test;
+
+/*
+ * @test
+ * @summary Simple smoke test for BodySubscriber.asInputStream();
+ * @run testng/othervm HttpResponseInputStreamTest
+ * @author daniel fuchs
+ */
+public class HttpResponseInputStreamTest {
+
+ static class TestException extends IOException {}
+
+ public static void main(String[] args) throws InterruptedException, ExecutionException {
+ testOnError();
+ }
+
+ /**
+ * Tests that a client will be unblocked and will
+ * @throws InterruptedException
+ * @throws ExecutionException
+ */
+ @Test
+ public static void testOnError() throws InterruptedException, ExecutionException {
+ CountDownLatch latch = new CountDownLatch(1);
+ BodySubscriber<InputStream> isb = BodySubscriber.asInputStream();
+ ErrorTestSubscription s = new ErrorTestSubscription(isb);
+ CompletionStage<Throwable> cs =
+ isb.getBody().thenApplyAsync((is) -> s.accept(latch, is));
+ latch.await();
+ isb.onSubscribe(s);
+ s.t.join();
+ Throwable result = cs.toCompletableFuture().get();
+ Throwable t = result;
+ if (!(t instanceof IOException)) {
+ throw new RuntimeException("Failed to receive excpected IOException", result);
+ } else {
+ System.out.println("Got expected exception: " + t);
+ }
+ while (t != null) {
+ if (t instanceof TestException) break;
+ t = t.getCause();
+ }
+ if (t instanceof TestException) {
+ System.out.println("Got expected cause: " + t);
+ } else {
+ throw new RuntimeException("Failed to receive excpected TestException", result);
+ }
+ }
+
+ static class ErrorTestSubscription implements Flow.Subscription {
+ final BodySubscriber<InputStream> isb;
+ final Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ // Give time to
+ System.out.println("waiting...");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+
+ }
+ System.out.println("Calling onError...");
+ isb.onError(new TestException());
+ }
+ };
+
+ ErrorTestSubscription(BodySubscriber<InputStream> isb) {
+ this.isb = isb;
+ }
+
+ int requested = 0;
+
+ @Override
+ public void request(long n) {
+ System.out.println("Got request: " + n);
+ if (requested == 0 && n > 0) {
+ //isb.onNext(List.of(java.nio.ByteBuffer.wrap(new byte[] {0x01})));
+ requested += n;
+ t.start();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ public Throwable accept(CountDownLatch latch, InputStream is) {
+ System.out.println("got " + is);
+ try {
+ latch.countDown();
+ System.out.println("reading all bytes");
+ is.readAllBytes();
+ System.out.println("all bytes read");
+ } catch (IOException e) {
+ return e;
+ } finally {
+ try {
+ is.close();
+ } catch (IOException e) {
+ return e;
+ }
+ }
+ return is == null ? new NullPointerException() : null;
+ }
+ }
+}