4
|
1 |
/*
|
5555
|
2 |
* Copyright (c) 2001, 2004, Oracle and/or its affiliates. All rights reserved.
|
4
|
3 |
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
|
4 |
*
|
|
5 |
* This code is free software; you can redistribute it and/or modify it
|
|
6 |
* under the terms of the GNU General Public License version 2 only, as
|
5555
|
7 |
* published by the Free Software Foundation. Oracle designates this
|
4
|
8 |
* particular file as subject to the "Classpath" exception as provided
|
5555
|
9 |
* by Oracle in the LICENSE file that accompanied this code.
|
4
|
10 |
*
|
|
11 |
* This code is distributed in the hope that it will be useful, but WITHOUT
|
|
12 |
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
13 |
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
14 |
* version 2 for more details (a copy is included in the LICENSE file that
|
|
15 |
* accompanied this code).
|
|
16 |
*
|
|
17 |
* You should have received a copy of the GNU General Public License version
|
|
18 |
* 2 along with this work; if not, write to the Free Software Foundation,
|
|
19 |
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
20 |
*
|
5555
|
21 |
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
|
22 |
* or visit www.oracle.com if you need additional information or have any
|
|
23 |
* questions.
|
4
|
24 |
*/
|
|
25 |
|
|
26 |
package com.sun.corba.se.impl.transport;
|
|
27 |
|
|
28 |
import java.util.Hashtable;
|
|
29 |
|
|
30 |
import org.omg.CORBA.CompletionStatus;
|
|
31 |
import org.omg.CORBA.SystemException;
|
|
32 |
|
|
33 |
import com.sun.corba.se.pept.encoding.InputObject;
|
|
34 |
import com.sun.corba.se.pept.encoding.OutputObject;
|
|
35 |
import com.sun.corba.se.pept.protocol.MessageMediator;
|
|
36 |
|
|
37 |
import com.sun.corba.se.spi.logging.CORBALogDomains;
|
|
38 |
import com.sun.corba.se.spi.orb.ORB;
|
|
39 |
import com.sun.corba.se.spi.protocol.CorbaMessageMediator;
|
|
40 |
import com.sun.corba.se.spi.transport.CorbaConnection;
|
|
41 |
import com.sun.corba.se.spi.transport.CorbaResponseWaitingRoom;
|
|
42 |
|
|
43 |
import com.sun.corba.se.impl.encoding.BufferManagerReadStream;
|
|
44 |
import com.sun.corba.se.impl.encoding.CDRInputObject;
|
|
45 |
import com.sun.corba.se.impl.logging.ORBUtilSystemException;
|
|
46 |
import com.sun.corba.se.impl.orbutil.ORBUtility;
|
|
47 |
import com.sun.corba.se.impl.protocol.giopmsgheaders.LocateReplyOrReplyMessage;
|
|
48 |
import com.sun.corba.se.impl.protocol.giopmsgheaders.ReplyMessage;
|
|
49 |
|
|
50 |
/**
|
|
51 |
* @author Harold Carr
|
|
52 |
*/
|
|
53 |
public class CorbaResponseWaitingRoomImpl
|
|
54 |
implements
|
|
55 |
CorbaResponseWaitingRoom
|
|
56 |
{
|
|
57 |
final static class OutCallDesc
|
|
58 |
{
|
|
59 |
java.lang.Object done = new java.lang.Object();
|
|
60 |
Thread thread;
|
|
61 |
MessageMediator messageMediator;
|
|
62 |
SystemException exception;
|
|
63 |
InputObject inputObject;
|
|
64 |
}
|
|
65 |
|
|
66 |
private ORB orb;
|
|
67 |
private ORBUtilSystemException wrapper ;
|
|
68 |
|
|
69 |
private CorbaConnection connection;
|
|
70 |
// Maps requestId to an OutCallDesc.
|
|
71 |
private Hashtable out_calls = null; // REVISIT - use int hastable/map
|
|
72 |
|
|
73 |
public CorbaResponseWaitingRoomImpl(ORB orb, CorbaConnection connection)
|
|
74 |
{
|
|
75 |
this.orb = orb;
|
|
76 |
wrapper = ORBUtilSystemException.get( orb,
|
|
77 |
CORBALogDomains.RPC_TRANSPORT ) ;
|
|
78 |
this.connection = connection;
|
|
79 |
out_calls = new Hashtable();
|
|
80 |
}
|
|
81 |
|
|
82 |
////////////////////////////////////////////////////
|
|
83 |
//
|
|
84 |
// pept.transport.ResponseWaitingRoom
|
|
85 |
//
|
|
86 |
|
|
87 |
public void registerWaiter(MessageMediator mediator)
|
|
88 |
{
|
|
89 |
CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
|
|
90 |
|
|
91 |
if (orb.transportDebugFlag) {
|
|
92 |
dprint(".registerWaiter: " + opAndId(messageMediator));
|
|
93 |
}
|
|
94 |
|
|
95 |
Integer requestId = messageMediator.getRequestIdInteger();
|
|
96 |
|
|
97 |
OutCallDesc call = new OutCallDesc();
|
|
98 |
call.thread = Thread.currentThread();
|
|
99 |
call.messageMediator = messageMediator;
|
|
100 |
out_calls.put(requestId, call);
|
|
101 |
}
|
|
102 |
|
|
103 |
public void unregisterWaiter(MessageMediator mediator)
|
|
104 |
{
|
|
105 |
CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
|
|
106 |
|
|
107 |
if (orb.transportDebugFlag) {
|
|
108 |
dprint(".unregisterWaiter: " + opAndId(messageMediator));
|
|
109 |
}
|
|
110 |
|
|
111 |
Integer requestId = messageMediator.getRequestIdInteger();
|
|
112 |
|
|
113 |
out_calls.remove(requestId);
|
|
114 |
}
|
|
115 |
|
|
116 |
public InputObject waitForResponse(MessageMediator mediator)
|
|
117 |
{
|
|
118 |
CorbaMessageMediator messageMediator = (CorbaMessageMediator) mediator;
|
|
119 |
|
|
120 |
try {
|
|
121 |
|
|
122 |
InputObject returnStream = null;
|
|
123 |
|
|
124 |
if (orb.transportDebugFlag) {
|
|
125 |
dprint(".waitForResponse->: " + opAndId(messageMediator));
|
|
126 |
}
|
|
127 |
|
|
128 |
Integer requestId = messageMediator.getRequestIdInteger();
|
|
129 |
|
|
130 |
if (messageMediator.isOneWay()) {
|
|
131 |
// The waiter is removed in releaseReply in the same
|
|
132 |
// way as a normal request.
|
|
133 |
|
|
134 |
if (orb.transportDebugFlag) {
|
|
135 |
dprint(".waitForResponse: one way - not waiting: "
|
|
136 |
+ opAndId(messageMediator));
|
|
137 |
}
|
|
138 |
|
|
139 |
return null;
|
|
140 |
}
|
|
141 |
|
|
142 |
OutCallDesc call = (OutCallDesc)out_calls.get(requestId);
|
|
143 |
if (call == null) {
|
|
144 |
throw wrapper.nullOutCall(CompletionStatus.COMPLETED_MAYBE);
|
|
145 |
}
|
|
146 |
|
|
147 |
synchronized(call.done) {
|
|
148 |
|
|
149 |
while (call.inputObject == null && call.exception == null) {
|
|
150 |
// Wait for the reply from the server.
|
|
151 |
// The ReaderThread reads in the reply IIOP message
|
|
152 |
// and signals us.
|
|
153 |
try {
|
|
154 |
if (orb.transportDebugFlag) {
|
|
155 |
dprint(".waitForResponse: waiting: "
|
|
156 |
+ opAndId(messageMediator));
|
|
157 |
}
|
|
158 |
call.done.wait();
|
|
159 |
} catch (InterruptedException ie) {};
|
|
160 |
}
|
|
161 |
|
|
162 |
if (call.exception != null) {
|
|
163 |
if (orb.transportDebugFlag) {
|
|
164 |
dprint(".waitForResponse: exception: "
|
|
165 |
+ opAndId(messageMediator));
|
|
166 |
}
|
|
167 |
throw call.exception;
|
|
168 |
}
|
|
169 |
|
|
170 |
returnStream = call.inputObject;
|
|
171 |
}
|
|
172 |
|
|
173 |
// REVISIT -- exceptions from unmarshaling code will
|
|
174 |
// go up through this client thread!
|
|
175 |
|
|
176 |
if (returnStream != null) {
|
|
177 |
// On fragmented streams the header MUST be unmarshaled here
|
|
178 |
// (in the client thread) in case it blocks.
|
|
179 |
// If the header was already unmarshaled, this won't
|
|
180 |
// do anything
|
|
181 |
// REVISIT: cast - need interface method.
|
|
182 |
((CDRInputObject)returnStream).unmarshalHeader();
|
|
183 |
}
|
|
184 |
|
|
185 |
return returnStream;
|
|
186 |
|
|
187 |
} finally {
|
|
188 |
if (orb.transportDebugFlag) {
|
|
189 |
dprint(".waitForResponse<-: " + opAndId(messageMediator));
|
|
190 |
}
|
|
191 |
}
|
|
192 |
}
|
|
193 |
|
|
194 |
public void responseReceived(InputObject is)
|
|
195 |
{
|
|
196 |
CDRInputObject inputObject = (CDRInputObject) is;
|
|
197 |
LocateReplyOrReplyMessage header = (LocateReplyOrReplyMessage)
|
|
198 |
inputObject.getMessageHeader();
|
|
199 |
Integer requestId = new Integer(header.getRequestId());
|
|
200 |
OutCallDesc call = (OutCallDesc) out_calls.get(requestId);
|
|
201 |
|
|
202 |
if (orb.transportDebugFlag) {
|
|
203 |
dprint(".responseReceived: id/"
|
|
204 |
+ requestId + ": "
|
|
205 |
+ header);
|
|
206 |
}
|
|
207 |
|
|
208 |
// This is an interesting case. It could mean that someone sent us a
|
|
209 |
// reply message, but we don't know what request it was for. That
|
|
210 |
// would probably call for an error. However, there's another case
|
|
211 |
// that's normal and we should think about --
|
|
212 |
//
|
|
213 |
// If the unmarshaling thread does all of its work inbetween the time
|
|
214 |
// the ReaderThread gives it the last fragment and gets to the
|
|
215 |
// out_calls.get line, then it will also be null, so just return;
|
|
216 |
if (call == null) {
|
|
217 |
if (orb.transportDebugFlag) {
|
|
218 |
dprint(".responseReceived: id/"
|
|
219 |
+ requestId
|
|
220 |
+ ": no waiter: "
|
|
221 |
+ header);
|
|
222 |
}
|
|
223 |
return;
|
|
224 |
}
|
|
225 |
|
|
226 |
// Set the reply InputObject and signal the client thread
|
|
227 |
// that the reply has been received.
|
|
228 |
// The thread signalled will remove outcall descriptor if appropriate.
|
|
229 |
// Otherwise, it'll be removed when last fragment for it has been put on
|
|
230 |
// BufferManagerRead's queue.
|
|
231 |
synchronized (call.done) {
|
|
232 |
CorbaMessageMediator messageMediator = (CorbaMessageMediator)
|
|
233 |
call.messageMediator;
|
|
234 |
|
|
235 |
if (orb.transportDebugFlag) {
|
|
236 |
dprint(".responseReceived: "
|
|
237 |
+ opAndId(messageMediator)
|
|
238 |
+ ": notifying waiters");
|
|
239 |
}
|
|
240 |
|
|
241 |
messageMediator.setReplyHeader(header);
|
|
242 |
messageMediator.setInputObject(is);
|
|
243 |
inputObject.setMessageMediator(messageMediator);
|
|
244 |
call.inputObject = is;
|
|
245 |
call.done.notify();
|
|
246 |
}
|
|
247 |
}
|
|
248 |
|
|
249 |
public int numberRegistered()
|
|
250 |
{
|
|
251 |
// Note: Hashtable.size() is not synchronized
|
|
252 |
return out_calls.size();
|
|
253 |
}
|
|
254 |
|
|
255 |
//////////////////////////////////////////////////
|
|
256 |
//
|
|
257 |
// CorbaResponseWaitingRoom
|
|
258 |
//
|
|
259 |
|
|
260 |
public void signalExceptionToAllWaiters(SystemException systemException)
|
|
261 |
{
|
|
262 |
|
|
263 |
if (orb.transportDebugFlag) {
|
|
264 |
dprint(".signalExceptionToAllWaiters: " + systemException);
|
|
265 |
}
|
|
266 |
|
|
267 |
OutCallDesc call;
|
|
268 |
java.util.Enumeration e = out_calls.elements();
|
|
269 |
while(e.hasMoreElements()) {
|
|
270 |
call = (OutCallDesc) e.nextElement();
|
|
271 |
|
|
272 |
synchronized(call.done){
|
|
273 |
// anything waiting for BufferManagerRead's fragment queue
|
|
274 |
// needs to be cancelled
|
|
275 |
CorbaMessageMediator corbaMsgMediator =
|
|
276 |
(CorbaMessageMediator)call.messageMediator;
|
|
277 |
CDRInputObject inputObject =
|
|
278 |
(CDRInputObject)corbaMsgMediator.getInputObject();
|
|
279 |
// IMPORTANT: If inputObject is null, then no need to tell
|
|
280 |
// BufferManagerRead to cancel request processing.
|
|
281 |
if (inputObject != null) {
|
|
282 |
BufferManagerReadStream bufferManager =
|
|
283 |
(BufferManagerReadStream)inputObject.getBufferManager();
|
|
284 |
int requestId = corbaMsgMediator.getRequestId();
|
|
285 |
bufferManager.cancelProcessing(requestId);
|
|
286 |
}
|
|
287 |
call.inputObject = null;
|
|
288 |
call.exception = systemException;
|
|
289 |
call.done.notify();
|
|
290 |
}
|
|
291 |
}
|
|
292 |
}
|
|
293 |
|
|
294 |
public MessageMediator getMessageMediator(int requestId)
|
|
295 |
{
|
|
296 |
Integer id = new Integer(requestId);
|
|
297 |
OutCallDesc call = (OutCallDesc) out_calls.get(id);
|
|
298 |
if (call == null) {
|
|
299 |
// This can happen when getting early reply fragments for a
|
|
300 |
// request which has completed (e.g., client marshaling error).
|
|
301 |
return null;
|
|
302 |
}
|
|
303 |
return call.messageMediator;
|
|
304 |
}
|
|
305 |
|
|
306 |
////////////////////////////////////////////////////
|
|
307 |
//
|
|
308 |
// Implementation.
|
|
309 |
//
|
|
310 |
|
|
311 |
protected void dprint(String msg)
|
|
312 |
{
|
|
313 |
ORBUtility.dprint("CorbaResponseWaitingRoomImpl", msg);
|
|
314 |
}
|
|
315 |
|
|
316 |
protected String opAndId(CorbaMessageMediator mediator)
|
|
317 |
{
|
|
318 |
return ORBUtility.operationNameAndRequestId(mediator);
|
|
319 |
}
|
|
320 |
}
|
|
321 |
|
|
322 |
// End of file.
|