518 } |
518 } |
519 |
519 |
520 void eventUpdated(AsyncEvent e) throws ClosedChannelException { |
520 void eventUpdated(AsyncEvent e) throws ClosedChannelException { |
521 if (Thread.currentThread() == this) { |
521 if (Thread.currentThread() == this) { |
522 SelectionKey key = e.channel().keyFor(selector); |
522 SelectionKey key = e.channel().keyFor(selector); |
523 if (key != null) { |
523 if (key != null && key.isValid()) { |
524 SelectorAttachment sa = (SelectorAttachment) key.attachment(); |
524 SelectorAttachment sa = (SelectorAttachment) key.attachment(); |
525 if (sa != null) sa.register(e); |
525 sa.register(e); |
|
526 } else if (e.interestOps() != 0){ |
|
527 // We don't care about paused events. |
|
528 // These are actually handled by |
|
529 // SelectorAttachment::resetInterestOps later on. |
|
530 // But if we reach here when trying to resume an |
|
531 // event then it's better to fail fast. |
|
532 debug.log(Level.DEBUG, "No key for channel"); |
|
533 e.abort(new IOException("No key for channel")); |
526 } |
534 } |
527 } else { |
535 } else { |
528 register(e); |
536 register(e); |
529 } |
537 } |
530 } |
538 } |
561 |
569 |
562 @Override |
570 @Override |
563 public void run() { |
571 public void run() { |
564 List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>(); |
572 List<Pair<AsyncEvent,IOException>> errorList = new ArrayList<>(); |
565 List<AsyncEvent> readyList = new ArrayList<>(); |
573 List<AsyncEvent> readyList = new ArrayList<>(); |
|
574 List<Runnable> resetList = new ArrayList<>(); |
566 try { |
575 try { |
567 while (!Thread.currentThread().isInterrupted()) { |
576 while (!Thread.currentThread().isInterrupted()) { |
568 synchronized (this) { |
577 synchronized (this) { |
569 assert errorList.isEmpty(); |
578 assert errorList.isEmpty(); |
570 assert readyList.isEmpty(); |
579 assert readyList.isEmpty(); |
|
580 assert resetList.isEmpty(); |
571 for (AsyncEvent event : registrations) { |
581 for (AsyncEvent event : registrations) { |
572 if (event instanceof AsyncTriggerEvent) { |
582 if (event instanceof AsyncTriggerEvent) { |
573 readyList.add(event); |
583 readyList.add(event); |
574 continue; |
584 continue; |
575 } |
585 } |
671 return; |
681 return; |
672 } |
682 } |
673 owner.purgeTimeoutsAndReturnNextDeadline(); |
683 owner.purgeTimeoutsAndReturnNextDeadline(); |
674 continue; |
684 continue; |
675 } |
685 } |
|
686 |
676 Set<SelectionKey> keys = selector.selectedKeys(); |
687 Set<SelectionKey> keys = selector.selectedKeys(); |
677 |
|
678 assert errorList.isEmpty(); |
688 assert errorList.isEmpty(); |
|
689 |
679 for (SelectionKey key : keys) { |
690 for (SelectionKey key : keys) { |
680 SelectorAttachment sa = (SelectorAttachment) key.attachment(); |
691 SelectorAttachment sa = (SelectorAttachment) key.attachment(); |
681 if (!key.isValid()) { |
692 if (!key.isValid()) { |
682 IOException ex = sa.chan.isOpen() |
693 IOException ex = sa.chan.isOpen() |
683 ? new IOException("Invalid key") |
694 ? new IOException("Invalid key") |
695 sa.pending.forEach(e -> errorList.add(new Pair<>(e,io))); |
706 sa.pending.forEach(e -> errorList.add(new Pair<>(e,io))); |
696 sa.pending.clear(); |
707 sa.pending.clear(); |
697 continue; |
708 continue; |
698 } |
709 } |
699 sa.events(eventsOccurred).forEach(readyList::add); |
710 sa.events(eventsOccurred).forEach(readyList::add); |
700 sa.resetInterestOps(eventsOccurred); |
711 resetList.add(() -> sa.resetInterestOps(eventsOccurred)); |
701 } |
712 } |
|
713 |
702 selector.selectNow(); // complete cancellation |
714 selector.selectNow(); // complete cancellation |
703 selector.selectedKeys().clear(); |
715 selector.selectedKeys().clear(); |
704 |
716 |
705 for (AsyncEvent event : readyList) { |
717 // handle selected events |
706 handleEvent(event, null); // will be delegated to executor |
718 readyList.forEach((e) -> handleEvent(e, null)); |
707 } |
|
708 readyList.clear(); |
719 readyList.clear(); |
|
720 |
|
721 // handle errors (closed channels etc...) |
709 errorList.forEach((p) -> handleEvent(p.first, p.second)); |
722 errorList.forEach((p) -> handleEvent(p.first, p.second)); |
710 errorList.clear(); |
723 errorList.clear(); |
|
724 |
|
725 // reset interest ops for selected channels |
|
726 resetList.forEach(r -> r.run()); |
|
727 resetList.clear(); |
|
728 |
711 } |
729 } |
712 } catch (Throwable e) { |
730 } catch (Throwable e) { |
713 //e.printStackTrace(); |
731 //e.printStackTrace(); |
714 if (!closed) { |
732 if (!closed) { |
715 // This terminates thread. So, better just print stack trace |
733 // This terminates thread. So, better just print stack trace |
741 if (closed || ioe != null) { |
759 if (closed || ioe != null) { |
742 event.abort(ioe); |
760 event.abort(ioe); |
743 } else { |
761 } else { |
744 event.handle(); |
762 event.handle(); |
745 } |
763 } |
|
764 } |
|
765 } |
|
766 |
|
767 final String debugInterestOps(SelectableChannel channel) { |
|
768 try { |
|
769 SelectionKey key = channel.keyFor(selmgr.selector); |
|
770 if (key == null) return "channel not registered with selector"; |
|
771 String keyInterestOps = key.isValid() |
|
772 ? "key.interestOps=" + key.interestOps() : "invalid key"; |
|
773 return String.format("channel registered with selector, %s, sa.interestOps=%s", |
|
774 keyInterestOps, |
|
775 ((SelectorAttachment)key.attachment()).interestOps); |
|
776 } catch (Throwable t) { |
|
777 return String.valueOf(t); |
746 } |
778 } |
747 } |
779 } |
748 |
780 |
749 /** |
781 /** |
750 * Tracks multiple user level registrations associated with one NIO |
782 * Tracks multiple user level registrations associated with one NIO |
770 this.selector = selector; |
802 this.selector = selector; |
771 } |
803 } |
772 |
804 |
773 void register(AsyncEvent e) throws ClosedChannelException { |
805 void register(AsyncEvent e) throws ClosedChannelException { |
774 int newOps = e.interestOps(); |
806 int newOps = e.interestOps(); |
|
807 // re register interest if we are not already interested |
|
808 // in the event. If the event is paused, then the pause will |
|
809 // be taken into account later when resetInterestOps is called. |
775 boolean reRegister = (interestOps & newOps) != newOps; |
810 boolean reRegister = (interestOps & newOps) != newOps; |
776 interestOps |= newOps; |
811 interestOps |= newOps; |
777 pending.add(e); |
812 pending.add(e); |
778 if (reRegister) { |
813 if (reRegister) { |
779 // first time registration happens here also |
814 // first time registration happens here also |
780 try { |
815 try { |
781 chan.register(selector, interestOps, this); |
816 chan.register(selector, interestOps, this); |
782 } catch (CancelledKeyException x) { |
817 } catch (Throwable x) { |
783 abortPending(x); |
818 abortPending(x); |
784 } |
819 } |
|
820 } else if (!chan.isOpen()) { |
|
821 abortPending(new ClosedChannelException()); |
785 } |
822 } |
786 } |
823 } |
787 |
824 |
788 /** |
825 /** |
789 * Returns a Stream<AsyncEvents> containing only events that are |
826 * Returns a Stream<AsyncEvents> containing only events that are |
816 } |
853 } |
817 } |
854 } |
818 |
855 |
819 this.interestOps = newOps; |
856 this.interestOps = newOps; |
820 SelectionKey key = chan.keyFor(selector); |
857 SelectionKey key = chan.keyFor(selector); |
821 if (newOps == 0 && pending.isEmpty()) { |
858 if (newOps == 0 && key != null && pending.isEmpty()) { |
822 key.cancel(); |
859 key.cancel(); |
823 } else { |
860 } else { |
824 try { |
861 try { |
|
862 if (key == null || !key.isValid()) { |
|
863 throw new CancelledKeyException(); |
|
864 } |
825 key.interestOps(newOps); |
865 key.interestOps(newOps); |
|
866 // double check after |
|
867 if (!chan.isOpen()) { |
|
868 abortPending(new ClosedChannelException()); |
|
869 return; |
|
870 } |
|
871 assert key.interestOps() == newOps; |
826 } catch (CancelledKeyException x) { |
872 } catch (CancelledKeyException x) { |
827 // channel may have been closed |
873 // channel may have been closed |
828 debug.log(Level.DEBUG, "key cancelled for " + chan); |
874 debug.log(Level.DEBUG, "key cancelled for " + chan); |
829 abortPending(x); |
875 abortPending(x); |
830 } |
876 } |