http-client-branch: improved logging and fixed race condition with throwing subscribers in onComplete http-client-branch
authordfuchs
Fri, 15 Jun 2018 12:43:10 +0100
branchhttp-client-branch
changeset 56763 25821dd1d917
parent 56762 79e371a6462c
child 56766 713f08212a7e
http-client-branch: improved logging and fixed race condition with throwing subscribers in onComplete
src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java
src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java
src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java
test/jdk/java/net/httpclient/EncodedCharsInURI.java
test/jdk/java/net/httpclient/ThrowingSubscribers.java
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Fri Jun 15 11:53:32 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Exchange.java	Fri Jun 15 12:43:10 2018 +0100
@@ -285,6 +285,15 @@
             if (debug.on()) debug.log("asyncReceiver finished (failed=%s)", t);
             if (t != null) {
                 s.cancel();
+                // Don't complete exceptionally here as 't'
+                // might not be the right exception: it will
+                // not have been decorated yet.
+                // t is an exception raised by the read side,
+                // an EOFException or Broken Pipe...
+                // We are cancelling the BodyPublisher subscription
+                // and completing bodySentCF to allow the next step
+                // to flow and call readHeaderAsync, which will
+                // get the right exception from the asyncReceiver.
                 bodySentCF.complete(this);
             }
         }, executor);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Fri Jun 15 11:53:32 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/Http1Response.java	Fri Jun 15 12:43:10 2018 +0100
@@ -313,13 +313,19 @@
                     try {
                         userSubscriber.onComplete();
                     } catch (Throwable x) {
-                        propagateError(t = withError = Utils.getCompletionCause(x));
-                        // rethrow and let the caller deal with it.
+                        // Simply propagate the error by calling
+                        // onError on the user subscriber, and let the
+                        // connection be reused since we should have received
+                        // and parsed all the bytes when we reach here.
+                        // If onError throws in turn, then we will simply
+                        // let that new exception flow up to the caller
+                        // and let it deal with it.
                         // (i.e: log and close the connection)
-                        // arguably we could decide to not throw and let the
-                        // connection be reused since we should have received and
-                        // parsed all the bytes when we reach here.
-                        throw x;
+                        // Note that rethrowing here could introduce a
+                        // race that might cause the next send() operation to
+                        // fail as the connection has already been put back
+                        // into the cache when we reach here.
+                        propagateError(t = withError = Utils.getCompletionCause(x));
                     }
                 } else {
                     propagateError(t);
--- a/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Fri Jun 15 11:53:32 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java	Fri Jun 15 12:43:10 2018 +0100
@@ -720,6 +720,7 @@
         }
 
         synchronized void shutdown() {
+            Log.logTrace("{0}: shutting down", getName());
             if (debug.on()) debug.log("SelectorManager shutting down");
             closed = true;
             try {
@@ -736,6 +737,7 @@
             List<AsyncEvent> readyList = new ArrayList<>();
             List<Runnable> resetList = new ArrayList<>();
             try {
+                if (Log.trace()) Log.logTrace(getName() + ": starting");
                 while (!Thread.currentThread().isInterrupted()) {
                     synchronized (this) {
                         assert errorList.isEmpty();
@@ -772,7 +774,7 @@
                                     throw new IOException("Channel closed");
                                 }
                             } catch (IOException e) {
-                                Log.logTrace("HttpClientImpl: " + e);
+                                Log.logTrace("{0}: {1}", getName(), e);
                                 if (debug.on())
                                     debug.log("Got " + e.getClass().getName()
                                               + " while handling registration events");
@@ -804,7 +806,9 @@
                     // Check whether client is still alive, and if not,
                     // gracefully stop this thread
                     if (!owner.isReferenced()) {
-                        Log.logTrace("HttpClient no longer referenced. Exiting...");
+                        Log.logTrace("{0}: {1}",
+                                getName(),
+                                "HttpClient no longer referenced. Exiting...");
                         return;
                     }
 
@@ -846,7 +850,9 @@
                         // Check whether client is still alive, and if not,
                         // gracefully stop this thread
                         if (!owner.isReferenced()) {
-                            Log.logTrace("HttpClient no longer referenced. Exiting...");
+                            Log.logTrace("{0}: {1}",
+                                    getName(),
+                                    "HttpClient no longer referenced. Exiting...");
                             return;
                         }
                         owner.purgeTimeoutsAndReturnNextDeadline();
@@ -897,11 +903,11 @@
 
                 }
             } catch (Throwable e) {
-                //e.printStackTrace();
                 if (!closed) {
                     // This terminates thread. So, better just print stack trace
                     String err = Utils.stackTrace(e);
-                    Log.logError("HttpClientImpl: fatal error: " + err);
+                    Log.logError("{0}: {1}: {2}", getName(),
+                            "HttpClientImpl shutting down due to fatal error", err);
                 }
                 if (debug.on()) debug.log("shutting down", e);
                 if (Utils.ASSERTIONSENABLED && !debug.on()) {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java	Fri Jun 15 11:53:32 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/PlainHttpConnection.java	Fri Jun 15 12:43:10 2018 +0100
@@ -126,7 +126,12 @@
             }
         } catch (Throwable throwable) {
             cf.completeExceptionally(Utils.toConnectException(throwable));
-            close();
+            try {
+                close();
+            } catch (Exception x) {
+                if (debug.on())
+                    debug.log("Failed to close channel after unsuccessful connect");
+            }
         }
         return cf;
     }
--- a/test/jdk/java/net/httpclient/EncodedCharsInURI.java	Fri Jun 15 11:53:32 2018 +0100
+++ b/test/jdk/java/net/httpclient/EncodedCharsInURI.java	Fri Jun 15 12:43:10 2018 +0100
@@ -33,7 +33,8 @@
  *          java.net.http/jdk.internal.net.http.frame
  *          java.net.http/jdk.internal.net.http.hpack
  * @run testng/othervm
- *        -Djdk.httpclient.HttpClient.log=headers EncodedCharsInURI
+ *        -Djdk.internal.httpclient.debug=true
+ *        -Djdk.httpclient.HttpClient.log=headers,errors EncodedCharsInURI
  */
 //*        -Djdk.internal.httpclient.debug=true
 
@@ -48,21 +49,14 @@
 import org.testng.annotations.Test;
 
 import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
 import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLServerSocket;
-import javax.net.ssl.SSLServerSocketFactory;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.io.Writer;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketAddress;
 import java.net.URI;
 import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
@@ -71,8 +65,6 @@
 import java.net.http.HttpResponse;
 import java.net.http.HttpResponse.BodyHandler;
 import java.net.http.HttpResponse.BodyHandlers;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.List;
 import java.util.Locale;
 import java.util.StringTokenizer;
@@ -87,6 +79,7 @@
 import static java.lang.String.format;
 import static java.lang.System.in;
 import static java.lang.System.out;
+import static java.nio.charset.StandardCharsets.US_ASCII;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.net.http.HttpClient.Builder.NO_PROXY;
 import static org.testng.Assert.assertEquals;
@@ -402,9 +395,6 @@
                     Socket targetConnection = null;
                     InputStream  ccis = clientConnection.getInputStream();
                     OutputStream ccos = clientConnection.getOutputStream();
-                    Writer w = new OutputStreamWriter(
-                            clientConnection.getOutputStream(), "UTF-8");
-                    PrintWriter pw = new PrintWriter(w);
                     System.out.println(now() + getName() + ": Reading request line");
                     String requestLine = readLine(ccis);
                     System.out.println(now() + getName() + ": Request line: " + requestLine);
@@ -459,11 +449,13 @@
                     // Then send the 200 OK response to the client
                     System.out.println(now() + getName() + ": Sending "
                             + response);
-                    pw.print(response);
-                    pw.flush();
+                    ccos.write(response.toString().getBytes(UTF_8));
+                    ccos.flush();
+                    System.out.println(now() + getName() + ": sent response headers");
                     ccos.write(b);
                     ccos.flush();
                     ccos.close();
+                    System.out.println(now() + getName() + ": sent " + b.length + " body bytes");
                     connections.remove(clientConnection);
                     clientConnection.close();
                 }
--- a/test/jdk/java/net/httpclient/ThrowingSubscribers.java	Fri Jun 15 11:53:32 2018 +0100
+++ b/test/jdk/java/net/httpclient/ThrowingSubscribers.java	Fri Jun 15 12:43:10 2018 +0100
@@ -179,6 +179,8 @@
         };
     }
 
+    private static AtomicLong URICOUNT = new AtomicLong();
+
     @DataProvider(name = "noThrows")
     public Object[][] noThrows() {
         String[] uris = uris();
@@ -268,19 +270,20 @@
     public void testNoThrows(String uri, boolean sameClient)
             throws Exception {
         HttpClient client = null;
-        out.printf("%ntestNoThrows(%s, %b)%n", uri, sameClient);
+        String uri2 = uri + "-" + URICOUNT.incrementAndGet() + "/noThrows";
+        out.printf("%ntestNoThrows(%s, %b)%n", uri2, sameClient);
         for (int i=0; i< ITERATION_COUNT; i++) {
             if (!sameClient || client == null)
                 client = newHttpClient(sameClient);
 
-            HttpRequest req = HttpRequest.newBuilder(URI.create(uri))
+            HttpRequest req = HttpRequest.newBuilder(URI.create(uri2))
                     .build();
             BodyHandler<String> handler =
                     new ThrowingBodyHandler((w) -> {},
                                             BodyHandlers.ofString());
             HttpResponse<String> response = client.send(req, handler);
             String body = response.body();
-            assertEquals(URI.create(body).getPath(), URI.create(uri).getPath());
+            assertEquals(URI.create(body).getPath(), URI.create(uri2).getPath());
         }
     }
 
@@ -290,6 +293,7 @@
                                      Thrower thrower)
             throws Exception
     {
+        uri = uri + "-" + URICOUNT.incrementAndGet();
         String test = format("testThrowingAsString(%s, %b, %s)",
                              uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofString,
@@ -303,6 +307,7 @@
                                     Thrower thrower)
             throws Exception
     {
+        uri = uri + "-" + URICOUNT.incrementAndGet();
         String test =  format("testThrowingAsLines(%s, %b, %s)",
                 uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofLines,
@@ -316,6 +321,7 @@
                                           Thrower thrower)
             throws Exception
     {
+        uri = uri + "-" + URICOUNT.incrementAndGet();
         String test = format("testThrowingAsInputStream(%s, %b, %s)",
                 uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,
@@ -329,6 +335,7 @@
                                           Thrower thrower)
             throws Exception
     {
+        uri = uri + "-" + URICOUNT.incrementAndGet();
         String test = format("testThrowingAsStringAsync(%s, %b, %s)",
                 uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofString,
@@ -342,6 +349,7 @@
                                          Thrower thrower)
             throws Exception
     {
+        uri = uri + "-" + URICOUNT.incrementAndGet();
         String test = format("testThrowingAsLinesAsync(%s, %b, %s)",
                 uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofLines,
@@ -355,6 +363,7 @@
                                                Thrower thrower)
             throws Exception
     {
+        uri = uri + "-" + URICOUNT.incrementAndGet();
         String test = format("testThrowingAsInputStreamAsync(%s, %b, %s)",
                 uri, sameClient, thrower);
         testThrowing(test, uri, sameClient, BodyHandlers::ofInputStream,
@@ -389,9 +398,9 @@
 
             if (!sameClient || client == null)
                 client = newHttpClient(sameClient);
-
+            String uri2 = uri + "-" + where;
             HttpRequest req = HttpRequest.
-                    newBuilder(URI.create(uri))
+                    newBuilder(URI.create(uri2))
                     .build();
             BodyHandler<T> handler =
                     new ThrowingBodyHandler(where.select(thrower), handlers.get());