8072773: (fs) Files.lines needs a better splitting implementation for stream source
authorpsandoz
Wed, 24 Jun 2015 12:05:30 +0200
changeset 31258 8e44e5e2563e
parent 31257 ae0369eaf0dc
child 31259 1ea46564f04d
8072773: (fs) Files.lines needs a better splitting implementation for stream source Reviewed-by: alanb
jdk/src/java.base/share/classes/java/nio/file/FileChannelLinesSpliterator.java
jdk/src/java.base/share/classes/java/nio/file/Files.java
jdk/test/java/nio/file/Files/StreamLinesTest.java
jdk/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java
jdk/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java
jdk/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java
jdk/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java
jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/src/java.base/share/classes/java/nio/file/FileChannelLinesSpliterator.java	Wed Jun 24 12:05:30 2015 +0200
@@ -0,0 +1,267 @@
+/*
+ * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+package java.nio.file;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.function.Consumer;
+
+/**
+ * A file-based lines spliterator, leveraging a shared mapped byte buffer and
+ * associated file channel, covering lines of a file for character encodings
+ * where line feed characters can be easily identified from character encoded
+ * bytes.
+ *
+ * <p>
+ * When the root spliterator is first split a mapped byte buffer will be created
+ * over the file for it's size that was observed when the stream was created.
+ * Thus a mapped byte buffer is only required for parallel stream execution.
+ * Sub-spliterators will share that mapped byte buffer.  Splitting will use the
+ * mapped byte buffer to find the closest line feed characters(s) to the left or
+ * right of the mid-point of covered range of bytes of the file.  If a line feed
+ * is found then the spliterator is split with returned spliterator containing
+ * the identified line feed characters(s) at the end of it's covered range of
+ * bytes.
+ *
+ * <p>
+ * Traversing will create a buffered reader, derived from the file channel, for
+ * the range of bytes of the file.  The lines are then read from that buffered
+ * reader.  Once traversing commences no further splitting can be performed and
+ * the reference to the mapped byte buffer will be set to null.
+ */
+final class FileChannelLinesSpliterator implements Spliterator<String> {
+
+    static final Set<String> SUPPORTED_CHARSET_NAMES;
+    static {
+        SUPPORTED_CHARSET_NAMES = new HashSet<>();
+        SUPPORTED_CHARSET_NAMES.add(StandardCharsets.UTF_8.name());
+        SUPPORTED_CHARSET_NAMES.add(StandardCharsets.ISO_8859_1.name());
+        SUPPORTED_CHARSET_NAMES.add(StandardCharsets.US_ASCII.name());
+    }
+
+    private final FileChannel fc;
+    private final Charset cs;
+    private int index;
+    private final int fence;
+
+    // Null before first split, non-null when splitting, null when traversing
+    private ByteBuffer buffer;
+    // Non-null when traversing
+    private BufferedReader reader;
+
+    FileChannelLinesSpliterator(FileChannel fc, Charset cs, int index, int fence) {
+        this.fc = fc;
+        this.cs = cs;
+        this.index = index;
+        this.fence = fence;
+    }
+
+    private FileChannelLinesSpliterator(FileChannel fc, Charset cs, int index, int fence, ByteBuffer buffer) {
+        this.fc = fc;
+        this.buffer = buffer;
+        this.cs = cs;
+        this.index = index;
+        this.fence = fence;
+    }
+
+    @Override
+    public boolean tryAdvance(Consumer<? super String> action) {
+        String line = readLine();
+        if (line != null) {
+            action.accept(line);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public void forEachRemaining(Consumer<? super String> action) {
+        String line;
+        while ((line = readLine()) != null) {
+            action.accept(line);
+        }
+    }
+
+    private BufferedReader getBufferedReader() {
+        /**
+         * A readable byte channel that reads bytes from an underlying
+         * file channel over a specified range.
+         */
+        ReadableByteChannel rrbc = new ReadableByteChannel() {
+            @Override
+            public int read(ByteBuffer dst) throws IOException {
+                int bytesToRead = fence - index;
+                if (bytesToRead == 0)
+                    return -1;
+
+                int bytesRead;
+                if (bytesToRead < dst.remaining()) {
+                    // The number of bytes to read is less than remaining
+                    // bytes in the buffer
+                    // Snapshot the limit, reduce it, read, then restore
+                    int oldLimit = dst.limit();
+                    dst.limit(dst.position() + bytesToRead);
+                    bytesRead = fc.read(dst, index);
+                    dst.limit(oldLimit);
+                } else {
+                    bytesRead = fc.read(dst, index);
+                }
+                if (bytesRead == -1) {
+                    index = fence;
+                    return bytesRead;
+                }
+
+                index += bytesRead;
+                return bytesRead;
+            }
+
+            @Override
+            public boolean isOpen() {
+                return fc.isOpen();
+            }
+
+            @Override
+            public void close() throws IOException {
+                fc.close();
+            }
+        };
+        return new BufferedReader(Channels.newReader(rrbc, cs.newDecoder(), -1));
+    }
+
+    private String readLine() {
+        if (reader == null) {
+            reader = getBufferedReader();
+            buffer = null;
+        }
+
+        try {
+            return reader.readLine();
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    private ByteBuffer getMappedByteBuffer() {
+        // TODO can the mapped byte buffer be explicitly unmapped?
+        // It's possible, via a shared-secret mechanism, when either
+        // 1) the spliterator starts traversing, although traversal can
+        //    happen concurrently for mulitple spliterators, so care is
+        //    needed in this case; or
+        // 2) when the stream is closed using some shared holder to pass
+        //    the mapped byte buffer when it is created.
+        try {
+            return fc.map(FileChannel.MapMode.READ_ONLY, 0, fence);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @Override
+    public Spliterator<String> trySplit() {
+        // Cannot split after partial traverse
+        if (reader != null)
+            return null;
+
+        ByteBuffer b;
+        if ((b = buffer) == null) {
+            b = buffer = getMappedByteBuffer();
+        }
+
+        final int hi = fence, lo = index;
+
+        // Check if line separator hits the mid point
+        int mid = (lo + hi) >>> 1;
+        int c =  b.get(mid);
+        if (c == '\n') {
+            mid++;
+        } else if (c == '\r') {
+            // Check if a line separator of "\r\n"
+            if (++mid < hi && b.get(mid) == '\n') {
+                mid++;
+            }
+        } else {
+            // TODO give up after a certain distance from the mid point?
+            // Scan to the left and right of the mid point
+            int midL = mid - 1;
+            int midR = mid + 1;
+            mid = 0;
+            while (midL > lo && midR < hi) {
+                // Sample to the left
+                c = b.get(midL--);
+                if (c == '\n' || c == '\r') {
+                    // If c is "\r" then no need to check for "\r\n"
+                    // since the subsequent value was previously checked
+                    mid = midL + 2;
+                    break;
+                }
+
+                // Sample to the right
+                c = b.get(midR++);
+                if (c == '\n' || c == '\r') {
+                    mid = midR;
+                    // Check if line-separator is "\r\n"
+                    if (c == '\r' && mid < hi && b.get(mid) == '\n') {
+                        mid++;
+                    }
+                    break;
+                }
+            }
+        }
+
+        // The left spliterator will have the line-separator at the end
+        return (mid > lo && mid < hi)
+               ? new FileChannelLinesSpliterator(fc, cs, lo, index = mid, b)
+               : null;
+    }
+
+    @Override
+    public long estimateSize() {
+        // Use the number of bytes as an estimate.
+        // We could divide by a constant that is the average number of
+        // characters per-line, but that constant will be factored out.
+        return fence - index;
+    }
+
+    @Override
+    public long getExactSizeIfKnown() {
+        return -1;
+    }
+
+    @Override
+    public int characteristics() {
+        return Spliterator.ORDERED | Spliterator.NONNULL;
+    }
+}
--- a/jdk/src/java.base/share/classes/java/nio/file/Files.java	Tue Jun 23 14:20:59 2015 -0700
+++ b/jdk/src/java.base/share/classes/java/nio/file/Files.java	Wed Jun 24 12:05:30 2015 +0200
@@ -38,6 +38,7 @@
 import java.io.UncheckedIOException;
 import java.io.Writer;
 import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetDecoder;
@@ -3735,6 +3736,7 @@
         }
     }
 
+
     /**
      * Read all lines from a file as a {@code Stream}. Unlike {@link
      * #readAllLines(Path, Charset) readAllLines}, this method does not read
@@ -3748,6 +3750,10 @@
      * <p> The returned stream contains a reference to an open file. The file
      * is closed by closing the stream.
      *
+     * <p> The file contents should not be modified during the execution of the
+     * terminal stream operation. Otherwise, the result of the terminal stream
+     * operation is undefined.
+     *
      * <p> After this method returns, then any subsequent I/O exception that
      * occurs while reading from the file or when a malformed or unmappable byte
      * sequence is read, is wrapped in an {@link UncheckedIOException} that will
@@ -3761,6 +3767,30 @@
      * control structure to ensure that the stream's open file is closed promptly
      * after the stream's operations have completed.
      *
+     * @implNote
+     * This implementation supports good parallel stream performance for the
+     * standard charsets {@link StandardCharsets#UTF_8 UTF-8},
+     * {@link StandardCharsets#US_ASCII US-ASCII} and
+     * {@link StandardCharsets#ISO_8859_1 ISO-8859-1}.  Such
+     * <em>line-optimal</em> charsets have the property that the encoded bytes
+     * of a line feed ('\n') or a carriage return ('\r') are efficiently
+     * identifiable from other encoded characters when randomly accessing the
+     * bytes of the file.
+     *
+     * <p> For non-<em>line-optimal</em> charsets the stream source's
+     * spliterator has poor splitting properties, similar to that of a
+     * spliterator associated with an iterator or that associated with a stream
+     * returned from {@link BufferedReader#lines()}.  Poor splitting properties
+     * can result in poor parallel stream performance.
+     *
+     * <p> For <em>line-optimal</em> charsets the stream source's spliterator
+     * has good splitting properties, assuming the file contains a regular
+     * sequence of lines.  Good splitting properties can result in good parallel
+     * stream performance.  The spliterator for a <em>line-optimal</em> charset
+     * takes advantage of the charset properties (a line feed or a carriage
+     * return being efficient identifiable) such that when splitting it can
+     * approximately divide the number of covered lines in half.
+     *
      * @param   path
      *          the path to the file
      * @param   cs
@@ -3781,7 +3811,50 @@
      * @since   1.8
      */
     public static Stream<String> lines(Path path, Charset cs) throws IOException {
-        BufferedReader br = Files.newBufferedReader(path, cs);
+        // Use the good splitting spliterator if:
+        // 1) the path is associated with the default file system;
+        // 2) the character set is supported; and
+        // 3) the file size is such that all bytes can be indexed by int values
+        //    (this limitation is imposed by ByteBuffer)
+        if (path.getFileSystem() == FileSystems.getDefault() &&
+            FileChannelLinesSpliterator.SUPPORTED_CHARSET_NAMES.contains(cs.name())) {
+            FileChannel fc = FileChannel.open(path, StandardOpenOption.READ);
+
+            Stream<String> fcls = createFileChannelLinesStream(fc, cs);
+            if (fcls != null) {
+                return fcls;
+            }
+            fc.close();
+        }
+
+        return createBufferedReaderLinesStream(Files.newBufferedReader(path, cs));
+    }
+
+    private static Stream<String> createFileChannelLinesStream(FileChannel fc, Charset cs) throws IOException {
+        try {
+            // Obtaining the size from the FileChannel is much faster
+            // than obtaining using path.toFile().length()
+            long length = fc.size();
+            if (length <= Integer.MAX_VALUE) {
+                Spliterator<String> s = new FileChannelLinesSpliterator(fc, cs, 0, (int) length);
+                return StreamSupport.stream(s, false)
+                        .onClose(Files.asUncheckedRunnable(fc));
+            }
+        } catch (Error|RuntimeException|IOException e) {
+            try {
+                fc.close();
+            } catch (IOException ex) {
+                try {
+                    e.addSuppressed(ex);
+                } catch (Throwable ignore) {
+                }
+            }
+            throw e;
+        }
+        return null;
+    }
+
+    private static Stream<String> createBufferedReaderLinesStream(BufferedReader br) {
         try {
             return br.lines().onClose(asUncheckedRunnable(br));
         } catch (Error|RuntimeException e) {
@@ -3790,7 +3863,8 @@
             } catch (IOException ex) {
                 try {
                     e.addSuppressed(ex);
-                } catch (Throwable ignore) {}
+                } catch (Throwable ignore) {
+                }
             }
             throw e;
         }
@@ -3804,6 +3878,10 @@
      * <p> The returned stream contains a reference to an open file. The file
      * is closed by closing the stream.
      *
+     * <p> The file contents should not be modified during the execution of the
+     * terminal stream operation. Otherwise, the result of the terminal stream
+     * operation is undefined.
+     *
      * <p> This method works as if invoking it were equivalent to evaluating the
      * expression:
      * <pre>{@code
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/jdk/test/java/nio/file/Files/StreamLinesTest.java	Wed Jun 24 12:05:30 2015 +0200
@@ -0,0 +1,205 @@
+/*
+ * Copyright (c) 2015, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+/* @test
+ * @bug 8072773
+ * @library /lib/testlibrary/ ../../../util/stream/bootlib
+ * @build java.util.stream.OpTestCase
+ * @build jdk.testlibrary.RandomFactory
+ * @run testng/othervm StreamLinesTest
+ * @summary Tests streams returned from Files.lines, primarily focused on
+ *          testing the file-channel-based stream stream with supported
+ *          character sets
+ * @key randomness
+ */
+
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import java.util.stream.OpTestCase;
+import java.util.stream.Stream;
+import java.util.stream.TestData;
+import jdk.testlibrary.RandomFactory;
+
+public class StreamLinesTest extends OpTestCase {
+
+    enum LineSeparator {
+        NONE(""),
+        N("\n"),
+        R("\r"),
+        RN("\r\n");
+
+        public final String value;
+
+        LineSeparator(String value) {
+            this.value = value;
+        }
+
+        public String toString() {
+            return name();
+        }
+    }
+
+    static Path generateTempFileWithLines(IntFunction<String> lineGenerator,
+                                          IntFunction<LineSeparator> lineSeparatorGenerator,
+                                          int lines, Charset cs, boolean endLineSep) throws IOException {
+        Path p = Files.createTempFile("lines", null);
+        BufferedWriter bw = Files.newBufferedWriter(p, cs);
+
+        for (int i = 0; i < lines - 1; i++) {
+            bw.write(lineGenerator.apply(i));
+            bw.write(lineSeparatorGenerator.apply(i).value);
+        }
+        if (lines > 0) {
+            bw.write(lineGenerator.apply(lines - 1));
+            if (endLineSep)
+                bw.write(lineSeparatorGenerator.apply(lines - 1).value);
+        }
+
+        bw.flush();
+        bw.close();
+        p.toFile().deleteOnExit();
+
+        return p;
+    }
+
+    static void writeLineSeparator(Path p,
+                                   IntFunction<LineSeparator> lineSeparatorGenerator,
+                                   int lines, Charset cs) throws IOException {
+        BufferedWriter bw = Files.newBufferedWriter(p, cs, StandardOpenOption.APPEND);
+        bw.write(lineSeparatorGenerator.apply(lines - 1).value);
+        bw.flush();
+        bw.close();
+    }
+
+    static List<String> readAllLines(Path path, Charset cs) throws IOException {
+        try (BufferedReader reader = Files.newBufferedReader(path, cs)) {
+            List<String> result = new ArrayList<>();
+            for (; ; ) {
+                String line = reader.readLine();
+                if (line == null)
+                    break;
+                result.add(line);
+            }
+            return result;
+        }
+    }
+
+    static Object[] of(String description, IntFunction<String> lineGenerator,
+                       IntFunction<LineSeparator> separatorGenerator, int n, Charset cs) {
+        return new Object[]{description, lineGenerator, separatorGenerator, n, cs};
+    }
+
+    private static final Random random = RandomFactory.getRandom();
+
+    @DataProvider
+    public static Object[][] lines() {
+        List<Object[]> l = new ArrayList<>();
+
+        // Include the three supported optimal-line charsets and one
+        // which does not
+        List<Charset> charsets = Arrays.asList(StandardCharsets.UTF_8,
+                                               StandardCharsets.US_ASCII,
+                                               StandardCharsets.ISO_8859_1,
+                                               StandardCharsets.UTF_16);
+        String[] lines = {"", "A", "AB", "ABC", "ABCD"};
+        int[] linesSizes = {1, 2, 3, 4, 16, 256, 1024};
+
+        for (Charset charset : charsets) {
+            for (String line : lines) {
+                for (int linesSize : linesSizes) {
+                    for (LineSeparator ls : EnumSet.complementOf(EnumSet.of(LineSeparator.NONE))) {
+                        String description = String.format("%d lines of \"%s\" with separator %s", linesSize, line, ls);
+                        l.add(of(description,
+                                 i -> line,
+                                 i -> ls,
+                                 linesSize, charset));
+                    }
+                }
+            }
+        }
+
+        for (Charset charset : charsets) {
+            l.add(of("A maximum of 1024 random lines and separators",
+                     i -> lines[1 + random.nextInt(lines.length - 1)],
+                     i -> LineSeparator.values()[random.nextInt(LineSeparator.values().length)],
+                     1024, charset));
+        }
+
+        for (Charset charset : charsets) {
+            l.add(of("One large line with no separators",
+                     i -> "ABCD",
+                     i -> LineSeparator.NONE,
+                     1024, charset));
+        }
+
+        return l.toArray(new Object[][]{});
+    }
+
+    @Test(dataProvider = "lines")
+    public void test(String description,
+                     IntFunction<String> lineGenerator, IntFunction<LineSeparator> separatorGenerator,
+                     int lines, Charset cs) throws IOException {
+        Path p = generateTempFileWithLines(lineGenerator, separatorGenerator, lines, cs, false);
+
+        Supplier<Stream<String>> ss = () -> {
+            try {
+                return Files.lines(p, cs);
+            }
+            catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        };
+
+        // Test without a separator at the end
+        List<String> expected = readAllLines(p, cs);
+        withData(TestData.Factory.ofSupplier("Lines with no separator at end", ss))
+                .stream(s -> s)
+                .expectedResult(expected)
+                .exercise();
+
+        // Test with a separator at the end
+        writeLineSeparator(p, separatorGenerator, lines, cs);
+        expected = readAllLines(p, cs);
+        withData(TestData.Factory.ofSupplier("Lines with separator at end", ss))
+                .stream(s -> s)
+                .expectedResult(expected)
+                .exercise();
+    }
+
+}
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java	Tue Jun 23 14:20:59 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/DoubleStreamTestScenario.java	Wed Jun 24 12:05:30 2015 +0200
@@ -43,22 +43,21 @@
 @SuppressWarnings({"rawtypes", "unchecked"})
 public enum DoubleStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
 
-    STREAM_FOR_EACH_WITH_CLOSE(false) {
+    STREAM_FOR_EACH(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            DoubleStream s = m.apply(data.stream());
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            DoubleStream s = m.apply(source);
             if (s.isParallel()) {
                 s = s.sequential();
             }
             s.forEach(b);
-            s.close();
         }
     },
 
     STREAM_TO_ARRAY(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            for (double t : m.apply(data.stream()).toArray()) {
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            for (double t : m.apply(source).toArray()) {
                 b.accept(t);
             }
         }
@@ -66,8 +65,8 @@
 
     STREAM_ITERATOR(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            for (PrimitiveIterator.OfDouble seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            for (PrimitiveIterator.OfDouble seqIter = m.apply(source).iterator(); seqIter.hasNext(); )
                 b.accept(seqIter.nextDouble());
         }
     },
@@ -75,8 +74,8 @@
     // Wrap as stream, and spliterate then iterate in pull mode
     STREAM_SPLITERATOR(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            for (Spliterator.OfDouble spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) {
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            for (Spliterator.OfDouble spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
             }
         }
     },
@@ -84,40 +83,40 @@
     // Wrap as stream, spliterate, then split a few times mixing advances with forEach
     STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator());
         }
     },
 
     // Wrap as stream, and spliterate then iterate in pull mode
     STREAM_SPLITERATOR_FOREACH(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            m.apply(data.stream()).spliterator().forEachRemaining(b);
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            m.apply(source).spliterator().forEachRemaining(b);
         }
     },
 
     PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            m.apply(data.parallelStream()).sequential().forEach(b);
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            m.apply(source).sequential().forEach(b);
         }
     },
 
     // Wrap as parallel stream + forEachOrdered
     PAR_STREAM_FOR_EACH_ORDERED(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
             // @@@ Want to explicitly select ordered equalator
-            m.apply(data.parallelStream()).forEachOrdered(b);
+            m.apply(source).forEachOrdered(b);
         }
     },
 
     // Wrap as stream, and spliterate then iterate sequentially
     PAR_STREAM_SPLITERATOR(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            for (Spliterator.OfDouble spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) {
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            for (Spliterator.OfDouble spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
             }
         }
     },
@@ -125,15 +124,15 @@
     // Wrap as stream, and spliterate then iterate sequentially
     PAR_STREAM_SPLITERATOR_FOREACH(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            m.apply(source).spliterator().forEachRemaining(b);
         }
     },
 
     PAR_STREAM_TO_ARRAY(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            for (double t : m.apply(data.parallelStream()).toArray())
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            for (double t : m.apply(source).toArray())
                 b.accept(t);
         }
     },
@@ -141,8 +140,8 @@
     // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
     PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            DoubleStream s = m.apply(data.parallelStream());
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            DoubleStream s = m.apply(source);
             Spliterator.OfDouble sp = s.spliterator();
             DoubleStream ss = StreamSupport.doubleStream(() -> sp,
                                                          StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
@@ -154,8 +153,8 @@
 
     PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(source,
                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
             DoubleStream pipe2 = m.apply(pipe1);
 
@@ -167,8 +166,8 @@
     // Wrap as parallel stream + forEach synchronizing
     PAR_STREAM_FOR_EACH(true, false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            m.apply(data.parallelStream()).forEach(e -> {
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            m.apply(source).forEach(e -> {
                 synchronized (data) {
                     b.accept(e);
                 }
@@ -179,8 +178,8 @@
     // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
     PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
-            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+        void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(source,
                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
             m.apply(pipe1).forEach(e -> {
                 synchronized (data) {
@@ -222,10 +221,12 @@
 
     public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
     void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
-        _run(data, (DoubleConsumer) b, (Function<S_IN, DoubleStream>) m);
+        try (S_IN source = getStream(data)) {
+            run(data, source, (DoubleConsumer) b, (Function<S_IN, DoubleStream>) m);
+        }
     }
 
     abstract <T, S_IN extends BaseStream<T, S_IN>>
-    void _run(TestData<T, S_IN> data, DoubleConsumer b, Function<S_IN, DoubleStream> m);
+    void run(TestData<T, S_IN> data, S_IN source, DoubleConsumer b, Function<S_IN, DoubleStream> m);
 
 }
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java	Tue Jun 23 14:20:59 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/IntStreamTestScenario.java	Wed Jun 24 12:05:30 2015 +0200
@@ -43,22 +43,21 @@
 @SuppressWarnings({"rawtypes", "unchecked"})
 public enum IntStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
 
-    STREAM_FOR_EACH_WITH_CLOSE(false) {
+    STREAM_FOR_EACH(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            IntStream s = m.apply(data.stream());
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            IntStream s = m.apply(source);
             if (s.isParallel()) {
                 s = s.sequential();
             }
             s.forEach(b);
-            s.close();
         }
     },
 
     STREAM_TO_ARRAY(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            for (int t : m.apply(data.stream()).toArray()) {
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            for (int t : m.apply(source).toArray()) {
                 b.accept(t);
             }
         }
@@ -66,8 +65,8 @@
 
     STREAM_ITERATOR(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            for (PrimitiveIterator.OfInt seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            for (PrimitiveIterator.OfInt seqIter = m.apply(source).iterator(); seqIter.hasNext(); )
                 b.accept(seqIter.nextInt());
         }
     },
@@ -75,8 +74,8 @@
     // Wrap as stream, and spliterate then iterate in pull mode
     STREAM_SPLITERATOR(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            for (Spliterator.OfInt spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) {
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            for (Spliterator.OfInt spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
             }
         }
     },
@@ -84,40 +83,40 @@
     // Wrap as stream, spliterate, then split a few times mixing advances with forEach
     STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator());
         }
     },
 
     // Wrap as stream, and spliterate then iterate in pull mode
     STREAM_SPLITERATOR_FOREACH(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            m.apply(data.stream()).spliterator().forEachRemaining(b);
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            m.apply(source).spliterator().forEachRemaining(b);
         }
     },
 
     PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            m.apply(data.parallelStream()).sequential().forEach(b);
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            m.apply(source).sequential().forEach(b);
         }
     },
 
     // Wrap as parallel stream + forEachOrdered
     PAR_STREAM_FOR_EACH_ORDERED(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
             // @@@ Want to explicitly select ordered equalator
-            m.apply(data.parallelStream()).forEachOrdered(b);
+            m.apply(source).forEachOrdered(b);
         }
     },
 
     // Wrap as stream, and spliterate then iterate sequentially
     PAR_STREAM_SPLITERATOR(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            for (Spliterator.OfInt spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) {
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            for (Spliterator.OfInt spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
             }
         }
     },
@@ -125,15 +124,15 @@
     // Wrap as stream, and spliterate then iterate sequentially
     PAR_STREAM_SPLITERATOR_FOREACH(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            m.apply(source).spliterator().forEachRemaining(b);
         }
     },
 
     PAR_STREAM_TO_ARRAY(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            for (int t : m.apply(data.parallelStream()).toArray())
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            for (int t : m.apply(source).toArray())
                 b.accept(t);
         }
     },
@@ -141,8 +140,8 @@
     // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
     PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            IntStream s = m.apply(data.parallelStream());
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            IntStream s = m.apply(source);
             Spliterator.OfInt sp = s.spliterator();
             IntStream ss = StreamSupport.intStream(() -> sp,
                                                    StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
@@ -155,8 +154,8 @@
 
     PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(source,
                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
             IntStream pipe2 = m.apply(pipe1);
 
@@ -168,8 +167,8 @@
     // Wrap as parallel stream + forEach synchronizing
     PAR_STREAM_FOR_EACH(true, false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            m.apply(data.parallelStream()).forEach(e -> {
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            m.apply(source).forEach(e -> {
                 synchronized (data) {
                     b.accept(e);
                 }
@@ -180,8 +179,8 @@
     // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
     PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m) {
-            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+        void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(source,
                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
             m.apply(pipe1).forEach(e -> {
                 synchronized (data) {
@@ -223,10 +222,12 @@
 
     public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
     void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
-        _run(data, (IntConsumer) b, (Function<S_IN, IntStream>) m);
+        try (S_IN source = getStream(data)) {
+            run(data, source, (IntConsumer) b, (Function<S_IN, IntStream>) m);
+        }
     }
 
     abstract <T, S_IN extends BaseStream<T, S_IN>>
-    void _run(TestData<T, S_IN> data, IntConsumer b, Function<S_IN, IntStream> m);
+    void run(TestData<T, S_IN> data, S_IN source, IntConsumer b, Function<S_IN, IntStream> m);
 
 }
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java	Tue Jun 23 14:20:59 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/LongStreamTestScenario.java	Wed Jun 24 12:05:30 2015 +0200
@@ -43,22 +43,21 @@
 @SuppressWarnings({"rawtypes", "unchecked"})
 public enum LongStreamTestScenario implements OpTestCase.BaseStreamTestScenario {
 
-    STREAM_FOR_EACH_WITH_CLOSE(false) {
+    STREAM_FOR_EACH(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            LongStream s = m.apply(data.stream());
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            LongStream s = m.apply(source);
             if (s.isParallel()) {
                 s = s.sequential();
             }
             s.forEach(b);
-            s.close();
         }
     },
 
     STREAM_TO_ARRAY(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            for (long t : m.apply(data.stream()).toArray()) {
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            for (long t : m.apply(source).toArray()) {
                 b.accept(t);
             }
         }
@@ -66,8 +65,8 @@
 
     STREAM_ITERATOR(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            for (PrimitiveIterator.OfLong seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            for (PrimitiveIterator.OfLong seqIter = m.apply(source).iterator(); seqIter.hasNext(); )
                 b.accept(seqIter.nextLong());
         }
     },
@@ -75,8 +74,8 @@
     // Wrap as stream, and spliterate then iterate in pull mode
     STREAM_SPLITERATOR(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            for (Spliterator.OfLong spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) {
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            for (Spliterator.OfLong spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
             }
         }
     },
@@ -84,40 +83,40 @@
     // Wrap as stream, spliterate, then split a few times mixing advances with forEach
     STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator());
         }
     },
 
     // Wrap as stream, and spliterate then iterate in pull mode
     STREAM_SPLITERATOR_FOREACH(false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            m.apply(data.stream()).spliterator().forEachRemaining(b);
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            m.apply(source).spliterator().forEachRemaining(b);
         }
     },
 
     PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            m.apply(data.parallelStream()).sequential().forEach(b);
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            m.apply(source).sequential().forEach(b);
         }
     },
 
     // Wrap as parallel stream + forEachOrdered
     PAR_STREAM_FOR_EACH_ORDERED(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
             // @@@ Want to explicitly select ordered equalator
-            m.apply(data.parallelStream()).forEachOrdered(b);
+            m.apply(source).forEachOrdered(b);
         }
     },
 
     // Wrap as stream, and spliterate then iterate sequentially
     PAR_STREAM_SPLITERATOR(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            for (Spliterator.OfLong spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) {
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            for (Spliterator.OfLong spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
             }
         }
     },
@@ -125,15 +124,15 @@
     // Wrap as stream, and spliterate then iterate sequentially
     PAR_STREAM_SPLITERATOR_FOREACH(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            m.apply(source).spliterator().forEachRemaining(b);
         }
     },
 
     PAR_STREAM_TO_ARRAY(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            for (long t : m.apply(data.parallelStream()).toArray())
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            for (long t : m.apply(source).toArray())
                 b.accept(t);
         }
     },
@@ -141,8 +140,8 @@
     // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
     PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            LongStream s = m.apply(data.parallelStream());
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            LongStream s = m.apply(source);
             Spliterator.OfLong sp = s.spliterator();
             LongStream ss = StreamSupport.longStream(() -> sp,
                                                      StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
@@ -154,8 +153,8 @@
 
     PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(source,
                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
             LongStream pipe2 = m.apply(pipe1);
 
@@ -167,8 +166,8 @@
     // Wrap as parallel stream + forEach synchronizing
     PAR_STREAM_FOR_EACH(true, false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            m.apply(data.parallelStream()).forEach(e -> {
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            m.apply(source).forEach(e -> {
                 synchronized (data) {
                     b.accept(e);
                 }
@@ -179,8 +178,8 @@
     // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
     PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
         <T, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m) {
-            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+        void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(source,
                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
             m.apply(pipe1).forEach(e -> {
                 synchronized (data) {
@@ -222,10 +221,12 @@
 
     public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
     void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
-        _run(data, (LongConsumer) b, (Function<S_IN, LongStream>) m);
+        try (S_IN source = getStream(data)) {
+            run(data, source, (LongConsumer) b, (Function<S_IN, LongStream>) m);
+        }
     }
 
     abstract <T, S_IN extends BaseStream<T, S_IN>>
-    void _run(TestData<T, S_IN> data, LongConsumer b, Function<S_IN, LongStream> m);
+    void run(TestData<T, S_IN> data, S_IN source, LongConsumer b, Function<S_IN, LongStream> m);
 
 }
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java	Tue Jun 23 14:20:59 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/OpTestCase.java	Wed Jun 24 12:05:30 2015 +0200
@@ -94,6 +94,13 @@
 
         boolean isOrdered();
 
+        default <T, S_IN extends BaseStream<T, S_IN>>
+        S_IN getStream(TestData<T, S_IN> data) {
+            return isParallel()
+                   ? data.parallelStream()
+                   : data.stream();
+        }
+
         <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
         void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m);
     }
@@ -375,15 +382,17 @@
             if (refResult == null) {
                 // Induce the reference result
                 before.accept(data);
-                S_OUT sOut = m.apply(data.stream());
-                isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
-                Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
-                refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
+                try (S_OUT sOut = m.apply(data.stream())) {
+                    isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
+                    Node<U> refNodeResult = ((AbstractPipeline<?, U, ?>) sOut).evaluateToArrayNode(size -> (U[]) new Object[size]);
+                    refResult = LambdaTestHelpers.toBoxedList(refNodeResult.spliterator());
+                }
                 after.accept(data);
             }
             else {
-                S_OUT sOut = m.apply(data.stream());
-                isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
+                try (S_OUT sOut = m.apply(data.stream())) {
+                    isStreamOrdered = StreamOpFlag.ORDERED.isKnown(((AbstractPipeline) sOut).getStreamFlags());
+                }
             }
 
             List<Error> errors = new ArrayList<>();
@@ -541,14 +550,18 @@
         // Build method
 
         public R exercise() {
-            S_OUT out = streamF.apply(data.stream()).sequential();
-            AbstractPipeline ap = (AbstractPipeline) out;
-            boolean isOrdered = StreamOpFlag.ORDERED.isKnown(ap.getStreamFlags());
-            StreamShape shape = ap.getOutputShape();
+            boolean isOrdered;
+            StreamShape shape;
+            Node<U> node;
+            try (S_OUT out = streamF.apply(data.stream()).sequential()) {
+                AbstractPipeline ap = (AbstractPipeline) out;
+                isOrdered = StreamOpFlag.ORDERED.isKnown(ap.getStreamFlags());
+                shape = ap.getOutputShape();
+                // Sequentially collect the output that will be input to the terminal op
+                node = ap.evaluateToArrayNode(size -> (U[]) new Object[size]);
+            }
 
             EnumSet<TerminalTestScenario> tests = EnumSet.allOf(TerminalTestScenario.class);
-            // Sequentially collect the output that will be input to the terminal op
-            Node<U> node = ap.evaluateToArrayNode(size -> (U[]) new Object[size]);
             if (refResult == null) {
                 // Induce the reference result
                 S_OUT source = (S_OUT) createPipeline(shape, node.spliterator(),
@@ -571,8 +584,10 @@
                                            ? data.parallelStream() : data.stream());
                 }
 
-                R result = (R) test.run(terminalF, source, shape);
-
+                R result;
+                try (source) {
+                    result = (R) test.run(terminalF, source, shape);
+                }
                 LambdaTestHelpers.launderAssertion(
                         () -> resultAsserter.assertResult(result, refResult, isOrdered, test.requiresParallelSource()),
                         () -> String.format("%s: %s != %s", test, refResult, result));
--- a/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java	Tue Jun 23 14:20:59 2015 -0700
+++ b/jdk/test/java/util/stream/bootlib/java/util/stream/StreamTestScenario.java	Wed Jun 24 12:05:30 2015 +0200
@@ -42,23 +42,22 @@
 @SuppressWarnings({"rawtypes", "unchecked"})
 public enum StreamTestScenario implements OpTestCase.BaseStreamTestScenario {
 
-    STREAM_FOR_EACH_WITH_CLOSE(false) {
+    STREAM_FOR_EACH(false) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            Stream<U> s = m.apply(data.stream());
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            Stream<U> s = m.apply(source);
             if (s.isParallel()) {
                 s = s.sequential();
             }
             s.forEach(b);
-            s.close();
         }
     },
 
     // Collec to list
     STREAM_COLLECT(false) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            for (U t : m.apply(data.stream()).collect(Collectors.toList())) {
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            for (U t : m.apply(source).collect(Collectors.toList())) {
                 b.accept(t);
             }
         }
@@ -67,8 +66,8 @@
     // To array
     STREAM_TO_ARRAY(false) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            for (Object t : m.apply(data.stream()).toArray()) {
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            for (Object t : m.apply(source).toArray()) {
                 b.accept((U) t);
             }
         }
@@ -77,8 +76,8 @@
     // Wrap as stream, and iterate in pull mode
     STREAM_ITERATOR(false) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            for (Iterator<U> seqIter = m.apply(data.stream()).iterator(); seqIter.hasNext(); )
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            for (Iterator<U> seqIter = m.apply(source).iterator(); seqIter.hasNext(); )
                 b.accept(seqIter.next());
         }
     },
@@ -86,65 +85,67 @@
     // Wrap as stream, and spliterate then iterate in pull mode
     STREAM_SPLITERATOR(false) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            for (Spliterator<U> spl = m.apply(data.stream()).spliterator(); spl.tryAdvance(b); ) { }
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            for (Spliterator<U> spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
+            }
         }
     },
 
     // Wrap as stream, spliterate, then split a few times mixing advances with forEach
     STREAM_SPLITERATOR_WITH_MIXED_TRAVERSE_AND_SPLIT(false) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(data.stream()).spliterator());
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            SpliteratorTestHelper.mixedTraverseAndSplit(b, m.apply(source).spliterator());
         }
     },
 
     // Wrap as stream, and spliterate then iterate in pull mode
     STREAM_SPLITERATOR_FOREACH(false) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            m.apply(data.stream()).spliterator().forEachRemaining(b);
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            m.apply(source).spliterator().forEachRemaining(b);
         }
     },
 
     // Wrap as parallel stream + sequential
     PAR_STREAM_SEQUENTIAL_FOR_EACH(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            m.apply(data.parallelStream()).sequential().forEach(b);
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            m.apply(source).sequential().forEach(b);
         }
     },
 
     // Wrap as parallel stream + forEachOrdered
     PAR_STREAM_FOR_EACH_ORDERED(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
             // @@@ Want to explicitly select ordered equalator
-            m.apply(data.parallelStream()).forEachOrdered(b);
+            m.apply(source).forEachOrdered(b);
         }
     },
 
     // Wrap as stream, and spliterate then iterate sequentially
     PAR_STREAM_SPLITERATOR(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            for (Spliterator<U> spl = m.apply(data.parallelStream()).spliterator(); spl.tryAdvance(b); ) { }
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            for (Spliterator<U> spl = m.apply(source).spliterator(); spl.tryAdvance(b); ) {
+            }
         }
     },
 
     // Wrap as stream, and spliterate then iterate sequentially
     PAR_STREAM_SPLITERATOR_FOREACH(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            m.apply(data.parallelStream()).spliterator().forEachRemaining(b);
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            m.apply(source).spliterator().forEachRemaining(b);
         }
     },
 
     // Wrap as parallel stream + toArray
     PAR_STREAM_TO_ARRAY(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            for (Object t : m.apply(data.parallelStream()).toArray())
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            for (Object t : m.apply(source).toArray())
                 b.accept((U) t);
         }
     },
@@ -152,8 +153,8 @@
     // Wrap as parallel stream, get the spliterator, wrap as a stream + toArray
     PAR_STREAM_SPLITERATOR_STREAM_TO_ARRAY(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            Stream<U> s = m.apply(data.parallelStream());
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            Stream<U> s = m.apply(source);
             Spliterator<U> sp = s.spliterator();
             Stream<U> ss = StreamSupport.stream(() -> sp,
                                                 StreamOpFlag.toCharacteristics(OpTestCase.getStreamFlags(s))
@@ -166,8 +167,8 @@
     // Wrap as parallel stream + toArray and clear SIZED flag
     PAR_STREAM_TO_ARRAY_CLEAR_SIZED(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(source,
                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
             Stream<U> pipe2 = m.apply(pipe1);
 
@@ -179,17 +180,22 @@
     // Wrap as parallel + collect to list
     PAR_STREAM_COLLECT_TO_LIST(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            for (U u : m.apply(data.parallelStream()).collect(Collectors.toList()))
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            for (U u : m.apply(source).collect(Collectors.toList()))
                 b.accept(u);
         }
     },
 
     // Wrap sequential as parallel, + collect to list
     STREAM_TO_PAR_STREAM_COLLECT_TO_LIST(true) {
+        public <T, S_IN extends BaseStream<T, S_IN>>
+        S_IN getStream(TestData<T, S_IN> data) {
+            return data.stream().parallel();
+        }
+
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            for (U u : m.apply(data.stream().parallel()).collect(Collectors.toList()))
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            for (U u : m.apply(source).collect(Collectors.toList()))
                 b.accept(u);
         }
     },
@@ -197,8 +203,8 @@
     // Wrap parallel as sequential,, + collect
     PAR_STREAM_TO_STREAM_COLLECT_TO_LIST(true) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            for (U u : m.apply(data.parallelStream().sequential()).collect(Collectors.toList()))
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            for (U u : m.apply(source).collect(Collectors.toList()))
                 b.accept(u);
         }
     },
@@ -206,8 +212,8 @@
     // Wrap as parallel stream + forEach synchronizing
     PAR_STREAM_FOR_EACH(true, false) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            m.apply(data.parallelStream()).forEach(e -> {
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            m.apply(source).forEach(e -> {
                 synchronized (data) {
                     b.accept(e);
                 }
@@ -218,8 +224,8 @@
     // Wrap as parallel stream + forEach synchronizing and clear SIZED flag
     PAR_STREAM_FOR_EACH_CLEAR_SIZED(true, false) {
         <T, U, S_IN extends BaseStream<T, S_IN>>
-        void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m) {
-            S_IN pipe1 = (S_IN) OpTestCase.chain(data.parallelStream(),
+        void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m) {
+            S_IN pipe1 = (S_IN) OpTestCase.chain(source,
                                                  new FlagDeclaringOp(StreamOpFlag.NOT_SIZED, data.getShape()));
             m.apply(pipe1).forEach(e -> {
                 synchronized (data) {
@@ -261,10 +267,12 @@
 
     public <T, U, S_IN extends BaseStream<T, S_IN>, S_OUT extends BaseStream<U, S_OUT>>
     void run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, S_OUT> m) {
-        _run(data, b, (Function<S_IN, Stream<U>>) m);
+        try (S_IN source = getStream(data)) {
+            run(data, source, b, (Function<S_IN, Stream<U>>) m);
+        }
     }
 
     abstract <T, U, S_IN extends BaseStream<T, S_IN>>
-    void _run(TestData<T, S_IN> data, Consumer<U> b, Function<S_IN, Stream<U>> m);
+    void run(TestData<T, S_IN> data, S_IN source, Consumer<U> b, Function<S_IN, Stream<U>> m);
 
 }