8139965: Hang seen when using com.sun.jndi.ldap.search.replyQueueSize
Reviewed-by: dfuchs
--- a/src/java.naming/share/classes/com/sun/jndi/ldap/BerDecoder.java Wed Apr 24 14:03:20 2019 +0200
+++ b/src/java.naming/share/classes/com/sun/jndi/ldap/BerDecoder.java Thu Apr 25 05:54:54 2019 -0700
@@ -186,12 +186,16 @@
*</pre></blockquote>
*/
private int parseIntWithTag(int tag) throws DecodeException {
-
-
if (parseByte() != tag) {
+ // Ber could have been reset;
+ String s;
+ if (offset > 0) {
+ s = Integer.toString(buf[offset - 1] & 0xff);
+ } else {
+ s = "Empty tag";
+ }
throw new DecodeException("Encountered ASN.1 tag " +
- Integer.toString(buf[offset - 1] & 0xff) +
- " (expected tag " + Integer.toString(tag) + ")");
+ s + " (expected tag " + Integer.toString(tag) + ")");
}
int len = parseLength();
--- a/src/java.naming/share/classes/com/sun/jndi/ldap/Connection.java Wed Apr 24 14:03:20 2019 +0200
+++ b/src/java.naming/share/classes/com/sun/jndi/ldap/Connection.java Thu Apr 25 05:54:54 2019 -0700
@@ -408,65 +408,29 @@
/**
* Reads a reply; waits until one is ready.
*/
- BerDecoder readReply(LdapRequest ldr)
- throws IOException, NamingException {
+ BerDecoder readReply(LdapRequest ldr) throws IOException, NamingException {
BerDecoder rber;
- // Track down elapsed time to workaround spurious wakeups
- long elapsedMilli = 0;
- long elapsedNano = 0;
-
- while (((rber = ldr.getReplyBer()) == null) &&
- (readTimeout <= 0 || elapsedMilli < readTimeout))
- {
- try {
- // If socket closed, don't even try
- synchronized (this) {
- if (sock == null) {
- throw new ServiceUnavailableException(host + ":" + port +
- "; socket closed");
- }
- }
- synchronized (ldr) {
- // check if condition has changed since our last check
- rber = ldr.getReplyBer();
- if (rber == null) {
- if (readTimeout > 0) { // Socket read timeout is specified
- long beginNano = System.nanoTime();
-
- // will be woken up before readTimeout if reply is
- // available
- ldr.wait(readTimeout - elapsedMilli);
- elapsedNano += (System.nanoTime() - beginNano);
- elapsedMilli += elapsedNano / 1000_000;
- elapsedNano %= 1000_000;
-
- } else {
- // no timeout is set so we wait infinitely until
- // a response is received
- // http://docs.oracle.com/javase/8/docs/technotes/guides/jndi/jndi-ldap.html#PROP
- ldr.wait();
- }
- } else {
- break;
- }
- }
- } catch (InterruptedException ex) {
- throw new InterruptedNamingException(
- "Interrupted during LDAP operation");
- }
+ try {
+ // if no timeout is set so we wait infinitely until
+ // a response is received
+ // http://docs.oracle.com/javase/8/docs/technotes/guides/jndi/jndi-ldap.html#PROP
+ rber = ldr.getReplyBer(readTimeout);
+ } catch (InterruptedException ex) {
+ throw new InterruptedNamingException(
+ "Interrupted during LDAP operation");
}
- if ((rber == null) && (elapsedMilli >= readTimeout)) {
+ if (rber == null) {
abandonRequest(ldr, null);
- throw new NamingException("LDAP response read timed out, timeout used:"
+ throw new NamingException(
+ "LDAP response read timed out, timeout used:"
+ readTimeout + "ms." );
}
return rber;
}
-
////////////////////////////////////////////////////////////////////////////
//
// Methods to add, find, delete, and abandon requests made to server
@@ -660,14 +624,11 @@
if (nparent) {
LdapRequest ldr = pendingRequests;
while (ldr != null) {
-
- synchronized (ldr) {
- ldr.notify();
+ ldr.close();
ldr = ldr.next;
}
}
}
- }
if (nparent) {
parent.processConnectionClosure();
}
@@ -755,7 +716,7 @@
* the safest thing to do is to shut it down.
*/
- private Object pauseLock = new Object(); // lock for reader to wait on while paused
+ private final Object pauseLock = new Object(); // lock for reader to wait on while paused
private boolean paused = false; // paused state of reader
/*
--- a/src/java.naming/share/classes/com/sun/jndi/ldap/LdapRequest.java Wed Apr 24 14:03:20 2019 +0200
+++ b/src/java.naming/share/classes/com/sun/jndi/ldap/LdapRequest.java Thu Apr 25 05:54:54 2019 -0700
@@ -29,55 +29,52 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.naming.CommunicationException;
+import java.util.concurrent.TimeUnit;
final class LdapRequest {
+ private final static BerDecoder EOF = new BerDecoder(new byte[]{}, -1, 0);
+
LdapRequest next; // Set/read in synchronized Connection methods
- int msgId; // read-only
+ final int msgId; // read-only
- private int gotten = 0;
- private BlockingQueue<BerDecoder> replies;
- private int highWatermark = -1;
- private boolean cancelled = false;
- private boolean pauseAfterReceipt = false;
- private boolean completed = false;
-
- LdapRequest(int msgId, boolean pause) {
- this(msgId, pause, -1);
- }
+ private final BlockingQueue<BerDecoder> replies;
+ private volatile boolean cancelled;
+ private volatile boolean closed;
+ private volatile boolean completed;
+ private final boolean pauseAfterReceipt;
LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
this.msgId = msgId;
this.pauseAfterReceipt = pause;
if (replyQueueCapacity == -1) {
- this.replies = new LinkedBlockingQueue<BerDecoder>();
+ this.replies = new LinkedBlockingQueue<>();
} else {
- this.replies =
- new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity);
- highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity
+ this.replies = new LinkedBlockingQueue<>(8 * replyQueueCapacity / 10);
}
}
- synchronized void cancel() {
+ void cancel() {
cancelled = true;
+ replies.offer(EOF);
+ }
- // Unblock reader of pending request
- // Should only ever have at most one waiter
- notify();
+ synchronized void close() {
+ closed = true;
+ replies.offer(EOF);
+ }
+
+ private boolean isClosed() {
+ return closed && (replies.size() == 0 || replies.peek() == EOF);
}
synchronized boolean addReplyBer(BerDecoder ber) {
- if (cancelled) {
+ // check the closed boolean value here as we don't want anything
+ // to be added to the queue after close() has been called.
+ if (cancelled || closed) {
return false;
}
- // Add a new reply to the queue of unprocessed replies.
- try {
- replies.put(ber);
- } catch (InterruptedException e) {
- // ignore
- }
-
// peek at the BER buffer to check if it is a SearchResultDone PDU
try {
ber.parseSeq(null);
@@ -88,33 +85,38 @@
}
ber.reset();
- notify(); // notify anyone waiting for reply
- /*
- * If a queue capacity has been set then trigger a pause when the
- * queue has filled to 80% capacity. Later, when the queue has drained
- * then the reader gets unpaused.
- */
- if (highWatermark != -1 && replies.size() >= highWatermark) {
- return true; // trigger the pause
+ // Add a new reply to the queue of unprocessed replies.
+ try {
+ replies.put(ber);
+ } catch (InterruptedException e) {
+ // ignore
}
+
return pauseAfterReceipt;
}
- synchronized BerDecoder getReplyBer() throws CommunicationException {
+ BerDecoder getReplyBer(long millis) throws CommunicationException,
+ InterruptedException {
+ if (cancelled) {
+ throw new CommunicationException("Request: " + msgId +
+ " cancelled");
+ }
+ if (isClosed()) {
+ return null;
+ }
+
+ BerDecoder result = millis > 0 ?
+ replies.poll(millis, TimeUnit.MILLISECONDS) : replies.take();
+
if (cancelled) {
throw new CommunicationException("Request: " + msgId +
" cancelled");
}
- /*
- * Remove a reply if the queue is not empty.
- * poll returns null if queue is empty.
- */
- BerDecoder reply = replies.poll();
- return reply;
+ return result == EOF ? null : result;
}
- synchronized boolean hasSearchCompleted() {
+ boolean hasSearchCompleted() {
return completed;
}
}
--- a/test/jdk/com/sun/jndi/ldap/LdapDnsProviderTest.java Wed Apr 24 14:03:20 2019 +0200
+++ b/test/jdk/com/sun/jndi/ldap/LdapDnsProviderTest.java Thu Apr 25 05:54:54 2019 -0700
@@ -194,8 +194,8 @@
// no SecurityManager
runTest("ldap:///dc=example,dc=com", "localhost:389");
runTest("ldap://localhost/dc=example,dc=com", "localhost:389");
- runTest("ldap://localhost:111/dc=example,dc=com", "localhost:111");
- runTest("ldaps://localhost:111/dc=example,dc=com", "localhost:111");
+ runTest("ldap://localhost:1111/dc=example,dc=com", "localhost:1111");
+ runTest("ldaps://localhost:1111/dc=example,dc=com", "localhost:1111");
runTest("ldaps://localhost/dc=example,dc=com", "localhost:636");
runTest(null, "localhost:389");
runTest("", "ConfigurationException");