src/java.base/share/classes/sun/net/www/http/KeepAliveStream.java
branchJDK-8229867-branch
changeset 57968 8595871a5446
parent 47216 71c04702a3d5
--- a/src/java.base/share/classes/sun/net/www/http/KeepAliveStream.java	Fri Aug 30 13:11:16 2019 +0100
+++ b/src/java.base/share/classes/sun/net/www/http/KeepAliveStream.java	Fri Aug 30 15:42:27 2019 +0100
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1996, 2019, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -26,7 +26,7 @@
 package sun.net.www.http;
 
 import java.io.*;
-
+import java.util.concurrent.locks.ReentrantLock;
 import sun.net.ProgressSource;
 import sun.net.www.MeteredStream;
 import jdk.internal.misc.InnocuousThread;
@@ -47,7 +47,7 @@
     boolean hurried;
 
     // has this KeepAliveStream been put on the queue for asynchronous cleanup.
-    protected boolean queuedForCleanup = false;
+    protected volatile boolean queuedForCleanup = false;
 
     private static final KeepAliveStreamCleaner queue = new KeepAliveStreamCleaner();
     private static Thread cleanerThread; // null
@@ -60,19 +60,17 @@
         this.hc = hc;
     }
 
+    final ReentrantLock readLock() {
+        return readLock;
+    }
+
     /**
      * Attempt to cache this connection
      */
     public void close() throws IOException  {
-        // If the inputstream is closed already, just return.
-        if (closed) {
-            return;
-        }
-
-        // If this stream has already been queued for cleanup.
-        if (queuedForCleanup) {
-            return;
-        }
+        // If the inputstream is closed already, or if this stream
+        // has already been queued for cleanup.just return.
+        if (closed || queuedForCleanup) return;
 
         // Skip past the data that's left in the Inputstream because
         // some sort of error may have occurred.
@@ -81,34 +79,43 @@
         // to hang around for nothing. So if we can't skip without blocking
         // we just close the socket and, therefore, terminate the keepAlive
         // NOTE: Don't close super class
+        // For consistency, access to `expected` and `count` should be
+        // protected by readLock
+        readLock.lock();
         try {
-            if (expected > count) {
-                long nskip = expected - count;
-                if (nskip <= available()) {
-                    do {} while ((nskip = (expected - count)) > 0L
-                                 && skip(Math.min(nskip, available())) > 0L);
-                } else if (expected <= KeepAliveStreamCleaner.MAX_DATA_REMAINING && !hurried) {
-                    //put this KeepAliveStream on the queue so that the data remaining
-                    //on the socket can be cleanup asyncronously.
-                    queueForCleanup(new KeepAliveCleanerEntry(this, hc));
-                } else {
-                    hc.closeServer();
+            if (closed || queuedForCleanup) return;
+            try {
+                if (expected > count) {
+                    long nskip = expected - count;
+                    if (nskip <= available()) {
+                        do {
+                        } while ((nskip = (expected - count)) > 0L
+                                && skip(Math.min(nskip, available())) > 0L);
+                    } else if (expected <= KeepAliveStreamCleaner.MAX_DATA_REMAINING && !hurried) {
+                        //put this KeepAliveStream on the queue so that the data remaining
+                        //on the socket can be cleanup asyncronously.
+                        queueForCleanup(new KeepAliveCleanerEntry(this, hc));
+                    } else {
+                        hc.closeServer();
+                    }
+                }
+                if (!closed && !hurried && !queuedForCleanup) {
+                    hc.finished();
+                }
+            } finally {
+                if (pi != null)
+                    pi.finishTracking();
+
+                if (!queuedForCleanup) {
+                    // nulling out the underlying inputstream as well as
+                    // httpClient to let gc collect the memories faster
+                    in = null;
+                    hc = null;
+                    closed = true;
                 }
             }
-            if (!closed && !hurried && !queuedForCleanup) {
-                hc.finished();
-            }
         } finally {
-            if (pi != null)
-                pi.finishTracking();
-
-            if (!queuedForCleanup) {
-                // nulling out the underlying inputstream as well as
-                // httpClient to let gc collect the memories faster
-                in = null;
-                hc = null;
-                closed = true;
-            }
+            readLock.unlock();
         }
     }
 
@@ -124,7 +131,8 @@
         throw new IOException("mark/reset not supported");
     }
 
-    public synchronized boolean hurry() {
+    public boolean hurry() {
+        readLock.lock();
         try {
             /* CASE 0: we're actually already done */
             if (closed || count >= expected) {
@@ -147,11 +155,14 @@
         } catch (IOException e) {
             // e.printStackTrace();
             return false;
+        } finally {
+            readLock.unlock();
         }
     }
 
     private static void queueForCleanup(KeepAliveCleanerEntry kace) {
-        synchronized(queue) {
+        queue.queueLock.lock();
+        try {
             if(!kace.getQueuedForCleanup()) {
                 if (!queue.offer(kace)) {
                     kace.getHttpClient().closeServer();
@@ -159,7 +170,7 @@
                 }
 
                 kace.setQueuedForCleanup();
-                queue.notifyAll();
+                queue.waiter.signalAll();
             }
 
             boolean startCleanupThread = (cleanerThread == null);
@@ -181,14 +192,18 @@
                     }
                 });
             }
-        } // queue
+        } finally {
+            queue.queueLock.unlock();
+        }
     }
 
     protected long remainingToRead() {
+        assert readLock.isHeldByCurrentThread();
         return expected - count;
     }
 
     protected void setClosed() {
+        assert readLock.isHeldByCurrentThread();
         in = null;
         hc = null;
         closed = true;