equal
deleted
inserted
replaced
39 import java.util.function.Consumer; |
39 import java.util.function.Consumer; |
40 import java.util.function.Supplier; |
40 import java.util.function.Supplier; |
41 import jdk.internal.net.http.common.BufferSupplier; |
41 import jdk.internal.net.http.common.BufferSupplier; |
42 import jdk.internal.net.http.common.Demand; |
42 import jdk.internal.net.http.common.Demand; |
43 import jdk.internal.net.http.common.FlowTube; |
43 import jdk.internal.net.http.common.FlowTube; |
|
44 import jdk.internal.net.http.common.Log; |
44 import jdk.internal.net.http.common.Logger; |
45 import jdk.internal.net.http.common.Logger; |
45 import jdk.internal.net.http.common.SequentialScheduler; |
46 import jdk.internal.net.http.common.SequentialScheduler; |
46 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; |
47 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; |
47 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask; |
48 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask; |
48 import jdk.internal.net.http.common.Utils; |
49 import jdk.internal.net.http.common.Utils; |
147 // ======================================================================// |
148 // ======================================================================// |
148 |
149 |
149 void signalClosed() { |
150 void signalClosed() { |
150 // Ensures that the subscriber will be terminated and that future |
151 // Ensures that the subscriber will be terminated and that future |
151 // subscribers will be notified when the connection is closed. |
152 // subscribers will be notified when the connection is closed. |
|
153 if (Log.channel()) { |
|
154 Log.logChannel("Connection close signalled: connection closed locally ({0})", |
|
155 channelDescr()); |
|
156 } |
152 readPublisher.subscriptionImpl.signalError( |
157 readPublisher.subscriptionImpl.signalError( |
153 new IOException("connection closed locally")); |
158 new IOException("connection closed locally")); |
154 } |
159 } |
155 |
160 |
156 /** |
161 /** |
362 // Kick off the initial request:1 that will start the writing side. |
367 // Kick off the initial request:1 that will start the writing side. |
363 // Invoked in the selector manager thread. |
368 // Invoked in the selector manager thread. |
364 void startSubscription() { |
369 void startSubscription() { |
365 try { |
370 try { |
366 if (debug.on()) debug.log("write: starting subscription"); |
371 if (debug.on()) debug.log("write: starting subscription"); |
|
372 if (Log.channel()) { |
|
373 Log.logChannel("Start requesting bytes for writing to channel: {0}", |
|
374 channelDescr()); |
|
375 } |
367 assert client.isSelectorThread(); |
376 assert client.isSelectorThread(); |
368 // make sure read registrations are handled before; |
377 // make sure read registrations are handled before; |
369 readPublisher.subscriptionImpl.handlePending(); |
378 readPublisher.subscriptionImpl.handlePending(); |
370 if (debug.on()) debug.log("write: offloading requestMore"); |
379 if (debug.on()) debug.log("write: offloading requestMore"); |
371 // start writing; |
380 // start writing; |
407 tryFlushCurrent(true); |
416 tryFlushCurrent(true); |
408 } |
417 } |
409 |
418 |
410 void signalError(Throwable error) { |
419 void signalError(Throwable error) { |
411 debug.log(() -> "write error: " + error); |
420 debug.log(() -> "write error: " + error); |
|
421 if (Log.channel()) { |
|
422 Log.logChannel("Failed to write to channel ({0}: {1})", |
|
423 channelDescr(), error); |
|
424 } |
412 completed = true; |
425 completed = true; |
413 readPublisher.signalError(error); |
426 readPublisher.signalError(error); |
414 } |
427 } |
415 |
428 |
416 // A repeatable WriteEvent which is paused after firing and can |
429 // A repeatable WriteEvent which is paused after firing and can |
453 upstreamSubscription.request(n); |
466 upstreamSubscription.request(n); |
454 } |
467 } |
455 |
468 |
456 @Override |
469 @Override |
457 public void cancel() { |
470 public void cancel() { |
|
471 if (debug.on()) debug.log("write: cancel"); |
458 dropSubscription(); |
472 dropSubscription(); |
459 upstreamSubscription.cancel(); |
473 upstreamSubscription.cancel(); |
460 } |
474 } |
461 |
475 |
462 void dropSubscription() { |
476 void dropSubscription() { |
556 void signalError(Throwable error) { |
570 void signalError(Throwable error) { |
557 if (debug.on()) debug.log("error signalled " + error); |
571 if (debug.on()) debug.log("error signalled " + error); |
558 if (!errorRef.compareAndSet(null, error)) { |
572 if (!errorRef.compareAndSet(null, error)) { |
559 return; |
573 return; |
560 } |
574 } |
|
575 if (Log.channel()) { |
|
576 Log.logChannel("Error signalled on channel {0}: {1}", |
|
577 channelDescr(), error); |
|
578 } |
561 subscriptionImpl.handleError(); |
579 subscriptionImpl.handleError(); |
562 } |
580 } |
563 |
581 |
564 final class ReadSubscription implements Flow.Subscription { |
582 final class ReadSubscription implements Flow.Subscription { |
565 final InternalReadSubscription impl; |
583 final InternalReadSubscription impl; |
663 } |
681 } |
664 |
682 |
665 final void handleSubscribeEvent() { |
683 final void handleSubscribeEvent() { |
666 assert client.isSelectorThread(); |
684 assert client.isSelectorThread(); |
667 debug.log("subscribe event raised"); |
685 debug.log("subscribe event raised"); |
|
686 if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr()); |
668 readScheduler.runOrSchedule(); |
687 readScheduler.runOrSchedule(); |
669 if (readScheduler.isStopped() || completed) { |
688 if (readScheduler.isStopped() || completed) { |
670 // if already completed or stopped we can handle any |
689 // if already completed or stopped we can handle any |
671 // pending connection directly from here. |
690 // pending connection directly from here. |
672 if (debug.on()) |
691 if (debug.on()) |
700 } |
719 } |
701 |
720 |
702 @Override |
721 @Override |
703 public final void cancel() { |
722 public final void cancel() { |
704 pauseReadEvent(); |
723 pauseReadEvent(); |
|
724 if (Log.channel()) { |
|
725 Log.logChannel("Read subscription cancelled for channel {0}", |
|
726 channelDescr()); |
|
727 } |
705 readScheduler.stop(); |
728 readScheduler.stop(); |
706 } |
729 } |
707 |
730 |
708 private void resumeReadEvent() { |
731 private void resumeReadEvent() { |
709 if (debug.on()) debug.log("resuming read event"); |
732 if (debug.on()) debug.log("resuming read event"); |
724 final void signalError(Throwable error) { |
747 final void signalError(Throwable error) { |
725 if (!errorRef.compareAndSet(null, error)) { |
748 if (!errorRef.compareAndSet(null, error)) { |
726 return; |
749 return; |
727 } |
750 } |
728 if (debug.on()) debug.log("got read error: " + error); |
751 if (debug.on()) debug.log("got read error: " + error); |
|
752 if (Log.channel()) { |
|
753 Log.logChannel("Read error signalled on channel {0}: {1}", |
|
754 channelDescr(), error); |
|
755 } |
729 readScheduler.runOrSchedule(); |
756 readScheduler.runOrSchedule(); |
730 } |
757 } |
731 |
758 |
732 final void signalReadable() { |
759 final void signalReadable() { |
733 readScheduler.runOrSchedule(); |
760 readScheduler.runOrSchedule(); |
770 // safe to pause here because we're finished anyway. |
797 // safe to pause here because we're finished anyway. |
771 pauseReadEvent(); |
798 pauseReadEvent(); |
772 if (debug.on()) |
799 if (debug.on()) |
773 debug.log("Sending error " + error |
800 debug.log("Sending error " + error |
774 + " to subscriber " + subscriber); |
801 + " to subscriber " + subscriber); |
|
802 if (Log.channel()) { |
|
803 Log.logChannel("Raising error with subscriber for {0}: {1}", |
|
804 channelDescr(), error); |
|
805 } |
775 current.errorRef.compareAndSet(null, error); |
806 current.errorRef.compareAndSet(null, error); |
776 current.signalCompletion(); |
807 current.signalCompletion(); |
777 readScheduler.stop(); |
808 readScheduler.stop(); |
778 debugState("leaving read() loop with error: "); |
809 debugState("leaving read() loop with error: "); |
779 return; |
810 return; |
786 try { |
817 try { |
787 List<ByteBuffer> bytes = readAvailable(current.bufferSource); |
818 List<ByteBuffer> bytes = readAvailable(current.bufferSource); |
788 if (bytes == EOF) { |
819 if (bytes == EOF) { |
789 if (!completed) { |
820 if (!completed) { |
790 if (debug.on()) debug.log("got read EOF"); |
821 if (debug.on()) debug.log("got read EOF"); |
|
822 if (Log.channel()) { |
|
823 Log.logChannel("EOF read from channel: {0}", |
|
824 channelDescr()); |
|
825 } |
791 completed = true; |
826 completed = true; |
792 // safe to pause here because we're finished |
827 // safe to pause here because we're finished |
793 // anyway. |
828 // anyway. |
794 pauseReadEvent(); |
829 pauseReadEvent(); |
795 current.signalCompletion(); |
830 current.signalCompletion(); |
847 } |
882 } |
848 } catch (Throwable t) { |
883 } catch (Throwable t) { |
849 if (debug.on()) debug.log("Unexpected exception in read loop", t); |
884 if (debug.on()) debug.log("Unexpected exception in read loop", t); |
850 signalError(t); |
885 signalError(t); |
851 } finally { |
886 } finally { |
|
887 if (readScheduler.isStopped()) { |
|
888 if (debug.on()) debug.log("Read scheduler stopped"); |
|
889 if (Log.channel()) { |
|
890 Log.logChannel("Stopped reading from channel {0}", channelDescr()); |
|
891 } |
|
892 } |
852 handlePending(); |
893 handlePending(); |
853 } |
894 } |
854 } |
895 } |
855 |
896 |
856 boolean handlePending() { |
897 boolean handlePending() { |
1236 } |
1277 } |
1237 |
1278 |
1238 final String dbgString() { |
1279 final String dbgString() { |
1239 return "SocketTube("+id+")"; |
1280 return "SocketTube("+id+")"; |
1240 } |
1281 } |
|
1282 |
|
1283 final String channelDescr() { |
|
1284 return String.valueOf(channel); |
|
1285 } |
1241 } |
1286 } |