src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java
changeset 49001 ce06058197a4
parent 48750 ffbb784a8873
child 49493 814bd31f8da0
equal deleted inserted replaced
49000:a406a9c451a0 49001:ce06058197a4
    26 package sun.nio.ch;
    26 package sun.nio.ch;
    27 
    27 
    28 import java.io.FileDescriptor;
    28 import java.io.FileDescriptor;
    29 import java.io.IOException;
    29 import java.io.IOException;
    30 import java.nio.ByteBuffer;
    30 import java.nio.ByteBuffer;
       
    31 import java.nio.channels.AsynchronousCloseException;
    31 import java.nio.channels.ClosedChannelException;
    32 import java.nio.channels.ClosedChannelException;
       
    33 import java.nio.channels.NotYetConnectedException;
    32 import java.nio.channels.Pipe;
    34 import java.nio.channels.Pipe;
    33 import java.nio.channels.SelectionKey;
    35 import java.nio.channels.SelectionKey;
    34 import java.nio.channels.spi.SelectorProvider;
    36 import java.nio.channels.spi.SelectorProvider;
       
    37 import java.util.Objects;
    35 import java.util.concurrent.locks.ReentrantLock;
    38 import java.util.concurrent.locks.ReentrantLock;
    36 
       
    37 
    39 
    38 class SourceChannelImpl
    40 class SourceChannelImpl
    39     extends Pipe.SourceChannel
    41     extends Pipe.SourceChannel
    40     implements SelChImpl
    42     implements SelChImpl
    41 {
    43 {
    42 
       
    43     // Used to make native read and write calls
    44     // Used to make native read and write calls
    44     private static final NativeDispatcher nd = new FileDispatcherImpl();
    45     private static final NativeDispatcher nd = new FileDispatcherImpl();
    45 
    46 
    46     // The file descriptor associated with this channel
    47     // The file descriptor associated with this channel
    47     private final FileDescriptor fd;
    48     private final FileDescriptor fd;
    48 
       
    49     // fd value needed for dev/poll. This value will remain valid
       
    50     // even after the value in the file descriptor object has been set to -1
       
    51     private final int fdVal;
    49     private final int fdVal;
    52 
       
    53     // ID of native thread doing read, for signalling
       
    54     private volatile long thread;
       
    55 
    50 
    56     // Lock held by current reading thread
    51     // Lock held by current reading thread
    57     private final ReentrantLock readLock = new ReentrantLock();
    52     private final ReentrantLock readLock = new ReentrantLock();
    58 
    53 
    59     // Lock held by any thread that modifies the state fields declared below
    54     // Lock held by any thread that modifies the state fields declared below
    61     private final Object stateLock = new Object();
    56     private final Object stateLock = new Object();
    62 
    57 
    63     // -- The following fields are protected by stateLock
    58     // -- The following fields are protected by stateLock
    64 
    59 
    65     // Channel state
    60     // Channel state
    66     private static final int ST_UNINITIALIZED = -1;
       
    67     private static final int ST_INUSE = 0;
    61     private static final int ST_INUSE = 0;
    68     private static final int ST_KILLED = 1;
    62     private static final int ST_CLOSING = 1;
    69     private volatile int state = ST_UNINITIALIZED;
    63     private static final int ST_KILLPENDING = 2;
       
    64     private static final int ST_KILLED = 3;
       
    65     private int state;
       
    66 
       
    67     // ID of native thread doing read, for signalling
       
    68     private long thread;
    70 
    69 
    71     // -- End of fields protected by stateLock
    70     // -- End of fields protected by stateLock
    72 
    71 
    73 
    72 
    74     public FileDescriptor getFD() {
    73     public FileDescriptor getFD() {
    81 
    80 
    82     SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) {
    81     SourceChannelImpl(SelectorProvider sp, FileDescriptor fd) {
    83         super(sp);
    82         super(sp);
    84         this.fd = fd;
    83         this.fd = fd;
    85         this.fdVal = IOUtil.fdVal(fd);
    84         this.fdVal = IOUtil.fdVal(fd);
    86         this.state = ST_INUSE;
    85     }
    87     }
    86 
    88 
    87     /**
       
    88      * Invoked by implCloseChannel to close the channel.
       
    89      */
       
    90     @Override
    89     protected void implCloseSelectableChannel() throws IOException {
    91     protected void implCloseSelectableChannel() throws IOException {
    90         synchronized (stateLock) {
    92         assert !isOpen();
    91             if (state != ST_KILLED)
    93 
    92                 nd.preClose(fd);
    94         boolean interrupted = false;
    93             long th = thread;
    95         boolean blocking;
    94             if (th != 0)
    96 
    95                 NativeThread.signal(th);
    97         // set state to ST_CLOSING
    96             if (!isRegistered())
    98         synchronized (stateLock) {
    97                 kill();
    99             assert state < ST_CLOSING;
    98         }
   100             state = ST_CLOSING;
    99     }
   101             blocking = isBlocking();
   100 
   102         }
       
   103 
       
   104         // wait for any outstanding read to complete
       
   105         if (blocking) {
       
   106             synchronized (stateLock) {
       
   107                 assert state == ST_CLOSING;
       
   108                 long th = thread;
       
   109                 if (th != 0) {
       
   110                     nd.preClose(fd);
       
   111                     NativeThread.signal(th);
       
   112 
       
   113                     // wait for read operation to end
       
   114                     while (thread != 0) {
       
   115                         try {
       
   116                             stateLock.wait();
       
   117                         } catch (InterruptedException e) {
       
   118                             interrupted = true;
       
   119                         }
       
   120                     }
       
   121                 }
       
   122             }
       
   123         } else {
       
   124             // non-blocking mode: wait for read to complete
       
   125             readLock.lock();
       
   126             readLock.unlock();
       
   127         }
       
   128 
       
   129         // set state to ST_KILLPENDING
       
   130         synchronized (stateLock) {
       
   131             assert state == ST_CLOSING;
       
   132             state = ST_KILLPENDING;
       
   133         }
       
   134 
       
   135         // close socket if not registered with Selector
       
   136         if (!isRegistered())
       
   137             kill();
       
   138 
       
   139         // restore interrupt status
       
   140         if (interrupted)
       
   141             Thread.currentThread().interrupt();
       
   142     }
       
   143 
       
   144     @Override
   101     public void kill() throws IOException {
   145     public void kill() throws IOException {
   102         synchronized (stateLock) {
   146         synchronized (stateLock) {
   103             if (state == ST_KILLED)
   147             assert thread == 0;
   104                 return;
   148             if (state == ST_KILLPENDING) {
   105             if (state == ST_UNINITIALIZED) {
       
   106                 state = ST_KILLED;
   149                 state = ST_KILLED;
   107                 return;
   150                 nd.close(fd);
   108             }
   151             }
   109             assert !isOpen() && !isRegistered();
   152         }
   110             nd.close(fd);
   153     }
   111             state = ST_KILLED;
   154 
   112         }
   155     @Override
   113     }
       
   114 
       
   115     protected void implConfigureBlocking(boolean block) throws IOException {
   156     protected void implConfigureBlocking(boolean block) throws IOException {
   116         IOUtil.configureBlocking(fd, block);
   157         readLock.lock();
       
   158         try {
       
   159             synchronized (stateLock) {
       
   160                 IOUtil.configureBlocking(fd, block);
       
   161             }
       
   162         } finally {
       
   163             readLock.unlock();
       
   164         }
   117     }
   165     }
   118 
   166 
   119     public boolean translateReadyOps(int ops, int initialOps,
   167     public boolean translateReadyOps(int ops, int initialOps,
   120                                      SelectionKeyImpl sk) {
   168                                      SelectionKeyImpl sk) {
   121         int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
   169         int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
   151         if (ops == SelectionKey.OP_READ)
   199         if (ops == SelectionKey.OP_READ)
   152             ops = Net.POLLIN;
   200             ops = Net.POLLIN;
   153         sk.selector.putEventOps(sk, ops);
   201         sk.selector.putEventOps(sk, ops);
   154     }
   202     }
   155 
   203 
   156     private void ensureOpen() throws IOException {
   204     /**
   157         if (!isOpen())
   205      * Marks the beginning of a read operation that might block.
   158             throw new ClosedChannelException();
   206      *
   159     }
   207      * @throws ClosedChannelException if the channel is closed
   160 
   208      * @throws NotYetConnectedException if the channel is not yet connected
       
   209      */
       
   210     private void beginRead(boolean blocking) throws ClosedChannelException {
       
   211         if (blocking) {
       
   212             // set hook for Thread.interrupt
       
   213             begin();
       
   214         }
       
   215         synchronized (stateLock) {
       
   216             if (!isOpen())
       
   217                 throw new ClosedChannelException();
       
   218             if (blocking)
       
   219                 thread = NativeThread.current();
       
   220         }
       
   221     }
       
   222 
       
   223     /**
       
   224      * Marks the end of a read operation that may have blocked.
       
   225      *
       
   226      * @throws AsynchronousCloseException if the channel was closed due to this
       
   227      * thread being interrupted on a blocking read operation.
       
   228      */
       
   229     private void endRead(boolean blocking, boolean completed)
       
   230         throws AsynchronousCloseException
       
   231     {
       
   232         if (blocking) {
       
   233             synchronized (stateLock) {
       
   234                 thread = 0;
       
   235                 // notify any thread waiting in implCloseSelectableChannel
       
   236                 if (state == ST_CLOSING) {
       
   237                     stateLock.notifyAll();
       
   238                 }
       
   239             }
       
   240             // remove hook for Thread.interrupt
       
   241             end(completed);
       
   242         }
       
   243     }
       
   244 
       
   245     @Override
   161     public int read(ByteBuffer dst) throws IOException {
   246     public int read(ByteBuffer dst) throws IOException {
       
   247         Objects.requireNonNull(dst);
   162 
   248 
   163         readLock.lock();
   249         readLock.lock();
   164         try {
   250         try {
   165             ensureOpen();
   251             boolean blocking = isBlocking();
   166             int n = 0;
   252             int n = 0;
   167             try {
   253             try {
   168                 begin();
   254                 beginRead(blocking);
   169                 if (!isOpen())
       
   170                     return 0;
       
   171                 thread = NativeThread.current();
       
   172                 do {
   255                 do {
   173                     n = IOUtil.read(fd, dst, -1, nd);
   256                     n = IOUtil.read(fd, dst, -1, nd);
   174                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
   257                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
   175                 return IOStatus.normalize(n);
       
   176             } finally {
   258             } finally {
   177                 thread = 0;
   259                 endRead(blocking, n > 0);
   178                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   179                 assert IOStatus.check(n);
   260                 assert IOStatus.check(n);
   180             }
   261             }
       
   262             return IOStatus.normalize(n);
   181         } finally {
   263         } finally {
   182             readLock.unlock();
   264             readLock.unlock();
   183         }
   265         }
   184     }
   266     }
   185 
   267 
   186     public long read(ByteBuffer[] dsts, int offset, int length)
   268     @Override
   187         throws IOException
   269     public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
   188     {
   270         Objects.checkFromIndexSize(offset, length, dsts.length);
   189         if ((offset < 0) || (length < 0) || (offset > dsts.length - length))
       
   190            throw new IndexOutOfBoundsException();
       
   191         return read(Util.subsequence(dsts, offset, length));
       
   192     }
       
   193 
       
   194     public long read(ByteBuffer[] dsts) throws IOException {
       
   195         if (dsts == null)
       
   196             throw new NullPointerException();
       
   197 
   271 
   198         readLock.lock();
   272         readLock.lock();
   199         try {
   273         try {
   200             ensureOpen();
   274             boolean blocking = isBlocking();
   201             long n = 0;
   275             long n = 0;
   202             try {
   276             try {
   203                 begin();
   277                 beginRead(blocking);
   204                 if (!isOpen())
       
   205                     return 0;
       
   206                 thread = NativeThread.current();
       
   207                 do {
   278                 do {
   208                     n = IOUtil.read(fd, dsts, nd);
   279                     n = IOUtil.read(fd, dsts, offset, length, nd);
   209                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
   280                 } while ((n == IOStatus.INTERRUPTED) && isOpen());
   210                 return IOStatus.normalize(n);
       
   211             } finally {
   281             } finally {
   212                 thread = 0;
   282                 endRead(blocking, n > 0);
   213                 end((n > 0) || (n == IOStatus.UNAVAILABLE));
       
   214                 assert IOStatus.check(n);
   283                 assert IOStatus.check(n);
   215             }
   284             }
       
   285             return IOStatus.normalize(n);
   216         } finally {
   286         } finally {
   217             readLock.unlock();
   287             readLock.unlock();
   218         }
   288         }
   219     }
   289     }
       
   290 
       
   291     @Override
       
   292     public long read(ByteBuffer[] dsts) throws IOException {
       
   293         return read(dsts, 0, dsts.length);
       
   294     }
   220 }
   295 }