src/java.net.http/share/classes/jdk/internal/net/http/HttpClientImpl.java
branchhttp-client-branch
changeset 56165 8a6065d830b9
parent 56092 fd85b2bf2b0d
child 56235 6218673d7fa0
equal deleted inserted replaced
56164:4db4bec0e5bb 56165:8a6065d830b9
   518         }
   518         }
   519 
   519 
   520         void eventUpdated(AsyncEvent e) throws ClosedChannelException {
   520         void eventUpdated(AsyncEvent e) throws ClosedChannelException {
   521             if (Thread.currentThread() == this) {
   521             if (Thread.currentThread() == this) {
   522                 SelectionKey key = e.channel().keyFor(selector);
   522                 SelectionKey key = e.channel().keyFor(selector);
   523                 if (key != null) {
   523                 if (key != null && key.isValid()) {
   524                     SelectorAttachment sa = (SelectorAttachment) key.attachment();
   524                     SelectorAttachment sa = (SelectorAttachment) key.attachment();
   525                     if (sa != null) sa.register(e);
   525                     sa.register(e);
       
   526                 } else if (e.interestOps() != 0){
       
   527                     // We don't care about paused events.
       
   528                     // These are actually handled by
       
   529                     // SelectorAttachment::resetInterestOps later on.
       
   530                     // But if we reach here when trying to resume an
       
   531                     // event then it's better to fail fast.
       
   532                     debug.log(Level.DEBUG, "No key for channel");
       
   533                     e.abort(new IOException("No key for channel"));
   526                 }
   534                 }
   527             } else {
   535             } else {
   528                 register(e);
   536                 register(e);
   529             }
   537             }
   530         }
   538         }
   561 
   569 
   562         @Override
   570         @Override
   563         public void run() {
   571         public void run() {
   564             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
   572             List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>();
   565             List<AsyncEvent> readyList = new ArrayList<>();
   573             List<AsyncEvent> readyList = new ArrayList<>();
       
   574             List<Runnable> resetList = new ArrayList<>();
   566             try {
   575             try {
   567                 while (!Thread.currentThread().isInterrupted()) {
   576                 while (!Thread.currentThread().isInterrupted()) {
   568                     synchronized (this) {
   577                     synchronized (this) {
   569                         assert errorList.isEmpty();
   578                         assert errorList.isEmpty();
   570                         assert readyList.isEmpty();
   579                         assert readyList.isEmpty();
       
   580                         assert resetList.isEmpty();
   571                         for (AsyncEvent event : registrations) {
   581                         for (AsyncEvent event : registrations) {
   572                             if (event instanceof AsyncTriggerEvent) {
   582                             if (event instanceof AsyncTriggerEvent) {
   573                                 readyList.add(event);
   583                                 readyList.add(event);
   574                                 continue;
   584                                 continue;
   575                             }
   585                             }
   671                             return;
   681                             return;
   672                         }
   682                         }
   673                         owner.purgeTimeoutsAndReturnNextDeadline();
   683                         owner.purgeTimeoutsAndReturnNextDeadline();
   674                         continue;
   684                         continue;
   675                     }
   685                     }
       
   686 
   676                     Set<SelectionKey> keys = selector.selectedKeys();
   687                     Set<SelectionKey> keys = selector.selectedKeys();
   677 
       
   678                     assert errorList.isEmpty();
   688                     assert errorList.isEmpty();
       
   689 
   679                     for (SelectionKey key : keys) {
   690                     for (SelectionKey key : keys) {
   680                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
   691                         SelectorAttachment sa = (SelectorAttachment) key.attachment();
   681                         if (!key.isValid()) {
   692                         if (!key.isValid()) {
   682                             IOException ex = sa.chan.isOpen()
   693                             IOException ex = sa.chan.isOpen()
   683                                     ? new IOException("Invalid key")
   694                                     ? new IOException("Invalid key")
   695                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
   706                             sa.pending.forEach(e -> errorList.add(new Pair<>(e,io)));
   696                             sa.pending.clear();
   707                             sa.pending.clear();
   697                             continue;
   708                             continue;
   698                         }
   709                         }
   699                         sa.events(eventsOccurred).forEach(readyList::add);
   710                         sa.events(eventsOccurred).forEach(readyList::add);
   700                         sa.resetInterestOps(eventsOccurred);
   711                         resetList.add(() -> sa.resetInterestOps(eventsOccurred));
   701                     }
   712                     }
       
   713 
   702                     selector.selectNow(); // complete cancellation
   714                     selector.selectNow(); // complete cancellation
   703                     selector.selectedKeys().clear();
   715                     selector.selectedKeys().clear();
   704 
   716 
   705                     for (AsyncEvent event : readyList) {
   717                     // handle selected events
   706                         handleEvent(event, null); // will be delegated to executor
   718                     readyList.forEach((e) -> handleEvent(e, null));
   707                     }
       
   708                     readyList.clear();
   719                     readyList.clear();
       
   720 
       
   721                     // handle errors (closed channels etc...)
   709                     errorList.forEach((p) -> handleEvent(p.first, p.second));
   722                     errorList.forEach((p) -> handleEvent(p.first, p.second));
   710                     errorList.clear();
   723                     errorList.clear();
       
   724 
       
   725                     // reset interest ops for selected channels
       
   726                     resetList.forEach(r -> r.run());
       
   727                     resetList.clear();
       
   728 
   711                 }
   729                 }
   712             } catch (Throwable e) {
   730             } catch (Throwable e) {
   713                 //e.printStackTrace();
   731                 //e.printStackTrace();
   714                 if (!closed) {
   732                 if (!closed) {
   715                     // This terminates thread. So, better just print stack trace
   733                     // This terminates thread. So, better just print stack trace
   741             if (closed || ioe != null) {
   759             if (closed || ioe != null) {
   742                 event.abort(ioe);
   760                 event.abort(ioe);
   743             } else {
   761             } else {
   744                 event.handle();
   762                 event.handle();
   745             }
   763             }
       
   764         }
       
   765     }
       
   766 
       
   767     final String debugInterestOps(SelectableChannel channel) {
       
   768         try {
       
   769             SelectionKey key = channel.keyFor(selmgr.selector);
       
   770             if (key == null) return "channel not registered with selector";
       
   771             String keyInterestOps = key.isValid()
       
   772                     ? "key.interestOps=" + key.interestOps() : "invalid key";
       
   773             return String.format("channel registered with selector, %s, sa.interestOps=%s",
       
   774                                  keyInterestOps,
       
   775                                  ((SelectorAttachment)key.attachment()).interestOps);
       
   776         } catch (Throwable t) {
       
   777             return String.valueOf(t);
   746         }
   778         }
   747     }
   779     }
   748 
   780 
   749     /**
   781     /**
   750      * Tracks multiple user level registrations associated with one NIO
   782      * Tracks multiple user level registrations associated with one NIO
   770             this.selector = selector;
   802             this.selector = selector;
   771         }
   803         }
   772 
   804 
   773         void register(AsyncEvent e) throws ClosedChannelException {
   805         void register(AsyncEvent e) throws ClosedChannelException {
   774             int newOps = e.interestOps();
   806             int newOps = e.interestOps();
       
   807             // re register interest if we are not already interested
       
   808             // in the event. If the event is paused, then the pause will
       
   809             // be taken into account later when resetInterestOps is called.
   775             boolean reRegister = (interestOps & newOps) != newOps;
   810             boolean reRegister = (interestOps & newOps) != newOps;
   776             interestOps |= newOps;
   811             interestOps |= newOps;
   777             pending.add(e);
   812             pending.add(e);
   778             if (reRegister) {
   813             if (reRegister) {
   779                 // first time registration happens here also
   814                 // first time registration happens here also
   780                 try {
   815                 try {
   781                     chan.register(selector, interestOps, this);
   816                     chan.register(selector, interestOps, this);
   782                 } catch (CancelledKeyException x) {
   817                 } catch (Throwable x) {
   783                     abortPending(x);
   818                     abortPending(x);
   784                 }
   819                 }
       
   820             } else if (!chan.isOpen()) {
       
   821                 abortPending(new ClosedChannelException());
   785             }
   822             }
   786         }
   823         }
   787 
   824 
   788         /**
   825         /**
   789          * Returns a Stream<AsyncEvents> containing only events that are
   826          * Returns a Stream<AsyncEvents> containing only events that are
   816                 }
   853                 }
   817             }
   854             }
   818 
   855 
   819             this.interestOps = newOps;
   856             this.interestOps = newOps;
   820             SelectionKey key = chan.keyFor(selector);
   857             SelectionKey key = chan.keyFor(selector);
   821             if (newOps == 0 && pending.isEmpty()) {
   858             if (newOps == 0 && key != null && pending.isEmpty()) {
   822                 key.cancel();
   859                 key.cancel();
   823             } else {
   860             } else {
   824                 try {
   861                 try {
       
   862                     if (key == null || !key.isValid()) {
       
   863                         throw new CancelledKeyException();
       
   864                     }
   825                     key.interestOps(newOps);
   865                     key.interestOps(newOps);
       
   866                     // double check after
       
   867                     if (!chan.isOpen()) {
       
   868                         abortPending(new ClosedChannelException());
       
   869                         return;
       
   870                     }
       
   871                     assert key.interestOps() == newOps;
   826                 } catch (CancelledKeyException x) {
   872                 } catch (CancelledKeyException x) {
   827                     // channel may have been closed
   873                     // channel may have been closed
   828                     debug.log(Level.DEBUG, "key cancelled for " + chan);
   874                     debug.log(Level.DEBUG, "key cancelled for " + chan);
   829                     abortPending(x);
   875                     abortPending(x);
   830                 }
   876                 }