--- a/jdk/src/share/classes/com/sun/jndi/ldap/LdapRequest.java Thu Mar 03 15:34:09 2011 +0000
+++ b/jdk/src/share/classes/com/sun/jndi/ldap/LdapRequest.java Thu Mar 03 16:51:03 2011 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 1999, 2002, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1999, 2011, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -26,7 +26,8 @@
package com.sun.jndi.ldap;
import java.io.IOException;
-import java.util.Vector;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import javax.naming.CommunicationException;
final class LdapRequest {
@@ -35,14 +36,26 @@
int msgId; // read-only
private int gotten = 0;
- private Vector replies = new Vector(3);
+ private BlockingQueue<BerDecoder> replies;
+ private int highWatermark = -1;
private boolean cancelled = false;
private boolean pauseAfterReceipt = false;
private boolean completed = false;
LdapRequest(int msgId, boolean pause) {
+ this(msgId, pause, -1);
+ }
+
+ LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
this.msgId = msgId;
this.pauseAfterReceipt = pause;
+ if (replyQueueCapacity == -1) {
+ this.replies = new LinkedBlockingQueue<BerDecoder>();
+ } else {
+ this.replies =
+ new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity);
+ highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity
+ }
}
synchronized void cancel() {
@@ -57,7 +70,13 @@
if (cancelled) {
return false;
}
- replies.addElement(ber);
+
+ // Add a new reply to the queue of unprocessed replies.
+ try {
+ replies.put(ber);
+ } catch (InterruptedException e) {
+ // ignore
+ }
// peek at the BER buffer to check if it is a SearchResultDone PDU
try {
@@ -70,6 +89,14 @@
ber.reset();
notify(); // notify anyone waiting for reply
+ /*
+ * If a queue capacity has been set then trigger a pause when the
+ * queue has filled to 80% capacity. Later, when the queue has drained
+ * then the reader gets unpaused.
+ */
+ if (highWatermark != -1 && replies.size() >= highWatermark) {
+ return true; // trigger the pause
+ }
return pauseAfterReceipt;
}
@@ -79,14 +106,12 @@
" cancelled");
}
- if (gotten < replies.size()) {
- BerDecoder answer = (BerDecoder)replies.elementAt(gotten);
- replies.setElementAt(null, gotten); // remove reference
- ++gotten; // skip to next
- return answer;
- } else {
- return null;
- }
+ /*
+ * Remove a reply if the queue is not empty.
+ * poll returns null if queue is empty.
+ */
+ BerDecoder reply = replies.poll();
+ return reply;
}
synchronized boolean hasSearchCompleted() {