http-client-branch: (WebSocket) logging and closure timer http-client-branch
authorprappo
Tue, 13 Mar 2018 17:10:20 +0000
branchhttp-client-branch
changeset 56294 181bc33917e4
parent 56293 7e21161251dc
child 56295 898dfb226bd0
http-client-branch: (WebSocket) logging and closure timer
src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java
test/jdk/java/net/httpclient/websocket/WebSocketTest.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Mon Mar 12 12:47:29 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Tue Mar 13 17:10:20 2018 +0000
@@ -122,7 +122,7 @@
         long id;
         if (DEBUG) {
             id = counter.incrementAndGet();
-            System.out.printf("[Transport] %s: sendText message.length()=%s, last=%s%n",
+            System.out.printf("[Transport] enter send text %s message.length()=%s last=%s%n",
                               id, message.length(), isLast);
         }
         // TODO (optimization?):
@@ -142,7 +142,7 @@
             f.completeExceptionally(e);
         }
         if (DEBUG) {
-            System.out.printf("[Transport] %s: sendText returned %s%n", id, f);
+            System.out.printf("[Transport] exit send text %s returned %s%n", id, f);
         }
         return f;
     }
@@ -155,7 +155,7 @@
         long id;
         if (DEBUG) {
             id = counter.incrementAndGet();
-            System.out.printf("[Transport] %s: sendBinary message.remaining()=%s, last=%s%n",
+            System.out.printf("[Transport] enter send binary %s message.remaining()=%s last=%s%n",
                               id, message.remaining(), isLast);
         }
         MinimalFuture<T> f = new MinimalFuture<>();
@@ -166,7 +166,7 @@
             f.completeExceptionally(e);
         }
         if (DEBUG) {
-            System.out.printf("[Transport] %s: sendBinary returned %s%n", id, f);
+            System.out.printf("[Transport] exit send binary %s returned %s%n", id, f);
         }
         return f;
     }
@@ -178,7 +178,7 @@
         long id;
         if (DEBUG) {
             id = counter.incrementAndGet();
-            System.out.printf("[Transport] %s: sendPing message.remaining()=%s%n",
+            System.out.printf("[Transport] enter send ping %s message.remaining()=%s%n",
                               id, message.remaining());
         }
         MinimalFuture<T> f = new MinimalFuture<>();
@@ -189,7 +189,7 @@
             f.completeExceptionally(e);
         }
         if (DEBUG) {
-            System.out.printf("[Transport] %s: sendPing returned %s%n", id, f);
+            System.out.printf("[Transport] exit send ping %s returned %s%n", id, f);
         }
         return f;
     }
@@ -201,7 +201,7 @@
         long id;
         if (DEBUG) {
             id = counter.incrementAndGet();
-            System.out.printf("[Transport] %s: sendPong message.remaining()=%s%n",
+            System.out.printf("[Transport] enter send pong %s message.remaining()=%s%n",
                               id, message.remaining());
         }
         MinimalFuture<T> f = new MinimalFuture<>();
@@ -212,7 +212,7 @@
             f.completeExceptionally(e);
         }
         if (DEBUG) {
-            System.out.printf("[Transport] %s: sendPong returned %s%n", id, f);
+            System.out.printf("[Transport] exit send pong %s returned %s%n", id, f);
         }
         return f;
     }
@@ -225,7 +225,7 @@
         long id;
         if (DEBUG) {
             id = counter.incrementAndGet();
-            System.out.printf("[Transport] %s: sendClose statusCode=%s, reason.length()=%s%n",
+            System.out.printf("[Transport] enter send close %s statusCode=%s, reason.length()=%s%n",
                               id, statusCode, reason.length());
         }
         MinimalFuture<T> f = new MinimalFuture<>();
@@ -236,7 +236,7 @@
             f.completeExceptionally(e);
         }
         if (DEBUG) {
-            System.out.printf("[Transport] %s: sendClose returned %s%n", id, f);
+            System.out.printf("[Transport] exit send close %s returned %s%n", id, f);
         }
         return f;
     }
@@ -461,7 +461,7 @@
             //   (a) A message has been added to the queue
             //   (b) The channel is ready for writing
             if (DEBUG) {
-                System.out.printf("[Transport] begin send task%n");
+                System.out.printf("[Transport] enter send task%n");
             }
             while (!queue.isEmpty()) {
                 try {
@@ -507,13 +507,13 @@
                 }
             }
             if (DEBUG) {
-                System.out.printf("[Transport] end send task%n");
+                System.out.printf("[Transport] exit send task%n");
             }
         }
 
         private boolean tryCompleteWrite() throws IOException {
             if (DEBUG) {
-                System.out.printf("[Transport] begin writing%n");
+                System.out.printf("[Transport] enter writing%n");
             }
             boolean finished = false;
             loop:
@@ -554,7 +554,7 @@
                 }
             }
             if (DEBUG) {
-                System.out.printf("[Transport] end writing%n");
+                System.out.printf("[Transport] exit writing%n");
             }
             return finished;
         }
@@ -592,7 +592,7 @@
         @Override
         public void run() {
             if (DEBUG) {
-                System.out.printf("[Transport] begin receive task%n");
+                System.out.printf("[Transport] enter receive task%n");
             }
             loop:
             while (!receiveScheduler.isStopped()) {
@@ -655,7 +655,7 @@
                 }
             }
             if (DEBUG) {
-                System.out.printf("[Transport] end receive task%n");
+                System.out.printf("[Transport] exit receive task%n");
             }
         }
     }
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java	Mon Mar 12 12:47:29 2018 +0000
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/WebSocketImpl.java	Tue Mar 13 17:10:20 2018 +0000
@@ -33,6 +33,7 @@
 import jdk.internal.net.http.websocket.OpeningHandshake.Result;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.lang.ref.Reference;
 import java.net.ProtocolException;
 import java.net.URI;
@@ -41,6 +42,7 @@
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -49,6 +51,7 @@
 import java.util.function.Function;
 
 import static java.util.Objects.requireNonNull;
+import static jdk.internal.net.http.common.MinimalFuture.completedFuture;
 import static jdk.internal.net.http.common.MinimalFuture.failedFuture;
 import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY;
 import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE;
@@ -69,7 +72,8 @@
 public final class WebSocketImpl implements WebSocket {
 
     private final static boolean DEBUG = true;
-    private final AtomicLong counter = new AtomicLong();
+    private final AtomicLong sendCounter = new AtomicLong();
+    private final AtomicLong receiveCounter = new AtomicLong();
 
     enum State {
         OPEN,
@@ -84,6 +88,7 @@
     }
 
     private final MinimalFuture<WebSocket> DONE = MinimalFuture.completedFuture(this);
+    private final long closeTimeout;
     private volatile boolean inputClosed;
     private volatile boolean outputClosed;
 
@@ -150,6 +155,28 @@
         this.listener = requireNonNull(listener);
         this.transport = transportFactory.createTransport(
                 new SignallingMessageConsumer());
+        closeTimeout = readCloseTimeout();
+    }
+
+    private static int readCloseTimeout() {
+        String property = "jdk.httpclient.websocket.closeTimeout";
+        int defaultValue = 30;
+        String value = Utils.getNetProperty(property);
+        int v;
+        if (value == null) {
+            v = defaultValue;
+        } else {
+            try {
+                v = Integer.parseUnsignedInt(value);
+            } catch (NumberFormatException ignored) {
+                v = defaultValue;
+            }
+        }
+        if (DEBUG) {
+            System.out.printf("[WebSocket] %s=%s, using value %s%n",
+                              property, value, v);
+        }
+        return v;
     }
 
     // FIXME: add to action handling of errors -> signalError()
@@ -160,7 +187,7 @@
         Objects.requireNonNull(message);
         long id;
         if (DEBUG) {
-            id = counter.incrementAndGet();
+            id = sendCounter.incrementAndGet();
             System.out.printf("[WebSocket] %s send text: payload length=%s last=%s%n",
                               id, message.length(), isLast);
         }
@@ -184,7 +211,7 @@
         Objects.requireNonNull(message);
         long id;
         if (DEBUG) {
-            id = counter.incrementAndGet();
+            id = sendCounter.incrementAndGet();
             System.out.printf("[WebSocket] %s send binary: payload=%s last=%s%n",
                               id, message, isLast);
         }
@@ -217,7 +244,7 @@
         Objects.requireNonNull(message);
         long id;
         if (DEBUG) {
-            id = counter.incrementAndGet();
+            id = sendCounter.incrementAndGet();
             System.out.printf("[WebSocket] %s send ping: payload=%s%n",
                               id, message);
         }
@@ -235,7 +262,7 @@
         Objects.requireNonNull(message);
         long id;
         if (DEBUG) {
-            id = counter.incrementAndGet();
+            id = sendCounter.incrementAndGet();
             System.out.printf("[WebSocket] %s send pong: payload=%s%n",
                               id, message);
         }
@@ -254,7 +281,7 @@
         Objects.requireNonNull(reason);
         long id;
         if (DEBUG) {
-            id = counter.incrementAndGet();
+            id = sendCounter.incrementAndGet();
             System.out.printf("[WebSocket] %s send close: statusCode=%s, reason.length=%s%n",
                               id, statusCode, reason);
         }
@@ -272,38 +299,50 @@
         return replaceNull(result);
     }
 
-    /*
-     * Sends a Close message, then shuts down the output since no more
-     * messages are expected to be sent at this point.
-     */
     private CompletableFuture<WebSocket> sendClose0(int statusCode,
                                                     String reason) {
         outputClosed = true;
-        BiConsumer<WebSocket, Throwable> closer = (r, e) -> {
-            Throwable cause = Utils.getCompletionCause(e);
-            if (cause instanceof IllegalArgumentException) {
-                // or pre=check it (isLegalToSendFromClient(statusCode))
-                return;
-            }
-            try {
-                transport.closeOutput();
-            } catch (IOException ex) {
-                Log.logError(ex);
+        CompletableFuture<WebSocket> cf
+                = transport.sendClose(statusCode, reason, this, (r, e) -> { });
+        CompletableFuture<WebSocket> closeOrTimeout
+                = replaceNull(cf).orTimeout(closeTimeout, TimeUnit.SECONDS);
+        // The snippet below, whose purpose might not be immediately obvious,
+        // is a trick used to complete a dependant stage with an IOException.
+        // A checked IOException cannot be thrown from inside the BiConsumer
+        // supplied to the handle method. Instead a CompletionStage completed
+        // exceptionally with this IOException is returned.
+        return closeOrTimeout.handle(this::processCloseOutcome)
+                             .thenCompose(Function.identity());
+    }
+
+    private CompletionStage<WebSocket> processCloseOutcome(WebSocket webSocket,
+                                                           Throwable e) {
+        if (DEBUG) {
+            System.out.printf("[WebSocket] send close completed, error=%s%n", e);
+            if (e != null) {
+                e.printStackTrace(System.out);
             }
-            if (cause instanceof TimeoutException) { // FIXME: it is not the case anymore
-                if (DEBUG) {
-                    System.out.println("[WebSocket] sendClose0 error: " + e);
-                }
-                try {
-                    transport.closeInput();
-                } catch (IOException ex) {
-                    Log.logError(ex);
-                }
-            }
-        };
-        CompletableFuture<WebSocket> cf
-                = transport.sendClose(statusCode, reason, this, closer);
-        return cf;
+        }
+        if (e == null) {
+            return completedFuture(webSocket);
+        }
+        Throwable cause = Utils.getCompletionCause(e);
+        if (cause instanceof IllegalArgumentException) {
+            return failedFuture(cause);
+        }
+        try {
+            transport.closeOutput();
+        } catch (IOException ignored) { }
+
+        if (cause instanceof TimeoutException) {
+            inputClosed = true;
+            try {
+                transport.closeInput();
+            } catch (IOException ignored) { }
+            return failedFuture(new InterruptedIOException(
+                    "Could not send close within a reasonable timeout"));
+        }
+        return failedFuture(cause);
     }
 
     @Override
@@ -372,8 +411,15 @@
 
         @Override
         public void run() {
+            if (DEBUG) {
+                System.out.printf("[WebSocket] enter receive task%n");
+            }
+            loop:
             while (true) {
                 State s = state.get();
+                if (DEBUG) {
+                    System.out.printf("[WebSocket] receive state: %s%n", s);
+                }
                 try {
                     switch (s) {
                         case OPEN:
@@ -398,20 +444,20 @@
                             break;
                         case CLOSE:
                             processClose();
-                            return;
+                            break loop;
                         case ERROR:
                             processError();
-                            return;
+                            break loop;
                         case IDLE:
                             if (demand.tryDecrement()
                                     && tryChangeState(IDLE, WAITING)) {
                                 transport.request(1);
                             }
-                            return;
+                            break loop;
                         case WAITING:
                             // For debugging spurious signalling: when there was a
                             // signal, but apparently nothing has changed
-                            return;
+                            break loop;
                         default:
                             throw new InternalError(String.valueOf(s));
                     }
@@ -419,6 +465,9 @@
                     signalError(t);
                 }
             }
+            if (DEBUG) {
+                System.out.printf("[WebSocket] exit receive task%n");
+            }
         }
 
         private void processError() throws IOException {
@@ -431,7 +480,11 @@
             if (err instanceof FailWebSocketException) {
                 int code1 = ((FailWebSocketException) err).getStatusCode();
                 err = new ProtocolException().initCause(err);
-                sendClose0(code1, "")
+                if (DEBUG) {
+                    System.out.printf("[WebSocket] failing %s with error=%s statusCode=%s%n",
+                                      WebSocketImpl.this, err, code1);
+                }
+                sendClose0(code1, "") // TODO handle errors from here
                         .whenComplete(
                                 (r, e) -> {
                                     if (e != null) {
@@ -439,7 +492,19 @@
                                     }
                                 });
             }
-            listener.onError(WebSocketImpl.this, err);
+            long id;
+            if (DEBUG) {
+                id = receiveCounter.incrementAndGet();
+                System.out.printf("[WebSocket] enter onError %s error=%s%n",
+                                  id, err);
+            }
+            try {
+                listener.onError(WebSocketImpl.this, err);
+            } finally {
+                if (DEBUG) {
+                    System.out.printf("[WebSocket] exit onError %s%n", id);
+                }
+            }
         }
 
         private void processClose() throws IOException {
@@ -448,10 +513,21 @@
             }
             transport.closeInput();
             receiveScheduler.stop();
-            CompletionStage<?> readyToClose;
-            readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason);
-            if (readyToClose == null) {
-                readyToClose = DONE;
+            CompletionStage<?> cs = null; // when the listener is ready to close
+            long id;
+            if (DEBUG) {
+                id = receiveCounter.incrementAndGet();
+                System.out.printf("[WebSocket] enter onClose %s statusCode=%s reason.length=%s%n",
+                                  id, statusCode, reason.length());
+            }
+            try {
+                cs = listener.onClose(WebSocketImpl.this, statusCode, reason);
+            } finally {
+                System.out.printf("[WebSocket] exit onClose %s returned %s%n",
+                                  id, cs);
+            }
+            if (cs == null) {
+                cs = DONE;
             }
             int code;
             if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) {
@@ -463,8 +539,8 @@
             } else {
                 code = statusCode;
             }
-            readyToClose.whenComplete((r, e) -> {
-                sendClose0(code, "") // FIXME errors from here?
+            cs.whenComplete((r, e) -> { // TODO log
+                sendClose0(code, "") // TODO handle errors from here
                         .whenComplete((r1, e1) -> {
                             if (DEBUG) {
                                 if (e1 != null) {
@@ -476,37 +552,105 @@
         }
 
         private void processPong() {
-            listener.onPong(WebSocketImpl.this, binaryData);
+            long id;
+            if (DEBUG) {
+                id = receiveCounter.incrementAndGet();
+                System.out.printf("[WebSocket] enter onPong %s payload=%s%n",
+                                  id, binaryData);
+            }
+            CompletionStage<?> cs = null;
+            try {
+                cs = listener.onPong(WebSocketImpl.this, binaryData);
+            } finally {
+                System.out.printf("[WebSocket] exit onPong %s returned %s%n",
+                                  id, cs);
+            }
         }
 
         private void processPing() {
-            // Let's make a full copy of this tiny data. What we want here
-            // is to rule out a possibility the shared data we send might be
-            // corrupted by processing in the listener.
+            if (DEBUG) {
+                System.out.printf("[WebSocket] processPing%n");
+            }
             ByteBuffer slice = binaryData.slice();
+            // A full copy of this (small) data is made. This way sending a
+            // replying Pong could be done in parallel with the listener
+            // handling this Ping.
             ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining())
                     .put(binaryData)
                     .flip();
             // Non-exclusive send;
             BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> {
-                if (e != null) { // Better error handing. What if already closed?
+                if (e != null) { // TODO: better error handing. What if already closed?
                     signalError(Utils.getCompletionCause(e));
                 }
             };
             transport.sendPong(copy, WebSocketImpl.this, reporter);
-            listener.onPing(WebSocketImpl.this, slice);
+            long id;
+            if (DEBUG) {
+                id = receiveCounter.incrementAndGet();
+                System.out.printf("[WebSocket] enter onPing %s payload=%s%n",
+                                  id, slice);
+            }
+            CompletionStage<?> cs = null;
+            try {
+                cs = listener.onPing(WebSocketImpl.this, slice);
+            } finally {
+                if (DEBUG) {
+                    System.out.printf("[WebSocket] exit onPing %s returned %s%n",
+                                      id, cs);
+                }
+            }
         }
 
         private void processBinary() {
-            listener.onBinary(WebSocketImpl.this, binaryData, part);
+            long id;
+            if (DEBUG) {
+                id = receiveCounter.incrementAndGet();
+                System.out.printf("[WebSocket] enter onBinary %s payload=%s, part=%s%n",
+                                  id, binaryData, part);
+            }
+            CompletionStage<?> cs = null;
+            try {
+                cs = listener.onBinary(WebSocketImpl.this, binaryData, part);
+            } finally {
+                if (DEBUG) {
+                    System.out.printf("[WebSocket] exit onBinary %s returned %s%n",
+                                      id, cs);
+                }
+            }
         }
 
         private void processText() {
-            listener.onText(WebSocketImpl.this, text, part);
+            long id;
+            if (DEBUG) {
+                id = receiveCounter.incrementAndGet();
+                System.out.printf("[WebSocket] enter onText %s payload.length=%s part=%s%n",
+                                  id, text.length(), part);
+            }
+            CompletionStage<?> cs = null;
+            try {
+                cs = listener.onText(WebSocketImpl.this, text, part);
+            } finally {
+                if (DEBUG) {
+                    System.out.printf("[WebSocket] exit onText %s returned %s%n",
+                                      id, cs);
+                }
+            }
         }
 
         private void processOpen() {
-            listener.onOpen(WebSocketImpl.this);
+            long id;
+            if (DEBUG) {
+                id = receiveCounter.incrementAndGet();
+                System.out.printf("[WebSocket] enter onOpen %s%n", id);
+            }
+            try {
+                listener.onOpen(WebSocketImpl.this);
+            } finally {
+                if (DEBUG) {
+                    System.out.printf("[WebSocket] exit onOpen %s%n", id);
+                }
+            }
         }
     }
 
--- a/test/jdk/java/net/httpclient/websocket/WebSocketTest.java	Mon Mar 12 12:47:29 2018 +0000
+++ b/test/jdk/java/net/httpclient/websocket/WebSocketTest.java	Tue Mar 13 17:10:20 2018 +0000
@@ -248,21 +248,29 @@
                     .newWebSocketBuilder()
                     .buildAsync(server.getURI(), new WebSocket.Listener() { })
                     .join();
-            ByteBuffer data = ByteBuffer.allocate(65536);
-            for (int i = 0; ; i++) { // fill up the send buffer
-                System.out.println("cycle #" + i);
-                try {
-                    ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
-                    data.clear();
-                } catch (TimeoutException e) {
-                    break;
+            try {
+                ByteBuffer data = ByteBuffer.allocate(65536);
+                for (int i = 0; ; i++) { // fill up the send buffer
+                    System.out.printf("begin cycle #%s at %s%n",
+                                      i, System.currentTimeMillis());
+                    try {
+                        ws.sendBinary(data, true).get(10, TimeUnit.SECONDS);
+                        data.clear();
+                    } catch (TimeoutException e) {
+                        break;
+                    } finally {
+                        System.out.printf("end cycle #%s at %s%n",
+                                          i, System.currentTimeMillis());
+                    }
                 }
+                CompletableFuture<WebSocket> cf = ws.sendClose(NORMAL_CLOSURE, "");
+                // The output closes even if the Close message has not been sent
+                assertFalse(cf.isDone());
+                assertTrue(ws.isOutputClosed());
+                assertEquals(ws.getSubprotocol(), "");
+            } finally {
+                ws.abort();
             }
-            CompletableFuture<WebSocket> cf = ws.sendClose(NORMAL_CLOSURE, "");
-            // The output closes even if the Close message has not been sent
-            assertFalse(cf.isDone());
-            assertTrue(ws.isOutputClosed());
-            assertEquals(ws.getSubprotocol(), "");
         }
     }
 
@@ -295,13 +303,17 @@
             ByteBuffer data = ByteBuffer.allocate(65536);
             CompletableFuture<WebSocket> cf = null;
             for (int i = 0; ; i++) {  // fill up the send buffer
-                System.out.println("cycle #" + i);
+                System.out.printf("begin cycle #%s at %s%n",
+                                  i, System.currentTimeMillis());
                 try {
                     cf = ws.sendBinary(data, true);
                     cf.get(10, TimeUnit.SECONDS);
                     data.clear();
                 } catch (TimeoutException e) {
                     break;
+                } finally {
+                    System.out.printf("end cycle #%s at %s%n",
+                                      i, System.currentTimeMillis());
                 }
             }
             ws.abort();
@@ -322,12 +334,16 @@
             String data = stringWith2NBytes(32768);
             CompletableFuture<WebSocket> cf = null;
             for (int i = 0; ; i++) {  // fill up the send buffer
-                System.out.println("cycle #" + i);
+                System.out.printf("begin cycle #%s at %s%n",
+                                  i, System.currentTimeMillis());
                 try {
                     cf = ws.sendText(data, true);
                     cf.get(10, TimeUnit.SECONDS);
                 } catch (TimeoutException e) {
                     break;
+                } finally {
+                    System.out.printf("end cycle #%s at %s%n",
+                                      i, System.currentTimeMillis());
                 }
             }
             ws.abort();
@@ -337,6 +353,43 @@
         }
     }
 
+    @Test
+    public void sendCloseTimeout() throws Exception {
+        try (DummyWebSocketServer server = notReadingServer()) {
+            server.open();
+            WebSocket ws = newHttpClient()
+                    .newWebSocketBuilder()
+                    .buildAsync(server.getURI(), new WebSocket.Listener() { })
+                    .join();
+            String data = stringWith2NBytes(32768);
+            CompletableFuture<WebSocket> cf = null;
+            for (int i = 0; ; i++) {  // fill up the send buffer
+                System.out.printf("begin cycle #%s at %s%n",
+                                  i, System.currentTimeMillis());
+                try {
+                    cf = ws.sendText(data, true);
+                    cf.get(10, TimeUnit.SECONDS);
+                } catch (TimeoutException e) {
+                    break;
+                } finally {
+                    System.out.printf("end cycle #%s at %s%n",
+                                      i, System.currentTimeMillis());
+                }
+            }
+            long before = System.currentTimeMillis();
+            assertCompletesExceptionally(IOException.class,
+                                         ws.sendClose(WebSocket.NORMAL_CLOSURE, "ok"));
+            long after = System.currentTimeMillis();
+            // default timeout should be 30 seconds
+            long elapsed = after - before;
+            System.out.printf("Elapsed %s ms%n", elapsed);
+            assertTrue(elapsed >= 29_000, String.valueOf(elapsed));
+            assertTrue(ws.isOutputClosed());
+            assertTrue(ws.isInputClosed());
+            assertCompletesExceptionally(IOException.class, cf);
+        }
+    }
+
     private static String stringWith2NBytes(int n) {
         // -- Russian Alphabet (33 characters, 2 bytes per char) --
         char[] abc = {
@@ -470,12 +523,16 @@
 
             CharBuffer data = CharBuffer.allocate(65536);
             for (int i = 0; ; i++) {  // fill up the send buffer
-                System.out.println("cycle #" + i);
+                System.out.printf("begin cycle #%s at %s%n",
+                                  i, System.currentTimeMillis());
                 try {
                     ws.sendText(data, true).get(10, TimeUnit.SECONDS);
                     data.clear();
                 } catch (TimeoutException e) {
                     break;
+                } finally {
+                    System.out.printf("end cycle #%s at %s%n",
+                                      i, System.currentTimeMillis());
                 }
             }
             assertCompletesExceptionally(ISE, ws.sendText("", true));