--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Tue Mar 20 10:19:00 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java Tue Mar 20 13:10:09 2018 +0000
@@ -864,6 +864,9 @@
boolean reRegister = (interestOps & newOps) != newOps;
interestOps |= newOps;
pending.add(e);
+ debug.log(Level.DEBUG,
+ "Registering %s for %d (%s)",
+ e, newOps, reRegister);
if (reRegister) {
// first time registration happens here also
try {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpResponseImpl.java Tue Mar 20 10:19:00 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpResponseImpl.java Tue Mar 20 13:10:09 2018 +0000
@@ -155,7 +155,7 @@
// Http1Exchange may have some remaining bytes in its
// internal buffer.
Supplier<ByteBuffer> initial = ((Http1Exchange<?>)exchImpl)::drainLeftOverBytes;
- rawchan = new RawChannelImpl(exchange.client(), connection, initial);
+ rawchan = new RawChannelTube(connection, initial);
}
return rawchan;
}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/RawChannelTube.java Tue Mar 20 13:10:09 2018 +0000
@@ -0,0 +1,435 @@
+/*
+ * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+package jdk.internal.net.http;
+
+import jdk.internal.net.http.common.Demand;
+import jdk.internal.net.http.common.FlowTube;
+import jdk.internal.net.http.common.Utils;
+import jdk.internal.net.http.websocket.RawChannel;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.lang.ref.Cleaner;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Flow;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import java.lang.System.Logger.Level;
+
+/*
+ * I/O abstraction used to implement WebSocket.
+ *
+ */
+public class RawChannelTube implements RawChannel {
+
+ final HttpConnection connection;
+ final FlowTube tube;
+ final WritePublisher writePublisher;
+ final ReadSubscriber readSubscriber;
+ final Supplier<ByteBuffer> initial;
+ final AtomicBoolean inited = new AtomicBoolean();
+ final AtomicBoolean outputClosed = new AtomicBoolean();
+ final AtomicBoolean inputClosed = new AtomicBoolean();
+ final AtomicBoolean closed = new AtomicBoolean();
+ final String dbgTag;
+ final System.Logger debug;
+ private static final Cleaner cleaner =
+ Utils.ASSERTIONSENABLED && Utils.DEBUG_WS ? Cleaner.create() : null;
+
+ RawChannelTube(HttpConnection connection,
+ Supplier<ByteBuffer> initial) {
+ this.connection = connection;
+ this.tube = connection.getConnectionFlow();
+ this.initial = initial;
+ this.writePublisher = new WritePublisher();
+ this.readSubscriber = new ReadSubscriber();
+ dbgTag = "[WebSocket] RawChannelTube(" + tube.toString() +")";
+ debug = Utils.getWebSocketLogger(dbgTag::toString, Utils.DEBUG_WS);
+ connection.client().reference();
+ connectFlows();
+ if (Utils.ASSERTIONSENABLED && Utils.DEBUG_WS) {
+ // this is just for debug...
+ cleaner.register(this, new CleanupChecker(closed, debug));
+ }
+ }
+
+ // Make sure no back reference to RawChannelTube can exist
+ // from this class. In particular it would be dangerous
+ // to reference connection, since connection has a reference
+ // to SocketTube with which a RawChannelTube is registered.
+ // Ditto for HttpClientImpl, which might have a back reference
+ // to the connection.
+ static final class CleanupChecker implements Runnable {
+ final AtomicBoolean closed;
+ final System.Logger debug;
+ CleanupChecker(AtomicBoolean closed, System.Logger debug) {
+ this.closed = closed;
+ this.debug = debug;
+ }
+
+ @Override
+ public void run() {
+ if (!closed.get()) {
+ debug.log(Level.DEBUG,
+ "RawChannelTube was not closed before being released");
+ }
+ }
+ }
+
+ private void connectFlows() {
+ debug.log(Level.DEBUG, "connectFlows");
+ tube.connectFlows(writePublisher, readSubscriber);
+ }
+
+ class WriteSubscription implements Flow.Subscription {
+ final Flow.Subscriber<? super List<ByteBuffer>> subscriber;
+ final Demand demand = new Demand();
+ volatile boolean cancelled;
+ WriteSubscription(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+ this.subscriber = subscriber;
+ }
+ @Override
+ public void request(long n) {
+ debug.log(Level.DEBUG, "WriteSubscription::request %d", n);
+ demand.increase(n);
+ RawEvent event;
+ while ((event = writePublisher.events.poll()) != null) {
+ debug.log(Level.DEBUG, "WriteSubscriber: handling event");
+ event.handle();
+ if (demand.isFulfilled()) break;
+ }
+ }
+ @Override
+ public void cancel() {
+ cancelled = true;
+ debug.log(Level.DEBUG, "WriteSubscription::cancel");
+ shutdownOutput();
+ RawEvent event;
+ while ((event = writePublisher.events.poll()) != null) {
+ debug.log(Level.DEBUG, "WriteSubscriber: handling event");
+ event.handle();
+ }
+ }
+ }
+
+ class WritePublisher implements FlowTube.TubePublisher {
+ final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
+ volatile WriteSubscription writeSubscription;
+ @Override
+ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+ debug.log(Level.DEBUG, "WritePublisher::subscribe");
+ WriteSubscription subscription = new WriteSubscription(subscriber);
+ subscriber.onSubscribe(subscription);
+ writeSubscription = subscription;
+ }
+ }
+
+ class ReadSubscriber implements FlowTube.TubeSubscriber {
+
+ volatile Flow.Subscription readSubscription;
+ volatile boolean completed;
+ long initialRequest;
+ final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
+ final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
+ final AtomicReference<Throwable> errorRef = new AtomicReference<>();
+
+ void checkEvents() {
+ Flow.Subscription subscription = readSubscription;
+ if (subscription != null) {
+ Throwable error = errorRef.get();
+ while (!buffers.isEmpty() || error != null || closed.get() || completed) {
+ RawEvent event = events.poll();
+ if (event == null) break;
+ debug.log(Level.DEBUG, "ReadSubscriber: handling event");
+ event.handle();
+ }
+ }
+ }
+
+ @Override
+ public void onSubscribe(Flow.Subscription subscription) {
+ //buffers.add(initial.get());
+ long n;
+ synchronized (this) {
+ readSubscription = subscription;
+ n = initialRequest;
+ initialRequest = 0;
+ }
+ debug.log(Level.DEBUG, "ReadSubscriber::onSubscribe");
+ if (n > 0) {
+ Throwable error = errorRef.get();
+ if (error == null && !closed.get() && !completed) {
+ debug.log(Level.DEBUG, "readSubscription: requesting " + n);
+ subscription.request(n);
+ }
+ }
+ checkEvents();
+ }
+
+ @Override
+ public void onNext(List<ByteBuffer> item) {
+ debug.log(Level.DEBUG, () -> "ReadSubscriber::onNext "
+ + Utils.remaining(item) + " bytes");
+ buffers.addAll(item);
+ checkEvents();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ if (closed.get() || errorRef.compareAndSet(null, throwable)) {
+ debug.log(Level.DEBUG, "ReadSubscriber::onError", throwable);
+ if (buffers.isEmpty()) {
+ checkEvents();
+ shutdownInput();
+ }
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ debug.log(Level.DEBUG, "ReadSubscriber::onComplete");
+ completed = true;
+ if (buffers.isEmpty()) {
+ checkEvents();
+ shutdownInput();
+ }
+ }
+ }
+
+
+ /*
+ * Registers given event whose callback will be called once only (i.e.
+ * register new event for each callback).
+ *
+ * Memory consistency effects: actions in a thread calling registerEvent
+ * happen-before any subsequent actions in the thread calling event.handle
+ */
+ public void registerEvent(RawEvent event) throws IOException {
+ int interestOps = event.interestOps();
+ if ((interestOps & SelectionKey.OP_WRITE) != 0) {
+ debug.log(Level.DEBUG, "register write event");
+ if (outputClosed.get()) throw new IOException("closed output");
+ writePublisher.events.add(event);
+ WriteSubscription writeSubscription = writePublisher.writeSubscription;
+ if (writeSubscription != null) {
+ while (!writeSubscription.demand.isFulfilled()) {
+ event = writePublisher.events.poll();
+ if (event == null) break;
+ event.handle();
+ }
+ }
+ }
+ if ((interestOps & SelectionKey.OP_READ) != 0) {
+ debug.log(Level.DEBUG, "register read event");
+ if (inputClosed.get()) throw new IOException("closed input");
+ readSubscriber.events.add(event);
+ readSubscriber.checkEvents();
+ if (readSubscriber.buffers.isEmpty()
+ && !readSubscriber.events.isEmpty()) {
+ Flow.Subscription readSubscription =
+ readSubscriber.readSubscription;
+ if (readSubscription == null) {
+ synchronized (readSubscriber) {
+ readSubscription = readSubscriber.readSubscription;
+ if (readSubscription == null) {
+ readSubscriber.initialRequest = 1;
+ return;
+ }
+ }
+ }
+ assert readSubscription != null;
+ debug.log(Level.DEBUG, "readSubscription: requesting 1");
+ readSubscription.request(1);
+ }
+ }
+ }
+
+ /**
+ * Hands over the initial bytes. Once the bytes have been returned they are
+ * no longer available and the method will throw an {@link
+ * IllegalStateException} on each subsequent invocation.
+ *
+ * @return the initial bytes
+ * @throws IllegalStateException
+ * if the method has been already invoked
+ */
+ public ByteBuffer initialByteBuffer() throws IllegalStateException {
+ if (inited.compareAndSet(false, true)) {
+ return initial.get();
+ } else throw new IllegalStateException("initial buffer already drained");
+ }
+
+ /*
+ * Returns a ByteBuffer with the data read or null if EOF is reached. Has no
+ * remaining bytes if no data available at the moment.
+ */
+ public ByteBuffer read() throws IOException {
+ debug.log(Level.DEBUG, "read");
+ Flow.Subscription readSubscription = readSubscriber.readSubscription;
+ if (readSubscription == null) return Utils.EMPTY_BYTEBUFFER;
+ ByteBuffer buffer = readSubscriber.buffers.poll();
+ if (buffer != null) {
+ debug.log(Level.DEBUG, () -> "read: " + buffer.remaining());
+ return buffer;
+ }
+ Throwable error = readSubscriber.errorRef.get();
+ if (error != null) error = Utils.getIOException(error);
+ if (error instanceof EOFException) {
+ debug.log(Level.DEBUG, "read: EOFException");
+ shutdownInput();
+ return null;
+ }
+ if (error != null) {
+ debug.log(Level.DEBUG, "read: " + error);
+ if (closed.get()) {
+ return null;
+ }
+ shutdownInput();
+ throw Utils.getIOException(error);
+ }
+ if (readSubscriber.completed) {
+ debug.log(Level.DEBUG, "read: EOF");
+ shutdownInput();
+ return null;
+ }
+ if (inputClosed.get()) {
+ debug.log(Level.DEBUG, "read: CLOSED");
+ throw new IOException("closed output");
+ }
+ debug.log(Level.DEBUG, "read: nothing to read");
+ return Utils.EMPTY_BYTEBUFFER;
+ }
+
+ /*
+ * Writes a sequence of bytes to this channel from a subsequence of the
+ * given buffers.
+ */
+ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
+ if (outputClosed.get()) {
+ debug.log(Level.DEBUG, "write: CLOSED");
+ throw new IOException("closed output");
+ }
+ WriteSubscription writeSubscription = writePublisher.writeSubscription;
+ if (writeSubscription == null) {
+ debug.log(Level.DEBUG, "write: unsubscribed: 0");
+ return 0;
+ }
+ if (writeSubscription.cancelled) {
+ debug.log(Level.DEBUG, "write: CANCELLED");
+ shutdownOutput();
+ throw new IOException("closed output");
+ }
+ if (writeSubscription.demand.tryDecrement()) {
+ List<ByteBuffer> buffers = copy(srcs, offset, length);
+ long res = Utils.remaining(buffers);
+ debug.log(Level.DEBUG, "write: writing %d", res);
+ writeSubscription.subscriber.onNext(buffers);
+ return res;
+ } else {
+ debug.log(Level.DEBUG, "write: no demand: 0");
+ return 0;
+ }
+ }
+
+ /**
+ * Shutdown the connection for reading without closing the channel.
+ *
+ * <p> Once shutdown for reading then further reads on the channel will
+ * return {@code null}, the end-of-stream indication. If the input side of
+ * the connection is already shutdown then invoking this method has no
+ * effect.
+ *
+ * @throws ClosedChannelException
+ * If this channel is closed
+ * @throws IOException
+ * If some other I/O error occurs
+ */
+ public void shutdownInput() {
+ if (inputClosed.compareAndSet(false, true)) {
+ debug.log(Level.DEBUG, "shutdownInput");
+ // TransportImpl will eventually call RawChannel::close.
+ // We must not call it here as this would close the socket
+ // and can cause an exception to back fire before
+ // TransportImpl and WebSocketImpl have updated their state.
+ }
+ }
+
+ /**
+ * Shutdown the connection for writing without closing the channel.
+ *
+ * <p> Once shutdown for writing then further attempts to write to the
+ * channel will throw {@link ClosedChannelException}. If the output side of
+ * the connection is already shutdown then invoking this method has no
+ * effect.
+ *
+ * @throws ClosedChannelException
+ * If this channel is closed
+ * @throws IOException
+ * If some other I/O error occurs
+ */
+ public void shutdownOutput() {
+ if (outputClosed.compareAndSet(false, true)) {
+ debug.log(Level.DEBUG, "shutdownOutput");
+ // TransportImpl will eventually call RawChannel::close.
+ // We must not call it here as this would close the socket
+ // and can cause an exception to back fire before
+ // TransportImpl and WebSocketImpl have updated their state.
+ }
+ }
+
+ /**
+ * Closes this channel.
+ *
+ * @throws IOException
+ * If an I/O error occurs
+ */
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ debug.log(Level.DEBUG, "close");
+ connection.client().unreference();
+ connection.close();
+ }
+ }
+
+ private static List<ByteBuffer> copy(ByteBuffer[] src, int offset, int len) {
+ int count = Math.min(len, src.length - offset);
+ if (count <= 0) return Utils.EMPTY_BB_LIST;
+ if (count == 1) return List.of(Utils.copy(src[offset]));
+ if (count == 2) return List.of(Utils.copy(src[offset]), Utils.copy(src[offset+1]));
+ List<ByteBuffer> list = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ list.add(Utils.copy(src[offset + i]));
+ }
+ return list;
+ }
+}
--- a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseContent.java Tue Mar 20 10:19:00 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseContent.java Tue Mar 20 13:10:09 2018 +0000
@@ -447,6 +447,7 @@
}
if (unfulfilled == 0) {
// We're done! All data has been received.
+ debug.log(Level.DEBUG, "Parser got all expected bytes: completing");
assert closedExceptionally == null;
onFinished.run();
pusher.onComplete();
--- a/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Tue Mar 20 10:19:00 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/SocketTube.java Tue Mar 20 13:10:09 2018 +0000
@@ -359,7 +359,7 @@
long remaining = Utils.remaining(bufs);
debug.log(Level.DEBUG, "trying to write: %d", remaining);
long written = writeAvailable(bufs);
- debug.log(Level.DEBUG, "wrote: %d", remaining);
+ debug.log(Level.DEBUG, "wrote: %d", written);
if (written == -1) {
signalError(new EOFException("EOF reached while writing"));
return;
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Tue Mar 20 10:19:00 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Tue Mar 20 13:10:09 2018 +0000
@@ -594,6 +594,7 @@
debug.log(Level.DEBUG, "enter receive task");
loop:
while (!receiveScheduler.isStopped()) {
+ ChannelState rs = readState;
if (data.hasRemaining()) {
debug.log(Level.DEBUG, "remaining bytes received %s",
data.remaining());
@@ -608,18 +609,20 @@
receiveScheduler.stop();
messageConsumer.onError(e);
}
+ if (!data.hasRemaining()) {
+ rs = readState = UNREGISTERED;
+ }
continue;
}
break loop;
}
- final ChannelState rs = readState;
debug.log(Level.DEBUG, "receive state: %s", rs);
switch (rs) {
case WAITING:
break loop;
case UNREGISTERED:
try {
- readState = WAITING;
+ rs = readState = WAITING;
channel.registerEvent(readEvent);
} catch (Throwable e) {
receiveScheduler.stop();
@@ -641,7 +644,7 @@
} else if (!data.hasRemaining()) {
// No data at the moment. Pretty much a "goto",
// reusing the existing code path for registration
- readState = UNREGISTERED;
+ rs = readState = UNREGISTERED;
}
continue loop;
default:
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Tue Mar 20 10:19:00 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java Tue Mar 20 13:10:09 2018 +0000
@@ -721,6 +721,7 @@
inputClosed = true;
outputClosed.set(true);
if (!this.error.compareAndSet(null, error) || !trySetState(ERROR)) {
+ debug.log(Level.DEBUG, "signalError", error);
Log.logError(error);
} else {
close();
--- a/test/jdk/java/net/httpclient/whitebox/Driver.java Tue Mar 20 10:19:00 2018 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,30 +0,0 @@
-/*
- * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-/*
- * @test
- * @bug 8151299 8164704
- * @modules java.net.http/jdk.internal.net.http
- * @run testng java.net.http/jdk.internal.net.http.SelectorTest
- * @run testng java.net.http/jdk.internal.net.http.RawChannelTest
- */
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/RawChannelTestDriver.java Tue Mar 20 13:10:09 2018 +0000
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @bug 8151299 8164704
+ * @modules java.net.http/jdk.internal.net.http
+ * @run testng java.net.http/jdk.internal.net.http.RawChannelTest
+ */
+//-Djdk.internal.httpclient.websocket.debug=true
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/SelectorTestDriver.java Tue Mar 20 13:10:09 2018 +0000
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/*
+ * @test
+ * @bug 8151299 8164704
+ * @modules java.net.http/jdk.internal.net.http
+ * @run testng java.net.http/jdk.internal.net.http.SelectorTest
+ */
--- a/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/RawChannelTest.java Tue Mar 20 10:19:00 2018 +0000
+++ b/test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/RawChannelTest.java Tue Mar 20 13:10:09 2018 +0000
@@ -35,6 +35,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -59,6 +60,8 @@
private final AtomicLong serverWritten = new AtomicLong();
private final AtomicLong clientRead = new AtomicLong();
private final AtomicLong serverRead = new AtomicLong();
+ private CompletableFuture<Void> outputCompleted = new CompletableFuture<>();
+ private CompletableFuture<Void> inputCompleted = new CompletableFuture<>();
/*
* Since at this level we don't have any control over the low level socket
@@ -120,7 +123,9 @@
if (i > 3) { // Fill up the send buffer not more than 3 times
try {
chan.shutdownOutput();
+ outputCompleted.complete(null);
} catch (IOException e) {
+ outputCompleted.completeExceptionally(e);
e.printStackTrace();
}
return;
@@ -160,10 +165,12 @@
try {
read = chan.read();
} catch (IOException e) {
+ inputCompleted.completeExceptionally(e);
e.printStackTrace();
}
if (read == null) {
print("OP_READ EOF");
+ inputCompleted.complete(null);
break;
} else if (!read.hasRemaining()) {
print("OP_READ stall");
@@ -182,6 +189,15 @@
print("OP_READ read %s bytes (%s total)", total, clientRead.get());
}
});
+ CompletableFuture.allOf(outputCompleted,inputCompleted)
+ .whenComplete((r,t) -> {
+ try {
+ print("closing channel");
+ chan.close();
+ } catch (IOException x) {
+ x.printStackTrace();
+ }
+ });
exit.await(); // All done, we need to compare results:
assertEquals(clientRead.get(), serverWritten.get());
assertEquals(serverRead.get(), clientWritten.get());
@@ -229,7 +245,6 @@
try {
long n = readSlowly(is);
print("Server read %s bytes", n);
- serverRead.addAndGet(n);
s.shutdownInput();
} catch (Exception e) {
e.printStackTrace();
@@ -240,7 +255,6 @@
try {
long n = writeSlowly(os);
print("Server written %s bytes", n);
- serverWritten.addAndGet(n);
s.shutdownOutput();
} catch (Exception e) {
e.printStackTrace();
@@ -290,8 +304,10 @@
long total = first.length;
os.write(first);
os.flush();
+ serverWritten.addAndGet(first.length);
// wait until initial bytes were read
+ print("Server wrote total %d: awaiting initialReadStall", total);
initialReadStall.await();
// make sure there is something to read, otherwise readStall
@@ -300,15 +316,20 @@
os.write(first);
os.flush();
total += first.length;
+ serverWritten.addAndGet(first.length);
// Let's wait for the signal from the raw channel that its read has
// stalled, and then continue sending a bit more stuff
+ print("Server wrote total %d: awaiting readStall", total);
readStall.await();
+ print("readStall unblocked, writing 32k");
for (int i = 0; i < 32; i++) {
byte[] b = byteArrayOfSize(1024);
os.write(b);
os.flush();
+ serverWritten.addAndGet(b.length);
total += b.length;
+ print("Server wrote total %d", total);
TimeUnit.MILLISECONDS.sleep(1);
}
return total;
@@ -317,11 +338,14 @@
private long readSlowly(InputStream is) throws Exception {
// Wait for the raw channel to fill up its send buffer
writeStall.await();
+ print("writingStall unblocked, start reading");
long overall = 0;
byte[] array = new byte[1024];
for (int n = 0; n != -1; n = is.read(array)) {
+ serverRead.addAndGet(n);
TimeUnit.MILLISECONDS.sleep(1);
overall += n;
+ print("Server read total: %d", overall);
}
return overall;
}