--- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Apr 13 15:06:50 2018 +0100
+++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Apr 13 15:33:13 2018 +0100
@@ -60,7 +60,7 @@
private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask());
- private final MessageQueue queue = new MessageQueue();
+ private final MessageQueue queue;
private final MessageEncoder encoder = new MessageEncoder();
/* A reusable buffer for writing, initially with no remaining bytes */
private final ByteBuffer dst = createWriteBuffer().position(0).limit(0);
@@ -82,7 +82,10 @@
private volatile ChannelState readState = UNREGISTERED;
private boolean inputClosed;
private boolean outputClosed;
- public TransportImpl(MessageStreamConsumer consumer, RawChannel channel) {
+
+ public TransportImpl(MessageQueue queue, MessageStreamConsumer consumer,
+ RawChannel channel) {
+ this.queue = queue;
this.messageConsumer = consumer;
this.channel = channel;
this.decoder = new MessageDecoder(this.messageConsumer);
@@ -145,6 +148,7 @@
queue.addText(text, isLast, attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
@@ -169,6 +173,7 @@
queue.addBinary(message, isLast, attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
@@ -192,6 +197,7 @@
queue.addPing(message, attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
@@ -215,6 +221,7 @@
queue.addPong(message, attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
@@ -238,6 +245,7 @@
queue.addPong(message, attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {
@@ -262,6 +270,7 @@
queue.addClose(statusCode, CharBuffer.wrap(reason), attachment, action, f);
sendScheduler.runOrSchedule();
} catch (IOException e) {
+ action.accept(null, e);
f.completeExceptionally(e);
}
if (debug.isLoggable(Level.DEBUG)) {