--- a/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java Tue Jan 29 17:42:58 2019 +0000
+++ b/src/java.base/share/classes/sun/nio/ch/NioSocketImpl.java Wed Jan 30 14:33:52 2019 +0000
@@ -194,7 +194,7 @@
throws IOException
{
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
- if (!nonBlocking && (timeout > 0)) {
+ if (timeout > 0 && !nonBlocking) {
IOUtil.configureBlocking(fd, false);
nonBlocking = true;
}
@@ -229,11 +229,31 @@
}
/**
- * Reads bytes from the socket into the given buffer.
+ * Try to read bytes from the socket into the given byte array.
+ */
+ private int tryRead(FileDescriptor fd, byte[] b, int off, int len)
+ throws IOException
+ {
+ ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
+ assert dst.position() == 0;
+ try {
+ int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
+ if (n > 0) {
+ dst.get(b, off, n);
+ }
+ return n;
+ } finally{
+ Util.offerFirstTemporaryDirectBuffer(dst);
+ }
+ }
+
+ /**
+ * Reads bytes from the socket into the given byte array.
+ * @return the number of bytes read
* @throws IOException if the socket is closed or an I/O occurs
* @throws SocketTimeoutException if the read timeout elapses
*/
- private int read(ByteBuffer dst) throws IOException {
+ private int read(byte[] b, int off, int len) throws IOException {
readLock.lock();
try {
int n = 0;
@@ -244,7 +264,7 @@
}
int timeout = this.timeout;
maybeConfigureNonBlocking(fd, timeout);
- n = IOUtil.read(fd, dst, -1, nd);
+ n = tryRead(fd, b, off, len);
if (IOStatus.okayToRetry(n) && isOpen()) {
if (timeout > 0) {
// read with timeout
@@ -253,7 +273,7 @@
do {
long startTime = System.nanoTime();
park(Net.POLLIN, nanos);
- n = IOUtil.read(fd, dst, -1, nd);
+ n = tryRead(fd, b, off, len);
if (n == IOStatus.UNAVAILABLE) {
nanos -= System.nanoTime() - startTime;
if (nanos <= 0)
@@ -264,7 +284,7 @@
// read, no timeout
do {
park(Net.POLLIN);
- n = IOUtil.read(fd, dst, -1, nd);
+ n = tryRead(fd, b, off, len);
} while (IOStatus.okayToRetry(n) && isOpen());
}
}
@@ -306,19 +326,36 @@
}
/**
- * Writes a sequence of bytes to this socket from the given buffer.
+ * Try to write a sequence of bytes to this socket from the given byte array
+ */
+ private int tryWrite(FileDescriptor fd, byte[] b, int off, int len)
+ throws IOException
+ {
+ ByteBuffer src = Util.getTemporaryDirectBuffer(len);
+ assert src.position() == 0;
+ try {
+ src.put(b, off, len);
+ return nd.write(fd, ((DirectBuffer)src).address(), len);
+ } finally {
+ Util.offerFirstTemporaryDirectBuffer(src);
+ }
+ }
+
+ /**
+ * Writes a sequence of bytes to this socket from the given byte array.
+ * @return the number of bytes written
* @throws IOException if the socket is closed or an I/O occurs
*/
- private int write(ByteBuffer dst) throws IOException {
+ private int write(byte[] b, int off, int len) throws IOException {
writeLock.lock();
try {
int n = 0;
FileDescriptor fd = beginWrite();
try {
- n = IOUtil.write(fd, dst, -1, nd);
+ n = tryWrite(fd, b, off, len);
while (IOStatus.okayToRetry(n) && isOpen()) {
park(Net.POLLOUT);
- n = IOUtil.write(fd, dst, -1, nd);
+ n = tryWrite(fd, b, off, len);
}
return n;
} finally {
@@ -709,18 +746,17 @@
return (n > 0) ? (a[0] & 0xff) : -1;
}
@Override
- public int read(byte b[], int off, int len) throws IOException {
+ public int read(byte[] b, int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
if (eof) {
- return -1; // legacy SocketInputStream behavior
+ return -1;
} else if (len == 0) {
return 0;
} else {
try {
// read up to MAX_BUFFER_SIZE bytes
int size = Math.min(len, MAX_BUFFER_SIZE);
- ByteBuffer dst = ByteBuffer.wrap(b, off, size);
- int n = NioSocketImpl.this.read(dst);
+ int n = NioSocketImpl.this.read(b, off, size);
if (n == -1)
eof = true;
return n;
@@ -751,20 +787,18 @@
write(a, 0, 1);
}
@Override
- public void write(byte b[], int off, int len) throws IOException {
+ public void write(byte[] b, int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
if (len > 0) {
try {
- ByteBuffer src = ByteBuffer.wrap(b, off, len);
- int end = src.limit();
- int pos;
- // write up to MAX_BUFFER_SIZE bytes at a time
- while ((pos = src.position()) < end) {
+ int pos = off;
+ int end = off + len;
+ while (pos < end) {
+ // write up to MAX_BUFFER_SIZE bytes
int size = Math.min((end - pos), MAX_BUFFER_SIZE);
- src.limit(pos + size);
- NioSocketImpl.this.write(src);
+ int n = NioSocketImpl.this.write(b, pos, size);
+ pos += n;
}
- assert src.limit() == end && src.remaining() == 0;
} catch (IOException ioe) {
throw new SocketException(ioe.getMessage());
}
--- a/test/micro/org/openjdk/bench/java/net/SocketReadWrite.java Tue Jan 29 17:42:58 2019 +0000
+++ b/test/micro/org/openjdk/bench/java/net/SocketReadWrite.java Wed Jan 30 14:33:52 2019 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2014 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2014, 2019 Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -28,80 +28,59 @@
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
-import java.net.SocketException;
import java.util.concurrent.TimeUnit;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.Mode;
-import org.openjdk.jmh.annotations.OutputTimeUnit;
-import org.openjdk.jmh.annotations.Scope;
-import org.openjdk.jmh.annotations.Setup;
-import org.openjdk.jmh.annotations.State;
-import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.*;
/**
- * Tests the overheads of I/O API.
- * This test is known to depend heavily on network conditions and paltform.
+ * Tests socket read/write.
*/
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
public class SocketReadWrite {
- private OutputStream os;
- private InputStream is;
- private ServerSocket ss;
private Socket s1, s2;
- private ReadThread rt;
+ private InputStream in;
+ private OutputStream out;
@Setup
public void beforeRun() throws IOException {
- InetAddress iaddr = InetAddress.getLocalHost();
-
- ss = new ServerSocket(0);
- s1 = new Socket(iaddr, ss.getLocalPort());
- s2 = ss.accept();
-
- os = s1.getOutputStream();
- is = s2.getInputStream();
-
- rt = new ReadThread(is);
- rt.start();
+ InetAddress lb = InetAddress.getLoopbackAddress();
+ try (ServerSocket ss = new ServerSocket(0)) {
+ s1 = new Socket(lb, ss.getLocalPort());
+ s2 = ss.accept();
+ }
+ s1.setTcpNoDelay(true);
+ s2.setTcpNoDelay(true);
+ in = s1.getInputStream();
+ out = s2.getOutputStream();
}
@TearDown
- public void afterRun() throws IOException, InterruptedException {
- os.write(0);
- os.close();
- is.close();
+ public void afterRun() throws IOException {
s1.close();
s2.close();
- ss.close();
- rt.join();
}
+ @Param({"1", "1024", "8192", "64000", "128000"})
+ public int size;
+
+ private final byte[] array = new byte[512*1024];
+
@Benchmark
public void test() throws IOException {
- os.write((byte) 4711);
- }
-
- static class ReadThread extends Thread {
- private InputStream is;
-
- public ReadThread(InputStream is) {
- this.is = is;
- }
-
- public void run() {
- try {
- while (is.read() > 0);
- } catch (SocketException ex) {
- // ignore - most likely "socket closed", which means shutdown
- } catch (IOException e) {
- e.printStackTrace();
+ if (size == 1) {
+ out.write((byte) 47);
+ int c = in.read();
+ } else {
+ out.write(array, 0, size);
+ int nread = 0;
+ while (nread < size) {
+ int n = in.read(array, 0, size);
+ if (n < 0) throw new RuntimeException();
+ nread += n;
}
}
}
-
}