src/java.net.http/share/classes/jdk/internal/net/http/websocket/TransportImpl.java
branchhttp-client-branch
changeset 56306 88c1f551d45e
parent 56304 065641767a75
child 56314 f92e7a8a189f
equal deleted inserted replaced
56305:9027d1747dd0 56306:88c1f551d45e
   101         // TODO (optimization?): allocateDirect if SSL?
   101         // TODO (optimization?): allocateDirect if SSL?
   102         return ByteBuffer.allocate(capacity);
   102         return ByteBuffer.allocate(capacity);
   103     }
   103     }
   104 
   104 
   105     private boolean write() throws IOException {
   105     private boolean write() throws IOException {
   106         debug.log(Level.DEBUG, "writing to the channel%n");
   106         debug.log(Level.DEBUG, "writing to the channel");
   107         long count = channel.write(dstArray, 0, dstArray.length);
   107         long count = channel.write(dstArray, 0, dstArray.length);
   108         debug.log(Level.DEBUG, "%s bytes written%n", count);
   108         debug.log(Level.DEBUG, "%s bytes written", count);
   109         for (ByteBuffer b : dstArray) {
   109         for (ByteBuffer b : dstArray) {
   110             if (b.hasRemaining()) {
   110             if (b.hasRemaining()) {
   111                 return false;
   111                 return false;
   112             }
   112             }
   113         }
   113         }
   120                                              T attachment,
   120                                              T attachment,
   121                                              BiConsumer<? super T, ? super Throwable> action) {
   121                                              BiConsumer<? super T, ? super Throwable> action) {
   122         long id = 0;
   122         long id = 0;
   123         if (debug.isLoggable(Level.DEBUG)) {
   123         if (debug.isLoggable(Level.DEBUG)) {
   124             id = counter.incrementAndGet();
   124             id = counter.incrementAndGet();
   125             debug.log(Level.DEBUG, "enter send text %s message.length()=%s last=%s%n",
   125             debug.log(Level.DEBUG, "enter send text %s message.length()=%s last=%s",
   126                               id, message.length(), isLast);
   126                               id, message.length(), isLast);
   127         }
   127         }
   128         // TODO (optimization?):
   128         // TODO (optimization?):
   129         // These sendXXX methods might be a good place to decide whether or not
   129         // These sendXXX methods might be a good place to decide whether or not
   130         // we can write straight ahead, possibly returning null instead of
   130         // we can write straight ahead, possibly returning null instead of
   139             queue.addText(text, isLast, attachment, action, f);
   139             queue.addText(text, isLast, attachment, action, f);
   140             sendScheduler.runOrSchedule();
   140             sendScheduler.runOrSchedule();
   141         } catch (IOException e) {
   141         } catch (IOException e) {
   142             f.completeExceptionally(e);
   142             f.completeExceptionally(e);
   143         }
   143         }
   144         debug.log(Level.DEBUG, "exit send text %s returned %s%n", id, f);
   144         debug.log(Level.DEBUG, "exit send text %s returned %s", id, f);
   145         return f;
   145         return f;
   146     }
   146     }
   147 
   147 
   148     @Override
   148     @Override
   149     public <T> CompletableFuture<T> sendBinary(ByteBuffer message,
   149     public <T> CompletableFuture<T> sendBinary(ByteBuffer message,
   151                                                T attachment,
   151                                                T attachment,
   152                                                BiConsumer<? super T, ? super Throwable> action) {
   152                                                BiConsumer<? super T, ? super Throwable> action) {
   153         long id = 0;
   153         long id = 0;
   154         if (debug.isLoggable(Level.DEBUG)) {
   154         if (debug.isLoggable(Level.DEBUG)) {
   155             id = counter.incrementAndGet();
   155             id = counter.incrementAndGet();
   156             debug.log(Level.DEBUG, "enter send binary %s message.remaining()=%s last=%s%n",
   156             debug.log(Level.DEBUG, "enter send binary %s message.remaining()=%s last=%s",
   157                               id, message.remaining(), isLast);
   157                               id, message.remaining(), isLast);
   158         }
   158         }
   159         MinimalFuture<T> f = new MinimalFuture<>();
   159         MinimalFuture<T> f = new MinimalFuture<>();
   160         try {
   160         try {
   161             queue.addBinary(message, isLast, attachment, action, f);
   161             queue.addBinary(message, isLast, attachment, action, f);
   162             sendScheduler.runOrSchedule();
   162             sendScheduler.runOrSchedule();
   163         } catch (IOException e) {
   163         } catch (IOException e) {
   164             f.completeExceptionally(e);
   164             f.completeExceptionally(e);
   165         }
   165         }
   166         debug.log(Level.DEBUG, "exit send binary %s returned %s%n", id, f);
   166         debug.log(Level.DEBUG, "exit send binary %s returned %s", id, f);
   167         return f;
   167         return f;
   168     }
   168     }
   169 
   169 
   170     @Override
   170     @Override
   171     public <T> CompletableFuture<T> sendPing(ByteBuffer message,
   171     public <T> CompletableFuture<T> sendPing(ByteBuffer message,
   172                                              T attachment,
   172                                              T attachment,
   173                                              BiConsumer<? super T, ? super Throwable> action) {
   173                                              BiConsumer<? super T, ? super Throwable> action) {
   174         long id = 0;
   174         long id = 0;
   175         if (debug.isLoggable(Level.DEBUG)) {
   175         if (debug.isLoggable(Level.DEBUG)) {
   176             id = counter.incrementAndGet();
   176             id = counter.incrementAndGet();
   177             debug.log(Level.DEBUG, "enter send ping %s message.remaining()=%s%n",
   177             debug.log(Level.DEBUG, "enter send ping %s message.remaining()=%s",
   178                               id, message.remaining());
   178                               id, message.remaining());
   179         }
   179         }
   180         MinimalFuture<T> f = new MinimalFuture<>();
   180         MinimalFuture<T> f = new MinimalFuture<>();
   181         try {
   181         try {
   182             queue.addPing(message, attachment, action, f);
   182             queue.addPing(message, attachment, action, f);
   183             sendScheduler.runOrSchedule();
   183             sendScheduler.runOrSchedule();
   184         } catch (IOException e) {
   184         } catch (IOException e) {
   185             f.completeExceptionally(e);
   185             f.completeExceptionally(e);
   186         }
   186         }
   187         debug.log(Level.DEBUG, "exit send ping %s returned %s%n", id, f);
   187         debug.log(Level.DEBUG, "exit send ping %s returned %s", id, f);
   188         return f;
   188         return f;
   189     }
   189     }
   190 
   190 
   191     @Override
   191     @Override
   192     public <T> CompletableFuture<T> sendPong(ByteBuffer message,
   192     public <T> CompletableFuture<T> sendPong(ByteBuffer message,
   193                                              T attachment,
   193                                              T attachment,
   194                                              BiConsumer<? super T, ? super Throwable> action) {
   194                                              BiConsumer<? super T, ? super Throwable> action) {
   195         long id = 0;
   195         long id = 0;
   196         if (debug.isLoggable(Level.DEBUG)) {
   196         if (debug.isLoggable(Level.DEBUG)) {
   197             id = counter.incrementAndGet();
   197             id = counter.incrementAndGet();
   198             debug.log(Level.DEBUG, "enter send pong %s message.remaining()=%s%n",
   198             debug.log(Level.DEBUG, "enter send pong %s message.remaining()=%s",
   199                               id, message.remaining());
   199                               id, message.remaining());
   200         }
   200         }
   201         MinimalFuture<T> f = new MinimalFuture<>();
   201         MinimalFuture<T> f = new MinimalFuture<>();
   202         try {
   202         try {
   203             queue.addPong(message, attachment, action, f);
   203             queue.addPong(message, attachment, action, f);
   204             sendScheduler.runOrSchedule();
   204             sendScheduler.runOrSchedule();
   205         } catch (IOException e) {
   205         } catch (IOException e) {
   206             f.completeExceptionally(e);
   206             f.completeExceptionally(e);
   207         }
   207         }
   208         debug.log(Level.DEBUG, "exit send pong %s returned %s%n", id, f);
   208         debug.log(Level.DEBUG, "exit send pong %s returned %s", id, f);
   209         return f;
   209         return f;
   210     }
   210     }
   211 
   211 
   212     @Override
   212     @Override
   213     public <T> CompletableFuture<T> sendPong(Supplier<? extends ByteBuffer> message,
   213     public <T> CompletableFuture<T> sendPong(Supplier<? extends ByteBuffer> message,
   214                                              T attachment,
   214                                              T attachment,
   215                                              BiConsumer<? super T, ? super Throwable> action) {
   215                                              BiConsumer<? super T, ? super Throwable> action) {
   216         long id = 0;
   216         long id = 0;
   217         if (debug.isLoggable(Level.DEBUG)) {
   217         if (debug.isLoggable(Level.DEBUG)) {
   218             id = counter.incrementAndGet();
   218             id = counter.incrementAndGet();
   219             debug.log(Level.DEBUG, "enter send pong %s supplier=%s%n",
   219             debug.log(Level.DEBUG, "enter send pong %s supplier=%s",
   220                       id, message);
   220                       id, message);
   221         }
   221         }
   222         MinimalFuture<T> f = new MinimalFuture<>();
   222         MinimalFuture<T> f = new MinimalFuture<>();
   223         try {
   223         try {
   224             queue.addPong(message, attachment, action, f);
   224             queue.addPong(message, attachment, action, f);
   225             sendScheduler.runOrSchedule();
   225             sendScheduler.runOrSchedule();
   226         } catch (IOException e) {
   226         } catch (IOException e) {
   227             f.completeExceptionally(e);
   227             f.completeExceptionally(e);
   228         }
   228         }
   229         debug.log(Level.DEBUG, "exit send pong %s returned %s%n", id, f);
   229         debug.log(Level.DEBUG, "exit send pong %s returned %s", id, f);
   230         return f;
   230         return f;
   231     }
   231     }
   232 
   232 
   233     @Override
   233     @Override
   234     public <T> CompletableFuture<T> sendClose(int statusCode,
   234     public <T> CompletableFuture<T> sendClose(int statusCode,
   512                         // All done, remove and complete
   512                         // All done, remove and complete
   513                         encoder.reset();
   513                         encoder.reset();
   514                         removeAndComplete(null);
   514                         removeAndComplete(null);
   515                     }
   515                     }
   516                 } catch (Throwable t) {
   516                 } catch (Throwable t) {
   517                     debug.log(Level.DEBUG, "send task exception %s", (Object)t);
   517                     debug.log(Level.DEBUG, "send task exception %s", (Object) t);
   518                     // buffer cleanup: if there is an exception, the buffer
   518                     // buffer cleanup: if there is an exception, the buffer
   519                     // should appear empty for the next write as there is
   519                     // should appear empty for the next write as there is
   520                     // nothing to write
   520                     // nothing to write
   521                     dst.position(dst.limit());
   521                     dst.position(dst.limit());
   522                     encoder.reset();
   522                     encoder.reset();
   563         }
   563         }
   564 
   564 
   565         @SuppressWarnings("unchecked")
   565         @SuppressWarnings("unchecked")
   566         private void removeAndComplete(Throwable error) {
   566         private void removeAndComplete(Throwable error) {
   567             debug.log(Level.DEBUG, "removeAndComplete error=%s",
   567             debug.log(Level.DEBUG, "removeAndComplete error=%s",
   568                     (Object)error);
   568                     (Object) error);
   569             queue.remove();
   569             queue.remove();
   570             if (error != null) {
   570             if (error != null) {
   571                 try {
   571                 try {
   572                     action.accept(null, error);
   572                     action.accept(null, error);
   573                 } finally {
   573                 } finally {