corba/src/share/classes/com/sun/corba/se/impl/encoding/BufferManagerReadStream.java
author tbell
Mon, 20 Apr 2009 00:12:19 -0700
changeset 2664 a0a22a8f16bd
parent 4 02bb8761fcce
child 3291 805a72a26925
permissions -rw-r--r--
6372405: Server thread hangs when fragments don't complete because of connection abort 5104239: Java: thread deadlock 6191561: JCK15: api/org_omg/PortableInterceptor/ClientRequestInfo/index.html#RIMethods sometime hang 6486322: org.omg.CORBA.ORB.init() thread safety issue 6420980: Security issue with the com.sun.corba.se.impl.orbutil.ORBUtility class 6465377: NullPointerException for RMI ORB in 1.5.0_08 6553303: Corba application fails w/ org.omg.CORBA.COMM_FAILURE: vmcid: SUN minor code: 203 completed: No 6438259: Wrong repository ID generated by IDLJ Reviewed-by: darcy
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
4
02bb8761fcce Initial load
duke
parents:
diff changeset
     1
/*
2664
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
     2
 * Copyright 2000-2008 Sun Microsystems, Inc.  All Rights Reserved.
4
02bb8761fcce Initial load
duke
parents:
diff changeset
     3
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
02bb8761fcce Initial load
duke
parents:
diff changeset
     4
 *
02bb8761fcce Initial load
duke
parents:
diff changeset
     5
 * This code is free software; you can redistribute it and/or modify it
02bb8761fcce Initial load
duke
parents:
diff changeset
     6
 * under the terms of the GNU General Public License version 2 only, as
02bb8761fcce Initial load
duke
parents:
diff changeset
     7
 * published by the Free Software Foundation.  Sun designates this
02bb8761fcce Initial load
duke
parents:
diff changeset
     8
 * particular file as subject to the "Classpath" exception as provided
02bb8761fcce Initial load
duke
parents:
diff changeset
     9
 * by Sun in the LICENSE file that accompanied this code.
02bb8761fcce Initial load
duke
parents:
diff changeset
    10
 *
02bb8761fcce Initial load
duke
parents:
diff changeset
    11
 * This code is distributed in the hope that it will be useful, but WITHOUT
02bb8761fcce Initial load
duke
parents:
diff changeset
    12
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
02bb8761fcce Initial load
duke
parents:
diff changeset
    13
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
02bb8761fcce Initial load
duke
parents:
diff changeset
    14
 * version 2 for more details (a copy is included in the LICENSE file that
02bb8761fcce Initial load
duke
parents:
diff changeset
    15
 * accompanied this code).
02bb8761fcce Initial load
duke
parents:
diff changeset
    16
 *
02bb8761fcce Initial load
duke
parents:
diff changeset
    17
 * You should have received a copy of the GNU General Public License version
02bb8761fcce Initial load
duke
parents:
diff changeset
    18
 * 2 along with this work; if not, write to the Free Software Foundation,
02bb8761fcce Initial load
duke
parents:
diff changeset
    19
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
02bb8761fcce Initial load
duke
parents:
diff changeset
    20
 *
02bb8761fcce Initial load
duke
parents:
diff changeset
    21
 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
02bb8761fcce Initial load
duke
parents:
diff changeset
    22
 * CA 95054 USA or visit www.sun.com if you need additional information or
02bb8761fcce Initial load
duke
parents:
diff changeset
    23
 * have any questions.
02bb8761fcce Initial load
duke
parents:
diff changeset
    24
 */
02bb8761fcce Initial load
duke
parents:
diff changeset
    25
package com.sun.corba.se.impl.encoding;
02bb8761fcce Initial load
duke
parents:
diff changeset
    26
02bb8761fcce Initial load
duke
parents:
diff changeset
    27
import java.nio.ByteBuffer;
02bb8761fcce Initial load
duke
parents:
diff changeset
    28
import com.sun.corba.se.pept.transport.ByteBufferPool;
02bb8761fcce Initial load
duke
parents:
diff changeset
    29
import com.sun.corba.se.spi.logging.CORBALogDomains;
02bb8761fcce Initial load
duke
parents:
diff changeset
    30
import com.sun.corba.se.spi.orb.ORB;
02bb8761fcce Initial load
duke
parents:
diff changeset
    31
import com.sun.corba.se.impl.logging.ORBUtilSystemException;
02bb8761fcce Initial load
duke
parents:
diff changeset
    32
import com.sun.corba.se.impl.orbutil.ORBUtility;
02bb8761fcce Initial load
duke
parents:
diff changeset
    33
import com.sun.corba.se.impl.protocol.RequestCanceledException;
02bb8761fcce Initial load
duke
parents:
diff changeset
    34
import com.sun.corba.se.impl.protocol.giopmsgheaders.FragmentMessage;
02bb8761fcce Initial load
duke
parents:
diff changeset
    35
import com.sun.corba.se.impl.protocol.giopmsgheaders.Message;
02bb8761fcce Initial load
duke
parents:
diff changeset
    36
import java.util.*;
02bb8761fcce Initial load
duke
parents:
diff changeset
    37
02bb8761fcce Initial load
duke
parents:
diff changeset
    38
public class BufferManagerReadStream
02bb8761fcce Initial load
duke
parents:
diff changeset
    39
    implements BufferManagerRead, MarkAndResetHandler
02bb8761fcce Initial load
duke
parents:
diff changeset
    40
{
02bb8761fcce Initial load
duke
parents:
diff changeset
    41
    private boolean receivedCancel = false;
02bb8761fcce Initial load
duke
parents:
diff changeset
    42
    private int cancelReqId = 0;
02bb8761fcce Initial load
duke
parents:
diff changeset
    43
02bb8761fcce Initial load
duke
parents:
diff changeset
    44
    // We should convert endOfStream to a final static dummy end node
02bb8761fcce Initial load
duke
parents:
diff changeset
    45
    private boolean endOfStream = true;
02bb8761fcce Initial load
duke
parents:
diff changeset
    46
    private BufferQueue fragmentQueue = new BufferQueue();
2664
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
    47
    private long FRAGMENT_TIMEOUT = 60000;
4
02bb8761fcce Initial load
duke
parents:
diff changeset
    48
02bb8761fcce Initial load
duke
parents:
diff changeset
    49
    // REVISIT - This should go in BufferManagerRead. But, since
02bb8761fcce Initial load
duke
parents:
diff changeset
    50
    //           BufferManagerRead is an interface. BufferManagerRead
02bb8761fcce Initial load
duke
parents:
diff changeset
    51
    //           might ought to be an abstract class instead of an
02bb8761fcce Initial load
duke
parents:
diff changeset
    52
    //           interface.
02bb8761fcce Initial load
duke
parents:
diff changeset
    53
    private ORB orb ;
02bb8761fcce Initial load
duke
parents:
diff changeset
    54
    private ORBUtilSystemException wrapper ;
02bb8761fcce Initial load
duke
parents:
diff changeset
    55
    private boolean debug = false;
02bb8761fcce Initial load
duke
parents:
diff changeset
    56
02bb8761fcce Initial load
duke
parents:
diff changeset
    57
    BufferManagerReadStream( ORB orb )
02bb8761fcce Initial load
duke
parents:
diff changeset
    58
    {
02bb8761fcce Initial load
duke
parents:
diff changeset
    59
        this.orb = orb ;
02bb8761fcce Initial load
duke
parents:
diff changeset
    60
        this.wrapper = ORBUtilSystemException.get( orb,
02bb8761fcce Initial load
duke
parents:
diff changeset
    61
            CORBALogDomains.RPC_ENCODING ) ;
02bb8761fcce Initial load
duke
parents:
diff changeset
    62
        debug = orb.transportDebugFlag;
02bb8761fcce Initial load
duke
parents:
diff changeset
    63
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
    64
02bb8761fcce Initial load
duke
parents:
diff changeset
    65
    public void cancelProcessing(int requestId) {
02bb8761fcce Initial load
duke
parents:
diff changeset
    66
        synchronized(fragmentQueue) {
02bb8761fcce Initial load
duke
parents:
diff changeset
    67
            receivedCancel = true;
02bb8761fcce Initial load
duke
parents:
diff changeset
    68
            cancelReqId = requestId;
02bb8761fcce Initial load
duke
parents:
diff changeset
    69
            fragmentQueue.notify();
02bb8761fcce Initial load
duke
parents:
diff changeset
    70
        }
02bb8761fcce Initial load
duke
parents:
diff changeset
    71
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
    72
02bb8761fcce Initial load
duke
parents:
diff changeset
    73
    public void processFragment(ByteBuffer byteBuffer, FragmentMessage msg)
02bb8761fcce Initial load
duke
parents:
diff changeset
    74
    {
02bb8761fcce Initial load
duke
parents:
diff changeset
    75
        ByteBufferWithInfo bbwi =
02bb8761fcce Initial load
duke
parents:
diff changeset
    76
            new ByteBufferWithInfo(orb, byteBuffer, msg.getHeaderLength());
02bb8761fcce Initial load
duke
parents:
diff changeset
    77
02bb8761fcce Initial load
duke
parents:
diff changeset
    78
        synchronized (fragmentQueue) {
02bb8761fcce Initial load
duke
parents:
diff changeset
    79
            if (debug)
02bb8761fcce Initial load
duke
parents:
diff changeset
    80
            {
02bb8761fcce Initial load
duke
parents:
diff changeset
    81
                // print address of ByteBuffer being queued
02bb8761fcce Initial load
duke
parents:
diff changeset
    82
                int bbAddress = System.identityHashCode(byteBuffer);
02bb8761fcce Initial load
duke
parents:
diff changeset
    83
                StringBuffer sb = new StringBuffer(80);
02bb8761fcce Initial load
duke
parents:
diff changeset
    84
                sb.append("processFragment() - queueing ByteBuffer id (");
02bb8761fcce Initial load
duke
parents:
diff changeset
    85
                sb.append(bbAddress).append(") to fragment queue.");
02bb8761fcce Initial load
duke
parents:
diff changeset
    86
                String strMsg = sb.toString();
02bb8761fcce Initial load
duke
parents:
diff changeset
    87
                dprint(strMsg);
02bb8761fcce Initial load
duke
parents:
diff changeset
    88
            }
02bb8761fcce Initial load
duke
parents:
diff changeset
    89
            fragmentQueue.enqueue(bbwi);
02bb8761fcce Initial load
duke
parents:
diff changeset
    90
            endOfStream = !msg.moreFragmentsToFollow();
02bb8761fcce Initial load
duke
parents:
diff changeset
    91
            fragmentQueue.notify();
02bb8761fcce Initial load
duke
parents:
diff changeset
    92
        }
02bb8761fcce Initial load
duke
parents:
diff changeset
    93
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
    94
02bb8761fcce Initial load
duke
parents:
diff changeset
    95
    public ByteBufferWithInfo underflow (ByteBufferWithInfo bbwi)
02bb8761fcce Initial load
duke
parents:
diff changeset
    96
    {
02bb8761fcce Initial load
duke
parents:
diff changeset
    97
02bb8761fcce Initial load
duke
parents:
diff changeset
    98
      ByteBufferWithInfo result = null;
02bb8761fcce Initial load
duke
parents:
diff changeset
    99
02bb8761fcce Initial load
duke
parents:
diff changeset
   100
      try {
02bb8761fcce Initial load
duke
parents:
diff changeset
   101
          //System.out.println("ENTER underflow");
02bb8761fcce Initial load
duke
parents:
diff changeset
   102
02bb8761fcce Initial load
duke
parents:
diff changeset
   103
        synchronized (fragmentQueue) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   104
02bb8761fcce Initial load
duke
parents:
diff changeset
   105
            if (receivedCancel) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   106
                throw new RequestCanceledException(cancelReqId);
02bb8761fcce Initial load
duke
parents:
diff changeset
   107
            }
02bb8761fcce Initial load
duke
parents:
diff changeset
   108
02bb8761fcce Initial load
duke
parents:
diff changeset
   109
            while (fragmentQueue.size() == 0) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   110
02bb8761fcce Initial load
duke
parents:
diff changeset
   111
                if (endOfStream) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   112
                    throw wrapper.endOfStream() ;
02bb8761fcce Initial load
duke
parents:
diff changeset
   113
                }
02bb8761fcce Initial load
duke
parents:
diff changeset
   114
2664
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
   115
                boolean interrupted = false;
4
02bb8761fcce Initial load
duke
parents:
diff changeset
   116
                try {
2664
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
   117
                    fragmentQueue.wait(FRAGMENT_TIMEOUT);
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
   118
                } catch (InterruptedException e) {
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
   119
                    interrupted = true;
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
   120
                }
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
   121
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
   122
                if (!interrupted && fragmentQueue.size() == 0) {
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
   123
                    throw wrapper.bufferReadManagerTimeout();
a0a22a8f16bd 6372405: Server thread hangs when fragments don't complete because of connection abort
tbell
parents: 4
diff changeset
   124
                }
4
02bb8761fcce Initial load
duke
parents:
diff changeset
   125
02bb8761fcce Initial load
duke
parents:
diff changeset
   126
                if (receivedCancel) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   127
                    throw new RequestCanceledException(cancelReqId);
02bb8761fcce Initial load
duke
parents:
diff changeset
   128
                }
02bb8761fcce Initial load
duke
parents:
diff changeset
   129
            }
02bb8761fcce Initial load
duke
parents:
diff changeset
   130
02bb8761fcce Initial load
duke
parents:
diff changeset
   131
            result = fragmentQueue.dequeue();
02bb8761fcce Initial load
duke
parents:
diff changeset
   132
            result.fragmented = true;
02bb8761fcce Initial load
duke
parents:
diff changeset
   133
02bb8761fcce Initial load
duke
parents:
diff changeset
   134
            if (debug)
02bb8761fcce Initial load
duke
parents:
diff changeset
   135
            {
02bb8761fcce Initial load
duke
parents:
diff changeset
   136
                // print address of ByteBuffer being dequeued
02bb8761fcce Initial load
duke
parents:
diff changeset
   137
                int bbAddr = System.identityHashCode(result.byteBuffer);
02bb8761fcce Initial load
duke
parents:
diff changeset
   138
                StringBuffer sb1 = new StringBuffer(80);
02bb8761fcce Initial load
duke
parents:
diff changeset
   139
                sb1.append("underflow() - dequeued ByteBuffer id (");
02bb8761fcce Initial load
duke
parents:
diff changeset
   140
                sb1.append(bbAddr).append(") from fragment queue.");
02bb8761fcce Initial load
duke
parents:
diff changeset
   141
                String msg1 = sb1.toString();
02bb8761fcce Initial load
duke
parents:
diff changeset
   142
                dprint(msg1);
02bb8761fcce Initial load
duke
parents:
diff changeset
   143
            }
02bb8761fcce Initial load
duke
parents:
diff changeset
   144
02bb8761fcce Initial load
duke
parents:
diff changeset
   145
            // VERY IMPORTANT
02bb8761fcce Initial load
duke
parents:
diff changeset
   146
            // Release bbwi.byteBuffer to the ByteBufferPool only if
02bb8761fcce Initial load
duke
parents:
diff changeset
   147
            // this BufferManagerStream is not marked for potential restore.
02bb8761fcce Initial load
duke
parents:
diff changeset
   148
            if (markEngaged == false && bbwi != null && bbwi.byteBuffer != null)
02bb8761fcce Initial load
duke
parents:
diff changeset
   149
            {
02bb8761fcce Initial load
duke
parents:
diff changeset
   150
                ByteBufferPool byteBufferPool = getByteBufferPool();
02bb8761fcce Initial load
duke
parents:
diff changeset
   151
02bb8761fcce Initial load
duke
parents:
diff changeset
   152
                if (debug)
02bb8761fcce Initial load
duke
parents:
diff changeset
   153
                {
02bb8761fcce Initial load
duke
parents:
diff changeset
   154
                    // print address of ByteBuffer being released
02bb8761fcce Initial load
duke
parents:
diff changeset
   155
                    int bbAddress = System.identityHashCode(bbwi.byteBuffer);
02bb8761fcce Initial load
duke
parents:
diff changeset
   156
                    StringBuffer sb = new StringBuffer(80);
02bb8761fcce Initial load
duke
parents:
diff changeset
   157
                    sb.append("underflow() - releasing ByteBuffer id (");
02bb8761fcce Initial load
duke
parents:
diff changeset
   158
                    sb.append(bbAddress).append(") to ByteBufferPool.");
02bb8761fcce Initial load
duke
parents:
diff changeset
   159
                    String msg = sb.toString();
02bb8761fcce Initial load
duke
parents:
diff changeset
   160
                    dprint(msg);
02bb8761fcce Initial load
duke
parents:
diff changeset
   161
                }
02bb8761fcce Initial load
duke
parents:
diff changeset
   162
02bb8761fcce Initial load
duke
parents:
diff changeset
   163
                byteBufferPool.releaseByteBuffer(bbwi.byteBuffer);
02bb8761fcce Initial load
duke
parents:
diff changeset
   164
                bbwi.byteBuffer = null;
02bb8761fcce Initial load
duke
parents:
diff changeset
   165
                bbwi = null;
02bb8761fcce Initial load
duke
parents:
diff changeset
   166
            }
02bb8761fcce Initial load
duke
parents:
diff changeset
   167
        }
02bb8761fcce Initial load
duke
parents:
diff changeset
   168
        return result;
02bb8761fcce Initial load
duke
parents:
diff changeset
   169
      } finally {
02bb8761fcce Initial load
duke
parents:
diff changeset
   170
          //System.out.println("EXIT underflow");
02bb8761fcce Initial load
duke
parents:
diff changeset
   171
      }
02bb8761fcce Initial load
duke
parents:
diff changeset
   172
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
   173
02bb8761fcce Initial load
duke
parents:
diff changeset
   174
    public void init(Message msg) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   175
        if (msg != null)
02bb8761fcce Initial load
duke
parents:
diff changeset
   176
            endOfStream = !msg.moreFragmentsToFollow();
02bb8761fcce Initial load
duke
parents:
diff changeset
   177
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
   178
02bb8761fcce Initial load
duke
parents:
diff changeset
   179
    // Release any queued ByteBufferWithInfo's byteBuffers to the
02bb8761fcce Initial load
duke
parents:
diff changeset
   180
    // ByteBufferPoool
02bb8761fcce Initial load
duke
parents:
diff changeset
   181
    public void close(ByteBufferWithInfo bbwi)
02bb8761fcce Initial load
duke
parents:
diff changeset
   182
    {
02bb8761fcce Initial load
duke
parents:
diff changeset
   183
        int inputBbAddress = 0;
02bb8761fcce Initial load
duke
parents:
diff changeset
   184
02bb8761fcce Initial load
duke
parents:
diff changeset
   185
        // release ByteBuffers on fragmentQueue
02bb8761fcce Initial load
duke
parents:
diff changeset
   186
        if (fragmentQueue != null)
02bb8761fcce Initial load
duke
parents:
diff changeset
   187
        {
02bb8761fcce Initial load
duke
parents:
diff changeset
   188
            synchronized (fragmentQueue)
02bb8761fcce Initial load
duke
parents:
diff changeset
   189
            {
02bb8761fcce Initial load
duke
parents:
diff changeset
   190
                // IMPORTANT: The fragment queue may have one ByteBuffer
02bb8761fcce Initial load
duke
parents:
diff changeset
   191
                //            on it that's also on the CDRInputStream if
02bb8761fcce Initial load
duke
parents:
diff changeset
   192
                //            this method is called when the stream is 'marked'.
02bb8761fcce Initial load
duke
parents:
diff changeset
   193
                //            Thus, we'll compare the ByteBuffer passed
02bb8761fcce Initial load
duke
parents:
diff changeset
   194
                //            in (from a CDRInputStream) with all ByteBuffers
02bb8761fcce Initial load
duke
parents:
diff changeset
   195
                //            on the stack. If one is found to equal, it will
02bb8761fcce Initial load
duke
parents:
diff changeset
   196
                //            not be released to the ByteBufferPool.
02bb8761fcce Initial load
duke
parents:
diff changeset
   197
                if (bbwi != null)
02bb8761fcce Initial load
duke
parents:
diff changeset
   198
                {
02bb8761fcce Initial load
duke
parents:
diff changeset
   199
                    inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
02bb8761fcce Initial load
duke
parents:
diff changeset
   200
                }
02bb8761fcce Initial load
duke
parents:
diff changeset
   201
02bb8761fcce Initial load
duke
parents:
diff changeset
   202
                ByteBufferWithInfo abbwi = null;
02bb8761fcce Initial load
duke
parents:
diff changeset
   203
                ByteBufferPool byteBufferPool = getByteBufferPool();
02bb8761fcce Initial load
duke
parents:
diff changeset
   204
                while (fragmentQueue.size() != 0)
02bb8761fcce Initial load
duke
parents:
diff changeset
   205
                {
02bb8761fcce Initial load
duke
parents:
diff changeset
   206
                    abbwi = fragmentQueue.dequeue();
02bb8761fcce Initial load
duke
parents:
diff changeset
   207
                    if (abbwi != null && abbwi.byteBuffer != null)
02bb8761fcce Initial load
duke
parents:
diff changeset
   208
                    {
02bb8761fcce Initial load
duke
parents:
diff changeset
   209
                        int bbAddress = System.identityHashCode(abbwi.byteBuffer);
02bb8761fcce Initial load
duke
parents:
diff changeset
   210
                        if (inputBbAddress != bbAddress)
02bb8761fcce Initial load
duke
parents:
diff changeset
   211
                        {
02bb8761fcce Initial load
duke
parents:
diff changeset
   212
                            if (debug)
02bb8761fcce Initial load
duke
parents:
diff changeset
   213
                            {
02bb8761fcce Initial load
duke
parents:
diff changeset
   214
                                 // print address of ByteBuffer released
02bb8761fcce Initial load
duke
parents:
diff changeset
   215
                                 StringBuffer sb = new StringBuffer(80);
02bb8761fcce Initial load
duke
parents:
diff changeset
   216
                                 sb.append("close() - fragmentQueue is ")
02bb8761fcce Initial load
duke
parents:
diff changeset
   217
                                   .append("releasing ByteBuffer id (")
02bb8761fcce Initial load
duke
parents:
diff changeset
   218
                                   .append(bbAddress).append(") to ")
02bb8761fcce Initial load
duke
parents:
diff changeset
   219
                                   .append("ByteBufferPool.");
02bb8761fcce Initial load
duke
parents:
diff changeset
   220
                                 String msg = sb.toString();
02bb8761fcce Initial load
duke
parents:
diff changeset
   221
                                 dprint(msg);
02bb8761fcce Initial load
duke
parents:
diff changeset
   222
                            }
02bb8761fcce Initial load
duke
parents:
diff changeset
   223
                        }
02bb8761fcce Initial load
duke
parents:
diff changeset
   224
                        byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
02bb8761fcce Initial load
duke
parents:
diff changeset
   225
                    }
02bb8761fcce Initial load
duke
parents:
diff changeset
   226
                }
02bb8761fcce Initial load
duke
parents:
diff changeset
   227
            }
02bb8761fcce Initial load
duke
parents:
diff changeset
   228
            fragmentQueue = null;
02bb8761fcce Initial load
duke
parents:
diff changeset
   229
        }
02bb8761fcce Initial load
duke
parents:
diff changeset
   230
02bb8761fcce Initial load
duke
parents:
diff changeset
   231
        // release ByteBuffers on fragmentStack
02bb8761fcce Initial load
duke
parents:
diff changeset
   232
        if (fragmentStack != null && fragmentStack.size() != 0)
02bb8761fcce Initial load
duke
parents:
diff changeset
   233
        {
02bb8761fcce Initial load
duke
parents:
diff changeset
   234
            // IMPORTANT: The fragment stack may have one ByteBuffer
02bb8761fcce Initial load
duke
parents:
diff changeset
   235
            //            on it that's also on the CDRInputStream if
02bb8761fcce Initial load
duke
parents:
diff changeset
   236
            //            this method is called when the stream is 'marked'.
02bb8761fcce Initial load
duke
parents:
diff changeset
   237
            //            Thus, we'll compare the ByteBuffer passed
02bb8761fcce Initial load
duke
parents:
diff changeset
   238
            //            in (from a CDRInputStream) with all ByteBuffers
02bb8761fcce Initial load
duke
parents:
diff changeset
   239
            //            on the stack. If one is found to equal, it will
02bb8761fcce Initial load
duke
parents:
diff changeset
   240
            //            not be released to the ByteBufferPool.
02bb8761fcce Initial load
duke
parents:
diff changeset
   241
            if (bbwi != null)
02bb8761fcce Initial load
duke
parents:
diff changeset
   242
            {
02bb8761fcce Initial load
duke
parents:
diff changeset
   243
                inputBbAddress = System.identityHashCode(bbwi.byteBuffer);
02bb8761fcce Initial load
duke
parents:
diff changeset
   244
            }
02bb8761fcce Initial load
duke
parents:
diff changeset
   245
02bb8761fcce Initial load
duke
parents:
diff changeset
   246
            ByteBufferWithInfo abbwi = null;
02bb8761fcce Initial load
duke
parents:
diff changeset
   247
            ByteBufferPool byteBufferPool = getByteBufferPool();
02bb8761fcce Initial load
duke
parents:
diff changeset
   248
            ListIterator itr = fragmentStack.listIterator();
02bb8761fcce Initial load
duke
parents:
diff changeset
   249
            while (itr.hasNext())
02bb8761fcce Initial load
duke
parents:
diff changeset
   250
            {
02bb8761fcce Initial load
duke
parents:
diff changeset
   251
                abbwi = (ByteBufferWithInfo)itr.next();
02bb8761fcce Initial load
duke
parents:
diff changeset
   252
02bb8761fcce Initial load
duke
parents:
diff changeset
   253
                if (abbwi != null && abbwi.byteBuffer != null)
02bb8761fcce Initial load
duke
parents:
diff changeset
   254
                {
02bb8761fcce Initial load
duke
parents:
diff changeset
   255
                   int bbAddress = System.identityHashCode(abbwi.byteBuffer);
02bb8761fcce Initial load
duke
parents:
diff changeset
   256
                   if (inputBbAddress != bbAddress)
02bb8761fcce Initial load
duke
parents:
diff changeset
   257
                   {
02bb8761fcce Initial load
duke
parents:
diff changeset
   258
                       if (debug)
02bb8761fcce Initial load
duke
parents:
diff changeset
   259
                       {
02bb8761fcce Initial load
duke
parents:
diff changeset
   260
                            // print address of ByteBuffer being released
02bb8761fcce Initial load
duke
parents:
diff changeset
   261
                            StringBuffer sb = new StringBuffer(80);
02bb8761fcce Initial load
duke
parents:
diff changeset
   262
                            sb.append("close() - fragmentStack - releasing ")
02bb8761fcce Initial load
duke
parents:
diff changeset
   263
                              .append("ByteBuffer id (" + bbAddress + ") to ")
02bb8761fcce Initial load
duke
parents:
diff changeset
   264
                              .append("ByteBufferPool.");
02bb8761fcce Initial load
duke
parents:
diff changeset
   265
                            String msg = sb.toString();
02bb8761fcce Initial load
duke
parents:
diff changeset
   266
                            dprint(msg);
02bb8761fcce Initial load
duke
parents:
diff changeset
   267
                       }
02bb8761fcce Initial load
duke
parents:
diff changeset
   268
                       byteBufferPool.releaseByteBuffer(abbwi.byteBuffer);
02bb8761fcce Initial load
duke
parents:
diff changeset
   269
                   }
02bb8761fcce Initial load
duke
parents:
diff changeset
   270
                }
02bb8761fcce Initial load
duke
parents:
diff changeset
   271
            }
02bb8761fcce Initial load
duke
parents:
diff changeset
   272
            fragmentStack = null;
02bb8761fcce Initial load
duke
parents:
diff changeset
   273
        }
02bb8761fcce Initial load
duke
parents:
diff changeset
   274
02bb8761fcce Initial load
duke
parents:
diff changeset
   275
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
   276
02bb8761fcce Initial load
duke
parents:
diff changeset
   277
    protected ByteBufferPool getByteBufferPool()
02bb8761fcce Initial load
duke
parents:
diff changeset
   278
    {
02bb8761fcce Initial load
duke
parents:
diff changeset
   279
        return orb.getByteBufferPool();
02bb8761fcce Initial load
duke
parents:
diff changeset
   280
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
   281
02bb8761fcce Initial load
duke
parents:
diff changeset
   282
    private void dprint(String msg)
02bb8761fcce Initial load
duke
parents:
diff changeset
   283
    {
02bb8761fcce Initial load
duke
parents:
diff changeset
   284
        ORBUtility.dprint("BufferManagerReadStream", msg);
02bb8761fcce Initial load
duke
parents:
diff changeset
   285
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
   286
02bb8761fcce Initial load
duke
parents:
diff changeset
   287
    // Mark and reset handler ----------------------------------------
02bb8761fcce Initial load
duke
parents:
diff changeset
   288
02bb8761fcce Initial load
duke
parents:
diff changeset
   289
    private boolean markEngaged = false;
02bb8761fcce Initial load
duke
parents:
diff changeset
   290
02bb8761fcce Initial load
duke
parents:
diff changeset
   291
    // List of fragment ByteBufferWithInfos received since
02bb8761fcce Initial load
duke
parents:
diff changeset
   292
    // the mark was engaged.
02bb8761fcce Initial load
duke
parents:
diff changeset
   293
    private LinkedList fragmentStack = null;
02bb8761fcce Initial load
duke
parents:
diff changeset
   294
    private RestorableInputStream inputStream = null;
02bb8761fcce Initial load
duke
parents:
diff changeset
   295
02bb8761fcce Initial load
duke
parents:
diff changeset
   296
    // Original state of the stream
02bb8761fcce Initial load
duke
parents:
diff changeset
   297
    private Object streamMemento = null;
02bb8761fcce Initial load
duke
parents:
diff changeset
   298
02bb8761fcce Initial load
duke
parents:
diff changeset
   299
    public void mark(RestorableInputStream inputStream)
02bb8761fcce Initial load
duke
parents:
diff changeset
   300
    {
02bb8761fcce Initial load
duke
parents:
diff changeset
   301
        this.inputStream = inputStream;
02bb8761fcce Initial load
duke
parents:
diff changeset
   302
        markEngaged = true;
02bb8761fcce Initial load
duke
parents:
diff changeset
   303
02bb8761fcce Initial load
duke
parents:
diff changeset
   304
        // Get the magic Object that the stream will use to
02bb8761fcce Initial load
duke
parents:
diff changeset
   305
        // reconstruct it's state when reset is called
02bb8761fcce Initial load
duke
parents:
diff changeset
   306
        streamMemento = inputStream.createStreamMemento();
02bb8761fcce Initial load
duke
parents:
diff changeset
   307
02bb8761fcce Initial load
duke
parents:
diff changeset
   308
        if (fragmentStack != null) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   309
            fragmentStack.clear();
02bb8761fcce Initial load
duke
parents:
diff changeset
   310
        }
02bb8761fcce Initial load
duke
parents:
diff changeset
   311
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
   312
02bb8761fcce Initial load
duke
parents:
diff changeset
   313
    // Collects fragments received since the mark was engaged.
02bb8761fcce Initial load
duke
parents:
diff changeset
   314
    public void fragmentationOccured(ByteBufferWithInfo newFragment)
02bb8761fcce Initial load
duke
parents:
diff changeset
   315
    {
02bb8761fcce Initial load
duke
parents:
diff changeset
   316
        if (!markEngaged)
02bb8761fcce Initial load
duke
parents:
diff changeset
   317
            return;
02bb8761fcce Initial load
duke
parents:
diff changeset
   318
02bb8761fcce Initial load
duke
parents:
diff changeset
   319
        if (fragmentStack == null)
02bb8761fcce Initial load
duke
parents:
diff changeset
   320
            fragmentStack = new LinkedList();
02bb8761fcce Initial load
duke
parents:
diff changeset
   321
02bb8761fcce Initial load
duke
parents:
diff changeset
   322
        fragmentStack.addFirst(new ByteBufferWithInfo(newFragment));
02bb8761fcce Initial load
duke
parents:
diff changeset
   323
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
   324
02bb8761fcce Initial load
duke
parents:
diff changeset
   325
    public void reset()
02bb8761fcce Initial load
duke
parents:
diff changeset
   326
    {
02bb8761fcce Initial load
duke
parents:
diff changeset
   327
        if (!markEngaged) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   328
            // REVISIT - call to reset without call to mark
02bb8761fcce Initial load
duke
parents:
diff changeset
   329
            return;
02bb8761fcce Initial load
duke
parents:
diff changeset
   330
        }
02bb8761fcce Initial load
duke
parents:
diff changeset
   331
02bb8761fcce Initial load
duke
parents:
diff changeset
   332
        markEngaged = false;
02bb8761fcce Initial load
duke
parents:
diff changeset
   333
02bb8761fcce Initial load
duke
parents:
diff changeset
   334
        // If we actually did peek across fragments, we need
02bb8761fcce Initial load
duke
parents:
diff changeset
   335
        // to push those fragments onto the front of the
02bb8761fcce Initial load
duke
parents:
diff changeset
   336
        // buffer queue.
02bb8761fcce Initial load
duke
parents:
diff changeset
   337
        if (fragmentStack != null && fragmentStack.size() != 0) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   338
            ListIterator iter = fragmentStack.listIterator();
02bb8761fcce Initial load
duke
parents:
diff changeset
   339
02bb8761fcce Initial load
duke
parents:
diff changeset
   340
            synchronized(fragmentQueue) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   341
                while (iter.hasNext()) {
02bb8761fcce Initial load
duke
parents:
diff changeset
   342
                    fragmentQueue.push((ByteBufferWithInfo)iter.next());
02bb8761fcce Initial load
duke
parents:
diff changeset
   343
                }
02bb8761fcce Initial load
duke
parents:
diff changeset
   344
            }
02bb8761fcce Initial load
duke
parents:
diff changeset
   345
02bb8761fcce Initial load
duke
parents:
diff changeset
   346
            fragmentStack.clear();
02bb8761fcce Initial load
duke
parents:
diff changeset
   347
        }
02bb8761fcce Initial load
duke
parents:
diff changeset
   348
02bb8761fcce Initial load
duke
parents:
diff changeset
   349
        // Give the stream the magic Object to restore
02bb8761fcce Initial load
duke
parents:
diff changeset
   350
        // it's state.
02bb8761fcce Initial load
duke
parents:
diff changeset
   351
        inputStream.restoreInternalState(streamMemento);
02bb8761fcce Initial load
duke
parents:
diff changeset
   352
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
   353
02bb8761fcce Initial load
duke
parents:
diff changeset
   354
    public MarkAndResetHandler getMarkAndResetHandler() {
02bb8761fcce Initial load
duke
parents:
diff changeset
   355
        return this;
02bb8761fcce Initial load
duke
parents:
diff changeset
   356
    }
02bb8761fcce Initial load
duke
parents:
diff changeset
   357
}