--- a/src/java.base/share/classes/sun/net/www/http/KeepAliveStream.java Fri Aug 30 13:11:16 2019 +0100
+++ b/src/java.base/share/classes/sun/net/www/http/KeepAliveStream.java Fri Aug 30 15:42:27 2019 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1996, 2019, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -26,7 +26,7 @@
package sun.net.www.http;
import java.io.*;
-
+import java.util.concurrent.locks.ReentrantLock;
import sun.net.ProgressSource;
import sun.net.www.MeteredStream;
import jdk.internal.misc.InnocuousThread;
@@ -47,7 +47,7 @@
boolean hurried;
// has this KeepAliveStream been put on the queue for asynchronous cleanup.
- protected boolean queuedForCleanup = false;
+ protected volatile boolean queuedForCleanup = false;
private static final KeepAliveStreamCleaner queue = new KeepAliveStreamCleaner();
private static Thread cleanerThread; // null
@@ -60,19 +60,17 @@
this.hc = hc;
}
+ final ReentrantLock readLock() {
+ return readLock;
+ }
+
/**
* Attempt to cache this connection
*/
public void close() throws IOException {
- // If the inputstream is closed already, just return.
- if (closed) {
- return;
- }
-
- // If this stream has already been queued for cleanup.
- if (queuedForCleanup) {
- return;
- }
+ // If the inputstream is closed already, or if this stream
+ // has already been queued for cleanup.just return.
+ if (closed || queuedForCleanup) return;
// Skip past the data that's left in the Inputstream because
// some sort of error may have occurred.
@@ -81,34 +79,43 @@
// to hang around for nothing. So if we can't skip without blocking
// we just close the socket and, therefore, terminate the keepAlive
// NOTE: Don't close super class
+ // For consistency, access to `expected` and `count` should be
+ // protected by readLock
+ readLock.lock();
try {
- if (expected > count) {
- long nskip = expected - count;
- if (nskip <= available()) {
- do {} while ((nskip = (expected - count)) > 0L
- && skip(Math.min(nskip, available())) > 0L);
- } else if (expected <= KeepAliveStreamCleaner.MAX_DATA_REMAINING && !hurried) {
- //put this KeepAliveStream on the queue so that the data remaining
- //on the socket can be cleanup asyncronously.
- queueForCleanup(new KeepAliveCleanerEntry(this, hc));
- } else {
- hc.closeServer();
+ if (closed || queuedForCleanup) return;
+ try {
+ if (expected > count) {
+ long nskip = expected - count;
+ if (nskip <= available()) {
+ do {
+ } while ((nskip = (expected - count)) > 0L
+ && skip(Math.min(nskip, available())) > 0L);
+ } else if (expected <= KeepAliveStreamCleaner.MAX_DATA_REMAINING && !hurried) {
+ //put this KeepAliveStream on the queue so that the data remaining
+ //on the socket can be cleanup asyncronously.
+ queueForCleanup(new KeepAliveCleanerEntry(this, hc));
+ } else {
+ hc.closeServer();
+ }
+ }
+ if (!closed && !hurried && !queuedForCleanup) {
+ hc.finished();
+ }
+ } finally {
+ if (pi != null)
+ pi.finishTracking();
+
+ if (!queuedForCleanup) {
+ // nulling out the underlying inputstream as well as
+ // httpClient to let gc collect the memories faster
+ in = null;
+ hc = null;
+ closed = true;
}
}
- if (!closed && !hurried && !queuedForCleanup) {
- hc.finished();
- }
} finally {
- if (pi != null)
- pi.finishTracking();
-
- if (!queuedForCleanup) {
- // nulling out the underlying inputstream as well as
- // httpClient to let gc collect the memories faster
- in = null;
- hc = null;
- closed = true;
- }
+ readLock.unlock();
}
}
@@ -124,7 +131,8 @@
throw new IOException("mark/reset not supported");
}
- public synchronized boolean hurry() {
+ public boolean hurry() {
+ readLock.lock();
try {
/* CASE 0: we're actually already done */
if (closed || count >= expected) {
@@ -147,11 +155,14 @@
} catch (IOException e) {
// e.printStackTrace();
return false;
+ } finally {
+ readLock.unlock();
}
}
private static void queueForCleanup(KeepAliveCleanerEntry kace) {
- synchronized(queue) {
+ queue.queueLock.lock();
+ try {
if(!kace.getQueuedForCleanup()) {
if (!queue.offer(kace)) {
kace.getHttpClient().closeServer();
@@ -159,7 +170,7 @@
}
kace.setQueuedForCleanup();
- queue.notifyAll();
+ queue.waiter.signalAll();
}
boolean startCleanupThread = (cleanerThread == null);
@@ -181,14 +192,18 @@
}
});
}
- } // queue
+ } finally {
+ queue.queueLock.unlock();
+ }
}
protected long remainingToRead() {
+ assert readLock.isHeldByCurrentThread();
return expected - count;
}
protected void setClosed() {
+ assert readLock.isHeldByCurrentThread();
in = null;
hc = null;
closed = true;