src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
branchhttp-client-branch
changeset 56427 7f1916397463
parent 56389 0ba90c4f1e3f
child 56437 f8b3f053cfbb
--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java	Fri Apr 13 15:33:13 2018 +0100
@@ -60,7 +60,7 @@
 
     private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
 
-    private final MessageQueue queue = new MessageQueue();
+    private final MessageQueue queue;
     private final MessageEncoder encoder = new MessageEncoder();
     /* A reusable buffer for writing, initially with no remaining bytes */
     private final ByteBuffer dst = createWriteBuffer().position(0).limit(0);
@@ -82,7 +82,10 @@
     private volatile ChannelState readState = UNREGISTERED;
     private boolean inputClosed;
     private boolean outputClosed;
-    public TransportImpl(MessageStreamConsumer consumer, RawChannel channel) {
+
+    public TransportImpl(MessageQueue queue, MessageStreamConsumer consumer,
+                         RawChannel channel) {
+        this.queue = queue;
         this.messageConsumer = consumer;
         this.channel = channel;
         this.decoder = new MessageDecoder(this.messageConsumer);
@@ -145,6 +148,7 @@
             queue.addText(text, isLast, attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
@@ -169,6 +173,7 @@
             queue.addBinary(message, isLast, attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
@@ -192,6 +197,7 @@
             queue.addPing(message, attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
@@ -215,6 +221,7 @@
             queue.addPong(message, attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
@@ -238,6 +245,7 @@
             queue.addPong(message, attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {
@@ -262,6 +270,7 @@
             queue.addClose(statusCode, CharBuffer.wrap(reason), attachment, action, f);
             sendScheduler.runOrSchedule();
         } catch (IOException e) {
+            action.accept(null, e);
             f.completeExceptionally(e);
         }
         if (debug.isLoggable(Level.DEBUG)) {