jdk/src/share/classes/com/sun/jndi/ldap/LdapRequest.java
changeset 8564 d99f879a35ab
parent 5506 202f599c92aa
child 25808 e113d0a0fde0
equal deleted inserted replaced
8561:ca8d6ccdd9dc 8564:d99f879a35ab
     1 /*
     1 /*
     2  * Copyright (c) 1999, 2002, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 1999, 2011, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     7  * published by the Free Software Foundation.  Oracle designates this
    24  */
    24  */
    25 
    25 
    26 package com.sun.jndi.ldap;
    26 package com.sun.jndi.ldap;
    27 
    27 
    28 import java.io.IOException;
    28 import java.io.IOException;
    29 import java.util.Vector;
    29 import java.util.concurrent.BlockingQueue;
       
    30 import java.util.concurrent.LinkedBlockingQueue;
    30 import javax.naming.CommunicationException;
    31 import javax.naming.CommunicationException;
    31 
    32 
    32 final class LdapRequest {
    33 final class LdapRequest {
    33 
    34 
    34     LdapRequest next;   // Set/read in synchronized Connection methods
    35     LdapRequest next;   // Set/read in synchronized Connection methods
    35     int msgId;          // read-only
    36     int msgId;          // read-only
    36 
    37 
    37     private int gotten = 0;
    38     private int gotten = 0;
    38     private Vector replies = new Vector(3);
    39     private BlockingQueue<BerDecoder> replies;
       
    40     private int highWatermark = -1;
    39     private boolean cancelled = false;
    41     private boolean cancelled = false;
    40     private boolean pauseAfterReceipt = false;
    42     private boolean pauseAfterReceipt = false;
    41     private boolean completed = false;
    43     private boolean completed = false;
    42 
    44 
    43     LdapRequest(int msgId, boolean pause) {
    45     LdapRequest(int msgId, boolean pause) {
       
    46         this(msgId, pause, -1);
       
    47     }
       
    48 
       
    49     LdapRequest(int msgId, boolean pause, int replyQueueCapacity) {
    44         this.msgId = msgId;
    50         this.msgId = msgId;
    45         this.pauseAfterReceipt = pause;
    51         this.pauseAfterReceipt = pause;
       
    52         if (replyQueueCapacity == -1) {
       
    53             this.replies = new LinkedBlockingQueue<BerDecoder>();
       
    54         } else {
       
    55             this.replies =
       
    56                 new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity);
       
    57             highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity
       
    58         }
    46     }
    59     }
    47 
    60 
    48     synchronized void cancel() {
    61     synchronized void cancel() {
    49         cancelled = true;
    62         cancelled = true;
    50 
    63 
    55 
    68 
    56     synchronized boolean addReplyBer(BerDecoder ber) {
    69     synchronized boolean addReplyBer(BerDecoder ber) {
    57         if (cancelled) {
    70         if (cancelled) {
    58             return false;
    71             return false;
    59         }
    72         }
    60         replies.addElement(ber);
    73 
       
    74         // Add a new reply to the queue of unprocessed replies.
       
    75         try {
       
    76             replies.put(ber);
       
    77         } catch (InterruptedException e) {
       
    78             // ignore
       
    79         }
    61 
    80 
    62         // peek at the BER buffer to check if it is a SearchResultDone PDU
    81         // peek at the BER buffer to check if it is a SearchResultDone PDU
    63         try {
    82         try {
    64             ber.parseSeq(null);
    83             ber.parseSeq(null);
    65             ber.parseInt();
    84             ber.parseInt();
    68             // ignore
    87             // ignore
    69         }
    88         }
    70         ber.reset();
    89         ber.reset();
    71 
    90 
    72         notify(); // notify anyone waiting for reply
    91         notify(); // notify anyone waiting for reply
       
    92         /*
       
    93          * If a queue capacity has been set then trigger a pause when the
       
    94          * queue has filled to 80% capacity. Later, when the queue has drained
       
    95          * then the reader gets unpaused.
       
    96          */
       
    97         if (highWatermark != -1 && replies.size() >= highWatermark) {
       
    98             return true; // trigger the pause
       
    99         }
    73         return pauseAfterReceipt;
   100         return pauseAfterReceipt;
    74     }
   101     }
    75 
   102 
    76     synchronized BerDecoder getReplyBer() throws CommunicationException {
   103     synchronized BerDecoder getReplyBer() throws CommunicationException {
    77         if (cancelled) {
   104         if (cancelled) {
    78             throw new CommunicationException("Request: " + msgId +
   105             throw new CommunicationException("Request: " + msgId +
    79                 " cancelled");
   106                 " cancelled");
    80         }
   107         }
    81 
   108 
    82         if (gotten < replies.size()) {
   109         /*
    83             BerDecoder answer = (BerDecoder)replies.elementAt(gotten);
   110          * Remove a reply if the queue is not empty.
    84             replies.setElementAt(null, gotten); // remove reference
   111          * poll returns null if queue is empty.
    85             ++gotten; // skip to next
   112          */
    86             return answer;
   113         BerDecoder reply = replies.poll();
    87         } else {
   114         return reply;
    88             return null;
       
    89         }
       
    90     }
   115     }
    91 
   116 
    92     synchronized boolean hasSearchCompleted() {
   117     synchronized boolean hasSearchCompleted() {
    93         return completed;
   118         return completed;
    94     }
   119     }