src/java.net.http/share/classes/java/net/http/internal/common/FlowTube.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56091 aedd6133e7a0
child 56093 22d94c4a3641
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
     1 /*
       
     2  * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package java.net.http.internal.common;
       
    27 
       
    28 import java.nio.ByteBuffer;
       
    29 import java.util.List;
       
    30 import java.util.concurrent.Flow;
       
    31 
       
    32 /**
       
    33  * FlowTube is an I/O abstraction that allows reading from and writing to a
       
    34  * destination asynchronously.
       
    35  * This is not a {@link Flow.Processor
       
    36  * Flow.Processor<List<ByteBuffer>, List<ByteBuffer>>},
       
    37  * but rather models a publisher source and a subscriber sink in a bidirectional flow.
       
    38  * <p>
       
    39  * The {@code connectFlows} method should be called to connect the bidirectional
       
    40  * flow. A FlowTube supports handing over the same read subscription to different
       
    41  * sequential read subscribers over time. When {@code connectFlows(writePublisher,
       
    42  * readSubscriber} is called, the FlowTube will call {@code dropSubscription} on
       
    43  * its former readSubscriber, and {@code onSubscribe} on its new readSubscriber.
       
    44  */
       
    45 public interface FlowTube extends
       
    46        Flow.Publisher<List<ByteBuffer>>,
       
    47        Flow.Subscriber<List<ByteBuffer>> {
       
    48 
       
    49     /**
       
    50      * A subscriber for reading from the bidirectional flow.
       
    51      * A TubeSubscriber is a {@code Flow.Subscriber} that can be canceled
       
    52      * by calling {@code dropSubscription()}.
       
    53      * Once {@code dropSubscription()} is called, the {@code TubeSubscriber}
       
    54      * should stop calling any method on its subscription.
       
    55      */
       
    56     static interface TubeSubscriber extends Flow.Subscriber<List<ByteBuffer>> {
       
    57 
       
    58         /**
       
    59          * Called when the flow is connected again, and the subscription
       
    60          * is handed over to a new subscriber.
       
    61          * Once {@code dropSubscription()} is called, the {@code TubeSubscriber}
       
    62          * should stop calling any method on its subscription.
       
    63          */
       
    64         default void dropSubscription() { }
       
    65 
       
    66     }
       
    67 
       
    68     /**
       
    69      * A publisher for writing to the bidirectional flow.
       
    70      */
       
    71     static interface TubePublisher extends Flow.Publisher<List<ByteBuffer>> {
       
    72 
       
    73     }
       
    74 
       
    75     /**
       
    76      * Connects the bidirectional flows to a write {@code Publisher} and a
       
    77      * read {@code Subscriber}. This method can be called sequentially
       
    78      * several times to switch existing publishers and subscribers to a new
       
    79      * pair of write subscriber and read publisher.
       
    80      * @param writePublisher A new publisher for writing to the bidirectional flow.
       
    81      * @param readSubscriber A new subscriber for reading from the bidirectional
       
    82      *                       flow.
       
    83      */
       
    84     default void connectFlows(TubePublisher writePublisher,
       
    85                               TubeSubscriber readSubscriber) {
       
    86 
       
    87         this.subscribe(readSubscriber);
       
    88         writePublisher.subscribe(this);
       
    89     }
       
    90 
       
    91     /**
       
    92      * Returns true if this flow was completed, either exceptionally
       
    93      * or normally (EOF reached).
       
    94      * @return true if the flow is finished
       
    95      */
       
    96     boolean isFinished();
       
    97 
       
    98 
       
    99     /**
       
   100      * Returns {@code s} if {@code s} is a {@code TubeSubscriber}, otherwise
       
   101      * wraps it in a {@code TubeSubscriber}.
       
   102      * Using the wrapper is only appropriate in the case where
       
   103      * {@code dropSubscription} doesn't need to be implemented, and the
       
   104      * {@code TubeSubscriber} is subscribed only once.
       
   105      *
       
   106      * @param s a subscriber for reading.
       
   107      * @return a {@code TubeSubscriber}: either {@code s} if {@code s} is a
       
   108      *           {@code TubeSubscriber}, otherwise a {@code TubeSubscriber}
       
   109      *           wrapper that delegates to {@code s}
       
   110      */
       
   111     static TubeSubscriber asTubeSubscriber(Flow.Subscriber<? super List<ByteBuffer>> s) {
       
   112         if (s instanceof TubeSubscriber) {
       
   113             return (TubeSubscriber) s;
       
   114         }
       
   115         return new AbstractTubeSubscriber.TubeSubscriberWrapper(s);
       
   116     }
       
   117 
       
   118     /**
       
   119      * Returns {@code s} if {@code s} is a {@code  TubePublisher}, otherwise
       
   120      * wraps it in a {@code  TubePublisher}.
       
   121      *
       
   122      * @param p a publisher for writing.
       
   123      * @return a {@code TubePublisher}: either {@code s} if {@code s} is a
       
   124      *           {@code  TubePublisher}, otherwise a {@code TubePublisher}
       
   125      *           wrapper that delegates to {@code s}
       
   126      */
       
   127     static TubePublisher asTubePublisher(Flow.Publisher<List<ByteBuffer>> p) {
       
   128         if (p instanceof TubePublisher) {
       
   129             return (TubePublisher) p;
       
   130         }
       
   131         return new AbstractTubePublisher.TubePublisherWrapper(p);
       
   132     }
       
   133 
       
   134     /**
       
   135      * Convenience abstract class for {@code TubePublisher} implementations.
       
   136      * It is not required that a {@code TubePublisher} implementation extends
       
   137      * this class.
       
   138      */
       
   139     static abstract class AbstractTubePublisher implements TubePublisher {
       
   140         static final class TubePublisherWrapper extends AbstractTubePublisher {
       
   141             final Flow.Publisher<List<ByteBuffer>> delegate;
       
   142             public TubePublisherWrapper(Flow.Publisher<List<ByteBuffer>> delegate) {
       
   143                 this.delegate = delegate;
       
   144             }
       
   145             @Override
       
   146             public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
       
   147                 delegate.subscribe(subscriber);
       
   148             }
       
   149         }
       
   150     }
       
   151 
       
   152     /**
       
   153      * Convenience abstract class for {@code TubeSubscriber} implementations.
       
   154      * It is not required that a {@code TubeSubscriber} implementation extends
       
   155      * this class.
       
   156      */
       
   157     static abstract class AbstractTubeSubscriber implements TubeSubscriber {
       
   158         static final class TubeSubscriberWrapper extends  AbstractTubeSubscriber {
       
   159             final Flow.Subscriber<? super List<ByteBuffer>> delegate;
       
   160             TubeSubscriberWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
       
   161                 this.delegate = delegate;
       
   162             }
       
   163             @Override
       
   164             public void dropSubscription() {}
       
   165             @Override
       
   166             public void onSubscribe(Flow.Subscription subscription) {
       
   167                 delegate.onSubscribe(subscription);
       
   168             }
       
   169             @Override
       
   170             public void onNext(List<ByteBuffer> item) {
       
   171                 delegate.onNext(item);
       
   172             }
       
   173             @Override
       
   174             public void onError(Throwable throwable) {
       
   175                 delegate.onError(throwable);
       
   176             }
       
   177             @Override
       
   178             public void onComplete() {
       
   179                 delegate.onComplete();
       
   180             }
       
   181         }
       
   182 
       
   183     }
       
   184 
       
   185 }