src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SequentialScheduler.java
branchhttp-client-branch
changeset 56089 42208b2f224e
parent 56088 38fac6d0521d
child 56090 5c7fb702948a
equal deleted inserted replaced
56088:38fac6d0521d 56089:42208b2f224e
     1 /*
       
     2  * Copyright (c) 2016, 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal.common;
       
    27 
       
    28 import java.util.concurrent.Executor;
       
    29 import java.util.concurrent.atomic.AtomicInteger;
       
    30 
       
    31 import static java.util.Objects.requireNonNull;
       
    32 
       
    33 /**
       
    34  * A scheduler of ( repeatable ) tasks that MUST be run sequentially.
       
    35  *
       
    36  * <p> This class can be used as a synchronization aid that assists a number of
       
    37  * parties in running a task in a mutually exclusive fashion.
       
    38  *
       
    39  * <p> To run the task, a party invokes {@code runOrSchedule}. To permanently
       
    40  * prevent the task from subsequent runs, the party invokes {@code stop}.
       
    41  *
       
    42  * <p> The parties can, but do not have to, operate in different threads.
       
    43  *
       
    44  * <p> The task can be either synchronous ( completes when its {@code run}
       
    45  * method returns ), or asynchronous ( completed when its
       
    46  * {@code DeferredCompleter} is explicitly completed ).
       
    47  *
       
    48  * <p> The next run of the task will not begin until the previous run has
       
    49  * finished.
       
    50  *
       
    51  * <p> The task may invoke {@code runOrSchedule} itself, which may be a normal
       
    52  * situation.
       
    53  */
       
    54 public final class SequentialScheduler {
       
    55 
       
    56     /*
       
    57        Since the task is fixed and known beforehand, no blocking synchronization
       
    58        (locks, queues, etc.) is required. The job can be done solely using
       
    59        nonblocking primitives.
       
    60 
       
    61        The machinery below addresses two problems:
       
    62 
       
    63          1. Running the task in a sequential order (no concurrent runs):
       
    64 
       
    65                 begin, end, begin, end...
       
    66 
       
    67          2. Avoiding indefinite recursion:
       
    68 
       
    69                 begin
       
    70                   end
       
    71                     begin
       
    72                       end
       
    73                         ...
       
    74 
       
    75        Problem #1 is solved with a finite state machine with 4 states:
       
    76 
       
    77            BEGIN, AGAIN, END, and STOP.
       
    78 
       
    79        Problem #2 is solved with a "state modifier" OFFLOAD.
       
    80 
       
    81        Parties invoke `runOrSchedule()` to signal the task must run. A party
       
    82        that has invoked `runOrSchedule()` either begins the task or exploits the
       
    83        party that is either beginning the task or ending it.
       
    84 
       
    85        The party that is trying to end the task either ends it or begins it
       
    86        again.
       
    87 
       
    88        To avoid indefinite recursion, before re-running the task the
       
    89        TryEndDeferredCompleter sets the OFFLOAD bit, signalling to its "child"
       
    90        TryEndDeferredCompleter that this ("parent") TryEndDeferredCompleter is
       
    91        available and the "child" must offload the task on to the "parent". Then
       
    92        a race begins. Whichever invocation of TryEndDeferredCompleter.complete
       
    93        manages to unset OFFLOAD bit first does not do the work.
       
    94 
       
    95        There is at most 1 thread that is beginning the task and at most 2
       
    96        threads that are trying to end it: "parent" and "child". In case of a
       
    97        synchronous task "parent" and "child" are the same thread.
       
    98      */
       
    99 
       
   100     /**
       
   101      * An interface to signal the completion of a {@link RestartableTask}.
       
   102      *
       
   103      * <p> The invocation of {@code complete} completes the task. The invocation
       
   104      * of {@code complete} may restart the task, if an attempt has previously
       
   105      * been made to run the task while it was already running.
       
   106      *
       
   107      * @apiNote {@code DeferredCompleter} is useful when a task is not necessary
       
   108      * complete when its {@code run} method returns, but will complete at a
       
   109      * later time, and maybe in different thread. This type exists for
       
   110      * readability purposes at use-sites only.
       
   111      */
       
   112     public static abstract class DeferredCompleter {
       
   113 
       
   114         /** Extensible from this (outer) class ONLY. */
       
   115         private DeferredCompleter() { }
       
   116 
       
   117         /** Completes the task. Must be called once, and once only. */
       
   118         public abstract void complete();
       
   119     }
       
   120 
       
   121     /**
       
   122      * A restartable task.
       
   123      */
       
   124     @FunctionalInterface
       
   125     public interface RestartableTask {
       
   126 
       
   127         /**
       
   128          * The body of the task.
       
   129          *
       
   130          * @param taskCompleter
       
   131          *         A completer that must be invoked once, and only once,
       
   132          *         when this task is logically finished
       
   133          */
       
   134         void run(DeferredCompleter taskCompleter);
       
   135     }
       
   136 
       
   137     /**
       
   138      * A simple and self-contained task that completes once its {@code run}
       
   139      * method returns.
       
   140      */
       
   141     public static abstract class CompleteRestartableTask
       
   142         implements RestartableTask
       
   143     {
       
   144         @Override
       
   145         public final void run(DeferredCompleter taskCompleter) {
       
   146             try {
       
   147                 run();
       
   148             } finally {
       
   149                 taskCompleter.complete();
       
   150             }
       
   151         }
       
   152 
       
   153         /** The body of the task. */
       
   154         protected abstract void run();
       
   155     }
       
   156 
       
   157     /**
       
   158      * A task that runs its main loop within a synchronized block to provide
       
   159      * memory visibility between runs. Since the main loop can't run concurrently,
       
   160      * the lock shouldn't be contended and no deadlock should ever be possible.
       
   161      */
       
   162     public static final class SynchronizedRestartableTask
       
   163             extends CompleteRestartableTask {
       
   164 
       
   165         private final Runnable mainLoop;
       
   166         private final Object lock = new Object();
       
   167 
       
   168         public SynchronizedRestartableTask(Runnable mainLoop) {
       
   169             this.mainLoop = mainLoop;
       
   170         }
       
   171 
       
   172         @Override
       
   173         protected void run() {
       
   174             synchronized(lock) {
       
   175                 mainLoop.run();
       
   176             }
       
   177         }
       
   178     }
       
   179 
       
   180     private static final int OFFLOAD =  1;
       
   181     private static final int AGAIN   =  2;
       
   182     private static final int BEGIN   =  4;
       
   183     private static final int STOP    =  8;
       
   184     private static final int END     = 16;
       
   185 
       
   186     private final AtomicInteger state = new AtomicInteger(END);
       
   187     private final RestartableTask restartableTask;
       
   188     private final DeferredCompleter completer;
       
   189     private final SchedulableTask schedulableTask;
       
   190 
       
   191     /**
       
   192      * An auxiliary task that starts the restartable task:
       
   193      * {@code restartableTask.run(completer)}.
       
   194      */
       
   195     private final class SchedulableTask implements Runnable {
       
   196         @Override
       
   197         public void run() {
       
   198             restartableTask.run(completer);
       
   199         }
       
   200     }
       
   201 
       
   202     public SequentialScheduler(RestartableTask restartableTask) {
       
   203         this.restartableTask = requireNonNull(restartableTask);
       
   204         this.completer = new TryEndDeferredCompleter();
       
   205         this.schedulableTask = new SchedulableTask();
       
   206     }
       
   207 
       
   208     /**
       
   209      * Runs or schedules the task to be run.
       
   210      *
       
   211      * @implSpec The recursion which is possible here must be bounded:
       
   212      *
       
   213      *  <pre>{@code
       
   214      *     this.runOrSchedule()
       
   215      *         completer.complete()
       
   216      *             this.runOrSchedule()
       
   217      *                 ...
       
   218      * }</pre>
       
   219      *
       
   220      * @implNote The recursion in this implementation has the maximum
       
   221      * depth of 1.
       
   222      */
       
   223     public void runOrSchedule() {
       
   224         runOrSchedule(schedulableTask, null);
       
   225     }
       
   226 
       
   227     /**
       
   228      * Executes or schedules the task to be executed in the provided executor.
       
   229      *
       
   230      * <p> This method can be used when potential executing from a calling
       
   231      * thread is not desirable.
       
   232      *
       
   233      * @param executor
       
   234      *         An executor in which to execute the task, if the task needs
       
   235      *         to be executed.
       
   236      *
       
   237      * @apiNote The given executor can be {@code null} in which case calling
       
   238      * {@code runOrSchedule(null)} is strictly equivalent to calling
       
   239      * {@code runOrSchedule()}.
       
   240      */
       
   241     public void runOrSchedule(Executor executor) {
       
   242         runOrSchedule(schedulableTask, executor);
       
   243     }
       
   244 
       
   245     private void runOrSchedule(SchedulableTask task, Executor executor) {
       
   246         while (true) {
       
   247             int s = state.get();
       
   248             if (s == END) {
       
   249                 if (state.compareAndSet(END, BEGIN)) {
       
   250                     break;
       
   251                 }
       
   252             } else if ((s & BEGIN) != 0) {
       
   253                 // Tries to change the state to AGAIN, preserving OFFLOAD bit
       
   254                 if (state.compareAndSet(s, AGAIN | (s & OFFLOAD))) {
       
   255                     return;
       
   256                 }
       
   257             } else if ((s & AGAIN) != 0 || s == STOP) {
       
   258                 /* In the case of AGAIN the scheduler does not provide
       
   259                    happens-before relationship between actions prior to
       
   260                    runOrSchedule() and actions that happen in task.run().
       
   261                    The reason is that no volatile write is done in this case,
       
   262                    and the call piggybacks on the call that has actually set
       
   263                    AGAIN state. */
       
   264                 return;
       
   265             } else {
       
   266                 // Non-existent state, or the one that cannot be offloaded
       
   267                 throw new InternalError(String.valueOf(s));
       
   268             }
       
   269         }
       
   270         if (executor == null) {
       
   271             task.run();
       
   272         } else {
       
   273             executor.execute(task);
       
   274         }
       
   275     }
       
   276 
       
   277     /** The only concrete {@code DeferredCompleter} implementation. */
       
   278     private class TryEndDeferredCompleter extends DeferredCompleter {
       
   279 
       
   280         @Override
       
   281         public void complete() {
       
   282             while (true) {
       
   283                 int s;
       
   284                 while (((s = state.get()) & OFFLOAD) != 0) {
       
   285                     // Tries to offload ending of the task to the parent
       
   286                     if (state.compareAndSet(s, s & ~OFFLOAD)) {
       
   287                         return;
       
   288                     }
       
   289                 }
       
   290                 while (true) {
       
   291                     if ((s & OFFLOAD) != 0) {
       
   292                         /* OFFLOAD bit can never be observed here. Otherwise
       
   293                            it would mean there is another invocation of
       
   294                            "complete" that can run the task. */
       
   295                         throw new InternalError(String.valueOf(s));
       
   296                     }
       
   297                     if (s == BEGIN) {
       
   298                         if (state.compareAndSet(BEGIN, END)) {
       
   299                             return;
       
   300                         }
       
   301                     } else if (s == AGAIN) {
       
   302                         if (state.compareAndSet(AGAIN, BEGIN | OFFLOAD)) {
       
   303                             break;
       
   304                         }
       
   305                     } else if (s == STOP) {
       
   306                         return;
       
   307                     } else if (s == END) {
       
   308                         throw new IllegalStateException("Duplicate completion");
       
   309                     } else {
       
   310                         // Non-existent state
       
   311                         throw new InternalError(String.valueOf(s));
       
   312                     }
       
   313                     s = state.get();
       
   314                 }
       
   315                 restartableTask.run(completer);
       
   316             }
       
   317         }
       
   318     }
       
   319 
       
   320     /**
       
   321      * Tells whether, or not, this scheduler has been permanently stopped.
       
   322      *
       
   323      * <p> Should be used from inside the task to poll the status of the
       
   324      * scheduler, pretty much the same way as it is done for threads:
       
   325      * <pre>{@code
       
   326      *     if (!Thread.currentThread().isInterrupted()) {
       
   327      *         ...
       
   328      *     }
       
   329      * }</pre>
       
   330      */
       
   331     public boolean isStopped() {
       
   332         return state.get() == STOP;
       
   333     }
       
   334 
       
   335     /**
       
   336      * Stops this scheduler.  Subsequent invocations of {@code runOrSchedule}
       
   337      * are effectively no-ops.
       
   338      *
       
   339      * <p> If the task has already begun, this invocation will not affect it,
       
   340      * unless the task itself uses {@code isStopped()} method to check the state
       
   341      * of the handler.
       
   342      */
       
   343     public void stop() {
       
   344         state.set(STOP);
       
   345     }
       
   346 
       
   347     /**
       
   348      * Returns a new {@code SequentialScheduler} that executes the provided
       
   349      * {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
       
   350      *
       
   351      * @apiNote This is equivalent to calling
       
   352      * {@code new SequentialScheduler(new SynchronizedRestartableTask(mainLoop))}
       
   353      * The main loop must not perform any blocking operation.
       
   354      *
       
   355      * @param mainLoop The main loop of the new sequential scheduler
       
   356      * @return a new {@code SequentialScheduler} that executes the provided
       
   357      * {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
       
   358      */
       
   359     public static SequentialScheduler synchronizedScheduler(Runnable mainLoop) {
       
   360         return new SequentialScheduler(new SynchronizedRestartableTask(mainLoop));
       
   361     }
       
   362 }