http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup)
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java Wed Nov 29 11:15:19 2017 +0000
@@ -90,7 +90,7 @@
ConnectionPool.CacheKey cacheKey() {
return ConnectionPool.cacheKey(address, null);
}
-
+
@Override
public void close() {
plainConnection.close();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SocketTube.java Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SocketTube.java Wed Nov 29 11:15:19 2017 +0000
@@ -558,8 +558,8 @@
if (subscribed || cancelled) return;
subscribed = true;
}
- subscriber.onConnection(this);
- debug.log(Level.DEBUG, "onConnection called");
+ subscriber.onSubscribe(this);
+ debug.log(Level.DEBUG, "onSubscribe called");
if (errorRef.get() != null) {
signalCompletion();
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/FlowTube.java Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/FlowTube.java Wed Nov 29 11:15:19 2017 +0000
@@ -40,12 +40,7 @@
* flow. A FlowTube supports handing over the same read subscription to different
* sequential read subscribers over time. When {@code connectFlows(writePublisher,
* readSubscriber} is called, the FlowTube will call {@code dropSubscription} on
- * its former readSubscriber, and {@code onConnection} on its new readSubscriber.
- * By default, the implementation of {@code onConnection} is to call
- * {@code onSubscribe}, but a subscriber that needs to subscribe sequentially
- * several times to the same FlowTube may override the default implementation
- * to ensure that {@code onSubscribe} is called only once.
- *
+ * its former readSubscriber, and {@code onSubscribe} on its new readSubscriber.
*/
public interface FlowTube extends
Flow.Publisher<List<ByteBuffer>>,
@@ -59,14 +54,6 @@
* should stop calling any method on its subscription.
*/
static interface TubeSubscriber extends Flow.Subscriber<List<ByteBuffer>> {
- /**
- * Called by {@code FlowTube.connectFlows}.
- * @param subscription the subscription.
- * @implSpec By default this method call {@code this.onSubscribe()}.
- */
- default void onConnection(Flow.Subscription subscription) {
- onSubscribe(subscription);
- }
/**
* Called when the flow is connected again, and the subscription
@@ -176,10 +163,6 @@
@Override
public void dropSubscription() {}
@Override
- public void onConnection(Flow.Subscription subscription) {
- delegate.onSubscribe(subscription);
- }
- @Override
public void onSubscribe(Flow.Subscription subscription) {
delegate.onSubscribe(subscription);
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Wed Nov 29 11:15:19 2017 +0000
@@ -92,6 +92,7 @@
final CompletableFuture<Void> cf;
final CompletableFuture<String> alpnCF; // completes on initial handshake
final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
+ volatile boolean close_notify_received;
/**
* Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each
@@ -121,6 +122,15 @@
}
/**
+ * Returns true if the SSLFlowDelegate has detected a TLS
+ * close_notify from the server.
+ * @return true, if a close_notify was detected.
+ */
+ public boolean closeNotifyReceived() {
+ return close_notify_received;
+ }
+
+ /**
* Connects the read sink (downReader) to the SSLFlowDelegate Reader,
* and the write sink (downWriter) to the SSLFlowDelegate Writer.
* Called from within the constructor. Overwritten by SSLTube.
@@ -461,6 +471,11 @@
scheduler.stop();
}
+ @Override
+ public boolean closing() {
+ return closeNotifyReceived();
+ }
+
private boolean isCompleting() {
synchronized(writeList) {
int lastIndex = writeList.size() - 1;
@@ -692,8 +707,7 @@
dst = b;
break;
case CLOSED:
- doClosure();
- return new EngineResult(sslResult);
+ return doClosure(new EngineResult(sslResult));
case BUFFER_UNDERFLOW:
// handled implicitly by compaction/reallocation of readBuf
return new EngineResult(sslResult);
@@ -705,9 +719,22 @@
}
// FIXME: acknowledge a received CLOSE request from peer
- void doClosure() throws IOException {
- //while (!wrapAndSend(emptyArray))
- //;
+ EngineResult doClosure(EngineResult r) throws IOException {
+ debug.log(Level.DEBUG,
+ "doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]",
+ r.result, engine.getHandshakeStatus(),
+ engine.isOutboundDone(), engine.isInboundDone());
+ if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) {
+ // we have received TLS close_notify and need to send
+ // an acknowledgement back. We're calling doHandshake
+ // to finish the close handshake.
+ if (engine.isInboundDone() && !engine.isOutboundDone()) {
+ debug.log(Level.DEBUG, "doClosure: close_notify received");
+ close_notify_received = true;
+ doHandshake(r, READER);
+ }
+ }
+ return r;
}
/**
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Wed Nov 29 11:15:19 2017 +0000
@@ -200,11 +200,6 @@
onSubscribe(delegate::onSubscribe, subscription);
}
- @Override
- public void onConnection(Flow.Subscription subscription) {
- onSubscribe(delegate::onConnection, subscription);
- }
-
private void onSubscribe(Consumer<Flow.Subscription> method,
Flow.Subscription subscription) {
subscribedCalled = true;
@@ -292,7 +287,7 @@
// are going to signal the SSLFlowDelegate reader, and make sure
// onSubscribed is called within the reader flow
// 2. If it's not yet subscribed (readSubscription == null), then
- // we're going to wait for onSubscribe/onConnection to be called.
+ // we're going to wait for onSubscribe to be called.
//
void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
@@ -345,9 +340,7 @@
if (previous != null) {
previous.dropSubscription();
}
- onNewSubscription(delegateWrapper,
- delegateWrapper::onSubscribe,
- subscription);
+ onNewSubscription(delegateWrapper, subscription);
}
@Override
@@ -359,14 +352,6 @@
}
@Override
- public void onConnection(Flow.Subscription subscription) {
- debug.log(Level.DEBUG,
- "SSLSubscriberWrapper (reader) onConnection(%s)",
- subscription);
- onSubscribeImpl(subscription);
- }
-
- @Override
public void onSubscribe(Flow.Subscription subscription) {
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) onSubscribe(%s)",
@@ -374,7 +359,7 @@
onSubscribeImpl(subscription);
}
- // called in the reader flow, from either onSubscribe or onConnection.
+ // called in the reader flow, from onSubscribe.
private void onSubscribeImpl(Flow.Subscription subscription) {
assert subscription != null;
DelegateWrapper subscriberImpl, pending;
@@ -395,13 +380,11 @@
// There is no pending delegate, but we have a previously
// subscribed delegate. This is obviously a re-subscribe.
// We are in the downstream reader flow, so we should call
- // onConnection directly.
+ // onSubscribe directly.
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
"resubscribing");
- onNewSubscription(subscriberImpl,
- subscriberImpl::onConnection,
- subscription);
+ onNewSubscription(subscriberImpl, subscription);
} else {
// We have some pending subscriber: subscribe it now that we have
// a subscription. If we already had a previous delegate then
@@ -414,10 +397,8 @@
}
private void onNewSubscription(DelegateWrapper subscriberImpl,
- Consumer<Flow.Subscription> method,
Flow.Subscription subscription) {
assert subscriberImpl != null;
- assert method != null;
assert subscription != null;
Throwable failed;
@@ -426,7 +407,7 @@
// subscriber
sslDelegate.resetReaderDemand();
// send the subscription to the subscriber.
- method.accept(subscription);
+ subscriberImpl.onSubscribe(subscription);
// The following twisted logic is just here that we don't invoke
// onError before onSubscribe. It also prevents race conditions
@@ -484,6 +465,16 @@
return !(hs == NOT_HANDSHAKING || hs == FINISHED);
}
+ private boolean handshakeFailed() {
+ // sslDelegate can be null if we reach here
+ // during the initial handshake, as that happens
+ // within the SSLFlowDelegate constructor.
+ // In that case we will want to raise an exception.
+ return handshaking()
+ && (sslDelegate == null
+ || !sslDelegate.closeNotifyReceived());
+ }
+
@Override
public void onComplete() {
assert !finished && !onCompleteReceived;
@@ -493,7 +484,12 @@
subscriberImpl = subscribed;
}
- if (handshaking()) {
+ if (handshakeFailed()) {
+ debug.log(Level.DEBUG,
+ "handshake: %s, inbound done: %s outbound done: %s",
+ engine.getHandshakeStatus(),
+ engine.isInboundDone(),
+ engine.isOutboundDone());
onErrorImpl(new SSLHandshakeException(
"Remote host terminated the handshake"));
} else if (subscriberImpl != null) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java Tue Nov 28 18:03:56 2017 +0300
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java Wed Nov 29 11:15:19 2017 +0000
@@ -195,12 +195,23 @@
outgoing(List.of(buffer), complete);
}
+ /**
+ * Sometime it might be necessary to complete the downstream subscriber
+ * before the upstream completes. For instance, when an SSL server
+ * sends a notify_close. In that case we should let the outgoing
+ * complete before upstream us completed.
+ * @return true, may be overridden by subclasses.
+ */
+ public boolean closing() {
+ return false;
+ }
+
public void outgoing(List<ByteBuffer> buffers, boolean complete) {
Objects.requireNonNull(buffers);
if (complete) {
assert Utils.remaining(buffers) == 0;
logger.log(Level.DEBUG, "completionAcknowledged");
- if (!upstreamCompleted)
+ if (!upstreamCompleted && !closing())
throw new IllegalStateException("upstream not completed");
completionAcknowledged = true;
} else {
--- a/test/jdk/java/net/httpclient/http2/BasicTest.java Tue Nov 28 18:03:56 2017 +0300
+++ b/test/jdk/java/net/httpclient/http2/BasicTest.java Wed Nov 29 11:15:19 2017 +0000
@@ -103,6 +103,7 @@
currentCF.getAndUpdate((cf) -> {
if (cf == null || cf.isDone()) {
cf = exchange.sendPing();
+ assert cf != null;
cfs.add(cf);
}
return cf;
@@ -123,8 +124,10 @@
paramsTest();
Thread.sleep(1000 * 4);
CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])).join();
- for (CompletableFuture<Long> cf : cfs) {
- System.out.printf("Ping ack received in %d millisec\n", cf.get());
+ synchronized (cfs) {
+ for (CompletableFuture<Long> cf : cfs) {
+ System.out.printf("Ping ack received in %d millisec\n", cf.get());
+ }
}
} catch (Throwable tt) {
System.err.println("tt caught");
--- a/test/jdk/java/net/httpclient/whitebox/SSLTubeTestDriver.java Tue Nov 28 18:03:56 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/SSLTubeTestDriver.java Wed Nov 29 11:15:19 2017 +0000
@@ -24,7 +24,5 @@
/*
* @test
* @modules jdk.incubator.httpclient
- * @ignore
+ * @run testng/othervm -Djdk.internal.httpclient.debug=true jdk.incubator.httpclient/jdk.incubator.http.SSLTubeTest
*/
-
- // FIXME * @run testng jdk.incubator.httpclient/jdk.incubator.http.SSLTubeTest
--- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Tue Nov 28 18:03:56 2017 +0300
+++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Wed Nov 29 11:15:19 2017 +0000
@@ -25,19 +25,27 @@
import jdk.incubator.http.internal.common.Demand;
import jdk.incubator.http.internal.common.FlowTube;
+import jdk.incubator.http.internal.common.SSLFlowDelegate;
import jdk.incubator.http.internal.common.SSLTube;
import jdk.incubator.http.internal.common.SequentialScheduler;
+import jdk.incubator.http.internal.common.Utils;
import org.testng.annotations.Test;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
+import javax.net.ssl.SSLServerSocket;
+import javax.net.ssl.SSLServerSocketFactory;
+import javax.net.ssl.SSLSocket;
import javax.net.ssl.TrustManagerFactory;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
import java.nio.ByteBuffer;
import java.security.KeyManagementException;
import java.security.KeyStore;
@@ -48,6 +56,7 @@
import java.util.List;
import java.util.Queue;
import java.util.StringTokenizer;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
@@ -55,8 +64,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@Test
@@ -80,9 +91,13 @@
/* Start of wiring */
ExecutorService sslExecutor = Executors.newCachedThreadPool();
/* Emulates an echo server */
- FlowTube server = new SSLTube(createSSLEngine(false),
- sslExecutor,
- new EchoTube(16));
+// FlowTube server = new SSLTube(createSSLEngine(false),
+// sslExecutor,
+// new EchoTube(16));
+ SSLLoopbackSubscriber server =
+ new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor);
+ server.start();
+
FlowTube client = new SSLTube(createSSLEngine(true),
sslExecutor,
server);
@@ -113,63 +128,188 @@
}
}
- private static final class EchoTube implements FlowTube {
-
- private final static Object EOF = new Object();
- private final Executor executor = Executors.newSingleThreadExecutor();
+ static class SSLLoopbackSubscriber implements FlowTube {
+ private final BlockingQueue<ByteBuffer> buffer;
+ private final Socket clientSock;
+ private final SSLSocket serverSock;
+ private final Thread thread1, thread2, thread3;
+ private volatile Flow.Subscription clientSubscription;
+ private final SubmissionPublisher<List<ByteBuffer>> publisher;
- private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
- private final int maxQueueSize;
- private final SequentialScheduler processingScheduler =
- new SequentialScheduler(createProcessingTask());
+ SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException {
+ SSLServerSocketFactory fac = ctx.getServerSocketFactory();
+ SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0);
+ SSLParameters params = serv.getSSLParameters();
+ params.setApplicationProtocols(new String[]{"proto2"});
+ serv.setSSLParameters(params);
+
+
+ int serverPort = serv.getLocalPort();
+ clientSock = new Socket("127.0.0.1", serverPort);
+ serverSock = (SSLSocket) serv.accept();
+ this.buffer = new LinkedBlockingQueue<>();
+ thread1 = new Thread(this::clientWriter, "clientWriter");
+ thread2 = new Thread(this::serverLoopback, "serverLoopback");
+ thread3 = new Thread(this::clientReader, "clientReader");
+ publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(),
+ this::handlePublisherException);
+ SSLFlowDelegate.Monitor.add(this::monitor);
+ }
- /* Writing into this tube */
- private long unfulfilled;
- private Flow.Subscription subscription;
+ public void start() {
+ thread1.start();
+ thread2.start();
+ thread3.start();
+ }
+
+ private void handlePublisherException(Object o, Throwable t) {
+ System.out.println("Loopback Publisher exception");
+ t.printStackTrace(System.out);
+ }
+
+ private final AtomicInteger readCount = new AtomicInteger();
- /* Reading from this tube */
- private final Demand demand = new Demand();
- private final AtomicBoolean cancelled = new AtomicBoolean();
- private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
-
- private EchoTube(int maxBufferSize) {
- if (maxBufferSize < 1)
- throw new IllegalArgumentException();
- this.maxQueueSize = maxBufferSize;
+ // reads off the SSLSocket the data from the "server"
+ private void clientReader() {
+ try {
+ InputStream is = clientSock.getInputStream();
+ final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+ System.out.println("clientReader: bufsize = " + bufsize);
+ while (true) {
+ byte[] buf = new byte[bufsize];
+ int n = is.read(buf);
+ if (n == -1) {
+ System.out.println("clientReader close: read "
+ + readCount.get() + " bytes");
+ publisher.close();
+ sleep(2000);
+ Utils.close(is, clientSock);
+ return;
+ }
+ ByteBuffer bb = ByteBuffer.wrap(buf, 0, n);
+ readCount.addAndGet(n);
+ publisher.submit(List.of(bb));
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Utils.close(clientSock);
+ }
}
- @Override
- public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
- this.subscriber = subscriber;
- this.subscriber.onSubscribe(new InternalSubscription());
+ // writes the encrypted data from SSLFLowDelegate to the j.n.Socket
+ // which is connected to the SSLSocket emulating a server.
+ private void clientWriter() {
+ long nbytes = 0;
+ try {
+ OutputStream os =
+ new BufferedOutputStream(clientSock.getOutputStream());
+
+ while (true) {
+ ByteBuffer buf = buffer.take();
+ if (buf == FlowTest.SENTINEL) {
+ // finished
+ //Utils.sleep(2000);
+ System.out.println("clientWriter close: " + nbytes + " written");
+ clientSock.shutdownOutput();
+ System.out.println("clientWriter close return");
+ return;
+ }
+ int len = buf.remaining();
+ int written = writeToStream(os, buf);
+ assert len == written;
+ nbytes += len;
+ assert !buf.hasRemaining()
+ : "buffer has " + buf.remaining() + " bytes left";
+ clientSubscription.request(1);
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+ private int writeToStream(OutputStream os, ByteBuffer buf) throws IOException {
+ byte[] b = buf.array();
+ int offset = buf.arrayOffset() + buf.position();
+ int n = buf.limit() - buf.position();
+ os.write(b, offset, n);
+ buf.position(buf.limit());
+ os.flush();
+ return n;
+ }
+
+ private final AtomicInteger loopCount = new AtomicInteger();
+
+ public String monitor() {
+ return "serverLoopback: loopcount = " + loopCount.toString()
+ + " clientRead: count = " + readCount.toString();
+ }
+
+ // thread2
+ private void serverLoopback() {
+ try {
+ InputStream is = serverSock.getInputStream();
+ OutputStream os = serverSock.getOutputStream();
+ final int bufsize = FlowTest.randomRange(512, 16 * 1024);
+ System.out.println("serverLoopback: bufsize = " + bufsize);
+ byte[] bb = new byte[bufsize];
+ while (true) {
+ int n = is.read(bb);
+ if (n == -1) {
+ sleep(2000);
+ is.close();
+ os.close();
+ serverSock.close();
+ return;
+ }
+ os.write(bb, 0, n);
+ os.flush();
+ loopCount.addAndGet(n);
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ /**
+ * This needs to be called before the chain is subscribed. It can't be
+ * supplied in the constructor.
+ */
+ public void setReturnSubscriber(Flow.Subscriber<List<ByteBuffer>> returnSubscriber) {
+ publisher.subscribe(returnSubscriber);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
- unfulfilled = maxQueueSize;
- (this.subscription = subscription).request(maxQueueSize);
+ clientSubscription = subscription;
+ clientSubscription.request(5);
}
@Override
public void onNext(List<ByteBuffer> item) {
- if (--unfulfilled == (maxQueueSize / 2)) {
- subscription.request(maxQueueSize - unfulfilled);
- unfulfilled = maxQueueSize;
+ try {
+ for (ByteBuffer b : item)
+ buffer.put(b);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Utils.close(clientSock);
}
- queue.add(item);
- processingScheduler.deferOrSchedule(executor);
}
@Override
public void onError(Throwable throwable) {
- queue.add(throwable);
- processingScheduler.deferOrSchedule(executor);
+ throwable.printStackTrace();
+ Utils.close(clientSock);
}
@Override
public void onComplete() {
- queue.add(EOF);
- processingScheduler.deferOrSchedule(executor);
+ try {
+ buffer.put(FlowTest.SENTINEL);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Utils.close(clientSock);
+ }
}
@Override
@@ -177,57 +317,163 @@
return false;
}
- private class InternalSubscription implements Flow.Subscription {
-
- @Override
- public void request(long n) {
- if (n <= 0) {
- throw new InternalError();
- }
- demand.increase(n);
- processingScheduler.runOrSchedule();
- }
-
- @Override
- public void cancel() {
- cancelled.set(true);
- }
+ @Override
+ public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+ publisher.subscribe(subscriber);
}
-
- private SequentialScheduler.RestartableTask createProcessingTask() {
- return new SequentialScheduler.CompleteRestartableTask() {
+ }
- @Override
- protected void run() {
- while (!cancelled.get()) {
- Object item = queue.peek();
- if (item == null)
- return;
- try {
- if (item instanceof List) {
- if (!demand.tryDecrement())
- return;
- @SuppressWarnings("unchecked")
- List<ByteBuffer> bytes = (List<ByteBuffer>) item;
- subscriber.onNext(bytes);
- } else if (item instanceof Throwable) {
- cancelled.set(true);
- subscriber.onError((Throwable) item);
- } else if (item == EOF) {
- cancelled.set(true);
- subscriber.onComplete();
- } else {
- throw new InternalError(String.valueOf(item));
- }
- } finally {
- Object removed = queue.remove();
- assert removed == item;
- }
- }
- }
- };
+ private static void sleep(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+
}
}
+// private static final class EchoTube implements FlowTube {
+//
+// private final static Object EOF = new Object();
+// private final Executor executor = Executors.newSingleThreadExecutor();
+//
+// private final Queue<Object> queue = new ConcurrentLinkedQueue<>();
+// private final int maxQueueSize;
+// private final SequentialScheduler processingScheduler =
+// new SequentialScheduler(createProcessingTask());
+//
+// /* Writing into this tube */
+// private long unfulfilled;
+// private Flow.Subscription subscription;
+//
+// /* Reading from this tube */
+// private final Demand demand = new Demand();
+// private final AtomicBoolean cancelled = new AtomicBoolean();
+// private Flow.Subscriber<? super List<ByteBuffer>> subscriber;
+//
+// private EchoTube(int maxBufferSize) {
+// if (maxBufferSize < 1)
+// throw new IllegalArgumentException();
+// this.maxQueueSize = maxBufferSize;
+// }
+//
+// @Override
+// public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
+// this.subscriber = subscriber;
+// System.out.println("EchoTube got subscriber: " + subscriber);
+// this.subscriber.onSubscribe(new InternalSubscription());
+// }
+//
+// @Override
+// public void onSubscribe(Flow.Subscription subscription) {
+// unfulfilled = maxQueueSize;
+// System.out.println("EchoTube request: " + maxQueueSize);
+// (this.subscription = subscription).request(maxQueueSize);
+// }
+//
+// @Override
+// public void onNext(List<ByteBuffer> item) {
+// if (--unfulfilled == (maxQueueSize / 2)) {
+// long req = maxQueueSize - unfulfilled;
+// subscription.request(req);
+// System.out.println("EchoTube request: " + req);
+// unfulfilled = maxQueueSize;
+// }
+// System.out.println("EchoTube add " + Utils.remaining(item));
+// queue.add(item);
+// processingScheduler.deferOrSchedule(executor);
+// }
+//
+// @Override
+// public void onError(Throwable throwable) {
+// System.out.println("EchoTube add " + throwable);
+// queue.add(throwable);
+// processingScheduler.deferOrSchedule(executor);
+// }
+//
+// @Override
+// public void onComplete() {
+// System.out.println("EchoTube add EOF");
+// queue.add(EOF);
+// processingScheduler.deferOrSchedule(executor);
+// }
+//
+// @Override
+// public boolean isFinished() {
+// return false;
+// }
+//
+// private class InternalSubscription implements Flow.Subscription {
+//
+// @Override
+// public void request(long n) {
+// System.out.println("EchoTube got request: " + n);
+// if (n <= 0) {
+// throw new InternalError();
+// }
+// demand.increase(n);
+// processingScheduler.runOrSchedule();
+// }
+//
+// @Override
+// public void cancel() {
+// cancelled.set(true);
+// }
+// }
+//
+// @Override
+// public String toString() {
+// return "EchoTube";
+// }
+//
+// private SequentialScheduler.RestartableTask createProcessingTask() {
+// return new SequentialScheduler.CompleteRestartableTask() {
+//
+// @Override
+// protected void run() {
+// try {
+// while (!cancelled.get()) {
+// Object item = queue.peek();
+// if (item == null)
+// return;
+// try {
+// System.out.println("EchoTube processing item");
+// if (item instanceof List) {
+// if (!demand.tryDecrement()) {
+// System.out.println("EchoTube no demand");
+// return;
+// }
+// @SuppressWarnings("unchecked")
+// List<ByteBuffer> bytes = (List<ByteBuffer>) item;
+// Object removed = queue.remove();
+// assert removed == item;
+// System.out.println("EchoTube processing "
+// + Utils.remaining(bytes));
+// subscriber.onNext(bytes);
+// } else if (item instanceof Throwable) {
+// cancelled.set(true);
+// Object removed = queue.remove();
+// assert removed == item;
+// System.out.println("EchoTube processing " + item);
+// subscriber.onError((Throwable) item);
+// } else if (item == EOF) {
+// cancelled.set(true);
+// Object removed = queue.remove();
+// assert removed == item;
+// System.out.println("EchoTube processing EOF");
+// subscriber.onComplete();
+// } else {
+// throw new InternalError(String.valueOf(item));
+// }
+// } finally {
+// }
+// }
+// } catch(Throwable t) {
+// t.printStackTrace();
+// throw t;
+// }
+// }
+// };
+// }
+// }
/**
* The final subscriber which receives the decrypted looped-back data. Just
@@ -253,6 +499,7 @@
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
unfulfilled = REQUEST_WINDOW;
+ System.out.println("EndSubscriber request " + REQUEST_WINDOW);
subscription.request(REQUEST_WINDOW);
}
@@ -269,7 +516,9 @@
@Override
public void onNext(List<ByteBuffer> buffers) {
if (--unfulfilled == (REQUEST_WINDOW / 2)) {
- subscription.request(REQUEST_WINDOW - unfulfilled);
+ long req = REQUEST_WINDOW - unfulfilled;
+ System.out.println("EndSubscriber request " + req);
+ subscription.request(req);
unfulfilled = REQUEST_WINDOW;
}
@@ -277,6 +526,7 @@
if (currval % 500 == 0) {
System.out.println("End: " + currval);
}
+ System.out.println("EndSubscriber onNext " + Utils.remaining(buffers));
for (ByteBuffer buf : buffers) {
while (buf.hasRemaining()) {
@@ -298,6 +548,7 @@
@Override
public void onError(Throwable throwable) {
+ System.out.println("EndSubscriber onError " + throwable);
completion.completeExceptionally(throwable);
}
@@ -312,6 +563,10 @@
completion.complete(null);
}
}
+ @Override
+ public String toString() {
+ return "EndSubscriber";
+ }
}
private static SSLEngine createSSLEngine(boolean client) throws IOException {