jdk/src/share/classes/com/sun/jndi/ldap/LdapRequest.java
changeset 8564 d99f879a35ab
parent 5506 202f599c92aa
child 25808 e113d0a0fde0
--- a/jdk/src/share/classes/com/sun/jndi/ldap/LdapRequest.java	Thu Mar 03 15:34:09 2011 +0000
+++ b/jdk/src/share/classes/com/sun/jndi/ldap/LdapRequest.java	Thu Mar 03 16:51:03 2011 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 1999, 2002, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 1999, 2011, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -26,7 +26,8 @@
 package com.sun.jndi.ldap;
 
 import java.io.IOException;
-import java.util.Vector;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import javax.naming.CommunicationException;
 
 final class LdapRequest {
@@ -35,14 +36,26 @@
     int msgId;          // read-only
 
     private int gotten = 0;
-    private Vector replies = new Vector(3);
+    private BlockingQueue<BerDecoder> replies;
+    private int highWatermark = -1;
     private boolean cancelled = false;
     private boolean pauseAfterReceipt = false;
     private boolean completed = false;
 
     LdapRequest(int msgId, boolean pause) {
+        this(msgId, pause, -1);
+    }
+
+    LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
         this.msgId = msgId;
         this.pauseAfterReceipt = pause;
+        if (replyQueueCapacity == -1) {
+            this.replies = new LinkedBlockingQueue<BerDecoder>();
+        } else {
+            this.replies =
+                new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity);
+            highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity
+        }
     }
 
     synchronized void cancel() {
@@ -57,7 +70,13 @@
         if (cancelled) {
             return false;
         }
-        replies.addElement(ber);
+
+        // Add a new reply to the queue of unprocessed replies.
+        try {
+            replies.put(ber);
+        } catch (InterruptedException e) {
+            // ignore
+        }
 
         // peek at the BER buffer to check if it is a SearchResultDone PDU
         try {
@@ -70,6 +89,14 @@
         ber.reset();
 
         notify(); // notify anyone waiting for reply
+        /*
+         * If a queue capacity has been set then trigger a pause when the
+         * queue has filled to 80% capacity. Later, when the queue has drained
+         * then the reader gets unpaused.
+         */
+        if (highWatermark != -1 && replies.size() >= highWatermark) {
+            return true; // trigger the pause
+        }
         return pauseAfterReceipt;
     }
 
@@ -79,14 +106,12 @@
                 " cancelled");
         }
 
-        if (gotten < replies.size()) {
-            BerDecoder answer = (BerDecoder)replies.elementAt(gotten);
-            replies.setElementAt(null, gotten); // remove reference
-            ++gotten; // skip to next
-            return answer;
-        } else {
-            return null;
-        }
+        /*
+         * Remove a reply if the queue is not empty.
+         * poll returns null if queue is empty.
+         */
+        BerDecoder reply = replies.poll();
+        return reply;
     }
 
     synchronized boolean hasSearchCompleted() {