8222532: (zipfs) Performance regression when writing ZipFileSystem entries in parallel
authorredestad
Wed, 24 Apr 2019 15:37:55 +0200
changeset 54608 c604234be658
parent 54607 b6db97903b69
child 54609 04857e2cd509
8222532: (zipfs) Performance regression when writing ZipFileSystem entries in parallel Reviewed-by: lancea, clanger, alanb
src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystem.java
src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystemProvider.java
src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipPath.java
--- a/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystem.java	Wed Apr 24 08:27:00 2019 -0400
+++ b/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystem.java	Wed Apr 24 15:37:55 2019 +0200
@@ -599,11 +599,11 @@
             throw new IllegalArgumentException("APPEND + TRUNCATE_EXISTING not allowed");
     }
 
-
     // Returns an output SeekableByteChannel for either
     // (1) writing the contents of a new entry, if the entry doesn't exit, or
     // (2) updating/replacing the contents of an existing entry.
-    // Note: The content is not compressed.
+    // Note: The content of the channel is not compressed until the
+    // channel is closed
     private class EntryOutputChannel extends ByteArrayChannel {
         Entry e;
 
@@ -622,19 +622,19 @@
 
         @Override
         public void close() throws IOException {
-            e.bytes = toByteArray();
-            e.size = e.bytes.length;
-            e.crc = -1;
+            // will update the entry
+            try (OutputStream os = getOutputStream(e)) {
+                os.write(toByteArray());
+            }
             super.close();
-            update(e);
         }
     }
 
-    private int getCompressMethod(FileAttribute<?>... attrs) {
+    private int getCompressMethod() {
          return defaultMethod;
     }
 
-    // Returns a Writable/ReadByteChannel for now. Might consdier to use
+    // Returns a Writable/ReadByteChannel for now. Might consider to use
     // newFileChannel() instead, which dump the entry data into a regular
     // file on the default file system and create a FileChannel on top of
     // it.
@@ -647,10 +647,9 @@
         if (options.contains(StandardOpenOption.WRITE) ||
             options.contains(StandardOpenOption.APPEND)) {
             checkWritable();
-            beginRead();    // only need a readlock, the "update()" will obtain
-                            // thewritelock when the channel is closed
+            beginRead();    // only need a read lock, the "update()" will obtain
+                            // the write lock when the channel is closed
             try {
-                ensureOpen();
                 Entry e = getEntry(path);
                 if (e != null) {
                     if (e.isDir() || options.contains(CREATE_NEW))
@@ -675,8 +674,7 @@
                     throw new NoSuchFileException(getString(path));
                 checkParents(path);
                 return new EntryOutputChannel(
-                    new Entry(path, Entry.NEW, false, getCompressMethod(attrs)));
-
+                    new Entry(path, Entry.NEW, false, getCompressMethod()));
             } finally {
                 endRead();
             }
@@ -743,7 +741,7 @@
             final Entry u = isFCH ? e : new Entry(path, tmpfile, Entry.FILECH);
             if (forWrite) {
                 u.flag = FLAG_DATADESCR;
-                u.method = getCompressMethod(attrs);
+                u.method = getCompressMethod();
             }
             // is there a better way to hook into the FileChannel's close method?
             return new FileChannel() {
@@ -844,7 +842,11 @@
 
     // the outstanding input streams that need to be closed
     private Set<InputStream> streams =
-        Collections.synchronizedSet(new HashSet<InputStream>());
+        Collections.synchronizedSet(new HashSet<>());
+
+    // the ex-channel and ex-path that need to close when their outstanding
+    // input streams are all closed by the obtainers.
+    private Set<ExistingChannelCloser> exChClosers = new HashSet<>();
 
     private Set<Path> tmppaths = Collections.synchronizedSet(new HashSet<Path>());
     private Path getTempPathForEntry(byte[] path) throws IOException {
@@ -1202,25 +1204,20 @@
         return written;
     }
 
-    private long writeEntry(Entry e, OutputStream os, byte[] buf)
+    private long writeEntry(Entry e, OutputStream os)
         throws IOException {
 
         if (e.bytes == null && e.file == null)    // dir, 0-length data
             return 0;
 
         long written = 0;
-        try (OutputStream os2 = e.method == METHOD_STORED ?
-            new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) {
-            if (e.bytes != null) {                 // in-memory
-                os2.write(e.bytes, 0, e.bytes.length);
-            } else if (e.file != null) {           // tmp file
-                if (e.type == Entry.NEW || e.type == Entry.FILECH) {
-                    try (InputStream is = Files.newInputStream(e.file)) {
-                        is.transferTo(os2);
-                    }
-                }
-                Files.delete(e.file);
-                tmppaths.remove(e.file);
+        if (e.crc != 0 && e.csize > 0) {
+            // pre-compressed entry, write directly to output stream
+            writeTo(e, os);
+        } else {
+            try (OutputStream os2 = (e.method == METHOD_STORED) ?
+                    new EntryOutputStreamCRC32(e, os) : new EntryOutputStreamDef(e, os)) {
+                writeTo(e, os2);
             }
         }
         written += e.csize;
@@ -1230,18 +1227,38 @@
         return written;
     }
 
+    private void writeTo(Entry e, OutputStream os) throws IOException {
+        if (e.bytes != null) {
+            os.write(e.bytes, 0, e.bytes.length);
+        } else if (e.file != null) {
+            if (e.type == Entry.NEW || e.type == Entry.FILECH) {
+                try (InputStream is = Files.newInputStream(e.file)) {
+                    is.transferTo(os);
+                }
+            }
+            Files.delete(e.file);
+            tmppaths.remove(e.file);
+        }
+    }
+
     // sync the zip file system, if there is any udpate
     private void sync() throws IOException {
-
+        // check ex-closer
+        if (!exChClosers.isEmpty()) {
+            for (ExistingChannelCloser ecc : exChClosers) {
+                if (ecc.closeAndDeleteIfDone()) {
+                    exChClosers.remove(ecc);
+                }
+            }
+        }
         if (!hasUpdate)
             return;
         Path tmpFile = createTempFileInSameDirectoryAs(zfpath);
-        try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE)))
-        {
+        try (OutputStream os = new BufferedOutputStream(Files.newOutputStream(tmpFile, WRITE))) {
             ArrayList<Entry> elist = new ArrayList<>(inodes.size());
             long written = 0;
-            byte[] buf = new byte[8192];
-            Entry e = null;
+            byte[] buf = null;
+            Entry e;
 
             // write loc
             for (IndexNode inode : inodes.values()) {
@@ -1254,11 +1271,13 @@
                             // LOC in new file and simply copy the rest (data and
                             // ext) without enflating/deflating from the old zip
                             // file LOC entry.
+                            if (buf == null)
+                                buf = new byte[8192];
                             written += copyLOCEntry(e, true, os, written, buf);
                         } else {                          // NEW, FILECH or CEN
                             e.locoff = written;
                             written += e.writeLOC(os);    // write loc header
-                            written += writeEntry(e, os, buf);
+                            written += writeEntry(e, os);
                         }
                         elist.add(e);
                     } catch (IOException x) {
@@ -1274,6 +1293,8 @@
                     }
                     e = Entry.readCEN(this, inode);
                     try {
+                        if (buf == null)
+                            buf = new byte[8192];
                         written += copyLOCEntry(e, false, os, written, buf);
                         elist.add(e);
                     } catch (IOException x) {
@@ -1291,9 +1312,23 @@
             end.cenlen = written - end.cenoff;
             end.write(os, written, forceEnd64);
         }
+        if (!streams.isEmpty()) {
+            //
+            // There are outstanding input streams open on existing "ch",
+            // so, don't close the "cha" and delete the "file for now, let
+            // the "ex-channel-closer" to handle them
+            Path path = createTempFileInSameDirectoryAs(zfpath);
+            ExistingChannelCloser ecc = new ExistingChannelCloser(path,
+                                                                  ch,
+                                                                  streams);
+            Files.move(zfpath, path, REPLACE_EXISTING);
+            exChClosers.add(ecc);
+            streams = Collections.synchronizedSet(new HashSet<>());
+        } else {
+            ch.close();
+            Files.delete(zfpath);
+        }
 
-        ch.close();
-        Files.delete(zfpath);
         Files.move(tmpFile, zfpath, REPLACE_EXISTING);
         hasUpdate = false;    // clear
     }
@@ -1351,11 +1386,15 @@
         } else {
             os = new ByteArrayOutputStream((e.size > 0)? (int)e.size : 8192);
         }
-        return new EntryOutputStream(e, os);
+        if (e.method == METHOD_DEFLATED) {
+            return new DeflatingEntryOutputStream(e, os);
+        } else {
+            return new EntryOutputStream(e, os);
+        }
     }
 
     private class EntryOutputStream extends FilterOutputStream {
-        private Entry e;
+        private final Entry e;
         private long written;
         private boolean isClosed;
 
@@ -1392,13 +1431,56 @@
         }
     }
 
+    // Output stream returned when writing "deflated" entries into memory,
+    // to enable eager (possibly parallel) deflation and reduce memory required.
+    private class DeflatingEntryOutputStream extends DeflaterOutputStream {
+        private final CRC32 crc;
+        private final Entry e;
+        private boolean isClosed;
+
+        DeflatingEntryOutputStream(Entry e, OutputStream os) throws IOException {
+            super(os, getDeflater());
+            this.e = Objects.requireNonNull(e, "Zip entry is null");
+            this.crc = new CRC32();
+        }
+
+        @Override
+        public synchronized void write(int b) throws IOException {
+            super.write(b);
+            crc.update(b);
+        }
+
+        @Override
+        public synchronized void write(byte b[], int off, int len)
+                throws IOException {
+            super.write(b, off, len);
+            crc.update(b, off, len);
+        }
+
+        @Override
+        public synchronized void close() throws IOException {
+            if (isClosed)
+                return;
+            isClosed = true;
+            finish();
+            e.size  = def.getBytesRead();
+            e.csize = def.getBytesWritten();
+            e.crc = crc.getValue();
+            if (out instanceof ByteArrayOutputStream)
+                e.bytes = ((ByteArrayOutputStream)out).toByteArray();
+            super.close();
+            update(e);
+            releaseDeflater(def);
+        }
+    }
+
     // Wrapper output stream class to write out a "stored" entry.
     // (1) this class does not close the underlying out stream when
     //     being closed.
     // (2) no need to be "synchronized", only used by sync()
     private class EntryOutputStreamCRC32 extends FilterOutputStream {
-        private Entry e;
-        private CRC32 crc;
+        private final CRC32 crc;
+        private final Entry e;
         private long written;
         private boolean isClosed;
 
@@ -1438,8 +1520,8 @@
     //     being closed.
     // (2) no need to be "synchronized", only used by sync()
     private class EntryOutputStreamDef extends DeflaterOutputStream {
-        private CRC32 crc;
-        private Entry e;
+        private final CRC32 crc;
+        private final Entry e;
         private boolean isClosed;
 
         EntryOutputStreamDef(Entry e, OutputStream os) throws IOException {
@@ -1471,14 +1553,12 @@
     private InputStream getInputStream(Entry e)
         throws IOException
     {
-        InputStream eis = null;
-
+        InputStream eis;
         if (e.type == Entry.NEW) {
-            // now bytes & file is uncompressed.
             if (e.bytes != null)
-                return new ByteArrayInputStream(e.bytes);
+                eis = new ByteArrayInputStream(e.bytes);
             else if (e.file != null)
-                return Files.newInputStream(e.file);
+                eis = Files.newInputStream(e.file);
             else
                 throw new ZipException("update entry data is missing");
         } else if (e.type == Entry.FILECH) {
@@ -1579,7 +1659,7 @@
                 len = (int) rem;
             }
             // readFullyAt()
-            long n = 0;
+            long n;
             ByteBuffer bb = ByteBuffer.wrap(b);
             bb.position(off);
             bb.limit(off + len);
@@ -1905,7 +1985,7 @@
             this.type = type;
         }
 
-        Entry (Entry e, int type) {
+        Entry(Entry e, int type) {
             name(e.name);
             this.isdir     = e.isdir;
             this.version   = e.version;
@@ -1928,7 +2008,7 @@
             this.type      = type;
         }
 
-        Entry (byte[] name, Path file, int type) {
+        Entry(byte[] name, Path file, int type) {
             this(name, type, false, METHOD_STORED);
             this.file = file;
         }
@@ -2424,6 +2504,36 @@
         }
     }
 
+    private static class ExistingChannelCloser {
+        private final Path path;
+        private final SeekableByteChannel ch;
+        private final Set<InputStream> streams;
+        ExistingChannelCloser(Path path,
+                              SeekableByteChannel ch,
+                              Set<InputStream> streams) {
+            this.path = path;
+            this.ch = ch;
+            this.streams = streams;
+        }
+
+        /**
+         * If there are no more outstanding streams, close the channel and
+         * delete the backing file
+         *
+         * @return true if we're done and closed the backing file,
+         *         otherwise false
+         * @throws IOException
+         */
+        public boolean closeAndDeleteIfDone() throws IOException {
+            if (streams.isEmpty()) {
+                ch.close();
+                Files.delete(path);
+                return true;
+            }
+            return false;
+        }
+    }
+
     // ZIP directory has two issues:
     // (1) ZIP spec does not require the ZIP file to include
     //     directory entry
--- a/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystemProvider.java	Wed Apr 24 08:27:00 2019 -0400
+++ b/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipFileSystemProvider.java	Wed Apr 24 15:37:55 2019 +0200
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2018, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2009, 2019, Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -104,7 +104,7 @@
                 if (filesystems.containsKey(realPath))
                     throw new FileSystemAlreadyExistsException();
             }
-            ZipFileSystem zipfs = null;
+            ZipFileSystem zipfs;
             try {
                 if (env.containsKey("multi-release")) {
                     zipfs = new JarFileSystem(this, path, env);
@@ -131,13 +131,13 @@
         throws IOException
     {
         ensureFile(path);
-         try {
-             ZipFileSystem zipfs;
-             if (env.containsKey("multi-release")) {
-                 zipfs = new JarFileSystem(this, path, env);
-             } else {
-                 zipfs = new ZipFileSystem(this, path, env);
-             }
+        try {
+            ZipFileSystem zipfs;
+            if (env.containsKey("multi-release")) {
+                zipfs = new JarFileSystem(this, path, env);
+            } else {
+                zipfs = new ZipFileSystem(this, path, env);
+            }
             return zipfs;
         } catch (ZipException ze) {
             String pname = path.toString();
--- a/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipPath.java	Wed Apr 24 08:27:00 2019 -0400
+++ b/src/jdk.zipfs/share/classes/jdk/nio/zipfs/ZipPath.java	Wed Apr 24 15:37:55 2019 +0200
@@ -676,7 +676,7 @@
 
     @Override
     public Iterator<Path> iterator() {
-        return new Iterator<Path>() {
+        return new Iterator<>() {
             private int i = 0;
 
             @Override
@@ -746,8 +746,8 @@
     void setAttribute(String attribute, Object value, LinkOption... options)
         throws IOException
     {
-        String type = null;
-        String attr = null;
+        String type;
+        String attr;
         int colonPos = attribute.indexOf(':');
         if (colonPos == -1) {
             type = "basic";
@@ -772,8 +772,8 @@
         throws IOException
 
     {
-        String view = null;
-        String attrs = null;
+        String view;
+        String attrs;
         int colonPos = attributes.indexOf(':');
         if (colonPos == -1) {
             view = "basic";