src/java.base/unix/classes/sun/nio/ch/PollSelectorImpl.java
changeset 49493 814bd31f8da0
parent 49417 1d3139252c1c
child 49526 cad4c844902a
equal deleted inserted replaced
49492:f1a8ec1a6972 49493:814bd31f8da0
    24  */
    24  */
    25 package sun.nio.ch;
    25 package sun.nio.ch;
    26 
    26 
    27 import java.io.IOException;
    27 import java.io.IOException;
    28 import java.nio.channels.ClosedSelectorException;
    28 import java.nio.channels.ClosedSelectorException;
    29 import java.nio.channels.SelectableChannel;
       
    30 import java.nio.channels.SelectionKey;
       
    31 import java.nio.channels.Selector;
    29 import java.nio.channels.Selector;
    32 import java.nio.channels.spi.SelectorProvider;
    30 import java.nio.channels.spi.SelectorProvider;
    33 import java.util.ArrayDeque;
    31 import java.util.ArrayDeque;
    34 import java.util.ArrayList;
    32 import java.util.ArrayList;
    35 import java.util.Deque;
    33 import java.util.Deque;
    36 import java.util.Iterator;
       
    37 import java.util.List;
    34 import java.util.List;
    38 import java.util.concurrent.TimeUnit;
    35 import java.util.concurrent.TimeUnit;
    39 
    36 
    40 import jdk.internal.misc.Unsafe;
    37 import jdk.internal.misc.Unsafe;
    41 
    38 
    61     private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
    58     private final List<SelectionKeyImpl> pollKeys = new ArrayList<>();
    62 
    59 
    63     // pending updates, queued by putEventOps
    60     // pending updates, queued by putEventOps
    64     private final Object updateLock = new Object();
    61     private final Object updateLock = new Object();
    65     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
    62     private final Deque<SelectionKeyImpl> updateKeys = new ArrayDeque<>();
    66     private final Deque<Integer> updateOps = new ArrayDeque<>();
    63     private final Deque<Integer> updateEvents = new ArrayDeque<>();
    67 
    64 
    68     // interrupt triggering and clearing
    65     // interrupt triggering and clearing
    69     private final Object interruptLock = new Object();
    66     private final Object interruptLock = new Object();
    70     private boolean interruptTriggered;
    67     private boolean interruptTriggered;
    71 
    68 
    97 
    94 
    98     @Override
    95     @Override
    99     protected int doSelect(long timeout) throws IOException {
    96     protected int doSelect(long timeout) throws IOException {
   100         assert Thread.holdsLock(this);
    97         assert Thread.holdsLock(this);
   101 
    98 
       
    99         int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout
       
   100         boolean blocking = (to != 0);
       
   101         boolean timedPoll = (to > 0);
       
   102 
   102         processUpdateQueue();
   103         processUpdateQueue();
   103         processDeregisterQueue();
   104         processDeregisterQueue();
   104         try {
   105         try {
   105             begin();
   106             begin(blocking);
   106 
   107 
   107             int to = (int) Math.min(timeout, Integer.MAX_VALUE); // max poll timeout
       
   108             boolean timedPoll = (to > 0);
       
   109             int numPolled;
   108             int numPolled;
   110             do {
   109             do {
   111                 long startTime = timedPoll ? System.nanoTime() : 0;
   110                 long startTime = timedPoll ? System.nanoTime() : 0;
   112                 numPolled = poll(pollArray.address(), pollArraySize, to);
   111                 numPolled = poll(pollArray.address(), pollArraySize, to);
   113                 if (numPolled == IOStatus.INTERRUPTED && timedPoll) {
   112                 if (numPolled == IOStatus.INTERRUPTED && timedPoll) {
   121                 }
   120                 }
   122             } while (numPolled == IOStatus.INTERRUPTED);
   121             } while (numPolled == IOStatus.INTERRUPTED);
   123             assert numPolled <= pollArraySize;
   122             assert numPolled <= pollArraySize;
   124 
   123 
   125         } finally {
   124         } finally {
   126             end();
   125             end(blocking);
   127         }
   126         }
   128 
   127 
   129         processDeregisterQueue();
   128         processDeregisterQueue();
   130         return updateSelectedKeys();
   129         return updateSelectedKeys();
   131     }
   130     }
   135      */
   134      */
   136     private void processUpdateQueue() {
   135     private void processUpdateQueue() {
   137         assert Thread.holdsLock(this);
   136         assert Thread.holdsLock(this);
   138 
   137 
   139         synchronized (updateLock) {
   138         synchronized (updateLock) {
   140             assert updateKeys.size() == updateOps.size();
   139             assert updateKeys.size() == updateEvents.size();
   141 
       
   142             SelectionKeyImpl ski;
   140             SelectionKeyImpl ski;
   143             while ((ski = updateKeys.pollFirst()) != null) {
   141             while ((ski = updateKeys.pollFirst()) != null) {
   144                 int ops = updateOps.pollFirst();
   142                 int newEvents = updateEvents.pollFirst();
   145                 if (ski.isValid()) {
   143                 if (ski.isValid()) {
   146                     int index = ski.getIndex();
   144                     int index = ski.getIndex();
   147                     assert index >= 0 && index < pollArraySize;
   145                     assert index >= 0 && index < pollArraySize;
   148                     if (index > 0) {
   146                     if (index > 0) {
   149                         assert pollKeys.get(index) == ski;
   147                         assert pollKeys.get(index) == ski;
   150                         if (ops == 0) {
   148                         if (newEvents == 0) {
   151                             remove(ski);
   149                             remove(ski);
   152                         } else {
   150                         } else {
   153                             update(ski, ops);
   151                             update(ski, newEvents);
   154                         }
   152                         }
   155                     } else if (ops != 0) {
   153                     } else if (newEvents != 0) {
   156                         add(ski, ops);
   154                         add(ski, newEvents);
   157                     }
   155                     }
   158                 }
   156                 }
   159             }
   157             }
   160         }
   158         }
   161     }
   159     }
   162 
   160 
   163     /**
   161     /**
   164      * Update the keys whose fd's have been selected by kqueue.
   162      * Update the keys of file descriptors that were polled and add them to
   165      * Add the ready keys to the selected key set.
   163      * the selected-key set.
   166      * If the interrupt fd has been selected, drain it and clear the interrupt.
   164      * If the interrupt fd has been selected, drain it and clear the interrupt.
   167      */
   165      */
   168     private int updateSelectedKeys() throws IOException {
   166     private int updateSelectedKeys() throws IOException {
   169         assert Thread.holdsLock(this);
   167         assert Thread.holdsLock(this);
   170         assert Thread.holdsLock(nioSelectedKeys());
   168         assert Thread.holdsLock(nioSelectedKeys());
   203 
   201 
   204     @Override
   202     @Override
   205     protected void implClose() throws IOException {
   203     protected void implClose() throws IOException {
   206         assert !isOpen();
   204         assert !isOpen();
   207         assert Thread.holdsLock(this);
   205         assert Thread.holdsLock(this);
   208         assert Thread.holdsLock(nioKeys());
       
   209 
   206 
   210         // prevent further wakeup
   207         // prevent further wakeup
   211         synchronized (interruptLock) {
   208         synchronized (interruptLock) {
   212             interruptTriggered = true;
   209             interruptTriggered = true;
   213         }
   210         }
   214 
   211 
   215         pollArray.free();
   212         pollArray.free();
   216         FileDispatcherImpl.closeIntFD(fd0);
   213         FileDispatcherImpl.closeIntFD(fd0);
   217         FileDispatcherImpl.closeIntFD(fd1);
   214         FileDispatcherImpl.closeIntFD(fd1);
   218 
       
   219         // Deregister channels
       
   220         Iterator<SelectionKey> i = keys.iterator();
       
   221         while (i.hasNext()) {
       
   222             SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
       
   223             ski.setIndex(-1);
       
   224             deregister(ski);
       
   225             SelectableChannel selch = ski.channel();
       
   226             if (!selch.isOpen() && !selch.isRegistered())
       
   227                 ((SelChImpl)selch).kill();
       
   228             i.remove();
       
   229         }
       
   230     }
   215     }
   231 
   216 
   232     @Override
   217     @Override
   233     protected void implRegister(SelectionKeyImpl ski) {
   218     protected void implRegister(SelectionKeyImpl ski) {
   234         assert ski.getIndex() == 0;
   219         assert ski.getIndex() == 0;
   235         assert Thread.holdsLock(nioKeys());
       
   236 
       
   237         ensureOpen();
   220         ensureOpen();
   238         keys.add(ski);
       
   239     }
   221     }
   240 
   222 
   241     @Override
   223     @Override
   242     protected void implDereg(SelectionKeyImpl ski) throws IOException {
   224     protected void implDereg(SelectionKeyImpl ski) throws IOException {
   243         assert !ski.isValid();
   225         assert !ski.isValid();
   244         assert Thread.holdsLock(this);
   226         assert Thread.holdsLock(this);
   245         assert Thread.holdsLock(nioKeys());
       
   246         assert Thread.holdsLock(nioSelectedKeys());
       
   247 
   227 
   248         // remove from poll array
   228         // remove from poll array
   249         int index = ski.getIndex();
   229         int index = ski.getIndex();
   250         if (index > 0) {
   230         if (index > 0) {
   251             remove(ski);
   231             remove(ski);
   252         }
   232         }
   253 
   233     }
   254         // remove from selected-key and key set
   234 
   255         selectedKeys.remove(ski);
   235     @Override
   256         keys.remove(ski);
   236     public void putEventOps(SelectionKeyImpl ski, int events) {
   257 
       
   258         // remove from channel's key set
       
   259         deregister(ski);
       
   260 
       
   261         SelectableChannel selch = ski.channel();
       
   262         if (!selch.isOpen() && !selch.isRegistered())
       
   263             ((SelChImpl) selch).kill();
       
   264     }
       
   265 
       
   266     @Override
       
   267     public void putEventOps(SelectionKeyImpl ski, int ops) {
       
   268         ensureOpen();
   237         ensureOpen();
   269         synchronized (updateLock) {
   238         synchronized (updateLock) {
   270             updateOps.addLast(ops);   // ops first in case adding the key fails
   239             updateEvents.addLast(events);  // events first in case adding key fails
   271             updateKeys.addLast(ski);
   240             updateKeys.addLast(ski);
   272         }
   241         }
   273     }
   242     }
   274 
   243 
   275     @Override
   244     @Override