jdk/src/java.base/share/classes/java/util/concurrent/ExecutorCompletionService.java
author dl
Wed, 21 Dec 2016 14:26:52 -0800
changeset 42927 1d31e540bfcb
parent 42322 c3474fef4fe4
child 45434 4582657c7260
permissions -rw-r--r--
8170484: Miscellaneous changes imported from jsr166 CVS 2016-12 Reviewed-by: martin, smarks, psandoz
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     1
/*
90ce3da70b43 Initial load
duke
parents:
diff changeset
     2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
90ce3da70b43 Initial load
duke
parents:
diff changeset
     3
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
     4
 * This code is free software; you can redistribute it and/or modify it
90ce3da70b43 Initial load
duke
parents:
diff changeset
     5
 * under the terms of the GNU General Public License version 2 only, as
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
     6
 * published by the Free Software Foundation.  Oracle designates this
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     7
 * particular file as subject to the "Classpath" exception as provided
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
     8
 * by Oracle in the LICENSE file that accompanied this code.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     9
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    10
 * This code is distributed in the hope that it will be useful, but WITHOUT
90ce3da70b43 Initial load
duke
parents:
diff changeset
    11
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
90ce3da70b43 Initial load
duke
parents:
diff changeset
    12
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
90ce3da70b43 Initial load
duke
parents:
diff changeset
    13
 * version 2 for more details (a copy is included in the LICENSE file that
90ce3da70b43 Initial load
duke
parents:
diff changeset
    14
 * accompanied this code).
90ce3da70b43 Initial load
duke
parents:
diff changeset
    15
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    16
 * You should have received a copy of the GNU General Public License version
90ce3da70b43 Initial load
duke
parents:
diff changeset
    17
 * 2 along with this work; if not, write to the Free Software Foundation,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    18
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    19
 *
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
    20
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
    21
 * or visit www.oracle.com if you need additional information or have any
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
    22
 * questions.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    23
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    24
90ce3da70b43 Initial load
duke
parents:
diff changeset
    25
/*
90ce3da70b43 Initial load
duke
parents:
diff changeset
    26
 * This file is available under and governed by the GNU General Public
90ce3da70b43 Initial load
duke
parents:
diff changeset
    27
 * License version 2 only, as published by the Free Software Foundation.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    28
 * However, the following notice accompanied the original version of this
90ce3da70b43 Initial load
duke
parents:
diff changeset
    29
 * file:
90ce3da70b43 Initial load
duke
parents:
diff changeset
    30
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    31
 * Written by Doug Lea with assistance from members of JCP JSR-166
90ce3da70b43 Initial load
duke
parents:
diff changeset
    32
 * Expert Group and released to the public domain, as explained at
9242
ef138d47df58 7034657: Update Creative Commons license URL in legal notices
dl
parents: 7518
diff changeset
    33
 * http://creativecommons.org/publicdomain/zero/1.0/
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    34
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    35
90ce3da70b43 Initial load
duke
parents:
diff changeset
    36
package java.util.concurrent;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    37
90ce3da70b43 Initial load
duke
parents:
diff changeset
    38
/**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    39
 * A {@link CompletionService} that uses a supplied {@link Executor}
90ce3da70b43 Initial load
duke
parents:
diff changeset
    40
 * to execute tasks.  This class arranges that submitted tasks are,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    41
 * upon completion, placed on a queue accessible using {@code take}.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    42
 * The class is lightweight enough to be suitable for transient use
90ce3da70b43 Initial load
duke
parents:
diff changeset
    43
 * when processing groups of tasks.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    44
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    45
 * <p>
90ce3da70b43 Initial load
duke
parents:
diff changeset
    46
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    47
 * <b>Usage Examples.</b>
90ce3da70b43 Initial load
duke
parents:
diff changeset
    48
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    49
 * Suppose you have a set of solvers for a certain problem, each
90ce3da70b43 Initial load
duke
parents:
diff changeset
    50
 * returning a value of some type {@code Result}, and would like to
90ce3da70b43 Initial load
duke
parents:
diff changeset
    51
 * run them concurrently, processing the results of each of them that
90ce3da70b43 Initial load
duke
parents:
diff changeset
    52
 * return a non-null value, in some method {@code use(Result r)}. You
90ce3da70b43 Initial load
duke
parents:
diff changeset
    53
 * could write this as:
90ce3da70b43 Initial load
duke
parents:
diff changeset
    54
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    55
 * <pre> {@code
90ce3da70b43 Initial load
duke
parents:
diff changeset
    56
 * void solve(Executor e,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    57
 *            Collection<Callable<Result>> solvers)
90ce3da70b43 Initial load
duke
parents:
diff changeset
    58
 *     throws InterruptedException, ExecutionException {
38551
82c48058acc2 8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
dl
parents: 32991
diff changeset
    59
 *   CompletionService<Result> cs
82c48058acc2 8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
dl
parents: 32991
diff changeset
    60
 *       = new ExecutorCompletionService<>(e);
82c48058acc2 8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
dl
parents: 32991
diff changeset
    61
 *   solvers.forEach(cs::submit);
82c48058acc2 8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
dl
parents: 32991
diff changeset
    62
 *   for (int i = solvers.size(); i > 0; i--) {
82c48058acc2 8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
dl
parents: 32991
diff changeset
    63
 *     Result r = cs.take().get();
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    64
 *     if (r != null)
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    65
 *       use(r);
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    66
 *   }
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    67
 * }}</pre>
90ce3da70b43 Initial load
duke
parents:
diff changeset
    68
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    69
 * Suppose instead that you would like to use the first non-null result
90ce3da70b43 Initial load
duke
parents:
diff changeset
    70
 * of the set of tasks, ignoring any that encounter exceptions,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    71
 * and cancelling all other tasks when the first one is ready:
90ce3da70b43 Initial load
duke
parents:
diff changeset
    72
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    73
 * <pre> {@code
90ce3da70b43 Initial load
duke
parents:
diff changeset
    74
 * void solve(Executor e,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    75
 *            Collection<Callable<Result>> solvers)
90ce3da70b43 Initial load
duke
parents:
diff changeset
    76
 *     throws InterruptedException {
38551
82c48058acc2 8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
dl
parents: 32991
diff changeset
    77
 *   CompletionService<Result> cs
82c48058acc2 8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
dl
parents: 32991
diff changeset
    78
 *       = new ExecutorCompletionService<>(e);
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    79
 *   int n = solvers.size();
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    80
 *   List<Future<Result>> futures = new ArrayList<>(n);
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    81
 *   Result result = null;
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    82
 *   try {
42322
c3474fef4fe4 8166646: Miscellaneous changes imported from jsr166 CVS 2016-10
dl
parents: 38551
diff changeset
    83
 *     solvers.forEach(solver -> futures.add(cs.submit(solver)));
38551
82c48058acc2 8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
dl
parents: 32991
diff changeset
    84
 *     for (int i = n; i > 0; i--) {
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    85
 *       try {
38551
82c48058acc2 8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
dl
parents: 32991
diff changeset
    86
 *         Result r = cs.take().get();
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    87
 *         if (r != null) {
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    88
 *           result = r;
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    89
 *           break;
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    90
 *         }
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    91
 *       } catch (ExecutionException ignore) {}
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    92
 *     }
38551
82c48058acc2 8153768: Miscellaneous changes imported from jsr166 CVS 2016-05
dl
parents: 32991
diff changeset
    93
 *   } finally {
42322
c3474fef4fe4 8166646: Miscellaneous changes imported from jsr166 CVS 2016-10
dl
parents: 38551
diff changeset
    94
 *     futures.forEach(future -> future.cancel(true));
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    95
 *   }
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    96
 *
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    97
 *   if (result != null)
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
    98
 *     use(result);
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    99
 * }}</pre>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   100
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   101
public class ExecutorCompletionService<V> implements CompletionService<V> {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   102
    private final Executor executor;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   103
    private final AbstractExecutorService aes;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   104
    private final BlockingQueue<Future<V>> completionQueue;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   105
90ce3da70b43 Initial load
duke
parents:
diff changeset
   106
    /**
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
   107
     * FutureTask extension to enqueue upon completion.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   108
     */
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
   109
    private static class QueueingFuture<V> extends FutureTask<Void> {
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
   110
        QueueingFuture(RunnableFuture<V> task,
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
   111
                       BlockingQueue<Future<V>> completionQueue) {
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   112
            super(task, null);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   113
            this.task = task;
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
   114
            this.completionQueue = completionQueue;
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   115
        }
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
   116
        private final Future<V> task;
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
   117
        private final BlockingQueue<Future<V>> completionQueue;
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   118
        protected void done() { completionQueue.add(task); }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   119
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   120
90ce3da70b43 Initial load
duke
parents:
diff changeset
   121
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   122
        if (aes == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   123
            return new FutureTask<V>(task);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   124
        else
90ce3da70b43 Initial load
duke
parents:
diff changeset
   125
            return aes.newTaskFor(task);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   126
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   127
90ce3da70b43 Initial load
duke
parents:
diff changeset
   128
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   129
        if (aes == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   130
            return new FutureTask<V>(task, result);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   131
        else
90ce3da70b43 Initial load
duke
parents:
diff changeset
   132
            return aes.newTaskFor(task, result);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   133
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   134
90ce3da70b43 Initial load
duke
parents:
diff changeset
   135
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   136
     * Creates an ExecutorCompletionService using the supplied
90ce3da70b43 Initial load
duke
parents:
diff changeset
   137
     * executor for base task execution and a
90ce3da70b43 Initial load
duke
parents:
diff changeset
   138
     * {@link LinkedBlockingQueue} as a completion queue.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   139
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   140
     * @param executor the executor to use
90ce3da70b43 Initial load
duke
parents:
diff changeset
   141
     * @throws NullPointerException if executor is {@code null}
90ce3da70b43 Initial load
duke
parents:
diff changeset
   142
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   143
    public ExecutorCompletionService(Executor executor) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   144
        if (executor == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   145
            throw new NullPointerException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   146
        this.executor = executor;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   147
        this.aes = (executor instanceof AbstractExecutorService) ?
90ce3da70b43 Initial load
duke
parents:
diff changeset
   148
            (AbstractExecutorService) executor : null;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   149
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   150
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   151
90ce3da70b43 Initial load
duke
parents:
diff changeset
   152
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   153
     * Creates an ExecutorCompletionService using the supplied
90ce3da70b43 Initial load
duke
parents:
diff changeset
   154
     * executor for base task execution and the supplied queue as its
90ce3da70b43 Initial load
duke
parents:
diff changeset
   155
     * completion queue.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   156
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   157
     * @param executor the executor to use
90ce3da70b43 Initial load
duke
parents:
diff changeset
   158
     * @param completionQueue the queue to use as the completion queue
90ce3da70b43 Initial load
duke
parents:
diff changeset
   159
     *        normally one dedicated for use by this service. This
90ce3da70b43 Initial load
duke
parents:
diff changeset
   160
     *        queue is treated as unbounded -- failed attempted
21278
ef8a3a2a72f2 8022746: List of spelling errors in API doc
malenkov
parents: 9242
diff changeset
   161
     *        {@code Queue.add} operations for completed tasks cause
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   162
     *        them not to be retrievable.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   163
     * @throws NullPointerException if executor or completionQueue are {@code null}
90ce3da70b43 Initial load
duke
parents:
diff changeset
   164
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   165
    public ExecutorCompletionService(Executor executor,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   166
                                     BlockingQueue<Future<V>> completionQueue) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   167
        if (executor == null || completionQueue == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   168
            throw new NullPointerException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   169
        this.executor = executor;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   170
        this.aes = (executor instanceof AbstractExecutorService) ?
90ce3da70b43 Initial load
duke
parents:
diff changeset
   171
            (AbstractExecutorService) executor : null;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   172
        this.completionQueue = completionQueue;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   173
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   174
90ce3da70b43 Initial load
duke
parents:
diff changeset
   175
    public Future<V> submit(Callable<V> task) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   176
        if (task == null) throw new NullPointerException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   177
        RunnableFuture<V> f = newTaskFor(task);
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
   178
        executor.execute(new QueueingFuture<V>(f, completionQueue));
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   179
        return f;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   180
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   181
90ce3da70b43 Initial load
duke
parents:
diff changeset
   182
    public Future<V> submit(Runnable task, V result) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   183
        if (task == null) throw new NullPointerException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   184
        RunnableFuture<V> f = newTaskFor(task, result);
32991
b27c76b82713 8134853: Bulk integration of java.util.concurrent classes
dl
parents: 25859
diff changeset
   185
        executor.execute(new QueueingFuture<V>(f, completionQueue));
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   186
        return f;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   187
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   188
90ce3da70b43 Initial load
duke
parents:
diff changeset
   189
    public Future<V> take() throws InterruptedException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   190
        return completionQueue.take();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   191
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   192
90ce3da70b43 Initial load
duke
parents:
diff changeset
   193
    public Future<V> poll() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   194
        return completionQueue.poll();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   195
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   196
7518
0282db800fe1 7003745: Code style cleanups (sync from Dougs CVS)
dl
parents: 5506
diff changeset
   197
    public Future<V> poll(long timeout, TimeUnit unit)
0282db800fe1 7003745: Code style cleanups (sync from Dougs CVS)
dl
parents: 5506
diff changeset
   198
            throws InterruptedException {
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   199
        return completionQueue.poll(timeout, unit);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   200
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   201
90ce3da70b43 Initial load
duke
parents:
diff changeset
   202
}