src/jdk.incubator.adba/share/classes/jdk/incubator/sql2/RowProcessorOperation.java
branchJDK-8188051-branch
changeset 56797 fb523d4d9185
parent 56796 69b384805d61
child 56798 3b438b3fef46
equal deleted inserted replaced
56796:69b384805d61 56797:fb523d4d9185
     1 /*
       
     2  * Copyright (c)  2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  * 
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  * 
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  * 
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  * 
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 package jdk.incubator.sql2;
       
    26 
       
    27 import java.time.Duration;
       
    28 import java.util.concurrent.CompletionStage;
       
    29 import java.util.concurrent.Flow;
       
    30 import java.util.function.Consumer;
       
    31 
       
    32 /**
       
    33  * A RowProcessorOperation handles a database action that returns a sequence of 
       
    34  * rows. The rows are handled by a java.util.concurrent.Flow.Processor. A 
       
    35  * RowProcessorOperation is conceptually a Row Publisher and a result Subscriber.
       
    36  * The configured Processor transforms the Row stream into a result stream.
       
    37  * Configuring a Processor causes the RowProcessorOperation to subscribe the
       
    38  * Processor to the stream of rows and for the RowProcessorOperation itself to 
       
    39  * subscribe to the stream of results published by the Processor. The last
       
    40  * result produced is the result of the RowProcessorOperation.
       
    41  * 
       
    42  * The RowProcessorOperation will insure that the demand for results is positive.
       
    43  * 
       
    44  * Calling Submission.cancel will call cancel on the result Subscription. The
       
    45  * Processor should call cancel on the row Subscription when cancel is called on
       
    46  * the result Subscription.
       
    47  * 
       
    48  * @param <T> the type of the result of the {@link Operation}
       
    49  */
       
    50 public interface RowProcessorOperation<T> extends ParameterizedOperation<T> {
       
    51 
       
    52   /** DRAFT
       
    53    * Accepts a Processor that subscribes to a stream of Rows and publishes
       
    54    * a stream of result values. This Operation will subscribe to the stream of
       
    55    * results. The last published result value is the result of the
       
    56    * Operation.
       
    57    * 
       
    58    * This Operation will insure result demand is eventually positive until 
       
    59    * resultSubscriber.onComplete or resultSubscriber.onError is called. This 
       
    60    * Operation will call resultSubscription.cancel if this Operation is canceled.
       
    61    * 
       
    62    * While there are more rows and row demand is positive and rowSubscription.cancel
       
    63    * has not been called, this Operation will eventually call rowToResult.onNext
       
    64    * with the next row in the row sequence. The Result.Row argument to onNext is
       
    65    * only valid for the duration of that call. When there are no more Rows this 
       
    66    * Operation will call rowToResult.onComplete. If there is an error this
       
    67    * Operation will call rowToResult.onError with the appropriate Exception whether
       
    68    * or not any rows were published.
       
    69    *
       
    70    * If resultSubscriber.onError is called this Operation completes
       
    71    * exceptionally with the passed exception. After all rows are published if
       
    72    * neither resultSubscriber.onComplete or resultSubscriber.onError is
       
    73    * called this Operation will complete exceptionally after the inactivity
       
    74    * timeout expires.
       
    75    * 
       
    76    * If this Operation is skipped it will be completed exceptionally with
       
    77    * SqlSkippedException but no calls will be made to rowToResult.
       
    78    * 
       
    79    * Calling Row.cancel is the same as calling Subscription.cancel on the Row
       
    80    * Subscription.
       
    81    *
       
    82    * @param rowToResult subscribes to a stream of Result.Rows and publishes a
       
    83    * stream of results of type T
       
    84    * @return this RowProcessorOperation
       
    85    */
       
    86   public RowProcessorOperation<T> rowProcessor(Flow.Processor<Result.Row, ? extends T> rowToResult);
       
    87   
       
    88   /** DRAFT 
       
    89    * Subscribe to the stream of Rows returned by this Operation. The result of 
       
    90    * this Operation is null. This is a convenience method.
       
    91    * 
       
    92    * @param rowSubscriber subscribes to a stream of Result.Rows 
       
    93    * @return this RowProcessorOperation
       
    94    */
       
    95   public default RowProcessorOperation<T> subscribe(Flow.Subscriber<Result.Row> rowSubscriber) {
       
    96 
       
    97     // create a Row to result Processor that passes the Rows to rowSubscriber
       
    98     // and publishes a single null as its only result.
       
    99     Flow.Processor<Result.Row, T> rowToResult
       
   100             = new Flow.Processor<Result.Row, T>() {
       
   101 
       
   102       protected boolean isResultPending = false;
       
   103       protected long resultDemand = 0;
       
   104 
       
   105       protected Flow.Subscription rowSubscription;
       
   106 
       
   107       protected Flow.Subscriber<? super T> resultSubscriber;
       
   108 
       
   109       protected Flow.Subscription resultSubscription = new Flow.Subscription() {
       
   110         @Override
       
   111         public void request(long n) {
       
   112           resultDemand += n;
       
   113           if (isResultPending && resultDemand > 0) {
       
   114             resultSubscriber.onNext(null);
       
   115             resultDemand--;
       
   116             resultSubscriber.onComplete();
       
   117             isResultPending = false;
       
   118           }
       
   119         }
       
   120 
       
   121         @Override
       
   122         public void cancel() {
       
   123           rowSubscription.cancel();
       
   124         }
       
   125       };
       
   126 
       
   127       @Override
       
   128       public void onSubscribe(Flow.Subscription subscription) {
       
   129         rowSubscription = subscription;
       
   130         rowSubscriber.onSubscribe(rowSubscription);
       
   131 
       
   132       }
       
   133 
       
   134       @Override
       
   135       public void onNext(Result.Row item) {
       
   136         rowSubscriber.onNext(item);
       
   137       }
       
   138 
       
   139       @Override
       
   140       public void onError(Throwable throwable) {
       
   141         rowSubscriber.onError(throwable);
       
   142         resultSubscriber.onError(throwable);
       
   143       }
       
   144 
       
   145       @Override
       
   146       public void onComplete() {
       
   147         rowSubscriber.onComplete();
       
   148         if (resultDemand > 0) {
       
   149           resultSubscriber.onNext(null);
       
   150           resultSubscriber.onComplete();
       
   151         } else {
       
   152           isResultPending = true;
       
   153         }
       
   154       }
       
   155 
       
   156       @Override
       
   157       public void subscribe(Flow.Subscriber<? super T> subscriber) {
       
   158         resultSubscriber = subscriber;
       
   159         resultSubscriber.onSubscribe(resultSubscription);
       
   160       }
       
   161     };
       
   162 
       
   163     return rowProcessor(rowToResult);
       
   164   }
       
   165 
       
   166   /** DRAFT
       
   167    * Sets the minimum time the Operation will wait for Processor activity before
       
   168    * terminating. If all of the following hold for some time exceeding minTime,
       
   169    * this Operation will be completed exceptionally with 
       
   170    * {@link java.util.concurrent.TimeoutException}.
       
   171    * <ul>
       
   172    * <li>no calls to the onNext, onComplete, or onError methods of the result
       
   173    * Subscriber, ie the Subscriber passed to rowToResult.subscribe</li>
       
   174    * <li>the demand for Rows is zero or all rows have been published</li>
       
   175    * </ul>
       
   176    * If the Operation can publish no more rows either because all rows have
       
   177    * been published or because the demand for rows is 0 and rowToResult
       
   178    * has neither published a result nor terminated the stream and this state has
       
   179    * continued for at least minTime, the Operation is completed exceptionally.
       
   180    * 
       
   181    * The default value is one minute.
       
   182    * 
       
   183    * Note: The minTime parameter value must be small to guarantee that the 
       
   184    * Connection does not hang for long periods. The default is large enough
       
   185    * that it most likely is insignificant for most apps, but small enough to
       
   186    * kick loose a hung Connection in semi-reasonable time.
       
   187    * 
       
   188    * @param minTime minimum time with the Processor making no progress before the
       
   189    * Operation is terminated.
       
   190    * @return this RowProcessorOperation
       
   191    */
       
   192   public RowProcessorOperation<T> inactivityTimeout(Duration minTime);
       
   193   
       
   194   
       
   195   // Covariant overrides
       
   196   
       
   197   @Override
       
   198   public RowProcessorOperation<T> onError(Consumer<Throwable> handler);
       
   199   
       
   200   @Override
       
   201   public RowProcessorOperation<T> set(String id, Object value, SqlType type);
       
   202 
       
   203   @Override
       
   204   public RowProcessorOperation<T> set(String id, CompletionStage<?> source, SqlType type);
       
   205 
       
   206   @Override
       
   207   public RowProcessorOperation<T> set(String id, CompletionStage<?> source);
       
   208 
       
   209   @Override
       
   210   public RowProcessorOperation<T> set(String id, Object value);
       
   211 
       
   212   @Override
       
   213   public RowProcessorOperation<T> timeout(Duration minTime);
       
   214 
       
   215 }