# HG changeset patch # User alanb # Date 1548858832 0 # Node ID 8bb7df86576af3321641b266412f52c3627fb4f6 # Parent 15acbc1030ecb9b85e52d68a70baf889edaad0d8 Improve read/write performance diff -r 15acbc1030ec -r 8bb7df86576a src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java --- a/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java Tue Jan 29 17:42:58 2019 +0000 +++ b/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java Wed Jan 30 14:33:52 2019 +0000 @@ -194,7 +194,7 @@ throws IOException { assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); - if (!nonBlocking && (timeout > 0)) { + if (timeout > 0 && !nonBlocking) { IOUtil.configureBlocking(fd, false); nonBlocking = true; } @@ -229,11 +229,31 @@ } /** - * Reads bytes from the socket into the given buffer. + * Try to read bytes from the socket into the given byte array. + */ + private int tryRead(FileDescriptor fd, byte[] b, int off, int len) + throws IOException + { + ByteBuffer dst = Util.getTemporaryDirectBuffer(len); + assert dst.position() == 0; + try { + int n = nd.read(fd, ((DirectBuffer)dst).address(), len); + if (n > 0) { + dst.get(b, off, n); + } + return n; + } finally{ + Util.offerFirstTemporaryDirectBuffer(dst); + } + } + + /** + * Reads bytes from the socket into the given byte array. + * @return the number of bytes read * @throws IOException if the socket is closed or an I/O occurs * @throws SocketTimeoutException if the read timeout elapses */ - private int read(ByteBuffer dst) throws IOException { + private int read(byte[] b, int off, int len) throws IOException { readLock.lock(); try { int n = 0; @@ -244,7 +264,7 @@ } int timeout = this.timeout; maybeConfigureNonBlocking(fd, timeout); - n = IOUtil.read(fd, dst, -1, nd); + n = tryRead(fd, b, off, len); if (IOStatus.okayToRetry(n) && isOpen()) { if (timeout > 0) { // read with timeout @@ -253,7 +273,7 @@ do { long startTime = System.nanoTime(); park(Net.POLLIN, nanos); - n = IOUtil.read(fd, dst, -1, nd); + n = tryRead(fd, b, off, len); if (n == IOStatus.UNAVAILABLE) { nanos -= System.nanoTime() - startTime; if (nanos <= 0) @@ -264,7 +284,7 @@ // read, no timeout do { park(Net.POLLIN); - n = IOUtil.read(fd, dst, -1, nd); + n = tryRead(fd, b, off, len); } while (IOStatus.okayToRetry(n) && isOpen()); } } @@ -306,19 +326,36 @@ } /** - * Writes a sequence of bytes to this socket from the given buffer. + * Try to write a sequence of bytes to this socket from the given byte array + */ + private int tryWrite(FileDescriptor fd, byte[] b, int off, int len) + throws IOException + { + ByteBuffer src = Util.getTemporaryDirectBuffer(len); + assert src.position() == 0; + try { + src.put(b, off, len); + return nd.write(fd, ((DirectBuffer)src).address(), len); + } finally { + Util.offerFirstTemporaryDirectBuffer(src); + } + } + + /** + * Writes a sequence of bytes to this socket from the given byte array. + * @return the number of bytes written * @throws IOException if the socket is closed or an I/O occurs */ - private int write(ByteBuffer dst) throws IOException { + private int write(byte[] b, int off, int len) throws IOException { writeLock.lock(); try { int n = 0; FileDescriptor fd = beginWrite(); try { - n = IOUtil.write(fd, dst, -1, nd); + n = tryWrite(fd, b, off, len); while (IOStatus.okayToRetry(n) && isOpen()) { park(Net.POLLOUT); - n = IOUtil.write(fd, dst, -1, nd); + n = tryWrite(fd, b, off, len); } return n; } finally { @@ -709,18 +746,17 @@ return (n > 0) ? (a[0] & 0xff) : -1; } @Override - public int read(byte b[], int off, int len) throws IOException { + public int read(byte[] b, int off, int len) throws IOException { Objects.checkFromIndexSize(off, len, b.length); if (eof) { - return -1; // legacy SocketInputStream behavior + return -1; } else if (len == 0) { return 0; } else { try { // read up to MAX_BUFFER_SIZE bytes int size = Math.min(len, MAX_BUFFER_SIZE); - ByteBuffer dst = ByteBuffer.wrap(b, off, size); - int n = NioSocketImpl.this.read(dst); + int n = NioSocketImpl.this.read(b, off, size); if (n == -1) eof = true; return n; @@ -751,20 +787,18 @@ write(a, 0, 1); } @Override - public void write(byte b[], int off, int len) throws IOException { + public void write(byte[] b, int off, int len) throws IOException { Objects.checkFromIndexSize(off, len, b.length); if (len > 0) { try { - ByteBuffer src = ByteBuffer.wrap(b, off, len); - int end = src.limit(); - int pos; - // write up to MAX_BUFFER_SIZE bytes at a time - while ((pos = src.position()) < end) { + int pos = off; + int end = off + len; + while (pos < end) { + // write up to MAX_BUFFER_SIZE bytes int size = Math.min((end - pos), MAX_BUFFER_SIZE); - src.limit(pos + size); - NioSocketImpl.this.write(src); + int n = NioSocketImpl.this.write(b, pos, size); + pos += n; } - assert src.limit() == end && src.remaining() == 0; } catch (IOException ioe) { throw new SocketException(ioe.getMessage()); } diff -r 15acbc1030ec -r 8bb7df86576a test/micro/org/openjdk/bench/java/net/SocketReadWrite.java --- a/test/micro/org/openjdk/bench/java/net/SocketReadWrite.java Tue Jan 29 17:42:58 2019 +0000 +++ b/test/micro/org/openjdk/bench/java/net/SocketReadWrite.java Wed Jan 30 14:33:52 2019 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2014, 2019 Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -28,80 +28,59 @@ import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; -import java.net.SocketException; import java.util.concurrent.TimeUnit; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.*; /** - * Tests the overheads of I/O API. - * This test is known to depend heavily on network conditions and paltform. + * Tests socket read/write. */ @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.MILLISECONDS) @State(Scope.Thread) public class SocketReadWrite { - private OutputStream os; - private InputStream is; - private ServerSocket ss; private Socket s1, s2; - private ReadThread rt; + private InputStream in; + private OutputStream out; @Setup public void beforeRun() throws IOException { - InetAddress iaddr = InetAddress.getLocalHost(); - - ss = new ServerSocket(0); - s1 = new Socket(iaddr, ss.getLocalPort()); - s2 = ss.accept(); - - os = s1.getOutputStream(); - is = s2.getInputStream(); - - rt = new ReadThread(is); - rt.start(); + InetAddress lb = InetAddress.getLoopbackAddress(); + try (ServerSocket ss = new ServerSocket(0)) { + s1 = new Socket(lb, ss.getLocalPort()); + s2 = ss.accept(); + } + s1.setTcpNoDelay(true); + s2.setTcpNoDelay(true); + in = s1.getInputStream(); + out = s2.getOutputStream(); } @TearDown - public void afterRun() throws IOException, InterruptedException { - os.write(0); - os.close(); - is.close(); + public void afterRun() throws IOException { s1.close(); s2.close(); - ss.close(); - rt.join(); } + @Param({"1", "1024", "8192", "64000", "128000"}) + public int size; + + private final byte[] array = new byte[512*1024]; + @Benchmark public void test() throws IOException { - os.write((byte) 4711); - } - - static class ReadThread extends Thread { - private InputStream is; - - public ReadThread(InputStream is) { - this.is = is; - } - - public void run() { - try { - while (is.read() > 0); - } catch (SocketException ex) { - // ignore - most likely "socket closed", which means shutdown - } catch (IOException e) { - e.printStackTrace(); + if (size == 1) { + out.write((byte) 47); + int c = in.read(); + } else { + out.write(array, 0, size); + int nread = 0; + while (nread < size) { + int n = in.read(array, 0, size); + if (n < 0) throw new RuntimeException(); + nread += n; } } } - }