--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Tue Nov 28 18:03:56 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Wed Nov 29 11:15:19 2017 +0000
@@ -25,19 +25,27 @@
import jdk.incubator.http.internal.common.Demand;
import jdk.incubator.http.internal.common.FlowTube;
+import jdk.incubator.http.internal.common.SSLFlowDelegate;
import jdk.incubator.http.internal.common.SSLTube;
import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.Utils;
import org.testng.annotations.Test;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManagerFactory;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.KeyStore;
@@ -48,6 +56,7 @@
import java.util.List;
import java.util.Queue;
import java.util.StringTokenizer;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
@@ -55,8 +64,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Test
@@ -80,9 +91,13 @@
/* Start of wiring */
ExecutorService sslExecutor = Executors.newCachedThreadPool();
/* Emulates an echo server */
- FlowTube server = new SSLTube(createSSLEngine(false),
- sslExecutor,
- new EchoTube(16));
+// FlowTube server = new SSLTube(createSSLEngine(false),
+// sslExecutor,
+// new EchoTube(16));
+ SSLLoopbackSubscriber server =
+ new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
+ server.start();
+
FlowTube client = new SSLTube(createSSLEngine(true),
sslExecutor,
server);
@@ -113,63 +128,188 @@
}
}
- private static final class EchoTube implements FlowTube {
-
- private final static Object EOF = new Object();
- private final Executor executor = Executors.newSingleThreadExecutor();
+ static class SSLLoopbackSubscriber implements FlowTube {
+ private final BlockingQueue<ByteBuffer> buffer;
+ private final Socket clientSock;
+ private final SSLSocket serverSock;
+ private final Thread thread1, thread2, thread3;
+ private volatile Flow.Subscription clientSubscription;
+ private final SubmissionPublisher<List<ByteBuffer>> publisher;
- private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
- private final int maxQueueSize;
- private final SequentialScheduler processingScheduler =
- new SequentialScheduler(createProcessingTask());
+ SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException {
+ SSLServerSocketFactory fac = ctx.getServerSocketFactory();
+ SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
+ SSLParameters params = serv.getSSLParameters();
+ params.setApplicationProtocols(new String[]{"proto2"});
+ serv.setSSLParameters(params);
+
+
+ int serverPort = serv.getLocalPort();
+ clientSock = new Socket("127.0.0.1", serverPort);
+ serverSock = (SSLSocket) serv.accept();
+ this.buffer = new LinkedBlockingQueue<>();
+ thread1 = new Thread(this::clientWriter, "clientWriter");
+ thread2 = new Thread(this::serverLoopback, "serverLoopback");
+ thread3 = new Thread(this::clientReader, "clientReader");
+ publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(),
+ this::handlePublisherException);
+ SSLFlowDelegate.Monitor.add(this::monitor);
+ }
- /* Writing into this tube */
- private long unfulfilled;
- private Flow.Subscription subscription;
+ public void start() {
+ thread1.start();
+ thread2.start();
+ thread3.start();
+ }
+
+ private void handlePublisherException(Object o, Throwable t) {
+ System.out.println("Loopback Publisher exception");
+ t.printStackTrace(System.out);
+ }
+
+ private final AtomicInteger readCount = new AtomicInteger();
- /* Reading from this tube */
- private final Demand demand = new Demand();
- private final AtomicBoolean cancelled = new AtomicBoolean();
- private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
-
- private EchoTube(int maxBufferSize) {
- if (maxBufferSize < 1)
- throw new IllegalArgumentException();
- this.maxQueueSize = maxBufferSize;
+ // reads off the SSLSocket the data from the "server"
+ private void clientReader() {
+ try {
+ InputStream is = clientSock.getInputStream();
+ final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+ System.out.println("clientReader: bufsize = " + bufsize);
+ while (true) {
+ byte[] buf = new byte[bufsize];
+ int n = is.read(buf);
+ if (n == -1) {
+ System.out.println("clientReader close: read "
+ + readCount.get() + " bytes");
+ publisher.close();
+ sleep(2000);
+ Utils.close(is, clientSock);
+ return;
+ }
+ ByteBuffer bb = ByteBuffer.wrap(buf, 0, n);
+ readCount.addAndGet(n);
+ publisher.submit(List.of(bb));
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Utils.close(clientSock);
+ }
}
- @Override
- public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
- this.subscriber = subscriber;
- this.subscriber.onSubscribe(new InternalSubscription());
+ // writes the encrypted data from SSLFLowDelegate to the j.n.Socket
+ // which is connected to the SSLSocket emulating a server.
+ private void clientWriter() {
+ long nbytes = 0;
+ try {
+ OutputStream os =
+ new BufferedOutputStream(clientSock.getOutputStream());
+
+ while (true) {
+ ByteBuffer buf = buffer.take();
+ if (buf == FlowTest.SENTINEL) {
+ // finished
+ //Utils.sleep(2000);
+ System.out.println("clientWriter close: " + nbytes + " written");
+ clientSock.shutdownOutput();
+ System.out.println("clientWriter close return");
+ return;
+ }
+ int len = buf.remaining();
+ int written = writeToStream(os, buf);
+ assert len == written;
+ nbytes += len;
+ assert !buf.hasRemaining()
+ : "buffer has " + buf.remaining() + " bytes left";
+ clientSubscription.request(1);
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ private int writeToStream(OutputStream os, ByteBuffer buf) throws IOException {
+ byte[] b = buf.array();
+ int offset = buf.arrayOffset() + buf.position();
+ int n = buf.limit() - buf.position();
+ os.write(b, offset, n);
+ buf.position(buf.limit());
+ os.flush();
+ return n;
+ }
+
+ private final AtomicInteger loopCount = new AtomicInteger();
+
+ public String monitor() {
+ return "serverLoopback: loopcount = " + loopCount.toString()
+ + " clientRead: count = " + readCount.toString();
+ }
+
+ // thread2
+ private void serverLoopback() {
+ try {
+ InputStream is = serverSock.getInputStream();
+ OutputStream os = serverSock.getOutputStream();
+ final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+ System.out.println("serverLoopback: bufsize = " + bufsize);
+ byte[] bb = new byte[bufsize];
+ while (true) {
+ int n = is.read(bb);
+ if (n == -1) {
+ sleep(2000);
+ is.close();
+ os.close();
+ serverSock.close();
+ return;
+ }
+ os.write(bb, 0, n);
+ os.flush();
+ loopCount.addAndGet(n);
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ /**
+ * This needs to be called before the chain is subscribed. It can't be
+ * supplied in the constructor.
+ */
+ public void setReturnSubscriber(Flow.Subscriber<List<ByteBuffer>> returnSubscriber) {
+ publisher.subscribe(returnSubscriber);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
- unfulfilled = maxQueueSize;
- (this.subscription = subscription).request(maxQueueSize);
+ clientSubscription = subscription;
+ clientSubscription.request(5);
}
@Override
public void onNext(List<ByteBuffer> item) {
- if (--unfulfilled == (maxQueueSize / 2)) {
- subscription.request(maxQueueSize - unfulfilled);
- unfulfilled = maxQueueSize;
+ try {
+ for (ByteBuffer b : item)
+ buffer.put(b);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Utils.close(clientSock);
}
- queue.add(item);
- processingScheduler.deferOrSchedule(executor);
}
@Override
public void onError(Throwable throwable) {
- queue.add(throwable);
- processingScheduler.deferOrSchedule(executor);
+ throwable.printStackTrace();
+ Utils.close(clientSock);
}
@Override
public void onComplete() {
- queue.add(EOF);
- processingScheduler.deferOrSchedule(executor);
+ try {
+ buffer.put(FlowTest.SENTINEL);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Utils.close(clientSock);
+ }
}
@Override
@@ -177,57 +317,163 @@
return false;
}
- private class InternalSubscription implements Flow.Subscription {
-
- @Override
- public void request(long n) {
- if (n <= 0) {
- throw new InternalError();
- }
- demand.increase(n);
- processingScheduler.runOrSchedule();
- }
-
- @Override
- public void cancel() {
- cancelled.set(true);
- }
+ @Override
+ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+ publisher.subscribe(subscriber);
}
-
- private SequentialScheduler.RestartableTask createProcessingTask() {
- return new SequentialScheduler.CompleteRestartableTask() {
+ }
- @Override
- protected void run() {
- while (!cancelled.get()) {
- Object item = queue.peek();
- if (item == null)
- return;
- try {
- if (item instanceof List) {
- if (!demand.tryDecrement())
- return;
- @SuppressWarnings("unchecked")
- List<ByteBuffer> bytes = (List<ByteBuffer>) item;
- subscriber.onNext(bytes);
- } else if (item instanceof Throwable) {
- cancelled.set(true);
- subscriber.onError((Throwable) item);
- } else if (item == EOF) {
- cancelled.set(true);
- subscriber.onComplete();
- } else {
- throw new InternalError(String.valueOf(item));
- }
- } finally {
- Object removed = queue.remove();
- assert removed == item;
- }
- }
- }
- };
+ private static void sleep(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+
}
}
+// private static final class EchoTube implements FlowTube {
+//
+// private final static Object EOF = new Object();
+// private final Executor executor = Executors.newSingleThreadExecutor();
+//
+// private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
+// private final int maxQueueSize;
+// private final SequentialScheduler processingScheduler =
+// new SequentialScheduler(createProcessingTask());
+//
+// /* Writing into this tube */
+// private long unfulfilled;
+// private Flow.Subscription subscription;
+//
+// /* Reading from this tube */
+// private final Demand demand = new Demand();
+// private final AtomicBoolean cancelled = new AtomicBoolean();
+// private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
+//
+// private EchoTube(int maxBufferSize) {
+// if (maxBufferSize < 1)
+// throw new IllegalArgumentException();
+// this.maxQueueSize = maxBufferSize;
+// }
+//
+// @Override
+// public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+// this.subscriber = subscriber;
+// System.out.println("EchoTube got subscriber: " + subscriber);
+// this.subscriber.onSubscribe(new InternalSubscription());
+// }
+//
+// @Override
+// public void onSubscribe(Flow.Subscription subscription) {
+// unfulfilled = maxQueueSize;
+// System.out.println("EchoTube request: " + maxQueueSize);
+// (this.subscription = subscription).request(maxQueueSize);
+// }
+//
+// @Override
+// public void onNext(List<ByteBuffer> item) {
+// if (--unfulfilled == (maxQueueSize / 2)) {
+// long req = maxQueueSize - unfulfilled;
+// subscription.request(req);
+// System.out.println("EchoTube request: " + req);
+// unfulfilled = maxQueueSize;
+// }
+// System.out.println("EchoTube add " + Utils.remaining(item));
+// queue.add(item);
+// processingScheduler.deferOrSchedule(executor);
+// }
+//
+// @Override
+// public void onError(Throwable throwable) {
+// System.out.println("EchoTube add " + throwable);
+// queue.add(throwable);
+// processingScheduler.deferOrSchedule(executor);
+// }
+//
+// @Override
+// public void onComplete() {
+// System.out.println("EchoTube add EOF");
+// queue.add(EOF);
+// processingScheduler.deferOrSchedule(executor);
+// }
+//
+// @Override
+// public boolean isFinished() {
+// return false;
+// }
+//
+// private class InternalSubscription implements Flow.Subscription {
+//
+// @Override
+// public void request(long n) {
+// System.out.println("EchoTube got request: " + n);
+// if (n <= 0) {
+// throw new InternalError();
+// }
+// demand.increase(n);
+// processingScheduler.runOrSchedule();
+// }
+//
+// @Override
+// public void cancel() {
+// cancelled.set(true);
+// }
+// }
+//
+// @Override
+// public String toString() {
+// return "EchoTube";
+// }
+//
+// private SequentialScheduler.RestartableTask createProcessingTask() {
+// return new SequentialScheduler.CompleteRestartableTask() {
+//
+// @Override
+// protected void run() {
+// try {
+// while (!cancelled.get()) {
+// Object item = queue.peek();
+// if (item == null)
+// return;
+// try {
+// System.out.println("EchoTube processing item");
+// if (item instanceof List) {
+// if (!demand.tryDecrement()) {
+// System.out.println("EchoTube no demand");
+// return;
+// }
+// @SuppressWarnings("unchecked")
+// List<ByteBuffer> bytes = (List<ByteBuffer>) item;
+// Object removed = queue.remove();
+// assert removed == item;
+// System.out.println("EchoTube processing "
+// + Utils.remaining(bytes));
+// subscriber.onNext(bytes);
+// } else if (item instanceof Throwable) {
+// cancelled.set(true);
+// Object removed = queue.remove();
+// assert removed == item;
+// System.out.println("EchoTube processing " + item);
+// subscriber.onError((Throwable) item);
+// } else if (item == EOF) {
+// cancelled.set(true);
+// Object removed = queue.remove();
+// assert removed == item;
+// System.out.println("EchoTube processing EOF");
+// subscriber.onComplete();
+// } else {
+// throw new InternalError(String.valueOf(item));
+// }
+// } finally {
+// }
+// }
+// } catch(Throwable t) {
+// t.printStackTrace();
+// throw t;
+// }
+// }
+// };
+// }
+// }
/**
* The final subscriber which receives the decrypted looped-back data. Just
@@ -253,6 +499,7 @@
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
unfulfilled = REQUEST_WINDOW;
+ System.out.println("EndSubscriber request " + REQUEST_WINDOW);
subscription.request(REQUEST_WINDOW);
}
@@ -269,7 +516,9 @@
@Override
public void onNext(List<ByteBuffer> buffers) {
if (--unfulfilled == (REQUEST_WINDOW / 2)) {
- subscription.request(REQUEST_WINDOW - unfulfilled);
+ long req = REQUEST_WINDOW - unfulfilled;
+ System.out.println("EndSubscriber request " + req);
+ subscription.request(req);
unfulfilled = REQUEST_WINDOW;
}
@@ -277,6 +526,7 @@
if (currval % 500 == 0) {
System.out.println("End: " + currval);
}
+ System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));
for (ByteBuffer buf : buffers) {
while (buf.hasRemaining()) {
@@ -298,6 +548,7 @@
@Override
public void onError(Throwable throwable) {
+ System.out.println("EndSubscriber onError " + throwable);
completion.completeExceptionally(throwable);
}
@@ -312,6 +563,10 @@
completion.complete(null);
}
}
+ @Override
+ public String toString() {
+ return "EndSubscriber";
+ }
}
private static SSLEngine createSSLEngine(boolean client) throws IOException {