8012647: Add Arrays.parallelPrefix (prefix sum, scan, cumulative sum)
authorchegar
Wed, 26 Jun 2013 15:30:39 +0100
changeset 18555 d69e2a644e38
parent 18554 d2f655022d2d
child 18556 0fe397a41c18
8012647: Add Arrays.parallelPrefix (prefix sum, scan, cumulative sum) Reviewed-by: chegar, alanb, psandoz Contributed-by: Doug Lea <dl@cs.oswego.edu>, Tristan Yan <tristan.yan@oracle.com>, Chris Hegarty <chris.hegarty@oracle.com>
jdk/src/share/classes/java/util/ArrayPrefixHelpers.java
jdk/src/share/classes/java/util/Arrays.java
jdk/test/java/util/Arrays/ParallelPrefix.java
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/share/classes/java/util/ArrayPrefixHelpers.java	Wed Jun 26 15:30:39 2013 +0100
@@ -0,0 +1,697 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.util;
+
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/publicdomain/zero/1.0/
+ */
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.CountedCompleter;
+import java.util.function.BinaryOperator;
+import java.util.function.IntBinaryOperator;
+import java.util.function.LongBinaryOperator;
+import java.util.function.DoubleBinaryOperator;
+
+/**
+ * ForkJoin tasks to perform Arrays.parallelPrefix operations.
+ *
+ * @author Doug Lea
+ * @since 1.8
+ */
+class ArrayPrefixHelpers {
+    private ArrayPrefixHelpers() {}; // non-instantiable
+
+    /*
+     * Parallel prefix (aka cumulate, scan) task classes
+     * are based loosely on Guy Blelloch's original
+     * algorithm (http://www.cs.cmu.edu/~scandal/alg/scan.html):
+     *  Keep dividing by two to threshold segment size, and then:
+     *   Pass 1: Create tree of partial sums for each segment
+     *   Pass 2: For each segment, cumulate with offset of left sibling
+     *
+     * This version improves performance within FJ framework mainly by
+     * allowing the second pass of ready left-hand sides to proceed
+     * even if some right-hand side first passes are still executing.
+     * It also combines first and second pass for leftmost segment,
+     * and skips the first pass for rightmost segment (whose result is
+     * not needed for second pass).  It similarly manages to avoid
+     * requiring that users supply an identity basis for accumulations
+     * by tracking those segments/subtasks for which the first
+     * existing element is used as base.
+     *
+     * Managing this relies on ORing some bits in the pendingCount for
+     * phases/states: CUMULATE, SUMMED, and FINISHED. CUMULATE is the
+     * main phase bit. When false, segments compute only their sum.
+     * When true, they cumulate array elements. CUMULATE is set at
+     * root at beginning of second pass and then propagated down. But
+     * it may also be set earlier for subtrees with lo==0 (the left
+     * spine of tree). SUMMED is a one bit join count. For leafs, it
+     * is set when summed. For internal nodes, it becomes true when
+     * one child is summed.  When the second child finishes summing,
+     * we then moves up tree to trigger the cumulate phase. FINISHED
+     * is also a one bit join count. For leafs, it is set when
+     * cumulated. For internal nodes, it becomes true when one child
+     * is cumulated.  When the second child finishes cumulating, it
+     * then moves up tree, completing at the root.
+     *
+     * To better exploit locality and reduce overhead, the compute
+     * method loops starting with the current task, moving if possible
+     * to one of its subtasks rather than forking.
+     *
+     * As usual for this sort of utility, there are 4 versions, that
+     * are simple copy/paste/adapt variants of each other.  (The
+     * double and int versions differ from long version soley by
+     * replacing "long" (with case-matching)).
+     */
+
+    // see above
+    static final int CUMULATE = 1;
+    static final int SUMMED   = 2;
+    static final int FINISHED = 4;
+
+    /** The smallest subtask array partition size to use as threshold */
+    static final int MIN_PARTITION = 16;
+
+    static final class CumulateTask<T> extends CountedCompleter<Void> {
+        final T[] array;
+        final BinaryOperator<T> function;
+        CumulateTask<T> left, right;
+        T in, out;
+        final int lo, hi, origin, fence, threshold;
+
+        /** Root task constructor */
+        public CumulateTask(CumulateTask<T> parent,
+                            BinaryOperator<T> function,
+                            T[] array, int lo, int hi) {
+            super(parent);
+            this.function = function; this.array = array;
+            this.lo = this.origin = lo; this.hi = this.fence = hi;
+            int p;
+            this.threshold =
+                    (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
+                    <= MIN_PARTITION ? MIN_PARTITION : p;
+        }
+
+        /** Subtask constructor */
+        CumulateTask(CumulateTask<T> parent, BinaryOperator<T> function,
+                     T[] array, int origin, int fence, int threshold,
+                     int lo, int hi) {
+            super(parent);
+            this.function = function; this.array = array;
+            this.origin = origin; this.fence = fence;
+            this.threshold = threshold;
+            this.lo = lo; this.hi = hi;
+        }
+
+        public final void compute() {
+            final BinaryOperator<T> fn;
+            final T[] a;
+            if ((fn = this.function) == null || (a = this.array) == null)
+                throw new NullPointerException();    // hoist checks
+            int th = threshold, org = origin, fnc = fence, l, h;
+            CumulateTask<T> t = this;
+            outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
+                if (h - l > th) {
+                    CumulateTask<T> lt = t.left, rt = t.right, f;
+                    if (lt == null) {                // first pass
+                        int mid = (l + h) >>> 1;
+                        f = rt = t.right =
+                                new CumulateTask<T>(t, fn, a, org, fnc, th, mid, h);
+                        t = lt = t.left  =
+                                new CumulateTask<T>(t, fn, a, org, fnc, th, l, mid);
+                    }
+                    else {                           // possibly refork
+                        T pin = t.in;
+                        lt.in = pin;
+                        f = t = null;
+                        if (rt != null) {
+                            T lout = lt.out;
+                            rt.in = (l == org ? lout :
+                                     fn.apply(pin, lout));
+                            for (int c;;) {
+                                if (((c = rt.getPendingCount()) & CUMULATE) != 0)
+                                    break;
+                                if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
+                                    t = rt;
+                                    break;
+                                }
+                            }
+                        }
+                        for (int c;;) {
+                            if (((c = lt.getPendingCount()) & CUMULATE) != 0)
+                                break;
+                            if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
+                                if (t != null)
+                                    f = t;
+                                t = lt;
+                                break;
+                            }
+                        }
+                        if (t == null)
+                            break;
+                    }
+                    if (f != null)
+                        f.fork();
+                }
+                else {
+                    int state; // Transition to sum, cumulate, or both
+                    for (int b;;) {
+                        if (((b = t.getPendingCount()) & FINISHED) != 0)
+                            break outer;                      // already done
+                        state = ((b & CUMULATE) != 0? FINISHED :
+                                 (l > org) ? SUMMED : (SUMMED|FINISHED));
+                        if (t.compareAndSetPendingCount(b, b|state))
+                            break;
+                    }
+
+                    T sum;
+                    if (state != SUMMED) {
+                        int first;
+                        if (l == org) {                       // leftmost; no in
+                            sum = a[org];
+                            first = org + 1;
+                        }
+                        else {
+                            sum = t.in;
+                            first = l;
+                        }
+                        for (int i = first; i < h; ++i)       // cumulate
+                            a[i] = sum = fn.apply(sum, a[i]);
+                    }
+                    else if (h < fnc) {                       // skip rightmost
+                        sum = a[l];
+                        for (int i = l + 1; i < h; ++i)       // sum only
+                            sum = fn.apply(sum, a[i]);
+                    }
+                    else
+                        sum = t.in;
+                    t.out = sum;
+                    for (CumulateTask<T> par;;) {             // propagate
+                        if ((par = (CumulateTask<T>)t.getCompleter()) == null) {
+                            if ((state & FINISHED) != 0)      // enable join
+                                t.quietlyComplete();
+                            break outer;
+                        }
+                        int b = par.getPendingCount();
+                        if ((b & state & FINISHED) != 0)
+                            t = par;                          // both done
+                        else if ((b & state & SUMMED) != 0) { // both summed
+                            int nextState; CumulateTask<T> lt, rt;
+                            if ((lt = par.left) != null &&
+                                (rt = par.right) != null) {
+                                T lout = lt.out;
+                                par.out = (rt.hi == fnc ? lout :
+                                           fn.apply(lout, rt.out));
+                            }
+                            int refork = (((b & CUMULATE) == 0 &&
+                                           par.lo == org) ? CUMULATE : 0);
+                            if ((nextState = b|state|refork) == b ||
+                                par.compareAndSetPendingCount(b, nextState)) {
+                                state = SUMMED;               // drop finished
+                                t = par;
+                                if (refork != 0)
+                                    par.fork();
+                            }
+                        }
+                        else if (par.compareAndSetPendingCount(b, b|state))
+                            break outer;                      // sib not ready
+                    }
+                }
+            }
+        }
+    }
+
+    static final class LongCumulateTask extends CountedCompleter<Void> {
+        final long[] array;
+        final LongBinaryOperator function;
+        LongCumulateTask left, right;
+        long in, out;
+        final int lo, hi, origin, fence, threshold;
+
+        /** Root task constructor */
+        public LongCumulateTask(LongCumulateTask parent,
+                                LongBinaryOperator function,
+                                long[] array, int lo, int hi) {
+            super(parent);
+            this.function = function; this.array = array;
+            this.lo = this.origin = lo; this.hi = this.fence = hi;
+            int p;
+            this.threshold =
+                    (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
+                    <= MIN_PARTITION ? MIN_PARTITION : p;
+        }
+
+        /** Subtask constructor */
+        LongCumulateTask(LongCumulateTask parent, LongBinaryOperator function,
+                         long[] array, int origin, int fence, int threshold,
+                         int lo, int hi) {
+            super(parent);
+            this.function = function; this.array = array;
+            this.origin = origin; this.fence = fence;
+            this.threshold = threshold;
+            this.lo = lo; this.hi = hi;
+        }
+
+        public final void compute() {
+            final LongBinaryOperator fn;
+            final long[] a;
+            if ((fn = this.function) == null || (a = this.array) == null)
+                throw new NullPointerException();    // hoist checks
+            int th = threshold, org = origin, fnc = fence, l, h;
+            LongCumulateTask t = this;
+            outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
+                if (h - l > th) {
+                    LongCumulateTask lt = t.left, rt = t.right, f;
+                    if (lt == null) {                // first pass
+                        int mid = (l + h) >>> 1;
+                        f = rt = t.right =
+                                new LongCumulateTask(t, fn, a, org, fnc, th, mid, h);
+                        t = lt = t.left  =
+                                new LongCumulateTask(t, fn, a, org, fnc, th, l, mid);
+                    }
+                    else {                           // possibly refork
+                        long pin = t.in;
+                        lt.in = pin;
+                        f = t = null;
+                        if (rt != null) {
+                            long lout = lt.out;
+                            rt.in = (l == org ? lout :
+                                     fn.applyAsLong(pin, lout));
+                            for (int c;;) {
+                                if (((c = rt.getPendingCount()) & CUMULATE) != 0)
+                                    break;
+                                if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
+                                    t = rt;
+                                    break;
+                                }
+                            }
+                        }
+                        for (int c;;) {
+                            if (((c = lt.getPendingCount()) & CUMULATE) != 0)
+                                break;
+                            if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
+                                if (t != null)
+                                    f = t;
+                                t = lt;
+                                break;
+                            }
+                        }
+                        if (t == null)
+                            break;
+                    }
+                    if (f != null)
+                        f.fork();
+                }
+                else {
+                    int state; // Transition to sum, cumulate, or both
+                    for (int b;;) {
+                        if (((b = t.getPendingCount()) & FINISHED) != 0)
+                            break outer;                      // already done
+                        state = ((b & CUMULATE) != 0? FINISHED :
+                                 (l > org) ? SUMMED : (SUMMED|FINISHED));
+                        if (t.compareAndSetPendingCount(b, b|state))
+                            break;
+                    }
+
+                    long sum;
+                    if (state != SUMMED) {
+                        int first;
+                        if (l == org) {                       // leftmost; no in
+                            sum = a[org];
+                            first = org + 1;
+                        }
+                        else {
+                            sum = t.in;
+                            first = l;
+                        }
+                        for (int i = first; i < h; ++i)       // cumulate
+                            a[i] = sum = fn.applyAsLong(sum, a[i]);
+                    }
+                    else if (h < fnc) {                       // skip rightmost
+                        sum = a[l];
+                        for (int i = l + 1; i < h; ++i)       // sum only
+                            sum = fn.applyAsLong(sum, a[i]);
+                    }
+                    else
+                        sum = t.in;
+                    t.out = sum;
+                    for (LongCumulateTask par;;) {            // propagate
+                        if ((par = (LongCumulateTask)t.getCompleter()) == null) {
+                            if ((state & FINISHED) != 0)      // enable join
+                                t.quietlyComplete();
+                            break outer;
+                        }
+                        int b = par.getPendingCount();
+                        if ((b & state & FINISHED) != 0)
+                            t = par;                          // both done
+                        else if ((b & state & SUMMED) != 0) { // both summed
+                            int nextState; LongCumulateTask lt, rt;
+                            if ((lt = par.left) != null &&
+                                (rt = par.right) != null) {
+                                long lout = lt.out;
+                                par.out = (rt.hi == fnc ? lout :
+                                           fn.applyAsLong(lout, rt.out));
+                            }
+                            int refork = (((b & CUMULATE) == 0 &&
+                                           par.lo == org) ? CUMULATE : 0);
+                            if ((nextState = b|state|refork) == b ||
+                                par.compareAndSetPendingCount(b, nextState)) {
+                                state = SUMMED;               // drop finished
+                                t = par;
+                                if (refork != 0)
+                                    par.fork();
+                            }
+                        }
+                        else if (par.compareAndSetPendingCount(b, b|state))
+                            break outer;                      // sib not ready
+                    }
+                }
+            }
+        }
+    }
+
+    static final class DoubleCumulateTask extends CountedCompleter<Void> {
+        final double[] array;
+        final DoubleBinaryOperator function;
+        DoubleCumulateTask left, right;
+        double in, out;
+        final int lo, hi, origin, fence, threshold;
+
+        /** Root task constructor */
+        public DoubleCumulateTask(DoubleCumulateTask parent,
+                                  DoubleBinaryOperator function,
+                                  double[] array, int lo, int hi) {
+            super(parent);
+            this.function = function; this.array = array;
+            this.lo = this.origin = lo; this.hi = this.fence = hi;
+            int p;
+            this.threshold =
+                    (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
+                    <= MIN_PARTITION ? MIN_PARTITION : p;
+        }
+
+        /** Subtask constructor */
+        DoubleCumulateTask(DoubleCumulateTask parent, DoubleBinaryOperator function,
+                           double[] array, int origin, int fence, int threshold,
+                           int lo, int hi) {
+            super(parent);
+            this.function = function; this.array = array;
+            this.origin = origin; this.fence = fence;
+            this.threshold = threshold;
+            this.lo = lo; this.hi = hi;
+        }
+
+        public final void compute() {
+            final DoubleBinaryOperator fn;
+            final double[] a;
+            if ((fn = this.function) == null || (a = this.array) == null)
+                throw new NullPointerException();    // hoist checks
+            int th = threshold, org = origin, fnc = fence, l, h;
+            DoubleCumulateTask t = this;
+            outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
+                if (h - l > th) {
+                    DoubleCumulateTask lt = t.left, rt = t.right, f;
+                    if (lt == null) {                // first pass
+                        int mid = (l + h) >>> 1;
+                        f = rt = t.right =
+                                new DoubleCumulateTask(t, fn, a, org, fnc, th, mid, h);
+                        t = lt = t.left  =
+                                new DoubleCumulateTask(t, fn, a, org, fnc, th, l, mid);
+                    }
+                    else {                           // possibly refork
+                        double pin = t.in;
+                        lt.in = pin;
+                        f = t = null;
+                        if (rt != null) {
+                            double lout = lt.out;
+                            rt.in = (l == org ? lout :
+                                     fn.applyAsDouble(pin, lout));
+                            for (int c;;) {
+                                if (((c = rt.getPendingCount()) & CUMULATE) != 0)
+                                    break;
+                                if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
+                                    t = rt;
+                                    break;
+                                }
+                            }
+                        }
+                        for (int c;;) {
+                            if (((c = lt.getPendingCount()) & CUMULATE) != 0)
+                                break;
+                            if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
+                                if (t != null)
+                                    f = t;
+                                t = lt;
+                                break;
+                            }
+                        }
+                        if (t == null)
+                            break;
+                    }
+                    if (f != null)
+                        f.fork();
+                }
+                else {
+                    int state; // Transition to sum, cumulate, or both
+                    for (int b;;) {
+                        if (((b = t.getPendingCount()) & FINISHED) != 0)
+                            break outer;                      // already done
+                        state = ((b & CUMULATE) != 0? FINISHED :
+                                 (l > org) ? SUMMED : (SUMMED|FINISHED));
+                        if (t.compareAndSetPendingCount(b, b|state))
+                            break;
+                    }
+
+                    double sum;
+                    if (state != SUMMED) {
+                        int first;
+                        if (l == org) {                       // leftmost; no in
+                            sum = a[org];
+                            first = org + 1;
+                        }
+                        else {
+                            sum = t.in;
+                            first = l;
+                        }
+                        for (int i = first; i < h; ++i)       // cumulate
+                            a[i] = sum = fn.applyAsDouble(sum, a[i]);
+                    }
+                    else if (h < fnc) {                       // skip rightmost
+                        sum = a[l];
+                        for (int i = l + 1; i < h; ++i)       // sum only
+                            sum = fn.applyAsDouble(sum, a[i]);
+                    }
+                    else
+                        sum = t.in;
+                    t.out = sum;
+                    for (DoubleCumulateTask par;;) {            // propagate
+                        if ((par = (DoubleCumulateTask)t.getCompleter()) == null) {
+                            if ((state & FINISHED) != 0)      // enable join
+                                t.quietlyComplete();
+                            break outer;
+                        }
+                        int b = par.getPendingCount();
+                        if ((b & state & FINISHED) != 0)
+                            t = par;                          // both done
+                        else if ((b & state & SUMMED) != 0) { // both summed
+                            int nextState; DoubleCumulateTask lt, rt;
+                            if ((lt = par.left) != null &&
+                                (rt = par.right) != null) {
+                                double lout = lt.out;
+                                par.out = (rt.hi == fnc ? lout :
+                                           fn.applyAsDouble(lout, rt.out));
+                            }
+                            int refork = (((b & CUMULATE) == 0 &&
+                                           par.lo == org) ? CUMULATE : 0);
+                            if ((nextState = b|state|refork) == b ||
+                                par.compareAndSetPendingCount(b, nextState)) {
+                                state = SUMMED;               // drop finished
+                                t = par;
+                                if (refork != 0)
+                                    par.fork();
+                            }
+                        }
+                        else if (par.compareAndSetPendingCount(b, b|state))
+                            break outer;                      // sib not ready
+                    }
+                }
+            }
+        }
+    }
+
+    static final class IntCumulateTask extends CountedCompleter<Void> {
+        final int[] array;
+        final IntBinaryOperator function;
+        IntCumulateTask left, right;
+        int in, out;
+        final int lo, hi, origin, fence, threshold;
+
+        /** Root task constructor */
+        public IntCumulateTask(IntCumulateTask parent,
+                               IntBinaryOperator function,
+                               int[] array, int lo, int hi) {
+            super(parent);
+            this.function = function; this.array = array;
+            this.lo = this.origin = lo; this.hi = this.fence = hi;
+            int p;
+            this.threshold =
+                    (p = (hi - lo) / (ForkJoinPool.getCommonPoolParallelism() << 3))
+                    <= MIN_PARTITION ? MIN_PARTITION : p;
+        }
+
+        /** Subtask constructor */
+        IntCumulateTask(IntCumulateTask parent, IntBinaryOperator function,
+                        int[] array, int origin, int fence, int threshold,
+                        int lo, int hi) {
+            super(parent);
+            this.function = function; this.array = array;
+            this.origin = origin; this.fence = fence;
+            this.threshold = threshold;
+            this.lo = lo; this.hi = hi;
+        }
+
+        public final void compute() {
+            final IntBinaryOperator fn;
+            final int[] a;
+            if ((fn = this.function) == null || (a = this.array) == null)
+                throw new NullPointerException();    // hoist checks
+            int th = threshold, org = origin, fnc = fence, l, h;
+            IntCumulateTask t = this;
+            outer: while ((l = t.lo) >= 0 && (h = t.hi) <= a.length) {
+                if (h - l > th) {
+                    IntCumulateTask lt = t.left, rt = t.right, f;
+                    if (lt == null) {                // first pass
+                        int mid = (l + h) >>> 1;
+                        f = rt = t.right =
+                                new IntCumulateTask(t, fn, a, org, fnc, th, mid, h);
+                        t = lt = t.left  =
+                                new IntCumulateTask(t, fn, a, org, fnc, th, l, mid);
+                    }
+                    else {                           // possibly refork
+                        int pin = t.in;
+                        lt.in = pin;
+                        f = t = null;
+                        if (rt != null) {
+                            int lout = lt.out;
+                            rt.in = (l == org ? lout :
+                                     fn.applyAsInt(pin, lout));
+                            for (int c;;) {
+                                if (((c = rt.getPendingCount()) & CUMULATE) != 0)
+                                    break;
+                                if (rt.compareAndSetPendingCount(c, c|CUMULATE)){
+                                    t = rt;
+                                    break;
+                                }
+                            }
+                        }
+                        for (int c;;) {
+                            if (((c = lt.getPendingCount()) & CUMULATE) != 0)
+                                break;
+                            if (lt.compareAndSetPendingCount(c, c|CUMULATE)) {
+                                if (t != null)
+                                    f = t;
+                                t = lt;
+                                break;
+                            }
+                        }
+                        if (t == null)
+                            break;
+                    }
+                    if (f != null)
+                        f.fork();
+                }
+                else {
+                    int state; // Transition to sum, cumulate, or both
+                    for (int b;;) {
+                        if (((b = t.getPendingCount()) & FINISHED) != 0)
+                            break outer;                      // already done
+                        state = ((b & CUMULATE) != 0? FINISHED :
+                                 (l > org) ? SUMMED : (SUMMED|FINISHED));
+                        if (t.compareAndSetPendingCount(b, b|state))
+                            break;
+                    }
+
+                    int sum;
+                    if (state != SUMMED) {
+                        int first;
+                        if (l == org) {                       // leftmost; no in
+                            sum = a[org];
+                            first = org + 1;
+                        }
+                        else {
+                            sum = t.in;
+                            first = l;
+                        }
+                        for (int i = first; i < h; ++i)       // cumulate
+                            a[i] = sum = fn.applyAsInt(sum, a[i]);
+                    }
+                    else if (h < fnc) {                       // skip rightmost
+                        sum = a[l];
+                        for (int i = l + 1; i < h; ++i)       // sum only
+                            sum = fn.applyAsInt(sum, a[i]);
+                    }
+                    else
+                        sum = t.in;
+                    t.out = sum;
+                    for (IntCumulateTask par;;) {            // propagate
+                        if ((par = (IntCumulateTask)t.getCompleter()) == null) {
+                            if ((state & FINISHED) != 0)      // enable join
+                                t.quietlyComplete();
+                            break outer;
+                        }
+                        int b = par.getPendingCount();
+                        if ((b & state & FINISHED) != 0)
+                            t = par;                          // both done
+                        else if ((b & state & SUMMED) != 0) { // both summed
+                            int nextState; IntCumulateTask lt, rt;
+                            if ((lt = par.left) != null &&
+                                (rt = par.right) != null) {
+                                int lout = lt.out;
+                                par.out = (rt.hi == fnc ? lout :
+                                           fn.applyAsInt(lout, rt.out));
+                            }
+                            int refork = (((b & CUMULATE) == 0 &&
+                                           par.lo == org) ? CUMULATE : 0);
+                            if ((nextState = b|state|refork) == b ||
+                                par.compareAndSetPendingCount(b, nextState)) {
+                                state = SUMMED;               // drop finished
+                                t = par;
+                                if (refork != 0)
+                                    par.fork();
+                            }
+                        }
+                        else if (par.compareAndSetPendingCount(b, b|state))
+                            break outer;                      // sib not ready
+                    }
+                }
+            }
+        }
+    }
+
+
+}
\ No newline at end of file
--- a/jdk/src/share/classes/java/util/Arrays.java	Wed Jun 26 06:32:40 2013 -0700
+++ b/jdk/src/share/classes/java/util/Arrays.java	Wed Jun 26 15:30:39 2013 +0100
@@ -1559,6 +1559,183 @@
         }
     }
 
+    // Parallel prefix
+
+    /**
+     * Cumulates, in parallel, each element of the given array in place,
+     * using the supplied function. For example if the array initially
+     * holds {@code [2, 1, 0, 3]} and the operation performs addition,
+     * then upon return the array holds {@code [2, 3, 3, 6]}.
+     * Parallel prefix computation is usually more efficient than
+     * sequential loops for large arrays.
+     *
+     * @param array the array, which is modified in-place by this method
+     * @param op a side-effect-free, associative function to perform the
+     * cumulation
+     * @throws NullPointerException if the specified array or function is null
+     * @since 1.8
+     */
+    public static <T> void parallelPrefix(T[] array, BinaryOperator<T> op) {
+        if (array.length > 0)
+            new ArrayPrefixHelpers.CumulateTask<>
+                    (null, op, array, 0, array.length).invoke();
+    }
+
+    /**
+     * Performs {@link #parallelPrefix(Object[], BinaryOperator)}
+     * for the given subrange of the array.
+     *
+     * @param array the array
+     * @param fromIndex the index of the first element, inclusive
+     * @param toIndex the index of the last element, exclusive
+     * @param op a side-effect-free, associative function to perform the
+     * cumulation
+     * @throws IllegalArgumentException if {@code fromIndex > toIndex}
+     * @throws ArrayIndexOutOfBoundsException
+     *     if {@code fromIndex < 0} or {@code toIndex > array.length}
+     * @throws NullPointerException if the specified array or function is null
+     * @since 1.8
+     */
+    public static <T> void parallelPrefix(T[] array, int fromIndex,
+                                          int toIndex, BinaryOperator<T> op) {
+        rangeCheck(array.length, fromIndex, toIndex);
+        if (fromIndex < toIndex)
+            new ArrayPrefixHelpers.CumulateTask<>
+                    (null, op, array, fromIndex, toIndex).invoke();
+    }
+
+    /**
+     * Cumulates, in parallel, each element of the given array in place,
+     * using the supplied function. For example if the array initially
+     * holds {@code [2, 1, 0, 3]} and the operation performs addition,
+     * then upon return the array holds {@code [2, 3, 3, 6]}.
+     * Parallel prefix computation is usually more efficient than
+     * sequential loops for large arrays.
+     *
+     * @param array the array, which is modified in-place by this method
+     * @param op a side-effect-free, associative function to perform the
+     * cumulation
+     * @throws NullPointerException if the specified array or function is null
+     * @since 1.8
+     */
+    public static void parallelPrefix(long[] array, LongBinaryOperator op) {
+        if (array.length > 0)
+            new ArrayPrefixHelpers.LongCumulateTask
+                    (null, op, array, 0, array.length).invoke();
+    }
+
+    /**
+     * Performs {@link #parallelPrefix(long[], LongBinaryOperator)}
+     * for the given subrange of the array.
+     *
+     * @param array the array
+     * @param fromIndex the index of the first element, inclusive
+     * @param toIndex the index of the last element, exclusive
+     * @param op a side-effect-free, associative function to perform the
+     * cumulation
+     * @throws IllegalArgumentException if {@code fromIndex > toIndex}
+     * @throws ArrayIndexOutOfBoundsException
+     *     if {@code fromIndex < 0} or {@code toIndex > array.length}
+     * @throws NullPointerException if the specified array or function is null
+     * @since 1.8
+     */
+    public static void parallelPrefix(long[] array, int fromIndex,
+                                      int toIndex, LongBinaryOperator op) {
+        rangeCheck(array.length, fromIndex, toIndex);
+        if (fromIndex < toIndex)
+            new ArrayPrefixHelpers.LongCumulateTask
+                    (null, op, array, fromIndex, toIndex).invoke();
+    }
+
+    /**
+     * Cumulates, in parallel, each element of the given array in place,
+     * using the supplied function. For example if the array initially
+     * holds {@code [2.0, 1.0, 0.0, 3.0]} and the operation performs addition,
+     * then upon return the array holds {@code [2.0, 3.0, 3.0, 6.0]}.
+     * Parallel prefix computation is usually more efficient than
+     * sequential loops for large arrays.
+     *
+     * <p> Because floating-point operations may not be strictly associative,
+     * the returned result may not be identical to the value that would be
+     * obtained if the operation was performed sequentially.
+     *
+     * @param array the array, which is modified in-place by this method
+     * @param op a side-effect-free function to perform the cumulation
+     * @throws NullPointerException if the specified array or function is null
+     * @since 1.8
+     */
+    public static void parallelPrefix(double[] array, DoubleBinaryOperator op) {
+        if (array.length > 0)
+            new ArrayPrefixHelpers.DoubleCumulateTask
+                    (null, op, array, 0, array.length).invoke();
+    }
+
+    /**
+     * Performs {@link #parallelPrefix(double[], DoubleBinaryOperator)}
+     * for the given subrange of the array.
+     *
+     * @param array the array
+     * @param fromIndex the index of the first element, inclusive
+     * @param toIndex the index of the last element, exclusive
+     * @param op a side-effect-free, associative function to perform the
+     * cumulation
+     * @throws IllegalArgumentException if {@code fromIndex > toIndex}
+     * @throws ArrayIndexOutOfBoundsException
+     *     if {@code fromIndex < 0} or {@code toIndex > array.length}
+     * @throws NullPointerException if the specified array or function is null
+     * @since 1.8
+     */
+    public static void parallelPrefix(double[] array, int fromIndex,
+                                      int toIndex, DoubleBinaryOperator op) {
+        rangeCheck(array.length, fromIndex, toIndex);
+        if (fromIndex < toIndex)
+            new ArrayPrefixHelpers.DoubleCumulateTask
+                    (null, op, array, fromIndex, toIndex).invoke();
+    }
+
+    /**
+     * Cumulates, in parallel, each element of the given array in place,
+     * using the supplied function. For example if the array initially
+     * holds {@code [2, 1, 0, 3]} and the operation performs addition,
+     * then upon return the array holds {@code [2, 3, 3, 6]}.
+     * Parallel prefix computation is usually more efficient than
+     * sequential loops for large arrays.
+     *
+     * @param array the array, which is modified in-place by this method
+     * @param op a side-effect-free, associative function to perform the
+     * cumulation
+     * @throws NullPointerException if the specified array or function is null
+     * @since 1.8
+     */
+    public static void parallelPrefix(int[] array, IntBinaryOperator op) {
+        if (array.length > 0)
+            new ArrayPrefixHelpers.IntCumulateTask
+                    (null, op, array, 0, array.length).invoke();
+    }
+
+    /**
+     * Performs {@link #parallelPrefix(int[], IntBinaryOperator)}
+     * for the given subrange of the array.
+     *
+     * @param array the array
+     * @param fromIndex the index of the first element, inclusive
+     * @param toIndex the index of the last element, exclusive
+     * @param op a side-effect-free, associative function to perform the
+     * cumulation
+     * @throws IllegalArgumentException if {@code fromIndex > toIndex}
+     * @throws ArrayIndexOutOfBoundsException
+     *     if {@code fromIndex < 0} or {@code toIndex > array.length}
+     * @throws NullPointerException if the specified array or function is null
+     * @since 1.8
+     */
+    public static void parallelPrefix(int[] array, int fromIndex,
+                                      int toIndex, IntBinaryOperator op) {
+        rangeCheck(array.length, fromIndex, toIndex);
+        if (fromIndex < toIndex)
+            new ArrayPrefixHelpers.IntCumulateTask
+                    (null, op, array, fromIndex, toIndex).invoke();
+    }
+
     // Searching
 
     /**
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/util/Arrays/ParallelPrefix.java	Wed Jun 26 15:30:39 2013 +0100
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/**
+ * @test
+ * @summary unit test for Arrays.ParallelPrefix().
+ * @author Tristan Yan
+ * @run testng ParallelPrefix
+ */
+
+import java.util.Arrays;
+import java.util.function.BinaryOperator;
+import java.util.function.DoubleBinaryOperator;
+import java.util.function.Function;
+import java.util.function.IntBinaryOperator;
+import java.util.function.LongBinaryOperator;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import static org.testng.Assert.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class ParallelPrefix {
+    //Array size less than MIN_PARTITION
+    private final static int SMALL_ARRAY_SIZE = 1 << 3;
+
+    //Array size equals MIN_PARTITION
+    private final static int THRESHOLD_ARRAY_SIZE = 1 << 4;
+
+    //Array size greater than MIN_PARTITION
+    private final static int MEDIUM_ARRAY_SIZE = 1 << 8;
+
+    //Array size much greater than MIN_PARTITION
+    private final static int LARGE_ARRAY_SIZE = 1 << 12;
+
+    private final static int[] ARRAY_SIZE_COLLECTION  = new int[]{
+        SMALL_ARRAY_SIZE, THRESHOLD_ARRAY_SIZE,MEDIUM_ARRAY_SIZE, LARGE_ARRAY_SIZE};
+
+    @DataProvider
+    public static Object[][] intSet(){
+        return genericData(size -> IntStream.range(0, size).toArray(), new IntBinaryOperator[]{Integer::sum, Integer::min});
+    }
+
+    @DataProvider
+    public static Object[][] longSet(){
+        return genericData(size -> LongStream.range(0, size).toArray(), new LongBinaryOperator[]{Long::sum, Long::min});
+    }
+
+    @DataProvider
+    public static Object[][] doubleSet(){
+        return genericData(size -> IntStream.range(0, size).mapToDouble(i -> (double)i).toArray(),
+                new DoubleBinaryOperator[]{Double::sum, Double::min});
+    }
+
+    @DataProvider
+    public static Object[][] stringSet(){
+        Function<Integer, String[]> stringsFunc = size ->
+                IntStream.range(0, size).mapToObj(Integer::toString).toArray(String[]::new);
+        BinaryOperator<String> cancatBop = String::concat;
+        return genericData(stringsFunc,  new BinaryOperator[]{cancatBop});
+    }
+
+    private static <T, OPS> Object[][] genericData(Function<Integer, T> generateFunc, OPS[] ops) {
+        //test arrays which size is equals n-1, n, n+1, test random data
+        Object[][] data = new Object[ARRAY_SIZE_COLLECTION.length * 3 * ops.length][4];
+        for(int n = 0; n < ARRAY_SIZE_COLLECTION.length; n++ ) {
+            for(int testValue = -1 ; testValue <= 1; testValue++) {
+                int array_size = ARRAY_SIZE_COLLECTION[n] + testValue;
+                for(int opsN = 0; opsN < ops.length; opsN++) {
+                    int index = n * 3 * ops.length + (testValue + 1) * ops.length + opsN;
+                    data[index][0] = generateFunc.apply(array_size);
+                    data[index][1] = array_size / 3;
+                    data[index][2] = 2 * array_size / 3;
+                    data[index][3] = ops[opsN];
+                }
+            }
+        }
+        return data;
+    }
+
+    @Test(dataProvider="intSet")
+    public void testParallelPrefixForInt(int[] data, int fromIndex, int toIndex, IntBinaryOperator op) {
+        int[] sequentialResult = data.clone();
+        for (int index = fromIndex + 1; index < toIndex; index++) {
+            sequentialResult[index ] = op.applyAsInt(sequentialResult[index  - 1], sequentialResult[index]);
+        }
+
+        int[] parallelResult = data.clone();
+        Arrays.parallelPrefix(parallelResult, fromIndex, toIndex, op);
+        assertEquals(parallelResult, sequentialResult);
+
+        int[] parallelRangeResult = Arrays.copyOfRange(data, fromIndex, toIndex);
+        Arrays.parallelPrefix(parallelRangeResult, op);
+        assertEquals(parallelRangeResult, Arrays.copyOfRange(sequentialResult, fromIndex, toIndex));
+    }
+
+    @Test(dataProvider="longSet")
+    public void testParallelPrefixForLong(long[] data, int fromIndex, int toIndex, LongBinaryOperator op) {
+        long[] sequentialResult = data.clone();
+        for (int index = fromIndex + 1; index < toIndex; index++) {
+            sequentialResult[index ] = op.applyAsLong(sequentialResult[index  - 1], sequentialResult[index]);
+        }
+
+        long[] parallelResult = data.clone();
+        Arrays.parallelPrefix(parallelResult, fromIndex, toIndex, op);
+        assertEquals(parallelResult, sequentialResult);
+
+        long[] parallelRangeResult = Arrays.copyOfRange(data, fromIndex, toIndex);
+        Arrays.parallelPrefix(parallelRangeResult, op);
+        assertEquals(parallelRangeResult, Arrays.copyOfRange(sequentialResult, fromIndex, toIndex));
+    }
+
+    @Test(dataProvider="doubleSet")
+    public void testParallelPrefixForDouble(double[] data, int fromIndex, int toIndex, DoubleBinaryOperator op) {
+        double[] sequentialResult = data.clone();
+        for (int index = fromIndex + 1; index < toIndex; index++) {
+            sequentialResult[index ] = op.applyAsDouble(sequentialResult[index  - 1], sequentialResult[index]);
+        }
+
+        double[] parallelResult = data.clone();
+        Arrays.parallelPrefix(parallelResult, fromIndex, toIndex, op);
+        assertEquals(parallelResult, sequentialResult);
+
+        double[] parallelRangeResult = Arrays.copyOfRange(data, fromIndex, toIndex);
+        Arrays.parallelPrefix(parallelRangeResult, op);
+        assertEquals(parallelRangeResult, Arrays.copyOfRange(sequentialResult, fromIndex, toIndex));
+    }
+
+    @Test(dataProvider="stringSet")
+    public void testParallelPrefixForStringr(String[] data , int fromIndex, int toIndex, BinaryOperator<String> op) {
+        String[] sequentialResult = data.clone();
+        for (int index = fromIndex + 1; index < toIndex; index++) {
+            sequentialResult[index ] = op.apply(sequentialResult[index  - 1], sequentialResult[index]);
+        }
+
+        String[] parallelResult = data.clone();
+        Arrays.parallelPrefix(parallelResult, fromIndex, toIndex, op);
+        assertEquals(parallelResult, sequentialResult);
+
+        String[] parallelRangeResult = Arrays.copyOfRange(data, fromIndex, toIndex);
+        Arrays.parallelPrefix(parallelRangeResult, op);
+        assertEquals(parallelRangeResult, Arrays.copyOfRange(sequentialResult, fromIndex, toIndex));
+    }
+}
+