src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java
changeset 54620 13b67c1420b8
parent 49493 814bd31f8da0
child 54754 193a8f1a4f3b
--- a/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java	Thu Apr 25 09:12:40 2019 +0200
+++ b/src/java.base/unix/classes/sun/nio/ch/SourceChannelImpl.java	Thu Apr 25 10:41:49 2019 +0100
@@ -35,6 +35,7 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.Objects;
+import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 class SourceChannelImpl
@@ -53,7 +54,8 @@
 
     // Lock held by any thread that modifies the state fields declared below
     // DO NOT invoke a blocking I/O operation while holding this lock!
-    private final Object stateLock = new Object();
+    private final ReentrantLock stateLock = new ReentrantLock();
+    private final Condition stateCondition = stateLock.newCondition();
 
     // -- The following fields are protected by stateLock
 
@@ -95,15 +97,19 @@
         boolean blocking;
 
         // set state to ST_CLOSING
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             assert state < ST_CLOSING;
             state = ST_CLOSING;
             blocking = isBlocking();
+        } finally {
+            stateLock.unlock();
         }
 
         // wait for any outstanding read to complete
         if (blocking) {
-            synchronized (stateLock) {
+            stateLock.lock();
+            try {
                 assert state == ST_CLOSING;
                 long th = thread;
                 if (th != 0) {
@@ -113,12 +119,14 @@
                     // wait for read operation to end
                     while (thread != 0) {
                         try {
-                            stateLock.wait();
+                            stateCondition.await();
                         } catch (InterruptedException e) {
                             interrupted = true;
                         }
                     }
                 }
+            } finally {
+                stateLock.unlock();
             }
         } else {
             // non-blocking mode: wait for read to complete
@@ -127,9 +135,12 @@
         }
 
         // set state to ST_KILLPENDING
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             assert state == ST_CLOSING;
             state = ST_KILLPENDING;
+        } finally {
+            stateLock.unlock();
         }
 
         // close socket if not registered with Selector
@@ -143,12 +154,15 @@
 
     @Override
     public void kill() throws IOException {
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             assert thread == 0;
             if (state == ST_KILLPENDING) {
                 state = ST_KILLED;
                 nd.close(fd);
             }
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -156,8 +170,11 @@
     protected void implConfigureBlocking(boolean block) throws IOException {
         readLock.lock();
         try {
-            synchronized (stateLock) {
+            stateLock.lock();
+            try {
                 IOUtil.configureBlocking(fd, block);
+            } finally {
+                stateLock.unlock();
             }
         } finally {
             readLock.unlock();
@@ -212,11 +229,14 @@
             // set hook for Thread.interrupt
             begin();
         }
-        synchronized (stateLock) {
+        stateLock.lock();
+        try {
             if (!isOpen())
                 throw new ClosedChannelException();
             if (blocking)
                 thread = NativeThread.current();
+        } finally {
+            stateLock.unlock();
         }
     }
 
@@ -230,12 +250,15 @@
         throws AsynchronousCloseException
     {
         if (blocking) {
-            synchronized (stateLock) {
+            stateLock.lock();
+            try {
                 thread = 0;
                 // notify any thread waiting in implCloseSelectableChannel
                 if (state == ST_CLOSING) {
-                    stateLock.notifyAll();
+                    stateCondition.signalAll();
                 }
+            } finally {
+                stateLock.unlock();
             }
             // remove hook for Thread.interrupt
             end(completed);
@@ -252,9 +275,13 @@
             int n = 0;
             try {
                 beginRead(blocking);
-                do {
-                    n = IOUtil.read(fd, dst, -1, nd);
-                } while ((n == IOStatus.INTERRUPTED) && isOpen());
+                n = IOUtil.read(fd, dst, -1, nd);
+                if (blocking) {
+                    while (IOStatus.okayToRetry(n) && isOpen()) {
+                        park(Net.POLLIN);
+                        n = IOUtil.read(fd, dst, -1, nd);
+                    }
+                }
             } finally {
                 endRead(blocking, n > 0);
                 assert IOStatus.check(n);
@@ -275,9 +302,13 @@
             long n = 0;
             try {
                 beginRead(blocking);
-                do {
-                    n = IOUtil.read(fd, dsts, offset, length, nd);
-                } while ((n == IOStatus.INTERRUPTED) && isOpen());
+                n = IOUtil.read(fd, dsts, offset, length, nd);
+                if (blocking) {
+                    while (IOStatus.okayToRetry(n) && isOpen()) {
+                        park(Net.POLLIN);
+                        n = IOUtil.read(fd, dsts, offset, length, nd);
+                    }
+                }
             } finally {
                 endRead(blocking, n > 0);
                 assert IOStatus.check(n);