# HG changeset patch # User prappo # Date 1513296316 -10800 # Node ID 76ac25076fdc3383c6ec89194fb5068a1cf43c7d # Parent 7f1e0cf933a6fdd3e274f19577a7600945736ccc http-client-branch: (WebSocket) refactoring diff -r 7f1e0cf933a6 -r 76ac25076fdc src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java Fri Dec 15 00:47:16 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/Transport.java Fri Dec 15 03:05:16 2017 +0300 @@ -30,6 +30,25 @@ import java.util.concurrent.CompletableFuture; /* + * Transport needs some way to asynchronously notify the send operation has been + * completed. It can have several different designs each of which has its own + * pros and cons: + * + * (1) void sendMessage(..., Callback) + * (2) CompletableFuture sendMessage(...) + * (3) CompletableFuture sendMessage(..., Callback) + * (4) boolean sendMessage(..., Callback) throws IOException + * ... + * + * If Transport's users use CFs, (1) forces these users to create CFs and pass + * them to the callback. If any additional (dependant) action needs to be + * attached to the returned CF, this means an extra object (CF) must be created + * in (2). (3) and (4) solves both issues, however (4) does not abstract out + * when exactly the operation has been performed. So the handling code needs to + * be repeated twice. And that leads to 2 different code paths (more bugs). + * Unless designed for this, the user should not assume any specific order of + * completion in (3) (e.g. callback first and then the returned CF). + * * The only parametrization of Transport used is Transport. The * type parameter T was introduced solely to avoid circular dependency between * Transport and WebSocket. After all, instances of T are used solely to diff -r 7f1e0cf933a6 -r 76ac25076fdc src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java Fri Dec 15 00:47:16 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/TransportImpl.java Fri Dec 15 03:05:16 2017 +0300 @@ -124,9 +124,9 @@ private void send0(OutgoingMessage message, Consumer handler) { boolean b = busy.get(); assert b; // Please don't inline this, as busy.get() has memory - // visibility effects and we don't want the program behaviour - // to depend on whether the assertions are turned on - // or turned off + // visibility effects and we don't want the program behaviour + // to depend on whether the assertions are turned on + // or turned off try { boolean sent = message.sendTo(channel); if (sent) { @@ -203,7 +203,7 @@ CompletableFuture cf = p.second; try { if (!message.contextualize(context)) { // Do not send the message - cf.complete(null); + cf.complete(resultSupplier.get()); repeat(taskCompleter); return; } @@ -256,9 +256,9 @@ @Override public void acknowledgeReception() { - long x = demand.decreaseAndGet(1); - if (x < 0) { - throw new InternalError(String.valueOf(x)); + boolean decremented = demand.tryDecrement(); + if (!decremented) { + throw new InternalError(); } } @@ -320,7 +320,7 @@ } /* - * Stops the machinery from reading and delivering messages permanently, + * Permanently stops reading from the channel and delivering messages * regardless of the current demand and data availability. */ @Override diff -r 7f1e0cf933a6 -r 76ac25076fdc src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Fri Dec 15 00:47:16 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Fri Dec 15 03:05:16 2017 +0300 @@ -153,8 +153,7 @@ return failedFuture(new IllegalStateException("Send pending")); } CompletableFuture cf = transport.sendText(message, isLast); - cf.whenComplete((r, e) -> outstandingSend.set(false)); - return cf; + return cf.whenComplete((r, e) -> outstandingSend.set(false)); } @Override @@ -170,8 +169,7 @@ // } else { // cf.whenComplete((r, e) -> outstandingSend.set(false)); // } - cf.whenComplete((r, e) -> outstandingSend.set(false)); - return cf; + return cf.whenComplete((r, e) -> outstandingSend.set(false)); } @Override diff -r 7f1e0cf933a6 -r 76ac25076fdc test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java --- a/test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java Fri Dec 15 00:47:16 2017 +0300 +++ b/test/jdk/java/net/httpclient/websocket/WebSocketImplDriver.java Fri Dec 15 03:05:16 2017 +0300 @@ -24,6 +24,6 @@ /* * @test * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open - * @run testng/othervm/timeout=30 --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.WebSocketImplTest + * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.WebSocketImplTest */ public class WebSocketImplDriver { } diff -r 7f1e0cf933a6 -r 76ac25076fdc test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java --- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java Fri Dec 15 00:47:16 2017 +0300 +++ b/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/WebSocketImplTest.java Fri Dec 15 03:05:16 2017 +0300 @@ -72,6 +72,8 @@ // TODO: request in onClose/onError // TODO: throw exception in onClose/onError // TODO: exception is thrown from request() + // TODO: repeated sendClose complete normally + // TODO: default Close message is sent if IAE is thrown from sendClose @Test public void testNonPositiveRequest() throws Exception { @@ -208,6 +210,72 @@ ); } + // Tease out "java.lang.IllegalStateException: Send pending" due to possible + // race between sending a message and replenishing the permit + @Test + public void testManyTextMessages() { + WebSocketImpl ws = newInstance( + new MockListener(1), + new TransportFactory() { + @Override + public Transport createTransport(Supplier sendResultSupplier, + MessageStreamConsumer consumer) { + + final Random r = new Random(); + + return new MockTransport<>(sendResultSupplier, consumer) { + @Override + protected CompletableFuture defaultSend() { + return millis(r.nextInt(100), result()); + } + }; + } + }); + int NUM_MESSAGES = 512; + CompletableFuture current = CompletableFuture.completedFuture(ws); + for (int i = 0; i < NUM_MESSAGES; i++) { + current = current.thenCompose(w -> w.sendText(" ", true)); + } + current.join(); + MockTransport transport = (MockTransport) ws.transport(); + assertEquals(transport.invocations().size(), NUM_MESSAGES); + } + + @Test + public void testManyBinaryMessages() { + WebSocketImpl ws = newInstance( + new MockListener(1), + new TransportFactory() { + @Override + public Transport createTransport(Supplier sendResultSupplier, + MessageStreamConsumer consumer) { + + final Random r = new Random(); + + return new MockTransport<>(sendResultSupplier, consumer) { + @Override + protected CompletableFuture defaultSend() { + return millis(r.nextInt(150), result()); + } + }; + } + }); + CompletableFuture start = new CompletableFuture<>(); + + int NUM_MESSAGES = 512; + CompletableFuture current = start; + for (int i = 0; i < NUM_MESSAGES; i++) { + current = current.thenComposeAsync(w -> w.sendBinary(ByteBuffer.allocate(1), true)); + } + + start.completeAsync(() -> ws); + current.join(); + + MockTransport transport = (MockTransport) ws.transport(); + assertEquals(transport.invocations().size(), NUM_MESSAGES); + } + + @Test public void sendTextImmediately() { WebSocketImpl ws = newInstance( @@ -349,6 +417,11 @@ .completeOnTimeout(result, sec, TimeUnit.SECONDS); } + private static CompletableFuture millis(long sec, T result) { + return new CompletableFuture() + .completeOnTimeout(result, sec, TimeUnit.MILLISECONDS); + } + private static CompletableFuture now(T result) { return CompletableFuture.completedFuture(result); }