Improve read/write performance niosocketimpl-branch
authoralanb
Wed, 30 Jan 2019 14:33:52 +0000
branchniosocketimpl-branch
changeset 57124 8bb7df86576a
parent 57122 15acbc1030ec
child 57128 3d6cee596b33
Improve read/write performance
src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java
test/micro/org/openjdk/bench/java/net/SocketReadWrite.java
--- a/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java	Tue Jan 29 17:42:58 2019 +0000
+++ b/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java	Wed Jan 30 14:33:52 2019 +0000
@@ -194,7 +194,7 @@
         throws IOException
     {
         assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
-        if (!nonBlocking && (timeout > 0)) {
+        if (timeout > 0 && !nonBlocking) {
             IOUtil.configureBlocking(fd, false);
             nonBlocking = true;
         }
@@ -229,11 +229,31 @@
     }
 
     /**
-     * Reads bytes from the socket into the given buffer.
+     * Try to read bytes from the socket into the given byte array.
+     */
+    private int tryRead(FileDescriptor fd, byte[] b, int off, int len)
+        throws IOException
+    {
+        ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
+        assert dst.position() == 0;
+        try {
+            int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
+            if (n > 0) {
+                dst.get(b, off, n);
+            }
+            return n;
+        } finally{
+            Util.offerFirstTemporaryDirectBuffer(dst);
+        }
+    }
+
+    /**
+     * Reads bytes from the socket into the given byte array.
+     * @return the number of bytes read
      * @throws IOException if the socket is closed or an I/O occurs
      * @throws SocketTimeoutException if the read timeout elapses
      */
-    private int read(ByteBuffer dst) throws IOException {
+    private int read(byte[] b, int off, int len) throws IOException {
         readLock.lock();
         try {
             int n = 0;
@@ -244,7 +264,7 @@
                 }
                 int timeout = this.timeout;
                 maybeConfigureNonBlocking(fd, timeout);
-                n = IOUtil.read(fd, dst, -1, nd);
+                n = tryRead(fd, b, off, len);
                 if (IOStatus.okayToRetry(n) && isOpen()) {
                     if (timeout > 0) {
                         // read with timeout
@@ -253,7 +273,7 @@
                         do {
                             long startTime = System.nanoTime();
                             park(Net.POLLIN, nanos);
-                            n = IOUtil.read(fd, dst, -1, nd);
+                            n = tryRead(fd, b, off, len);
                             if (n == IOStatus.UNAVAILABLE) {
                                 nanos -= System.nanoTime() - startTime;
                                 if (nanos <= 0)
@@ -264,7 +284,7 @@
                         // read, no timeout
                         do {
                             park(Net.POLLIN);
-                            n = IOUtil.read(fd, dst, -1, nd);
+                            n = tryRead(fd, b, off, len);
                         } while (IOStatus.okayToRetry(n) && isOpen());
                     }
                 }
@@ -306,19 +326,36 @@
     }
 
     /**
-     * Writes a sequence of bytes to this socket from the given buffer.
+     * Try to write a sequence of bytes to this socket from the given byte array
+     */
+    private int tryWrite(FileDescriptor fd, byte[] b, int off, int len)
+        throws IOException
+    {
+        ByteBuffer src = Util.getTemporaryDirectBuffer(len);
+        assert src.position() == 0;
+        try {
+            src.put(b, off, len);
+            return nd.write(fd, ((DirectBuffer)src).address(), len);
+        } finally {
+            Util.offerFirstTemporaryDirectBuffer(src);
+        }
+    }
+
+    /**
+     * Writes a sequence of bytes to this socket from the given byte array.
+     * @return the number of bytes written
      * @throws IOException if the socket is closed or an I/O occurs
      */
-    private int write(ByteBuffer dst) throws IOException {
+    private int write(byte[] b, int off, int len) throws IOException {
         writeLock.lock();
         try {
             int n = 0;
             FileDescriptor fd = beginWrite();
             try {
-                n = IOUtil.write(fd, dst, -1, nd);
+                n = tryWrite(fd, b, off, len);
                 while (IOStatus.okayToRetry(n) && isOpen()) {
                     park(Net.POLLOUT);
-                    n = IOUtil.write(fd, dst, -1, nd);
+                    n = tryWrite(fd, b, off, len);
                 }
                 return n;
             } finally {
@@ -709,18 +746,17 @@
                 return (n > 0) ? (a[0] & 0xff) : -1;
             }
             @Override
-            public int read(byte b[], int off, int len) throws IOException {
+            public int read(byte[] b, int off, int len) throws IOException {
                 Objects.checkFromIndexSize(off, len, b.length);
                 if (eof) {
-                    return -1; // legacy SocketInputStream behavior
+                    return -1;
                 } else if (len == 0) {
                     return 0;
                 } else {
                     try {
                         // read up to MAX_BUFFER_SIZE bytes
                         int size = Math.min(len, MAX_BUFFER_SIZE);
-                        ByteBuffer dst = ByteBuffer.wrap(b, off, size);
-                        int n = NioSocketImpl.this.read(dst);
+                        int n = NioSocketImpl.this.read(b, off, size);
                         if (n == -1)
                             eof = true;
                         return n;
@@ -751,20 +787,18 @@
                 write(a, 0, 1);
             }
             @Override
-            public void write(byte b[], int off, int len) throws IOException {
+            public void write(byte[] b, int off, int len) throws IOException {
                 Objects.checkFromIndexSize(off, len, b.length);
                 if (len > 0) {
                     try {
-                        ByteBuffer src = ByteBuffer.wrap(b, off, len);
-                        int end = src.limit();
-                        int pos;
-                        // write up to MAX_BUFFER_SIZE bytes at a time
-                        while ((pos = src.position()) < end) {
+                        int pos = off;
+                        int end = off + len;
+                        while (pos < end) {
+                            // write up to MAX_BUFFER_SIZE bytes
                             int size = Math.min((end - pos), MAX_BUFFER_SIZE);
-                            src.limit(pos + size);
-                            NioSocketImpl.this.write(src);
+                            int n = NioSocketImpl.this.write(b, pos, size);
+                            pos += n;
                         }
-                        assert src.limit() == end && src.remaining() == 0;
                     } catch (IOException ioe) {
                         throw new SocketException(ioe.getMessage());
                     }
--- a/test/micro/org/openjdk/bench/java/net/SocketReadWrite.java	Tue Jan 29 17:42:58 2019 +0000
+++ b/test/micro/org/openjdk/bench/java/net/SocketReadWrite.java	Wed Jan 30 14:33:52 2019 +0000
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2014, 2019 Oracle and/or its affiliates. All rights reserved.
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
  * This code is free software; you can redistribute it and/or modify it
@@ -28,80 +28,59 @@
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketException;
 import java.util.concurrent.TimeUnit;
 
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.*;
 
 /**
- * Tests the overheads of I/O API.
- * This test is known to depend heavily on network conditions and paltform.
+ * Tests socket read/write.
  */
 @BenchmarkMode(Mode.Throughput)
 @OutputTimeUnit(TimeUnit.MILLISECONDS)
 @State(Scope.Thread)
 public class SocketReadWrite {
 
-    private OutputStream os;
-    private InputStream is;
-    private ServerSocket ss;
     private Socket s1, s2;
-    private ReadThread rt;
+    private InputStream in;
+    private OutputStream out;
 
     @Setup
     public void beforeRun() throws IOException {
-        InetAddress iaddr = InetAddress.getLocalHost();
-
-        ss = new ServerSocket(0);
-        s1 = new Socket(iaddr, ss.getLocalPort());
-        s2 = ss.accept();
-
-        os = s1.getOutputStream();
-        is = s2.getInputStream();
-
-        rt = new ReadThread(is);
-        rt.start();
+        InetAddress lb = InetAddress.getLoopbackAddress();
+        try (ServerSocket ss = new ServerSocket(0)) {
+            s1 = new Socket(lb, ss.getLocalPort());
+            s2 = ss.accept();
+        }
+        s1.setTcpNoDelay(true);
+        s2.setTcpNoDelay(true);
+        in = s1.getInputStream();
+        out = s2.getOutputStream();
     }
 
     @TearDown
-    public void afterRun() throws IOException, InterruptedException {
-        os.write(0);
-        os.close();
-        is.close();
+    public void afterRun() throws IOException {
         s1.close();
         s2.close();
-        ss.close();
-        rt.join();
     }
 
+    @Param({"1", "1024", "8192", "64000", "128000"})
+    public int size;
+
+    private final byte[] array = new byte[512*1024];
+
     @Benchmark
     public void test() throws IOException {
-        os.write((byte) 4711);
-    }
-
-    static class ReadThread extends Thread {
-        private InputStream is;
-
-        public ReadThread(InputStream is) {
-            this.is = is;
-        }
-
-        public void run() {
-            try {
-                while (is.read() > 0);
-            } catch (SocketException ex) {
-                // ignore - most likely "socket closed", which means shutdown
-            } catch (IOException e) {
-                e.printStackTrace();
+        if (size == 1) {
+            out.write((byte) 47);
+            int c = in.read();
+        } else {
+            out.write(array, 0, size);
+            int nread = 0;
+            while (nread < size) {
+                int n = in.read(array, 0, size);
+                if (n < 0) throw new RuntimeException();
+                nread += n;
             }
         }
     }
-
 }