diff -r f39b316f10c9 -r 7f1916397463 src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Apr 13 15:06:50 2018 +0100 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java Fri Apr 13 15:33:13 2018 +0100 @@ -60,7 +60,7 @@ private final SequentialScheduler sendScheduler = new SequentialScheduler(new SendTask()); - private final MessageQueue queue = new MessageQueue(); + private final MessageQueue queue; private final MessageEncoder encoder = new MessageEncoder(); /* A reusable buffer for writing, initially with no remaining bytes */ private final ByteBuffer dst = createWriteBuffer().position(0).limit(0); @@ -82,7 +82,10 @@ private volatile ChannelState readState = UNREGISTERED; private boolean inputClosed; private boolean outputClosed; - public TransportImpl(MessageStreamConsumer consumer, RawChannel channel) { + + public TransportImpl(MessageQueue queue, MessageStreamConsumer consumer, + RawChannel channel) { + this.queue = queue; this.messageConsumer = consumer; this.channel = channel; this.decoder = new MessageDecoder(this.messageConsumer); @@ -145,6 +148,7 @@ queue.addText(text, isLast, attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { @@ -169,6 +173,7 @@ queue.addBinary(message, isLast, attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { @@ -192,6 +197,7 @@ queue.addPing(message, attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { @@ -215,6 +221,7 @@ queue.addPong(message, attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { @@ -238,6 +245,7 @@ queue.addPong(message, attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) { @@ -262,6 +270,7 @@ queue.addClose(statusCode, CharBuffer.wrap(reason), attachment, action, f); sendScheduler.runOrSchedule(); } catch (IOException e) { + action.accept(null, e); f.completeExceptionally(e); } if (debug.isLoggable(Level.DEBUG)) {