1 /* |
1 /* |
2 * Copyright (c) 1999, 2002, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 1999, 2011, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. Oracle designates this |
7 * published by the Free Software Foundation. Oracle designates this |
24 */ |
24 */ |
25 |
25 |
26 package com.sun.jndi.ldap; |
26 package com.sun.jndi.ldap; |
27 |
27 |
28 import java.io.IOException; |
28 import java.io.IOException; |
29 import java.util.Vector; |
29 import java.util.concurrent.BlockingQueue; |
|
30 import java.util.concurrent.LinkedBlockingQueue; |
30 import javax.naming.CommunicationException; |
31 import javax.naming.CommunicationException; |
31 |
32 |
32 final class LdapRequest { |
33 final class LdapRequest { |
33 |
34 |
34 LdapRequest next; // Set/read in synchronized Connection methods |
35 LdapRequest next; // Set/read in synchronized Connection methods |
35 int msgId; // read-only |
36 int msgId; // read-only |
36 |
37 |
37 private int gotten = 0; |
38 private int gotten = 0; |
38 private Vector replies = new Vector(3); |
39 private BlockingQueue<BerDecoder> replies; |
|
40 private int highWatermark = -1; |
39 private boolean cancelled = false; |
41 private boolean cancelled = false; |
40 private boolean pauseAfterReceipt = false; |
42 private boolean pauseAfterReceipt = false; |
41 private boolean completed = false; |
43 private boolean completed = false; |
42 |
44 |
43 LdapRequest(int msgId, boolean pause) { |
45 LdapRequest(int msgId, boolean pause) { |
|
46 this(msgId, pause, -1); |
|
47 } |
|
48 |
|
49 LdapRequest(int msgId, boolean pause, int replyQueueCapacity) { |
44 this.msgId = msgId; |
50 this.msgId = msgId; |
45 this.pauseAfterReceipt = pause; |
51 this.pauseAfterReceipt = pause; |
|
52 if (replyQueueCapacity == -1) { |
|
53 this.replies = new LinkedBlockingQueue<BerDecoder>(); |
|
54 } else { |
|
55 this.replies = |
|
56 new LinkedBlockingQueue<BerDecoder>(replyQueueCapacity); |
|
57 highWatermark = (replyQueueCapacity * 80) / 100; // 80% capacity |
|
58 } |
46 } |
59 } |
47 |
60 |
48 synchronized void cancel() { |
61 synchronized void cancel() { |
49 cancelled = true; |
62 cancelled = true; |
50 |
63 |
68 // ignore |
87 // ignore |
69 } |
88 } |
70 ber.reset(); |
89 ber.reset(); |
71 |
90 |
72 notify(); // notify anyone waiting for reply |
91 notify(); // notify anyone waiting for reply |
|
92 /* |
|
93 * If a queue capacity has been set then trigger a pause when the |
|
94 * queue has filled to 80% capacity. Later, when the queue has drained |
|
95 * then the reader gets unpaused. |
|
96 */ |
|
97 if (highWatermark != -1 && replies.size() >= highWatermark) { |
|
98 return true; // trigger the pause |
|
99 } |
73 return pauseAfterReceipt; |
100 return pauseAfterReceipt; |
74 } |
101 } |
75 |
102 |
76 synchronized BerDecoder getReplyBer() throws CommunicationException { |
103 synchronized BerDecoder getReplyBer() throws CommunicationException { |
77 if (cancelled) { |
104 if (cancelled) { |
78 throw new CommunicationException("Request: " + msgId + |
105 throw new CommunicationException("Request: " + msgId + |
79 " cancelled"); |
106 " cancelled"); |
80 } |
107 } |
81 |
108 |
82 if (gotten < replies.size()) { |
109 /* |
83 BerDecoder answer = (BerDecoder)replies.elementAt(gotten); |
110 * Remove a reply if the queue is not empty. |
84 replies.setElementAt(null, gotten); // remove reference |
111 * poll returns null if queue is empty. |
85 ++gotten; // skip to next |
112 */ |
86 return answer; |
113 BerDecoder reply = replies.poll(); |
87 } else { |
114 return reply; |
88 return null; |
|
89 } |
|
90 } |
115 } |
91 |
116 |
92 synchronized boolean hasSearchCompleted() { |
117 synchronized boolean hasSearchCompleted() { |
93 return completed; |
118 return completed; |
94 } |
119 } |