src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java
changeset 54754 193a8f1a4f3b
parent 54620 13b67c1420b8
equal deleted inserted replaced
54753:a8535f04b465 54754:193a8f1a4f3b
    33 import java.nio.channels.NotYetConnectedException;
    33 import java.nio.channels.NotYetConnectedException;
    34 import java.nio.channels.Pipe;
    34 import java.nio.channels.Pipe;
    35 import java.nio.channels.SelectionKey;
    35 import java.nio.channels.SelectionKey;
    36 import java.nio.channels.spi.SelectorProvider;
    36 import java.nio.channels.spi.SelectorProvider;
    37 import java.util.Objects;
    37 import java.util.Objects;
    38 import java.util.concurrent.locks.Condition;
       
    39 import java.util.concurrent.locks.ReentrantLock;
    38 import java.util.concurrent.locks.ReentrantLock;
    40 
    39 
    41 class SourceChannelImpl
    40 class SourceChannelImpl
    42     extends Pipe.SourceChannel
    41     extends Pipe.SourceChannel
    43     implements SelChImpl
    42     implements SelChImpl
    52     // Lock held by current reading thread
    51     // Lock held by current reading thread
    53     private final ReentrantLock readLock = new ReentrantLock();
    52     private final ReentrantLock readLock = new ReentrantLock();
    54 
    53 
    55     // Lock held by any thread that modifies the state fields declared below
    54     // Lock held by any thread that modifies the state fields declared below
    56     // DO NOT invoke a blocking I/O operation while holding this lock!
    55     // DO NOT invoke a blocking I/O operation while holding this lock!
    57     private final ReentrantLock stateLock = new ReentrantLock();
    56     private final Object stateLock = new Object();
    58     private final Condition stateCondition = stateLock.newCondition();
       
    59 
    57 
    60     // -- The following fields are protected by stateLock
    58     // -- The following fields are protected by stateLock
    61 
    59 
    62     // Channel state
    60     // Channel state
    63     private static final int ST_INUSE = 0;
    61     private static final int ST_INUSE = 0;
    64     private static final int ST_CLOSING = 1;
    62     private static final int ST_CLOSING = 1;
    65     private static final int ST_KILLPENDING = 2;
    63     private static final int ST_CLOSED = 2;
    66     private static final int ST_KILLED = 3;
       
    67     private int state;
    64     private int state;
    68 
    65 
    69     // ID of native thread doing read, for signalling
    66     // ID of native thread doing read, for signalling
    70     private long thread;
    67     private long thread;
    71 
    68 
    85         this.fd = fd;
    82         this.fd = fd;
    86         this.fdVal = IOUtil.fdVal(fd);
    83         this.fdVal = IOUtil.fdVal(fd);
    87     }
    84     }
    88 
    85 
    89     /**
    86     /**
    90      * Invoked by implCloseChannel to close the channel.
    87      * Closes the read end of the pipe if there are no read operation in
    91      */
    88      * progress and the channel is not registered with a Selector.
    92     @Override
    89      */
    93     protected void implCloseSelectableChannel() throws IOException {
    90     private boolean tryClose() throws IOException {
    94         assert !isOpen();
    91         assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
    95 
    92         if (thread == 0 && !isRegistered()) {
    96         boolean interrupted = false;
    93             state = ST_CLOSED;
    97         boolean blocking;
    94             nd.close(fd);
    98 
    95             return true;
    99         // set state to ST_CLOSING
    96         } else {
   100         stateLock.lock();
    97             return false;
       
    98         }
       
    99     }
       
   100 
       
   101     /**
       
   102      * Invokes tryClose to attempt to close the read end of the pipe.
       
   103      *
       
   104      * This method is used for deferred closing by I/O and Selector operations.
       
   105      */
       
   106     private void tryFinishClose() {
   101         try {
   107         try {
       
   108             tryClose();
       
   109         } catch (IOException ignore) { }
       
   110     }
       
   111 
       
   112     /**
       
   113      * Closes this channel when configured in blocking mode.
       
   114      *
       
   115      * If there is a read operation in progress then the read-end of the pipe
       
   116      * is pre-closed and the reader is signalled, in which case the final close
       
   117      * is deferred until the reader aborts.
       
   118      */
       
   119     private void implCloseBlockingMode() throws IOException {
       
   120         synchronized (stateLock) {
   102             assert state < ST_CLOSING;
   121             assert state < ST_CLOSING;
   103             state = ST_CLOSING;
   122             state = ST_CLOSING;
   104             blocking = isBlocking();
   123             if (!tryClose()) {
   105         } finally {
       
   106             stateLock.unlock();
       
   107         }
       
   108 
       
   109         // wait for any outstanding read to complete
       
   110         if (blocking) {
       
   111             stateLock.lock();
       
   112             try {
       
   113                 assert state == ST_CLOSING;
       
   114                 long th = thread;
   124                 long th = thread;
   115                 if (th != 0) {
   125                 if (th != 0) {
   116                     nd.preClose(fd);
   126                     nd.preClose(fd);
   117                     NativeThread.signal(th);
   127                     NativeThread.signal(th);
   118 
       
   119                     // wait for read operation to end
       
   120                     while (thread != 0) {
       
   121                         try {
       
   122                             stateCondition.await();
       
   123                         } catch (InterruptedException e) {
       
   124                             interrupted = true;
       
   125                         }
       
   126                     }
       
   127                 }
   128                 }
   128             } finally {
   129             }
   129                 stateLock.unlock();
   130         }
   130             }
   131     }
       
   132 
       
   133     /**
       
   134      * Closes this channel when configured in non-blocking mode.
       
   135      *
       
   136      * If the channel is registered with a Selector then the close is deferred
       
   137      * until the channel is flushed from all Selectors.
       
   138      */
       
   139     private void implCloseNonBlockingMode() throws IOException {
       
   140         synchronized (stateLock) {
       
   141             assert state < ST_CLOSING;
       
   142             state = ST_CLOSING;
       
   143         }
       
   144         // wait for any read operation to complete before trying to close
       
   145         readLock.lock();
       
   146         readLock.unlock();
       
   147         synchronized (stateLock) {
       
   148             if (state == ST_CLOSING) {
       
   149                 tryClose();
       
   150             }
       
   151         }
       
   152     }
       
   153 
       
   154     /**
       
   155      * Invoked by implCloseChannel to close the channel.
       
   156      */
       
   157     @Override
       
   158     protected void implCloseSelectableChannel() throws IOException {
       
   159         assert !isOpen();
       
   160         if (isBlocking()) {
       
   161             implCloseBlockingMode();
   131         } else {
   162         } else {
   132             // non-blocking mode: wait for read to complete
   163             implCloseNonBlockingMode();
   133             readLock.lock();
   164         }
   134             readLock.unlock();
   165     }
   135         }
   166     @Override
   136 
   167     public void kill() {
   137         // set state to ST_KILLPENDING
   168         synchronized (stateLock) {
   138         stateLock.lock();
   169             assert !isOpen();
   139         try {
   170             if (state == ST_CLOSING) {
   140             assert state == ST_CLOSING;
   171                 tryFinishClose();
   141             state = ST_KILLPENDING;
   172             }
   142         } finally {
       
   143             stateLock.unlock();
       
   144         }
       
   145 
       
   146         // close socket if not registered with Selector
       
   147         if (!isRegistered())
       
   148             kill();
       
   149 
       
   150         // restore interrupt status
       
   151         if (interrupted)
       
   152             Thread.currentThread().interrupt();
       
   153     }
       
   154 
       
   155     @Override
       
   156     public void kill() throws IOException {
       
   157         stateLock.lock();
       
   158         try {
       
   159             assert thread == 0;
       
   160             if (state == ST_KILLPENDING) {
       
   161                 state = ST_KILLED;
       
   162                 nd.close(fd);
       
   163             }
       
   164         } finally {
       
   165             stateLock.unlock();
       
   166         }
   173         }
   167     }
   174     }
   168 
   175 
   169     @Override
   176     @Override
   170     protected void implConfigureBlocking(boolean block) throws IOException {
   177     protected void implConfigureBlocking(boolean block) throws IOException {
   171         readLock.lock();
   178         readLock.lock();
   172         try {
   179         try {
   173             stateLock.lock();
   180             synchronized (stateLock) {
   174             try {
   181                 if (!isOpen())
       
   182                     throw new ClosedChannelException();
   175                 IOUtil.configureBlocking(fd, block);
   183                 IOUtil.configureBlocking(fd, block);
   176             } finally {
       
   177                 stateLock.unlock();
       
   178             }
   184             }
   179         } finally {
   185         } finally {
   180             readLock.unlock();
   186             readLock.unlock();
   181         }
   187         }
   182     }
   188     }
   227     private void beginRead(boolean blocking) throws ClosedChannelException {
   233     private void beginRead(boolean blocking) throws ClosedChannelException {
   228         if (blocking) {
   234         if (blocking) {
   229             // set hook for Thread.interrupt
   235             // set hook for Thread.interrupt
   230             begin();
   236             begin();
   231         }
   237         }
   232         stateLock.lock();
   238         synchronized (stateLock) {
   233         try {
       
   234             if (!isOpen())
   239             if (!isOpen())
   235                 throw new ClosedChannelException();
   240                 throw new ClosedChannelException();
   236             if (blocking)
   241             if (blocking)
   237                 thread = NativeThread.current();
   242                 thread = NativeThread.current();
   238         } finally {
       
   239             stateLock.unlock();
       
   240         }
   243         }
   241     }
   244     }
   242 
   245 
   243     /**
   246     /**
   244      * Marks the end of a read operation that may have blocked.
   247      * Marks the end of a read operation that may have blocked.
   248      */
   251      */
   249     private void endRead(boolean blocking, boolean completed)
   252     private void endRead(boolean blocking, boolean completed)
   250         throws AsynchronousCloseException
   253         throws AsynchronousCloseException
   251     {
   254     {
   252         if (blocking) {
   255         if (blocking) {
   253             stateLock.lock();
   256             synchronized (stateLock) {
   254             try {
       
   255                 thread = 0;
   257                 thread = 0;
   256                 // notify any thread waiting in implCloseSelectableChannel
       
   257                 if (state == ST_CLOSING) {
   258                 if (state == ST_CLOSING) {
   258                     stateCondition.signalAll();
   259                     tryFinishClose();
   259                 }
   260                 }
   260             } finally {
       
   261                 stateLock.unlock();
       
   262             }
   261             }
   263             // remove hook for Thread.interrupt
   262             // remove hook for Thread.interrupt
   264             end(completed);
   263             end(completed);
   265         }
   264         }
   266     }
   265     }