# HG changeset patch # User dfuchs # Date 1511954119 0 # Node ID 583695a0ed6a489d0c0ce18dcecbb052a1c9b1f8 # Parent a36a236e55d8c2681494795f079981b78e755881 http-client-branch: review comment: SSLTubeTest is ignored (and some associated fix and cleanup) diff -r a36a236e55d8 -r 583695a0ed6a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java Tue Nov 28 18:03:56 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java Wed Nov 29 11:15:19 2017 +0000 @@ -90,7 +90,7 @@ ConnectionPool.CacheKey cacheKey() { return ConnectionPool.cacheKey(address, null); } - + @Override public void close() { plainConnection.close(); diff -r a36a236e55d8 -r 583695a0ed6a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SocketTube.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SocketTube.java Tue Nov 28 18:03:56 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/SocketTube.java Wed Nov 29 11:15:19 2017 +0000 @@ -558,8 +558,8 @@ if (subscribed || cancelled) return; subscribed = true; } - subscriber.onConnection(this); - debug.log(Level.DEBUG, "onConnection called"); + subscriber.onSubscribe(this); + debug.log(Level.DEBUG, "onSubscribe called"); if (errorRef.get() != null) { signalCompletion(); } diff -r a36a236e55d8 -r 583695a0ed6a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/FlowTube.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/FlowTube.java Tue Nov 28 18:03:56 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/FlowTube.java Wed Nov 29 11:15:19 2017 +0000 @@ -40,12 +40,7 @@ * flow. A FlowTube supports handing over the same read subscription to different * sequential read subscribers over time. When {@code connectFlows(writePublisher, * readSubscriber} is called, the FlowTube will call {@code dropSubscription} on - * its former readSubscriber, and {@code onConnection} on its new readSubscriber. - * By default, the implementation of {@code onConnection} is to call - * {@code onSubscribe}, but a subscriber that needs to subscribe sequentially - * several times to the same FlowTube may override the default implementation - * to ensure that {@code onSubscribe} is called only once. - * + * its former readSubscriber, and {@code onSubscribe} on its new readSubscriber. */ public interface FlowTube extends Flow.Publisher>, @@ -59,14 +54,6 @@ * should stop calling any method on its subscription. */ static interface TubeSubscriber extends Flow.Subscriber> { - /** - * Called by {@code FlowTube.connectFlows}. - * @param subscription the subscription. - * @implSpec By default this method call {@code this.onSubscribe()}. - */ - default void onConnection(Flow.Subscription subscription) { - onSubscribe(subscription); - } /** * Called when the flow is connected again, and the subscription @@ -176,10 +163,6 @@ @Override public void dropSubscription() {} @Override - public void onConnection(Flow.Subscription subscription) { - delegate.onSubscribe(subscription); - } - @Override public void onSubscribe(Flow.Subscription subscription) { delegate.onSubscribe(subscription); } diff -r a36a236e55d8 -r 583695a0ed6a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Tue Nov 28 18:03:56 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Wed Nov 29 11:15:19 2017 +0000 @@ -92,6 +92,7 @@ final CompletableFuture cf; final CompletableFuture alpnCF; // completes on initial handshake final static ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; + volatile boolean close_notify_received; /** * Creates an SSLFlowDelegate fed from two Flow.Subscribers. Each @@ -121,6 +122,15 @@ } /** + * Returns true if the SSLFlowDelegate has detected a TLS + * close_notify from the server. + * @return true, if a close_notify was detected. + */ + public boolean closeNotifyReceived() { + return close_notify_received; + } + + /** * Connects the read sink (downReader) to the SSLFlowDelegate Reader, * and the write sink (downWriter) to the SSLFlowDelegate Writer. * Called from within the constructor. Overwritten by SSLTube. @@ -461,6 +471,11 @@ scheduler.stop(); } + @Override + public boolean closing() { + return closeNotifyReceived(); + } + private boolean isCompleting() { synchronized(writeList) { int lastIndex = writeList.size() - 1; @@ -692,8 +707,7 @@ dst = b; break; case CLOSED: - doClosure(); - return new EngineResult(sslResult); + return doClosure(new EngineResult(sslResult)); case BUFFER_UNDERFLOW: // handled implicitly by compaction/reallocation of readBuf return new EngineResult(sslResult); @@ -705,9 +719,22 @@ } // FIXME: acknowledge a received CLOSE request from peer - void doClosure() throws IOException { - //while (!wrapAndSend(emptyArray)) - //; + EngineResult doClosure(EngineResult r) throws IOException { + debug.log(Level.DEBUG, + "doClosure(%s): %s [isOutboundDone: %s, isInboundDone: %s]", + r.result, engine.getHandshakeStatus(), + engine.isOutboundDone(), engine.isInboundDone()); + if (engine.getHandshakeStatus() == HandshakeStatus.NEED_WRAP) { + // we have received TLS close_notify and need to send + // an acknowledgement back. We're calling doHandshake + // to finish the close handshake. + if (engine.isInboundDone() && !engine.isOutboundDone()) { + debug.log(Level.DEBUG, "doClosure: close_notify received"); + close_notify_received = true; + doHandshake(r, READER); + } + } + return r; } /** diff -r a36a236e55d8 -r 583695a0ed6a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Tue Nov 28 18:03:56 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Wed Nov 29 11:15:19 2017 +0000 @@ -200,11 +200,6 @@ onSubscribe(delegate::onSubscribe, subscription); } - @Override - public void onConnection(Flow.Subscription subscription) { - onSubscribe(delegate::onConnection, subscription); - } - private void onSubscribe(Consumer method, Flow.Subscription subscription) { subscribedCalled = true; @@ -292,7 +287,7 @@ // are going to signal the SSLFlowDelegate reader, and make sure // onSubscribed is called within the reader flow // 2. If it's not yet subscribed (readSubscription == null), then - // we're going to wait for onSubscribe/onConnection to be called. + // we're going to wait for onSubscribe to be called. // void setDelegate(Flow.Subscriber> delegate) { debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s", @@ -345,9 +340,7 @@ if (previous != null) { previous.dropSubscription(); } - onNewSubscription(delegateWrapper, - delegateWrapper::onSubscribe, - subscription); + onNewSubscription(delegateWrapper, subscription); } @Override @@ -359,14 +352,6 @@ } @Override - public void onConnection(Flow.Subscription subscription) { - debug.log(Level.DEBUG, - "SSLSubscriberWrapper (reader) onConnection(%s)", - subscription); - onSubscribeImpl(subscription); - } - - @Override public void onSubscribe(Flow.Subscription subscription) { debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) onSubscribe(%s)", @@ -374,7 +359,7 @@ onSubscribeImpl(subscription); } - // called in the reader flow, from either onSubscribe or onConnection. + // called in the reader flow, from onSubscribe. private void onSubscribeImpl(Flow.Subscription subscription) { assert subscription != null; DelegateWrapper subscriberImpl, pending; @@ -395,13 +380,11 @@ // There is no pending delegate, but we have a previously // subscribed delegate. This is obviously a re-subscribe. // We are in the downstream reader flow, so we should call - // onConnection directly. + // onSubscribe directly. debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) onSubscribeImpl: %s", "resubscribing"); - onNewSubscription(subscriberImpl, - subscriberImpl::onConnection, - subscription); + onNewSubscription(subscriberImpl, subscription); } else { // We have some pending subscriber: subscribe it now that we have // a subscription. If we already had a previous delegate then @@ -414,10 +397,8 @@ } private void onNewSubscription(DelegateWrapper subscriberImpl, - Consumer method, Flow.Subscription subscription) { assert subscriberImpl != null; - assert method != null; assert subscription != null; Throwable failed; @@ -426,7 +407,7 @@ // subscriber sslDelegate.resetReaderDemand(); // send the subscription to the subscriber. - method.accept(subscription); + subscriberImpl.onSubscribe(subscription); // The following twisted logic is just here that we don't invoke // onError before onSubscribe. It also prevents race conditions @@ -484,6 +465,16 @@ return !(hs == NOT_HANDSHAKING || hs == FINISHED); } + private boolean handshakeFailed() { + // sslDelegate can be null if we reach here + // during the initial handshake, as that happens + // within the SSLFlowDelegate constructor. + // In that case we will want to raise an exception. + return handshaking() + && (sslDelegate == null + || !sslDelegate.closeNotifyReceived()); + } + @Override public void onComplete() { assert !finished && !onCompleteReceived; @@ -493,7 +484,12 @@ subscriberImpl = subscribed; } - if (handshaking()) { + if (handshakeFailed()) { + debug.log(Level.DEBUG, + "handshake: %s, inbound done: %s outbound done: %s", + engine.getHandshakeStatus(), + engine.isInboundDone(), + engine.isOutboundDone()); onErrorImpl(new SSLHandshakeException( "Remote host terminated the handshake")); } else if (subscriberImpl != null) { diff -r a36a236e55d8 -r 583695a0ed6a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java Tue Nov 28 18:03:56 2017 +0300 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java Wed Nov 29 11:15:19 2017 +0000 @@ -195,12 +195,23 @@ outgoing(List.of(buffer), complete); } + /** + * Sometime it might be necessary to complete the downstream subscriber + * before the upstream completes. For instance, when an SSL server + * sends a notify_close. In that case we should let the outgoing + * complete before upstream us completed. + * @return true, may be overridden by subclasses. + */ + public boolean closing() { + return false; + } + public void outgoing(List buffers, boolean complete) { Objects.requireNonNull(buffers); if (complete) { assert Utils.remaining(buffers) == 0; logger.log(Level.DEBUG, "completionAcknowledged"); - if (!upstreamCompleted) + if (!upstreamCompleted && !closing()) throw new IllegalStateException("upstream not completed"); completionAcknowledged = true; } else { diff -r a36a236e55d8 -r 583695a0ed6a test/jdk/java/net/httpclient/http2/BasicTest.java --- a/test/jdk/java/net/httpclient/http2/BasicTest.java Tue Nov 28 18:03:56 2017 +0300 +++ b/test/jdk/java/net/httpclient/http2/BasicTest.java Wed Nov 29 11:15:19 2017 +0000 @@ -103,6 +103,7 @@ currentCF.getAndUpdate((cf) -> { if (cf == null || cf.isDone()) { cf = exchange.sendPing(); + assert cf != null; cfs.add(cf); } return cf; @@ -123,8 +124,10 @@ paramsTest(); Thread.sleep(1000 * 4); CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0])).join(); - for (CompletableFuture cf : cfs) { - System.out.printf("Ping ack received in %d millisec\n", cf.get()); + synchronized (cfs) { + for (CompletableFuture cf : cfs) { + System.out.printf("Ping ack received in %d millisec\n", cf.get()); + } } } catch (Throwable tt) { System.err.println("tt caught"); diff -r a36a236e55d8 -r 583695a0ed6a test/jdk/java/net/httpclient/whitebox/SSLTubeTestDriver.java --- a/test/jdk/java/net/httpclient/whitebox/SSLTubeTestDriver.java Tue Nov 28 18:03:56 2017 +0300 +++ b/test/jdk/java/net/httpclient/whitebox/SSLTubeTestDriver.java Wed Nov 29 11:15:19 2017 +0000 @@ -24,7 +24,5 @@ /* * @test * @modules jdk.incubator.httpclient - * @ignore + * @run testng/othervm -Djdk.internal.httpclient.debug=true jdk.incubator.httpclient/jdk.incubator.http.SSLTubeTest */ - - // FIXME * @run testng jdk.incubator.httpclient/jdk.incubator.http.SSLTubeTest diff -r a36a236e55d8 -r 583695a0ed6a test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java --- a/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Tue Nov 28 18:03:56 2017 +0300 +++ b/test/jdk/java/net/httpclient/whitebox/jdk.incubator.httpclient/jdk/incubator/http/SSLTubeTest.java Wed Nov 29 11:15:19 2017 +0000 @@ -25,19 +25,27 @@ import jdk.incubator.http.internal.common.Demand; import jdk.incubator.http.internal.common.FlowTube; +import jdk.incubator.http.internal.common.SSLFlowDelegate; import jdk.incubator.http.internal.common.SSLTube; import jdk.incubator.http.internal.common.SequentialScheduler; +import jdk.incubator.http.internal.common.Utils; import org.testng.annotations.Test; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLServerSocket; +import javax.net.ssl.SSLServerSocketFactory; +import javax.net.ssl.SSLSocket; import javax.net.ssl.TrustManagerFactory; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; import java.nio.ByteBuffer; import java.security.KeyManagementException; import java.security.KeyStore; @@ -48,6 +56,7 @@ import java.util.List; import java.util.Queue; import java.util.StringTokenizer; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -55,8 +64,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @Test @@ -80,9 +91,13 @@ /* Start of wiring */ ExecutorService sslExecutor = Executors.newCachedThreadPool(); /* Emulates an echo server */ - FlowTube server = new SSLTube(createSSLEngine(false), - sslExecutor, - new EchoTube(16)); +// FlowTube server = new SSLTube(createSSLEngine(false), +// sslExecutor, +// new EchoTube(16)); + SSLLoopbackSubscriber server = + new SSLLoopbackSubscriber((new SimpleSSLContext()).get(), sslExecutor); + server.start(); + FlowTube client = new SSLTube(createSSLEngine(true), sslExecutor, server); @@ -113,63 +128,188 @@ } } - private static final class EchoTube implements FlowTube { - - private final static Object EOF = new Object(); - private final Executor executor = Executors.newSingleThreadExecutor(); + static class SSLLoopbackSubscriber implements FlowTube { + private final BlockingQueue buffer; + private final Socket clientSock; + private final SSLSocket serverSock; + private final Thread thread1, thread2, thread3; + private volatile Flow.Subscription clientSubscription; + private final SubmissionPublisher> publisher; - private final Queue queue = new ConcurrentLinkedQueue<>(); - private final int maxQueueSize; - private final SequentialScheduler processingScheduler = - new SequentialScheduler(createProcessingTask()); + SSLLoopbackSubscriber(SSLContext ctx, ExecutorService exec) throws IOException { + SSLServerSocketFactory fac = ctx.getServerSocketFactory(); + SSLServerSocket serv = (SSLServerSocket) fac.createServerSocket(0); + SSLParameters params = serv.getSSLParameters(); + params.setApplicationProtocols(new String[]{"proto2"}); + serv.setSSLParameters(params); + + + int serverPort = serv.getLocalPort(); + clientSock = new Socket("127.0.0.1", serverPort); + serverSock = (SSLSocket) serv.accept(); + this.buffer = new LinkedBlockingQueue<>(); + thread1 = new Thread(this::clientWriter, "clientWriter"); + thread2 = new Thread(this::serverLoopback, "serverLoopback"); + thread3 = new Thread(this::clientReader, "clientReader"); + publisher = new SubmissionPublisher<>(exec, Flow.defaultBufferSize(), + this::handlePublisherException); + SSLFlowDelegate.Monitor.add(this::monitor); + } - /* Writing into this tube */ - private long unfulfilled; - private Flow.Subscription subscription; + public void start() { + thread1.start(); + thread2.start(); + thread3.start(); + } + + private void handlePublisherException(Object o, Throwable t) { + System.out.println("Loopback Publisher exception"); + t.printStackTrace(System.out); + } + + private final AtomicInteger readCount = new AtomicInteger(); - /* Reading from this tube */ - private final Demand demand = new Demand(); - private final AtomicBoolean cancelled = new AtomicBoolean(); - private Flow.Subscriber> subscriber; - - private EchoTube(int maxBufferSize) { - if (maxBufferSize < 1) - throw new IllegalArgumentException(); - this.maxQueueSize = maxBufferSize; + // reads off the SSLSocket the data from the "server" + private void clientReader() { + try { + InputStream is = clientSock.getInputStream(); + final int bufsize = FlowTest.randomRange(512, 16 * 1024); + System.out.println("clientReader: bufsize = " + bufsize); + while (true) { + byte[] buf = new byte[bufsize]; + int n = is.read(buf); + if (n == -1) { + System.out.println("clientReader close: read " + + readCount.get() + " bytes"); + publisher.close(); + sleep(2000); + Utils.close(is, clientSock); + return; + } + ByteBuffer bb = ByteBuffer.wrap(buf, 0, n); + readCount.addAndGet(n); + publisher.submit(List.of(bb)); + } + } catch (Throwable e) { + e.printStackTrace(); + Utils.close(clientSock); + } } - @Override - public void subscribe(Flow.Subscriber> subscriber) { - this.subscriber = subscriber; - this.subscriber.onSubscribe(new InternalSubscription()); + // writes the encrypted data from SSLFLowDelegate to the j.n.Socket + // which is connected to the SSLSocket emulating a server. + private void clientWriter() { + long nbytes = 0; + try { + OutputStream os = + new BufferedOutputStream(clientSock.getOutputStream()); + + while (true) { + ByteBuffer buf = buffer.take(); + if (buf == FlowTest.SENTINEL) { + // finished + //Utils.sleep(2000); + System.out.println("clientWriter close: " + nbytes + " written"); + clientSock.shutdownOutput(); + System.out.println("clientWriter close return"); + return; + } + int len = buf.remaining(); + int written = writeToStream(os, buf); + assert len == written; + nbytes += len; + assert !buf.hasRemaining() + : "buffer has " + buf.remaining() + " bytes left"; + clientSubscription.request(1); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + + private int writeToStream(OutputStream os, ByteBuffer buf) throws IOException { + byte[] b = buf.array(); + int offset = buf.arrayOffset() + buf.position(); + int n = buf.limit() - buf.position(); + os.write(b, offset, n); + buf.position(buf.limit()); + os.flush(); + return n; + } + + private final AtomicInteger loopCount = new AtomicInteger(); + + public String monitor() { + return "serverLoopback: loopcount = " + loopCount.toString() + + " clientRead: count = " + readCount.toString(); + } + + // thread2 + private void serverLoopback() { + try { + InputStream is = serverSock.getInputStream(); + OutputStream os = serverSock.getOutputStream(); + final int bufsize = FlowTest.randomRange(512, 16 * 1024); + System.out.println("serverLoopback: bufsize = " + bufsize); + byte[] bb = new byte[bufsize]; + while (true) { + int n = is.read(bb); + if (n == -1) { + sleep(2000); + is.close(); + os.close(); + serverSock.close(); + return; + } + os.write(bb, 0, n); + os.flush(); + loopCount.addAndGet(n); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + + + /** + * This needs to be called before the chain is subscribed. It can't be + * supplied in the constructor. + */ + public void setReturnSubscriber(Flow.Subscriber> returnSubscriber) { + publisher.subscribe(returnSubscriber); } @Override public void onSubscribe(Flow.Subscription subscription) { - unfulfilled = maxQueueSize; - (this.subscription = subscription).request(maxQueueSize); + clientSubscription = subscription; + clientSubscription.request(5); } @Override public void onNext(List item) { - if (--unfulfilled == (maxQueueSize / 2)) { - subscription.request(maxQueueSize - unfulfilled); - unfulfilled = maxQueueSize; + try { + for (ByteBuffer b : item) + buffer.put(b); + } catch (InterruptedException e) { + e.printStackTrace(); + Utils.close(clientSock); } - queue.add(item); - processingScheduler.deferOrSchedule(executor); } @Override public void onError(Throwable throwable) { - queue.add(throwable); - processingScheduler.deferOrSchedule(executor); + throwable.printStackTrace(); + Utils.close(clientSock); } @Override public void onComplete() { - queue.add(EOF); - processingScheduler.deferOrSchedule(executor); + try { + buffer.put(FlowTest.SENTINEL); + } catch (InterruptedException e) { + e.printStackTrace(); + Utils.close(clientSock); + } } @Override @@ -177,57 +317,163 @@ return false; } - private class InternalSubscription implements Flow.Subscription { - - @Override - public void request(long n) { - if (n <= 0) { - throw new InternalError(); - } - demand.increase(n); - processingScheduler.runOrSchedule(); - } - - @Override - public void cancel() { - cancelled.set(true); - } + @Override + public void subscribe(Flow.Subscriber> subscriber) { + publisher.subscribe(subscriber); } - - private SequentialScheduler.RestartableTask createProcessingTask() { - return new SequentialScheduler.CompleteRestartableTask() { + } - @Override - protected void run() { - while (!cancelled.get()) { - Object item = queue.peek(); - if (item == null) - return; - try { - if (item instanceof List) { - if (!demand.tryDecrement()) - return; - @SuppressWarnings("unchecked") - List bytes = (List) item; - subscriber.onNext(bytes); - } else if (item instanceof Throwable) { - cancelled.set(true); - subscriber.onError((Throwable) item); - } else if (item == EOF) { - cancelled.set(true); - subscriber.onComplete(); - } else { - throw new InternalError(String.valueOf(item)); - } - } finally { - Object removed = queue.remove(); - assert removed == item; - } - } - } - }; + private static void sleep(long millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException e) { + } } +// private static final class EchoTube implements FlowTube { +// +// private final static Object EOF = new Object(); +// private final Executor executor = Executors.newSingleThreadExecutor(); +// +// private final Queue queue = new ConcurrentLinkedQueue<>(); +// private final int maxQueueSize; +// private final SequentialScheduler processingScheduler = +// new SequentialScheduler(createProcessingTask()); +// +// /* Writing into this tube */ +// private long unfulfilled; +// private Flow.Subscription subscription; +// +// /* Reading from this tube */ +// private final Demand demand = new Demand(); +// private final AtomicBoolean cancelled = new AtomicBoolean(); +// private Flow.Subscriber> subscriber; +// +// private EchoTube(int maxBufferSize) { +// if (maxBufferSize < 1) +// throw new IllegalArgumentException(); +// this.maxQueueSize = maxBufferSize; +// } +// +// @Override +// public void subscribe(Flow.Subscriber> subscriber) { +// this.subscriber = subscriber; +// System.out.println("EchoTube got subscriber: " + subscriber); +// this.subscriber.onSubscribe(new InternalSubscription()); +// } +// +// @Override +// public void onSubscribe(Flow.Subscription subscription) { +// unfulfilled = maxQueueSize; +// System.out.println("EchoTube request: " + maxQueueSize); +// (this.subscription = subscription).request(maxQueueSize); +// } +// +// @Override +// public void onNext(List item) { +// if (--unfulfilled == (maxQueueSize / 2)) { +// long req = maxQueueSize - unfulfilled; +// subscription.request(req); +// System.out.println("EchoTube request: " + req); +// unfulfilled = maxQueueSize; +// } +// System.out.println("EchoTube add " + Utils.remaining(item)); +// queue.add(item); +// processingScheduler.deferOrSchedule(executor); +// } +// +// @Override +// public void onError(Throwable throwable) { +// System.out.println("EchoTube add " + throwable); +// queue.add(throwable); +// processingScheduler.deferOrSchedule(executor); +// } +// +// @Override +// public void onComplete() { +// System.out.println("EchoTube add EOF"); +// queue.add(EOF); +// processingScheduler.deferOrSchedule(executor); +// } +// +// @Override +// public boolean isFinished() { +// return false; +// } +// +// private class InternalSubscription implements Flow.Subscription { +// +// @Override +// public void request(long n) { +// System.out.println("EchoTube got request: " + n); +// if (n <= 0) { +// throw new InternalError(); +// } +// demand.increase(n); +// processingScheduler.runOrSchedule(); +// } +// +// @Override +// public void cancel() { +// cancelled.set(true); +// } +// } +// +// @Override +// public String toString() { +// return "EchoTube"; +// } +// +// private SequentialScheduler.RestartableTask createProcessingTask() { +// return new SequentialScheduler.CompleteRestartableTask() { +// +// @Override +// protected void run() { +// try { +// while (!cancelled.get()) { +// Object item = queue.peek(); +// if (item == null) +// return; +// try { +// System.out.println("EchoTube processing item"); +// if (item instanceof List) { +// if (!demand.tryDecrement()) { +// System.out.println("EchoTube no demand"); +// return; +// } +// @SuppressWarnings("unchecked") +// List bytes = (List) item; +// Object removed = queue.remove(); +// assert removed == item; +// System.out.println("EchoTube processing " +// + Utils.remaining(bytes)); +// subscriber.onNext(bytes); +// } else if (item instanceof Throwable) { +// cancelled.set(true); +// Object removed = queue.remove(); +// assert removed == item; +// System.out.println("EchoTube processing " + item); +// subscriber.onError((Throwable) item); +// } else if (item == EOF) { +// cancelled.set(true); +// Object removed = queue.remove(); +// assert removed == item; +// System.out.println("EchoTube processing EOF"); +// subscriber.onComplete(); +// } else { +// throw new InternalError(String.valueOf(item)); +// } +// } finally { +// } +// } +// } catch(Throwable t) { +// t.printStackTrace(); +// throw t; +// } +// } +// }; +// } +// } /** * The final subscriber which receives the decrypted looped-back data. Just @@ -253,6 +499,7 @@ public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; unfulfilled = REQUEST_WINDOW; + System.out.println("EndSubscriber request " + REQUEST_WINDOW); subscription.request(REQUEST_WINDOW); } @@ -269,7 +516,9 @@ @Override public void onNext(List buffers) { if (--unfulfilled == (REQUEST_WINDOW / 2)) { - subscription.request(REQUEST_WINDOW - unfulfilled); + long req = REQUEST_WINDOW - unfulfilled; + System.out.println("EndSubscriber request " + req); + subscription.request(req); unfulfilled = REQUEST_WINDOW; } @@ -277,6 +526,7 @@ if (currval % 500 == 0) { System.out.println("End: " + currval); } + System.out.println("EndSubscriber onNext " + Utils.remaining(buffers)); for (ByteBuffer buf : buffers) { while (buf.hasRemaining()) { @@ -298,6 +548,7 @@ @Override public void onError(Throwable throwable) { + System.out.println("EndSubscriber onError " + throwable); completion.completeExceptionally(throwable); } @@ -312,6 +563,10 @@ completion.complete(null); } } + @Override + public String toString() { + return "EndSubscriber"; + } } private static SSLEngine createSSLEngine(boolean client) throws IOException {