jdk/src/share/classes/java/util/concurrent/ExecutorCompletionService.java
author malenkov
Tue, 29 Oct 2013 17:01:06 +0400
changeset 21278 ef8a3a2a72f2
parent 9242 ef138d47df58
permissions -rw-r--r--
8022746: List of spelling errors in API doc Reviewed-by: alexsch, smarks
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     1
/*
90ce3da70b43 Initial load
duke
parents:
diff changeset
     2
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
90ce3da70b43 Initial load
duke
parents:
diff changeset
     3
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
     4
 * This code is free software; you can redistribute it and/or modify it
90ce3da70b43 Initial load
duke
parents:
diff changeset
     5
 * under the terms of the GNU General Public License version 2 only, as
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
     6
 * published by the Free Software Foundation.  Oracle designates this
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     7
 * particular file as subject to the "Classpath" exception as provided
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
     8
 * by Oracle in the LICENSE file that accompanied this code.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
     9
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    10
 * This code is distributed in the hope that it will be useful, but WITHOUT
90ce3da70b43 Initial load
duke
parents:
diff changeset
    11
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
90ce3da70b43 Initial load
duke
parents:
diff changeset
    12
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
90ce3da70b43 Initial load
duke
parents:
diff changeset
    13
 * version 2 for more details (a copy is included in the LICENSE file that
90ce3da70b43 Initial load
duke
parents:
diff changeset
    14
 * accompanied this code).
90ce3da70b43 Initial load
duke
parents:
diff changeset
    15
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    16
 * You should have received a copy of the GNU General Public License version
90ce3da70b43 Initial load
duke
parents:
diff changeset
    17
 * 2 along with this work; if not, write to the Free Software Foundation,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    18
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    19
 *
5506
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
    20
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
    21
 * or visit www.oracle.com if you need additional information or have any
202f599c92aa 6943119: Rebrand source copyright notices
ohair
parents: 2
diff changeset
    22
 * questions.
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    23
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    24
90ce3da70b43 Initial load
duke
parents:
diff changeset
    25
/*
90ce3da70b43 Initial load
duke
parents:
diff changeset
    26
 * This file is available under and governed by the GNU General Public
90ce3da70b43 Initial load
duke
parents:
diff changeset
    27
 * License version 2 only, as published by the Free Software Foundation.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    28
 * However, the following notice accompanied the original version of this
90ce3da70b43 Initial load
duke
parents:
diff changeset
    29
 * file:
90ce3da70b43 Initial load
duke
parents:
diff changeset
    30
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    31
 * Written by Doug Lea with assistance from members of JCP JSR-166
90ce3da70b43 Initial load
duke
parents:
diff changeset
    32
 * Expert Group and released to the public domain, as explained at
9242
ef138d47df58 7034657: Update Creative Commons license URL in legal notices
dl
parents: 7518
diff changeset
    33
 * http://creativecommons.org/publicdomain/zero/1.0/
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
    34
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
    35
90ce3da70b43 Initial load
duke
parents:
diff changeset
    36
package java.util.concurrent;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    37
90ce3da70b43 Initial load
duke
parents:
diff changeset
    38
/**
90ce3da70b43 Initial load
duke
parents:
diff changeset
    39
 * A {@link CompletionService} that uses a supplied {@link Executor}
90ce3da70b43 Initial load
duke
parents:
diff changeset
    40
 * to execute tasks.  This class arranges that submitted tasks are,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    41
 * upon completion, placed on a queue accessible using {@code take}.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    42
 * The class is lightweight enough to be suitable for transient use
90ce3da70b43 Initial load
duke
parents:
diff changeset
    43
 * when processing groups of tasks.
90ce3da70b43 Initial load
duke
parents:
diff changeset
    44
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    45
 * <p>
90ce3da70b43 Initial load
duke
parents:
diff changeset
    46
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    47
 * <b>Usage Examples.</b>
90ce3da70b43 Initial load
duke
parents:
diff changeset
    48
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    49
 * Suppose you have a set of solvers for a certain problem, each
90ce3da70b43 Initial load
duke
parents:
diff changeset
    50
 * returning a value of some type {@code Result}, and would like to
90ce3da70b43 Initial load
duke
parents:
diff changeset
    51
 * run them concurrently, processing the results of each of them that
90ce3da70b43 Initial load
duke
parents:
diff changeset
    52
 * return a non-null value, in some method {@code use(Result r)}. You
90ce3da70b43 Initial load
duke
parents:
diff changeset
    53
 * could write this as:
90ce3da70b43 Initial load
duke
parents:
diff changeset
    54
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    55
 * <pre> {@code
90ce3da70b43 Initial load
duke
parents:
diff changeset
    56
 * void solve(Executor e,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    57
 *            Collection<Callable<Result>> solvers)
90ce3da70b43 Initial load
duke
parents:
diff changeset
    58
 *     throws InterruptedException, ExecutionException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    59
 *     CompletionService<Result> ecs
90ce3da70b43 Initial load
duke
parents:
diff changeset
    60
 *         = new ExecutorCompletionService<Result>(e);
90ce3da70b43 Initial load
duke
parents:
diff changeset
    61
 *     for (Callable<Result> s : solvers)
90ce3da70b43 Initial load
duke
parents:
diff changeset
    62
 *         ecs.submit(s);
90ce3da70b43 Initial load
duke
parents:
diff changeset
    63
 *     int n = solvers.size();
90ce3da70b43 Initial load
duke
parents:
diff changeset
    64
 *     for (int i = 0; i < n; ++i) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    65
 *         Result r = ecs.take().get();
90ce3da70b43 Initial load
duke
parents:
diff changeset
    66
 *         if (r != null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
    67
 *             use(r);
90ce3da70b43 Initial load
duke
parents:
diff changeset
    68
 *     }
90ce3da70b43 Initial load
duke
parents:
diff changeset
    69
 * }}</pre>
90ce3da70b43 Initial load
duke
parents:
diff changeset
    70
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    71
 * Suppose instead that you would like to use the first non-null result
90ce3da70b43 Initial load
duke
parents:
diff changeset
    72
 * of the set of tasks, ignoring any that encounter exceptions,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    73
 * and cancelling all other tasks when the first one is ready:
90ce3da70b43 Initial load
duke
parents:
diff changeset
    74
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
    75
 * <pre> {@code
90ce3da70b43 Initial load
duke
parents:
diff changeset
    76
 * void solve(Executor e,
90ce3da70b43 Initial load
duke
parents:
diff changeset
    77
 *            Collection<Callable<Result>> solvers)
90ce3da70b43 Initial load
duke
parents:
diff changeset
    78
 *     throws InterruptedException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    79
 *     CompletionService<Result> ecs
90ce3da70b43 Initial load
duke
parents:
diff changeset
    80
 *         = new ExecutorCompletionService<Result>(e);
90ce3da70b43 Initial load
duke
parents:
diff changeset
    81
 *     int n = solvers.size();
90ce3da70b43 Initial load
duke
parents:
diff changeset
    82
 *     List<Future<Result>> futures
90ce3da70b43 Initial load
duke
parents:
diff changeset
    83
 *         = new ArrayList<Future<Result>>(n);
90ce3da70b43 Initial load
duke
parents:
diff changeset
    84
 *     Result result = null;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    85
 *     try {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    86
 *         for (Callable<Result> s : solvers)
90ce3da70b43 Initial load
duke
parents:
diff changeset
    87
 *             futures.add(ecs.submit(s));
90ce3da70b43 Initial load
duke
parents:
diff changeset
    88
 *         for (int i = 0; i < n; ++i) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    89
 *             try {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    90
 *                 Result r = ecs.take().get();
90ce3da70b43 Initial load
duke
parents:
diff changeset
    91
 *                 if (r != null) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    92
 *                     result = r;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    93
 *                     break;
90ce3da70b43 Initial load
duke
parents:
diff changeset
    94
 *                 }
90ce3da70b43 Initial load
duke
parents:
diff changeset
    95
 *             } catch (ExecutionException ignore) {}
90ce3da70b43 Initial load
duke
parents:
diff changeset
    96
 *         }
90ce3da70b43 Initial load
duke
parents:
diff changeset
    97
 *     }
90ce3da70b43 Initial load
duke
parents:
diff changeset
    98
 *     finally {
90ce3da70b43 Initial load
duke
parents:
diff changeset
    99
 *         for (Future<Result> f : futures)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   100
 *             f.cancel(true);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   101
 *     }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   102
 *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   103
 *     if (result != null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   104
 *         use(result);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   105
 * }}</pre>
90ce3da70b43 Initial load
duke
parents:
diff changeset
   106
 */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   107
public class ExecutorCompletionService<V> implements CompletionService<V> {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   108
    private final Executor executor;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   109
    private final AbstractExecutorService aes;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   110
    private final BlockingQueue<Future<V>> completionQueue;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   111
90ce3da70b43 Initial load
duke
parents:
diff changeset
   112
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   113
     * FutureTask extension to enqueue upon completion
90ce3da70b43 Initial load
duke
parents:
diff changeset
   114
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   115
    private class QueueingFuture extends FutureTask<Void> {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   116
        QueueingFuture(RunnableFuture<V> task) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   117
            super(task, null);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   118
            this.task = task;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   119
        }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   120
        protected void done() { completionQueue.add(task); }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   121
        private final Future<V> task;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   122
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   123
90ce3da70b43 Initial load
duke
parents:
diff changeset
   124
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   125
        if (aes == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   126
            return new FutureTask<V>(task);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   127
        else
90ce3da70b43 Initial load
duke
parents:
diff changeset
   128
            return aes.newTaskFor(task);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   129
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   130
90ce3da70b43 Initial load
duke
parents:
diff changeset
   131
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   132
        if (aes == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   133
            return new FutureTask<V>(task, result);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   134
        else
90ce3da70b43 Initial load
duke
parents:
diff changeset
   135
            return aes.newTaskFor(task, result);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   136
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   137
90ce3da70b43 Initial load
duke
parents:
diff changeset
   138
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   139
     * Creates an ExecutorCompletionService using the supplied
90ce3da70b43 Initial load
duke
parents:
diff changeset
   140
     * executor for base task execution and a
90ce3da70b43 Initial load
duke
parents:
diff changeset
   141
     * {@link LinkedBlockingQueue} as a completion queue.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   142
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   143
     * @param executor the executor to use
90ce3da70b43 Initial load
duke
parents:
diff changeset
   144
     * @throws NullPointerException if executor is {@code null}
90ce3da70b43 Initial load
duke
parents:
diff changeset
   145
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   146
    public ExecutorCompletionService(Executor executor) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   147
        if (executor == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   148
            throw new NullPointerException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   149
        this.executor = executor;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   150
        this.aes = (executor instanceof AbstractExecutorService) ?
90ce3da70b43 Initial load
duke
parents:
diff changeset
   151
            (AbstractExecutorService) executor : null;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   152
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   153
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   154
90ce3da70b43 Initial load
duke
parents:
diff changeset
   155
    /**
90ce3da70b43 Initial load
duke
parents:
diff changeset
   156
     * Creates an ExecutorCompletionService using the supplied
90ce3da70b43 Initial load
duke
parents:
diff changeset
   157
     * executor for base task execution and the supplied queue as its
90ce3da70b43 Initial load
duke
parents:
diff changeset
   158
     * completion queue.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   159
     *
90ce3da70b43 Initial load
duke
parents:
diff changeset
   160
     * @param executor the executor to use
90ce3da70b43 Initial load
duke
parents:
diff changeset
   161
     * @param completionQueue the queue to use as the completion queue
90ce3da70b43 Initial load
duke
parents:
diff changeset
   162
     *        normally one dedicated for use by this service. This
90ce3da70b43 Initial load
duke
parents:
diff changeset
   163
     *        queue is treated as unbounded -- failed attempted
21278
ef8a3a2a72f2 8022746: List of spelling errors in API doc
malenkov
parents: 9242
diff changeset
   164
     *        {@code Queue.add} operations for completed tasks cause
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   165
     *        them not to be retrievable.
90ce3da70b43 Initial load
duke
parents:
diff changeset
   166
     * @throws NullPointerException if executor or completionQueue are {@code null}
90ce3da70b43 Initial load
duke
parents:
diff changeset
   167
     */
90ce3da70b43 Initial load
duke
parents:
diff changeset
   168
    public ExecutorCompletionService(Executor executor,
90ce3da70b43 Initial load
duke
parents:
diff changeset
   169
                                     BlockingQueue<Future<V>> completionQueue) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   170
        if (executor == null || completionQueue == null)
90ce3da70b43 Initial load
duke
parents:
diff changeset
   171
            throw new NullPointerException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   172
        this.executor = executor;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   173
        this.aes = (executor instanceof AbstractExecutorService) ?
90ce3da70b43 Initial load
duke
parents:
diff changeset
   174
            (AbstractExecutorService) executor : null;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   175
        this.completionQueue = completionQueue;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   176
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   177
90ce3da70b43 Initial load
duke
parents:
diff changeset
   178
    public Future<V> submit(Callable<V> task) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   179
        if (task == null) throw new NullPointerException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   180
        RunnableFuture<V> f = newTaskFor(task);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   181
        executor.execute(new QueueingFuture(f));
90ce3da70b43 Initial load
duke
parents:
diff changeset
   182
        return f;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   183
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   184
90ce3da70b43 Initial load
duke
parents:
diff changeset
   185
    public Future<V> submit(Runnable task, V result) {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   186
        if (task == null) throw new NullPointerException();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   187
        RunnableFuture<V> f = newTaskFor(task, result);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   188
        executor.execute(new QueueingFuture(f));
90ce3da70b43 Initial load
duke
parents:
diff changeset
   189
        return f;
90ce3da70b43 Initial load
duke
parents:
diff changeset
   190
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   191
90ce3da70b43 Initial load
duke
parents:
diff changeset
   192
    public Future<V> take() throws InterruptedException {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   193
        return completionQueue.take();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   194
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   195
90ce3da70b43 Initial load
duke
parents:
diff changeset
   196
    public Future<V> poll() {
90ce3da70b43 Initial load
duke
parents:
diff changeset
   197
        return completionQueue.poll();
90ce3da70b43 Initial load
duke
parents:
diff changeset
   198
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   199
7518
0282db800fe1 7003745: Code style cleanups (sync from Dougs CVS)
dl
parents: 5506
diff changeset
   200
    public Future<V> poll(long timeout, TimeUnit unit)
0282db800fe1 7003745: Code style cleanups (sync from Dougs CVS)
dl
parents: 5506
diff changeset
   201
            throws InterruptedException {
2
90ce3da70b43 Initial load
duke
parents:
diff changeset
   202
        return completionQueue.poll(timeout, unit);
90ce3da70b43 Initial load
duke
parents:
diff changeset
   203
    }
90ce3da70b43 Initial load
duke
parents:
diff changeset
   204
90ce3da70b43 Initial load
duke
parents:
diff changeset
   205
}