equal
deleted
inserted
replaced
39 import java.util.function.Consumer; |
39 import java.util.function.Consumer; |
40 import java.util.function.Supplier; |
40 import java.util.function.Supplier; |
41 import jdk.internal.net.http.common.BufferSupplier; |
41 import jdk.internal.net.http.common.BufferSupplier; |
42 import jdk.internal.net.http.common.Demand; |
42 import jdk.internal.net.http.common.Demand; |
43 import jdk.internal.net.http.common.FlowTube; |
43 import jdk.internal.net.http.common.FlowTube; |
|
44 import jdk.internal.net.http.common.Log; |
44 import jdk.internal.net.http.common.Logger; |
45 import jdk.internal.net.http.common.Logger; |
45 import jdk.internal.net.http.common.SequentialScheduler; |
46 import jdk.internal.net.http.common.SequentialScheduler; |
46 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; |
47 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter; |
47 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask; |
48 import jdk.internal.net.http.common.SequentialScheduler.RestartableTask; |
48 import jdk.internal.net.http.common.Utils; |
49 import jdk.internal.net.http.common.Utils; |
147 // ======================================================================// |
148 // ======================================================================// |
148 |
149 |
149 void signalClosed() { |
150 void signalClosed() { |
150 // Ensures that the subscriber will be terminated and that future |
151 // Ensures that the subscriber will be terminated and that future |
151 // subscribers will be notified when the connection is closed. |
152 // subscribers will be notified when the connection is closed. |
|
153 if (Log.channel()) { |
|
154 Log.logChannel("Connection close signalled: connection closed locally ({0})", |
|
155 channelDescr()); |
|
156 } |
152 readPublisher.subscriptionImpl.signalError( |
157 readPublisher.subscriptionImpl.signalError( |
153 new IOException("connection closed locally")); |
158 new IOException("connection closed locally")); |
154 } |
159 } |
155 |
160 |
156 /** |
161 /** |
362 // Kick off the initial request:1 that will start the writing side. |
367 // Kick off the initial request:1 that will start the writing side. |
363 // Invoked in the selector manager thread. |
368 // Invoked in the selector manager thread. |
364 void startSubscription() { |
369 void startSubscription() { |
365 try { |
370 try { |
366 if (debug.on()) debug.log("write: starting subscription"); |
371 if (debug.on()) debug.log("write: starting subscription"); |
|
372 if (Log.channel()) { |
|
373 Log.logChannel("Start requesting bytes for writing to channel: {0}", |
|
374 channelDescr()); |
|
375 } |
367 assert client.isSelectorThread(); |
376 assert client.isSelectorThread(); |
368 // make sure read registrations are handled before; |
377 // make sure read registrations are handled before; |
369 readPublisher.subscriptionImpl.handlePending(); |
378 readPublisher.subscriptionImpl.handlePending(); |
370 if (debug.on()) debug.log("write: offloading requestMore"); |
379 if (debug.on()) debug.log("write: offloading requestMore"); |
371 // start writing; |
380 // start writing; |
407 tryFlushCurrent(true); |
416 tryFlushCurrent(true); |
408 } |
417 } |
409 |
418 |
410 void signalError(Throwable error) { |
419 void signalError(Throwable error) { |
411 debug.log(() -> "write error: " + error); |
420 debug.log(() -> "write error: " + error); |
|
421 if (Log.channel()) { |
|
422 Log.logChannel("Failed to write on channel ({0}: {1})", |
|
423 channelDescr(), error); |
|
424 } |
412 completed = true; |
425 completed = true; |
413 readPublisher.signalError(error); |
426 readPublisher.signalError(error); |
414 } |
427 } |
415 |
428 |
416 // A repeatable WriteEvent which is paused after firing and can |
429 // A repeatable WriteEvent which is paused after firing and can |
557 void signalError(Throwable error) { |
570 void signalError(Throwable error) { |
558 if (debug.on()) debug.log("error signalled " + error); |
571 if (debug.on()) debug.log("error signalled " + error); |
559 if (!errorRef.compareAndSet(null, error)) { |
572 if (!errorRef.compareAndSet(null, error)) { |
560 return; |
573 return; |
561 } |
574 } |
|
575 if (Log.channel()) { |
|
576 Log.logChannel("Error signalled on channel {0}: {1}", |
|
577 channelDescr(), error); |
|
578 } |
562 subscriptionImpl.handleError(); |
579 subscriptionImpl.handleError(); |
563 } |
580 } |
564 |
581 |
565 final class ReadSubscription implements Flow.Subscription { |
582 final class ReadSubscription implements Flow.Subscription { |
566 final InternalReadSubscription impl; |
583 final InternalReadSubscription impl; |
664 } |
681 } |
665 |
682 |
666 final void handleSubscribeEvent() { |
683 final void handleSubscribeEvent() { |
667 assert client.isSelectorThread(); |
684 assert client.isSelectorThread(); |
668 debug.log("subscribe event raised"); |
685 debug.log("subscribe event raised"); |
|
686 if (Log.channel()) Log.logChannel("Start reading from {0}", channelDescr()); |
669 readScheduler.runOrSchedule(); |
687 readScheduler.runOrSchedule(); |
670 if (readScheduler.isStopped() || completed) { |
688 if (readScheduler.isStopped() || completed) { |
671 // if already completed or stopped we can handle any |
689 // if already completed or stopped we can handle any |
672 // pending connection directly from here. |
690 // pending connection directly from here. |
673 if (debug.on()) |
691 if (debug.on()) |
787 try { |
805 try { |
788 List<ByteBuffer> bytes = readAvailable(current.bufferSource); |
806 List<ByteBuffer> bytes = readAvailable(current.bufferSource); |
789 if (bytes == EOF) { |
807 if (bytes == EOF) { |
790 if (!completed) { |
808 if (!completed) { |
791 if (debug.on()) debug.log("got read EOF"); |
809 if (debug.on()) debug.log("got read EOF"); |
|
810 if (Log.channel()) { |
|
811 Log.logChannel("EOF reached from channel: {0}", |
|
812 channelDescr()); |
|
813 } |
792 completed = true; |
814 completed = true; |
793 // safe to pause here because we're finished |
815 // safe to pause here because we're finished |
794 // anyway. |
816 // anyway. |
795 pauseReadEvent(); |
817 pauseReadEvent(); |
796 current.signalCompletion(); |
818 current.signalCompletion(); |
1237 } |
1259 } |
1238 |
1260 |
1239 final String dbgString() { |
1261 final String dbgString() { |
1240 return "SocketTube("+id+")"; |
1262 return "SocketTube("+id+")"; |
1241 } |
1263 } |
|
1264 |
|
1265 final String channelDescr() { |
|
1266 return String.valueOf(channel); |
|
1267 } |
1242 } |
1268 } |