src/java.net.http/share/classes/jdk/internal/net/http/RawChannelTube.java
branchhttp-client-branch
changeset 56326 63422db47911
child 56423 ba64c30666cc
equal deleted inserted replaced
56325:195d2970d981 56326:63422db47911
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.internal.net.http;
       
    27 
       
    28 import jdk.internal.net.http.common.Demand;
       
    29 import jdk.internal.net.http.common.FlowTube;
       
    30 import jdk.internal.net.http.common.Utils;
       
    31 import jdk.internal.net.http.websocket.RawChannel;
       
    32 
       
    33 import java.io.EOFException;
       
    34 import java.io.IOException;
       
    35 import java.lang.ref.Cleaner;
       
    36 import java.nio.ByteBuffer;
       
    37 import java.nio.channels.ClosedChannelException;
       
    38 import java.nio.channels.SelectionKey;
       
    39 import java.util.ArrayList;
       
    40 import java.util.List;
       
    41 import java.util.concurrent.ConcurrentLinkedQueue;
       
    42 import java.util.concurrent.Flow;
       
    43 import java.util.concurrent.atomic.AtomicBoolean;
       
    44 import java.util.concurrent.atomic.AtomicReference;
       
    45 import java.util.function.Supplier;
       
    46 import java.lang.System.Logger.Level;
       
    47 
       
    48 /*
       
    49  * I/O abstraction used to implement WebSocket.
       
    50  *
       
    51  */
       
    52 public class RawChannelTube implements RawChannel {
       
    53 
       
    54     final HttpConnection connection;
       
    55     final FlowTube tube;
       
    56     final WritePublisher writePublisher;
       
    57     final ReadSubscriber readSubscriber;
       
    58     final Supplier<ByteBuffer> initial;
       
    59     final AtomicBoolean inited = new AtomicBoolean();
       
    60     final AtomicBoolean outputClosed = new AtomicBoolean();
       
    61     final AtomicBoolean inputClosed = new AtomicBoolean();
       
    62     final AtomicBoolean closed = new AtomicBoolean();
       
    63     final String dbgTag;
       
    64     final System.Logger debug;
       
    65     private static final Cleaner cleaner =
       
    66             Utils.ASSERTIONSENABLED  && Utils.DEBUG_WS ? Cleaner.create() : null;
       
    67 
       
    68     RawChannelTube(HttpConnection connection,
       
    69                    Supplier<ByteBuffer> initial) {
       
    70         this.connection = connection;
       
    71         this.tube = connection.getConnectionFlow();
       
    72         this.initial = initial;
       
    73         this.writePublisher = new WritePublisher();
       
    74         this.readSubscriber = new ReadSubscriber();
       
    75         dbgTag = "[WebSocket] RawChannelTube(" + tube.toString() +")";
       
    76         debug = Utils.getWebSocketLogger(dbgTag::toString, Utils.DEBUG_WS);
       
    77         connection.client().reference();
       
    78         connectFlows();
       
    79         if (Utils.ASSERTIONSENABLED && Utils.DEBUG_WS) {
       
    80             // this is just for debug...
       
    81             cleaner.register(this, new CleanupChecker(closed, debug));
       
    82         }
       
    83     }
       
    84 
       
    85     // Make sure no back reference to RawChannelTube can exist
       
    86     // from this class. In particular it would be dangerous
       
    87     // to reference connection, since connection has a reference
       
    88     // to SocketTube with which a RawChannelTube is registered.
       
    89     // Ditto for HttpClientImpl, which might have a back reference
       
    90     // to the connection.
       
    91     static final class CleanupChecker implements Runnable {
       
    92         final AtomicBoolean closed;
       
    93         final System.Logger debug;
       
    94         CleanupChecker(AtomicBoolean closed, System.Logger debug) {
       
    95             this.closed = closed;
       
    96             this.debug = debug;
       
    97         }
       
    98 
       
    99         @Override
       
   100         public void run() {
       
   101             if (!closed.get()) {
       
   102                 debug.log(Level.DEBUG,
       
   103                          "RawChannelTube was not closed before being released");
       
   104             }
       
   105         }
       
   106     }
       
   107 
       
   108     private void connectFlows() {
       
   109         debug.log(Level.DEBUG, "connectFlows");
       
   110         tube.connectFlows(writePublisher, readSubscriber);
       
   111     }
       
   112 
       
   113     class WriteSubscription implements Flow.Subscription {
       
   114         final Flow.Subscriber<? super List<ByteBuffer>> subscriber;
       
   115         final Demand demand = new Demand();
       
   116         volatile boolean cancelled;
       
   117         WriteSubscription(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
       
   118             this.subscriber = subscriber;
       
   119         }
       
   120         @Override
       
   121         public void request(long n) {
       
   122             debug.log(Level.DEBUG, "WriteSubscription::request %d", n);
       
   123             demand.increase(n);
       
   124             RawEvent event;
       
   125             while ((event = writePublisher.events.poll()) != null) {
       
   126                 debug.log(Level.DEBUG, "WriteSubscriber: handling event");
       
   127                 event.handle();
       
   128                 if (demand.isFulfilled()) break;
       
   129             }
       
   130         }
       
   131         @Override
       
   132         public void cancel() {
       
   133             cancelled = true;
       
   134             debug.log(Level.DEBUG, "WriteSubscription::cancel");
       
   135             shutdownOutput();
       
   136             RawEvent event;
       
   137             while ((event = writePublisher.events.poll()) != null) {
       
   138                 debug.log(Level.DEBUG, "WriteSubscriber: handling event");
       
   139                 event.handle();
       
   140             }
       
   141         }
       
   142     }
       
   143 
       
   144     class WritePublisher implements FlowTube.TubePublisher {
       
   145         final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
       
   146         volatile WriteSubscription writeSubscription;
       
   147         @Override
       
   148         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
       
   149             debug.log(Level.DEBUG, "WritePublisher::subscribe");
       
   150             WriteSubscription subscription = new WriteSubscription(subscriber);
       
   151             subscriber.onSubscribe(subscription);
       
   152             writeSubscription = subscription;
       
   153         }
       
   154     }
       
   155 
       
   156     class ReadSubscriber implements  FlowTube.TubeSubscriber {
       
   157 
       
   158         volatile Flow.Subscription readSubscription;
       
   159         volatile boolean completed;
       
   160         long initialRequest;
       
   161         final ConcurrentLinkedQueue<RawEvent> events = new ConcurrentLinkedQueue<>();
       
   162         final ConcurrentLinkedQueue<ByteBuffer> buffers = new ConcurrentLinkedQueue<>();
       
   163         final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
   164 
       
   165         void checkEvents() {
       
   166             Flow.Subscription subscription = readSubscription;
       
   167             if (subscription != null) {
       
   168                 Throwable error = errorRef.get();
       
   169                 while (!buffers.isEmpty() || error != null || closed.get() || completed) {
       
   170                     RawEvent event = events.poll();
       
   171                     if (event == null) break;
       
   172                     debug.log(Level.DEBUG, "ReadSubscriber: handling event");
       
   173                     event.handle();
       
   174                 }
       
   175             }
       
   176         }
       
   177 
       
   178         @Override
       
   179         public void onSubscribe(Flow.Subscription subscription) {
       
   180             //buffers.add(initial.get());
       
   181             long n;
       
   182             synchronized (this) {
       
   183                 readSubscription = subscription;
       
   184                 n = initialRequest;
       
   185                 initialRequest = 0;
       
   186             }
       
   187             debug.log(Level.DEBUG, "ReadSubscriber::onSubscribe");
       
   188             if (n > 0) {
       
   189                 Throwable error = errorRef.get();
       
   190                 if (error == null && !closed.get() && !completed) {
       
   191                     debug.log(Level.DEBUG, "readSubscription: requesting " + n);
       
   192                     subscription.request(n);
       
   193                 }
       
   194             }
       
   195             checkEvents();
       
   196         }
       
   197 
       
   198         @Override
       
   199         public void onNext(List<ByteBuffer> item) {
       
   200             debug.log(Level.DEBUG, () -> "ReadSubscriber::onNext "
       
   201                     + Utils.remaining(item) + " bytes");
       
   202             buffers.addAll(item);
       
   203             checkEvents();
       
   204         }
       
   205 
       
   206         @Override
       
   207         public void onError(Throwable throwable) {
       
   208             if (closed.get() || errorRef.compareAndSet(null, throwable)) {
       
   209                 debug.log(Level.DEBUG, "ReadSubscriber::onError", throwable);
       
   210                 if (buffers.isEmpty()) {
       
   211                     checkEvents();
       
   212                     shutdownInput();
       
   213                 }
       
   214             }
       
   215         }
       
   216 
       
   217         @Override
       
   218         public void onComplete() {
       
   219             debug.log(Level.DEBUG, "ReadSubscriber::onComplete");
       
   220             completed = true;
       
   221             if (buffers.isEmpty()) {
       
   222                 checkEvents();
       
   223                 shutdownInput();
       
   224             }
       
   225         }
       
   226     }
       
   227 
       
   228 
       
   229     /*
       
   230      * Registers given event whose callback will be called once only (i.e.
       
   231      * register new event for each callback).
       
   232      *
       
   233      * Memory consistency effects: actions in a thread calling registerEvent
       
   234      * happen-before any subsequent actions in the thread calling event.handle
       
   235      */
       
   236     public void registerEvent(RawEvent event) throws IOException {
       
   237         int interestOps = event.interestOps();
       
   238         if ((interestOps & SelectionKey.OP_WRITE) != 0) {
       
   239             debug.log(Level.DEBUG, "register write event");
       
   240             if (outputClosed.get()) throw new IOException("closed output");
       
   241             writePublisher.events.add(event);
       
   242             WriteSubscription writeSubscription = writePublisher.writeSubscription;
       
   243             if (writeSubscription != null) {
       
   244                 while (!writeSubscription.demand.isFulfilled()) {
       
   245                     event = writePublisher.events.poll();
       
   246                     if (event == null) break;
       
   247                     event.handle();
       
   248                 }
       
   249             }
       
   250         }
       
   251         if ((interestOps & SelectionKey.OP_READ) != 0) {
       
   252             debug.log(Level.DEBUG, "register read event");
       
   253             if (inputClosed.get()) throw new IOException("closed input");
       
   254             readSubscriber.events.add(event);
       
   255             readSubscriber.checkEvents();
       
   256             if (readSubscriber.buffers.isEmpty()
       
   257                     && !readSubscriber.events.isEmpty()) {
       
   258                 Flow.Subscription readSubscription =
       
   259                         readSubscriber.readSubscription;
       
   260                 if (readSubscription == null) {
       
   261                     synchronized (readSubscriber) {
       
   262                         readSubscription = readSubscriber.readSubscription;
       
   263                         if (readSubscription == null) {
       
   264                             readSubscriber.initialRequest = 1;
       
   265                             return;
       
   266                         }
       
   267                     }
       
   268                 }
       
   269                 assert  readSubscription != null;
       
   270                 debug.log(Level.DEBUG, "readSubscription: requesting 1");
       
   271                 readSubscription.request(1);
       
   272             }
       
   273         }
       
   274     }
       
   275 
       
   276     /**
       
   277      * Hands over the initial bytes. Once the bytes have been returned they are
       
   278      * no longer available and the method will throw an {@link
       
   279      * IllegalStateException} on each subsequent invocation.
       
   280      *
       
   281      * @return the initial bytes
       
   282      * @throws IllegalStateException
       
   283      *         if the method has been already invoked
       
   284      */
       
   285     public ByteBuffer initialByteBuffer() throws IllegalStateException {
       
   286         if (inited.compareAndSet(false, true)) {
       
   287             return initial.get();
       
   288         } else throw new IllegalStateException("initial buffer already drained");
       
   289     }
       
   290 
       
   291     /*
       
   292      * Returns a ByteBuffer with the data read or null if EOF is reached. Has no
       
   293      * remaining bytes if no data available at the moment.
       
   294      */
       
   295     public ByteBuffer read() throws IOException {
       
   296         debug.log(Level.DEBUG, "read");
       
   297         Flow.Subscription readSubscription = readSubscriber.readSubscription;
       
   298         if (readSubscription == null) return Utils.EMPTY_BYTEBUFFER;
       
   299         ByteBuffer buffer = readSubscriber.buffers.poll();
       
   300         if (buffer != null) {
       
   301             debug.log(Level.DEBUG, () -> "read: " + buffer.remaining());
       
   302             return buffer;
       
   303         }
       
   304         Throwable error = readSubscriber.errorRef.get();
       
   305         if (error != null) error = Utils.getIOException(error);
       
   306         if (error instanceof EOFException) {
       
   307             debug.log(Level.DEBUG, "read: EOFException");
       
   308             shutdownInput();
       
   309             return null;
       
   310         }
       
   311         if (error != null) {
       
   312             debug.log(Level.DEBUG, "read: " + error);
       
   313             if (closed.get()) {
       
   314                 return null;
       
   315             }
       
   316             shutdownInput();
       
   317             throw Utils.getIOException(error);
       
   318         }
       
   319         if (readSubscriber.completed) {
       
   320             debug.log(Level.DEBUG, "read: EOF");
       
   321             shutdownInput();
       
   322             return null;
       
   323         }
       
   324         if (inputClosed.get()) {
       
   325             debug.log(Level.DEBUG, "read: CLOSED");
       
   326             throw new IOException("closed output");
       
   327         }
       
   328         debug.log(Level.DEBUG, "read: nothing to read");
       
   329         return Utils.EMPTY_BYTEBUFFER;
       
   330     }
       
   331 
       
   332     /*
       
   333      * Writes a sequence of bytes to this channel from a subsequence of the
       
   334      * given buffers.
       
   335      */
       
   336     public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
       
   337         if (outputClosed.get()) {
       
   338             debug.log(Level.DEBUG, "write: CLOSED");
       
   339             throw new IOException("closed output");
       
   340         }
       
   341         WriteSubscription writeSubscription =  writePublisher.writeSubscription;
       
   342         if (writeSubscription == null) {
       
   343             debug.log(Level.DEBUG, "write: unsubscribed: 0");
       
   344             return 0;
       
   345         }
       
   346         if (writeSubscription.cancelled) {
       
   347             debug.log(Level.DEBUG, "write: CANCELLED");
       
   348             shutdownOutput();
       
   349             throw new IOException("closed output");
       
   350         }
       
   351         if (writeSubscription.demand.tryDecrement()) {
       
   352             List<ByteBuffer> buffers = copy(srcs, offset, length);
       
   353             long res = Utils.remaining(buffers);
       
   354             debug.log(Level.DEBUG, "write: writing %d", res);
       
   355             writeSubscription.subscriber.onNext(buffers);
       
   356             return res;
       
   357         } else {
       
   358             debug.log(Level.DEBUG, "write: no demand: 0");
       
   359             return 0;
       
   360         }
       
   361     }
       
   362 
       
   363     /**
       
   364      * Shutdown the connection for reading without closing the channel.
       
   365      *
       
   366      * <p> Once shutdown for reading then further reads on the channel will
       
   367      * return {@code null}, the end-of-stream indication. If the input side of
       
   368      * the connection is already shutdown then invoking this method has no
       
   369      * effect.
       
   370      *
       
   371      * @throws ClosedChannelException
       
   372      *         If this channel is closed
       
   373      * @throws IOException
       
   374      *         If some other I/O error occurs
       
   375      */
       
   376     public void shutdownInput() {
       
   377         if (inputClosed.compareAndSet(false, true)) {
       
   378             debug.log(Level.DEBUG, "shutdownInput");
       
   379             // TransportImpl will eventually call RawChannel::close.
       
   380             // We must not call it here as this would close the socket
       
   381             // and can cause an exception to back fire before
       
   382             // TransportImpl and WebSocketImpl have updated their state.
       
   383         }
       
   384     }
       
   385 
       
   386     /**
       
   387      * Shutdown the connection for writing without closing the channel.
       
   388      *
       
   389      * <p> Once shutdown for writing then further attempts to write to the
       
   390      * channel will throw {@link ClosedChannelException}. If the output side of
       
   391      * the connection is already shutdown then invoking this method has no
       
   392      * effect.
       
   393      *
       
   394      * @throws ClosedChannelException
       
   395      *         If this channel is closed
       
   396      * @throws IOException
       
   397      *         If some other I/O error occurs
       
   398      */
       
   399     public void shutdownOutput() {
       
   400         if (outputClosed.compareAndSet(false, true)) {
       
   401             debug.log(Level.DEBUG, "shutdownOutput");
       
   402             // TransportImpl will eventually call RawChannel::close.
       
   403             // We must not call it here as this would close the socket
       
   404             // and can cause an exception to back fire before
       
   405             // TransportImpl and WebSocketImpl have updated their state.
       
   406         }
       
   407     }
       
   408 
       
   409     /**
       
   410      * Closes this channel.
       
   411      *
       
   412      * @throws IOException
       
   413      *         If an I/O error occurs
       
   414      */
       
   415     @Override
       
   416     public void close() {
       
   417         if (closed.compareAndSet(false, true)) {
       
   418             debug.log(Level.DEBUG, "close");
       
   419             connection.client().unreference();
       
   420             connection.close();
       
   421         }
       
   422     }
       
   423 
       
   424     private static List<ByteBuffer> copy(ByteBuffer[] src, int offset, int len) {
       
   425         int count = Math.min(len, src.length - offset);
       
   426         if (count <= 0) return Utils.EMPTY_BB_LIST;
       
   427         if (count == 1) return List.of(Utils.copy(src[offset]));
       
   428         if (count == 2) return List.of(Utils.copy(src[offset]), Utils.copy(src[offset+1]));
       
   429         List<ByteBuffer> list = new ArrayList<>(count);
       
   430         for (int i = 0; i < count; i++) {
       
   431             list.add(Utils.copy(src[offset + i]));
       
   432         }
       
   433         return list;
       
   434     }
       
   435 }