--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1AsyncReceiver.java Mon Nov 06 18:17:09 2017 +0000
@@ -44,7 +44,6 @@
import jdk.incubator.http.internal.common.Demand;
import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
import jdk.incubator.http.internal.common.ConnectionExpiredException;
import jdk.incubator.http.internal.common.Utils;
@@ -146,7 +145,7 @@
private final ConcurrentLinkedDeque<ByteBuffer> queue
= new ConcurrentLinkedDeque<>();
private final SequentialScheduler scheduler =
- new SequentialScheduler(new SynchronizedRestartableTask(this::flush));
+ SequentialScheduler.synchronizedScheduler(this::flush);
private final Executor executor;
private final Http1TubeSubscriber subscriber = new Http1TubeSubscriber();
private final AtomicReference<Http1AsyncDelegate> pendingDelegateRef;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Mon Nov 06 18:17:09 2017 +0000
@@ -42,7 +42,7 @@
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.FlowTube;
import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
+import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
import static jdk.incubator.http.HttpClient.Version.HTTP_1_1;
@@ -83,9 +83,9 @@
private final Http1Publisher writePublisher = new Http1Publisher();
/** Completed when the header have been published, or there is an error */
- private volatile CompletableFuture<ExchangeImpl<T>> headersSentCF = new CompletableFuture<>();
+ private volatile CompletableFuture<ExchangeImpl<T>> headersSentCF = new MinimalFuture<>();
/** Completed when the body has been published, or there is an error */
- private volatile CompletableFuture<ExchangeImpl<T>> bodySentCF = new CompletableFuture<>();
+ private volatile CompletableFuture<ExchangeImpl<T>> bodySentCF = new MinimalFuture<>();
/** The subscriber to the request's body published. Maybe null. */
private volatile Http1BodySubscriber bodySubscriber;
@@ -238,13 +238,13 @@
operations.add(connectCF);
}
} else {
- connectCF = new CompletableFuture<>();
+ connectCF = new MinimalFuture<>();
connectCF.complete(null);
}
return connectCF
.thenCompose(unused -> {
- CompletableFuture<Void> cf = new CompletableFuture<>();
+ CompletableFuture<Void> cf = new MinimalFuture<>();
try {
connectFlows(connection);
@@ -519,8 +519,8 @@
volatile boolean cancelled;
final Http1WriteSubscription subscription = new Http1WriteSubscription();
final Demand demand = new Demand();
- final SequentialScheduler writeScheduler = new SequentialScheduler(
- new SynchronizedRestartableTask(new WriteTask()));
+ final SequentialScheduler writeScheduler =
+ SequentialScheduler.synchronizedScheduler(new WriteTask());
@Override
public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> s) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2Connection.java Mon Nov 06 18:17:09 2017 +0000
@@ -49,7 +49,6 @@
import javax.net.ssl.SSLEngine;
import jdk.incubator.http.internal.common.*;
import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
import jdk.incubator.http.internal.common.FlowTube.TubeSubscriber;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.Encoder;
@@ -154,8 +153,8 @@
}
// Preface is sent. Checks for pending data and flush it.
- // We rely on this method being called from within the readlock,
- // so we know that no other thread could execute this method
+ // We rely on this method being called from within the Http2TubeSubscriber
+ // scheduler, so we know that no other thread could execute this method
// concurrently while we're here.
// This ensures that later incoming buffers will not
// be processed before we have flushed the pending queue.
@@ -275,7 +274,9 @@
sendConnectionPreface();
}
- // async style but completes immediately
+ // Used when upgrading an HTTP/1.1 connection to HTTP/2 after receiving
+ // agreement from the server. Async style but completes immediately, because
+ // the connection is already connected.
static CompletableFuture<Http2Connection> createAsync(HttpConnection connection,
Http2ClientImpl client2,
Exchange<?> exchange,
@@ -297,7 +298,7 @@
return connection.connectAsync()
.thenCompose(unused -> checkSSLConfig(connection))
.thenCompose(notused-> {
- CompletableFuture<Http2Connection> cf = new CompletableFuture<>();
+ CompletableFuture<Http2Connection> cf = new MinimalFuture<>();
try {
Http2Connection hc = new Http2Connection(request, h2client, connection);
cf.complete(hc);
@@ -490,10 +491,8 @@
return readBufferPool.get(getMaxReceiveFrameSize() + Http2Frame.FRAME_HEADER_SIZE);
}
- private final Object readlock = new Object();
-
long count;
- public final void asyncReceive(ByteBufferReference buffer) {
+ final void asyncReceive(ByteBufferReference buffer) {
// We don't need to read anything and
// we don't want to send anything back to the server
// until the connection preface has been sent.
@@ -505,48 +504,43 @@
// sending a GOAWAY frame with 'invalid_preface'.
//
// Note: asyncReceive is only called from the Http2TubeSubscriber
- // sequential scheduler. Only asyncReceive uses the readLock.
- // Therefore synchronizing on the readlock here should be
- // safe.
- //
- synchronized (readlock) {
- try {
- Supplier<ByteBuffer> bs = initial;
- // ensure that we always handle the initial buffer first,
- // if any.
- if (bs != null) {
- initial = null;
- ByteBuffer b = bs.get();
- if (b.hasRemaining()) {
- long c = ++count;
- debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
- + c +"): " + b.remaining());
- framesController.processReceivedData(framesDecoder,
- ByteBufferReference.of(b));
- }
+ // sequential scheduler.
+ try {
+ Supplier<ByteBuffer> bs = initial;
+ // ensure that we always handle the initial buffer first,
+ // if any.
+ if (bs != null) {
+ initial = null;
+ ByteBuffer b = bs.get();
+ if (b.hasRemaining()) {
+ long c = ++count;
+ debug.log(Level.DEBUG, () -> "H2 Receiving Initial("
+ + c +"): " + b.remaining());
+ framesController.processReceivedData(framesDecoder,
+ ByteBufferReference.of(b));
}
- ByteBuffer b = buffer.get();
- // the readlock ensures that the order of incoming buffers
- // is preserved.
- if (b == EMPTY_TRIGGER) {
- debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
- boolean prefaceSent = framesController.prefaceSent;
- assert prefaceSent;
- // call framesController.processReceivedData to potentially
- // trigger the processing of all the data buffered there.
- framesController.processReceivedData(framesDecoder, buffer);
- debug.log(Level.DEBUG, "H2 processed buffered data");
- } else {
- long c = ++count;
- debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
- framesController.processReceivedData(framesDecoder, buffer);
- debug.log(Level.DEBUG, "H2 processed(%d)", c);
- }
- } catch (Throwable e) {
- String msg = Utils.stackTrace(e);
- Log.logTrace(msg);
- shutdown(e);
}
+ ByteBuffer b = buffer.get();
+ // the Http2TubeSubscriber scheduler ensures that the order of incoming
+ // buffers is preserved.
+ if (b == EMPTY_TRIGGER) {
+ debug.log(Level.DEBUG, "H2 Received EMPTY_TRIGGER");
+ boolean prefaceSent = framesController.prefaceSent;
+ assert prefaceSent;
+ // call framesController.processReceivedData to potentially
+ // trigger the processing of all the data buffered there.
+ framesController.processReceivedData(framesDecoder, buffer);
+ debug.log(Level.DEBUG, "H2 processed buffered data");
+ } else {
+ long c = ++count;
+ debug.log(Level.DEBUG, "H2 Receiving(%d): %d", c, b.remaining());
+ framesController.processReceivedData(framesDecoder, buffer);
+ debug.log(Level.DEBUG, "H2 processed(%d)", c);
+ }
+ } catch (Throwable e) {
+ String msg = Utils.stackTrace(e);
+ Log.logTrace(msg);
+ shutdown(e);
}
}
@@ -555,7 +549,7 @@
}
void shutdown(Throwable t) {
- debug.log(Level.DEBUG, () -> "Shutting down h2c: " + t);
+ debug.log(Level.DEBUG, () -> "Shutting down h2c (closed="+closed+"): " + t);
if (closed == true) return;
synchronized (this) {
if (closed == true) return;
@@ -1060,8 +1054,8 @@
volatile Throwable error;
final ConcurrentLinkedQueue<ByteBuffer> queue
= new ConcurrentLinkedQueue<>();
- final SequentialScheduler scheduler = new SequentialScheduler(
- new SynchronizedRestartableTask(this::processQueue));
+ final SequentialScheduler scheduler =
+ SequentialScheduler.synchronizedScheduler(this::processQueue);
final void processQueue() {
try {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Mon Nov 06 18:17:09 2017 +0000
@@ -44,7 +44,6 @@
import jdk.incubator.http.internal.common.*;
import jdk.incubator.http.internal.common.SequentialScheduler;
-import jdk.incubator.http.internal.common.SequentialScheduler.SynchronizedRestartableTask;
import jdk.incubator.http.internal.frame.*;
import jdk.incubator.http.internal.hpack.DecodingCallback;
import static java.util.stream.Collectors.toList;
@@ -99,7 +98,7 @@
final ConcurrentLinkedQueue<Http2Frame> inputQ = new ConcurrentLinkedQueue<>();
final SequentialScheduler sched =
- new SequentialScheduler(new SynchronizedRestartableTask(this::schedule));
+ SequentialScheduler.synchronizedScheduler(this::schedule);
final SubscriptionBase userSubscription = new SubscriptionBase(sched, this::cancel);
/**
@@ -124,7 +123,6 @@
volatile RequestSubscriber requestSubscriber;
volatile int responseCode;
volatile Response response;
- volatile CompletableFuture<Response> responseCF;
volatile Throwable failed; // The exception with which this stream was canceled.
final CompletableFuture<Void> requestBodyCF = new MinimalFuture<>();
volatile CompletableFuture<T> responseBodyCF;
@@ -563,7 +561,7 @@
}
OutgoingHeaders<Stream<T>> f = headerFrame(requestContentLen);
connection.sendFrame(f);
- CompletableFuture<ExchangeImpl<T>> cf = new CompletableFuture<ExchangeImpl<T>>();
+ CompletableFuture<ExchangeImpl<T>> cf = new MinimalFuture<>();
cf.complete(this); // #### good enough for now
return cf;
}
@@ -616,8 +614,8 @@
RequestSubscriber(long contentLen) {
this.contentLength = contentLen;
this.remainingContentLength = contentLen;
- this.sendScheduler = new SequentialScheduler(
- new SynchronizedRestartableTask(this::trySend));
+ this.sendScheduler =
+ SequentialScheduler.synchronizedScheduler(this::trySend);
}
@Override
@@ -900,6 +898,7 @@
// This method sends a RST_STREAM frame
void cancelImpl(Throwable e) {
+ debug.log(Level.DEBUG, "cancelling stream {0}: {1}", streamid, e);
if (Log.trace()) {
Log.logTrace("cancelling stream {0}: {1}\n", streamid, e);
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLFlowDelegate.java Mon Nov 06 18:17:09 2017 +0000
@@ -44,6 +44,7 @@
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
import javax.net.ssl.SSLEngineResult.Status;
import javax.net.ssl.SSLException;
+import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction;
/**
* Implements SSL using two SubscriberWrappers.
@@ -112,7 +113,7 @@
this.handshakeState = new AtomicInteger(NOT_HANDSHAKING);
this.cf = CompletableFuture.allOf(reader.completion(), writer.completion())
.thenRun(this::normalStop);
- this.alpnCF = new CompletableFuture<>();
+ this.alpnCF = new MinimalFuture<>();
//Monitor.add(this::monitor);
}
@@ -152,6 +153,11 @@
return sb.toString();
}
+ protected SchedulingAction enterReadScheduling() {
+ return SchedulingAction.CONTINUE;
+ }
+
+
/**
* Processing function for incoming data. Pass it thru SSLEngine.unwrap().
* Any decrypted buffers returned to be passed downstream.
@@ -172,21 +178,26 @@
static final int TARGET_BUFSIZE = 16 * 1024;
volatile ByteBuffer readBuf;
volatile boolean completing = false;
- final Object readLock = new Object();
+ final Object readBufferLock = new Object();
final System.Logger debugr =
Utils.getDebugLogger(this::dbgString, DEBUG);
- class ReaderDownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
+ class ReaderDownstreamPusher implements Runnable {
@Override public void run() { processData(); }
}
Reader() {
super();
- scheduler = new SequentialScheduler(new ReaderDownstreamPusher());
+ scheduler = SequentialScheduler.synchronizedScheduler(
+ new ReaderDownstreamPusher());
this.readBuf = ByteBuffer.allocate(1024);
readBuf.limit(0); // keep in read mode
}
+ protected SchedulingAction enterScheduling() {
+ return enterReadScheduling();
+ }
+
public final String dbgString() {
return "SSL Reader(" + tubeName + ")";
}
@@ -230,7 +241,7 @@
// readBuf is kept ready for reading outside of this method
private void addToReadBuf(List<ByteBuffer> buffers) {
- synchronized (readLock) {
+ synchronized (readBufferLock) {
for (ByteBuffer buf : buffers) {
readBuf.compact();
while (readBuf.remaining() < buf.remaining())
@@ -263,7 +274,7 @@
boolean handshaking = false;
try {
EngineResult result;
- synchronized (readLock) {
+ synchronized (readBufferLock) {
result = unwrapBuffer(readBuf);
debugr.log(Level.DEBUG, "Unwrapped: %s", result.result);
}
@@ -314,6 +325,7 @@
}
}
}
+
/**
* Returns a CompletableFuture which completes after all activity
* in the delegate is terminated (whether normally or exceptionally).
@@ -324,16 +336,6 @@
return cf;
}
- private String xxx(List<ByteBuffer> i) {
- StringBuilder sb = new StringBuilder();
- sb.append("xxx size=" + i.size());
- int x = 0;
- for (ByteBuffer b : i)
- x += b.remaining();
- sb.append(" total " + x);
- return sb.toString();
- }
-
public interface Monitorable {
public String getInfo();
}
@@ -706,8 +708,8 @@
return writer;
}
- public void resumeReader() {
- reader.schedule();
+ public boolean resumeReader() {
+ return reader.signalScheduling();
}
public void resetReaderDemand() {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SSLTube.java Mon Nov 06 18:17:09 2017 +0000
@@ -37,6 +37,7 @@
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLEngineResult.HandshakeStatus;
+import jdk.incubator.http.internal.common.SubscriberWrapper.SchedulingAction;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
import static javax.net.ssl.SSLEngineResult.HandshakeStatus.FINISHED;
@@ -60,14 +61,28 @@
writeSubscription = new SSLSubscriptionWrapper();
readSubscriber = new SSLSubscriberWrapper();
this.engine = engine;
- sslDelegate = new SSLFlowDelegate(engine,
+ sslDelegate = new SSLTubeFlowDelegate(engine,
executor,
readSubscriber,
- tube); // FIXME
+ tube);
tube.subscribe(sslDelegate.upstreamReader());
sslDelegate.upstreamWriter().onSubscribe(writeSubscription);
}
+ // the other possibility would be to pass a lambda to the
+ // constructor of SSLFlowDelegate (instead of subclassing it).
+ final class SSLTubeFlowDelegate extends SSLFlowDelegate {
+ SSLTubeFlowDelegate(SSLEngine engine, Executor executor,
+ SSLSubscriberWrapper readSubscriber,
+ FlowTube tube) {
+ super(engine, executor, readSubscriber, tube);
+ }
+ protected SchedulingAction enterReadScheduling() {
+ readSubscriber.processPendingSubscriber();
+ return SchedulingAction.CONTINUE;
+ }
+ }
+
public CompletableFuture<String> getALPN() {
return sslDelegate.alpn();
}
@@ -97,17 +112,20 @@
// onComplete/onError are not called before onSubscribed.
final static class DelegateWrapper implements FlowTube.TubeSubscriber {
private final FlowTube.TubeSubscriber delegate;
+ private final System.Logger debug;
volatile boolean subscribedCalled;
volatile boolean subscribedDone;
volatile boolean completed;
volatile Throwable error;
- DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
+ DelegateWrapper(Flow.Subscriber<? super List<ByteBuffer>> delegate,
+ System.Logger debug) {
this.delegate = FlowTube.asTubeSubscriber(delegate);
+ this.debug = debug;
}
@Override
public void dropSubscription() {
- if (subscribedCalled) {
+ if (subscribedCalled && !completed) {
delegate.dropSubscription();
}
}
@@ -140,15 +158,25 @@
finished = completed;
}
if (x != null) {
+ debug.log(Level.DEBUG,
+ "Subscriber completed before subscribe: forwarding %s",
+ (Object)x);
delegate.onError(x);
} else if (finished) {
+ debug.log(Level.DEBUG,
+ "Subscriber completed before subscribe: calling onComplete()");
delegate.onComplete();
}
}
@Override
public void onError(Throwable t) {
- if (completed) return;
+ if (completed) {
+ debug.log(Level.DEBUG,
+ "Subscriber already completed: ignoring %s",
+ (Object)t);
+ return;
+ }
boolean subscribed;
synchronized (this) {
if (completed) return;
@@ -158,6 +186,10 @@
}
if (subscribed) {
delegate.onError(t);
+ } else {
+ debug.log(Level.DEBUG,
+ "Subscriber not yet subscribed: stored %s",
+ (Object)t);
}
}
@@ -172,6 +204,9 @@
}
if (subscribed) {
delegate.onComplete();
+ } else {
+ debug.log(Level.DEBUG,
+ "Subscriber not yet subscribed: stored completed=true");
}
}
@@ -184,27 +219,73 @@
// Used to read data from the SSLTube.
final class SSLSubscriberWrapper implements FlowTube.TubeSubscriber {
- private volatile DelegateWrapper delegate;
+ private AtomicReference<DelegateWrapper> pendingDelegate =
+ new AtomicReference<>();
private volatile DelegateWrapper subscribed;
private volatile boolean onCompleteReceived;
private final AtomicReference<Throwable> errorRef
= new AtomicReference<>();
+ // setDelegate can be called asynchronously when the SSLTube flow
+ // is connected. At this time the permanent subscriber (this class)
+ // may already be subscribed (readSubscription != null) or not.
+ // 1. If it's already subscribed (readSubscription != null), we
+ // are going to signal the SSLFlowDelegate reader, and make sure
+ // onSubscribed is called within the reader flow
+ // 2. If it's not yet subscribed (readSubscription == null), then
+ // we're going to wait for onSubscribe/onConnection to be called.
+ //
void setDelegate(Flow.Subscriber<? super List<ByteBuffer>> delegate) {
debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) got delegate: %s",
delegate);
assert delegate != null;
- DelegateWrapper delegateWrapper = new DelegateWrapper(delegate);
+ DelegateWrapper delegateWrapper = new DelegateWrapper(delegate, debug);
+ DelegateWrapper previous;
Flow.Subscription subscription;
+ boolean handleNow;
synchronized (this) {
- this.delegate = delegateWrapper;
+ previous = pendingDelegate.getAndSet(delegateWrapper);
subscription = readSubscription;
+ handleNow = this.errorRef.get() != null || finished;
+ }
+ if (previous != null) {
+ previous.dropSubscription();
}
if (subscription == null) {
debug.log(Level.DEBUG, "SSLSubscriberWrapper (reader) no subscription yet");
return;
}
+ if (handleNow || !sslDelegate.resumeReader()) {
+ processPendingSubscriber();
+ }
+ }
+ // Can be called outside of the flow if an error has already been
+ // raise. Otherwise, must be called within the SSLFlowDelegate
+ // downstream reader flow.
+ // If there is a subscription, and if there is a pending delegate,
+ // calls dropSubscription() on the previous delegate (if any),
+ // then subscribe the pending delegate.
+ void processPendingSubscriber() {
+ Flow.Subscription subscription;
+ DelegateWrapper delegateWrapper, previous;
+ synchronized (this) {
+ delegateWrapper = pendingDelegate.get();
+ if (delegateWrapper == null) return;
+ subscription = readSubscription;
+ previous = subscribed;
+ }
+ if (subscription == null) {
+ debug.log(Level.DEBUG,
+ "SSLSubscriberWrapper (reader) %s",
+ "processPendingSubscriber: no subscription yet");
+ return;
+ }
+ delegateWrapper = pendingDelegate.getAndSet(null);
+ if (delegateWrapper == null) return;
+ if (previous != null) {
+ previous.dropSubscription();
+ }
onNewSubscription(delegateWrapper,
delegateWrapper::onSubscribe,
subscription);
@@ -212,7 +293,7 @@
@Override
public void dropSubscription() {
- DelegateWrapper subscriberImpl = delegate;
+ DelegateWrapper subscriberImpl = subscribed;
if (subscriberImpl != null) {
subscriberImpl.dropSubscription();
}
@@ -223,20 +304,7 @@
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) onConnection(%s)",
subscription);
- assert subscription != null;
- DelegateWrapper subscriberImpl;
- synchronized (this) {
- subscriberImpl = delegate;
- readSubscription = subscription;
- }
- if (subscriberImpl == null) {
- debug.log(Level.DEBUG,
- "SSLSubscriberWrapper (reader) onConnection: no delegate yet");
- return;
- }
- onNewSubscription(subscriberImpl,
- subscriberImpl::onConnection,
- subscription);
+ onSubscribeImpl(subscription);
}
@Override
@@ -244,21 +312,46 @@
debug.log(Level.DEBUG,
"SSLSubscriberWrapper (reader) onSubscribe(%s)",
subscription);
- readSubscription = subscription;
+ onSubscribeImpl(subscription);
+ }
+
+ // called in the reader flow, from either onSubscribe or onConnection.
+ private void onSubscribeImpl(Flow.Subscription subscription) {
assert subscription != null;
- DelegateWrapper subscriberImpl;
+ DelegateWrapper subscriberImpl, pending;
synchronized (this) {
- subscriberImpl = delegate;
readSubscription = subscription;
+ subscriberImpl = subscribed;
+ pending = pendingDelegate.get();
}
- if (subscriberImpl == null) {
+
+ if (subscriberImpl == null && pending == null) {
debug.log(Level.DEBUG,
- "SSLSubscriberWrapper (reader) onSubscribe: no delegate yet");
+ "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+ "no delegate yet");
return;
}
- onNewSubscription(subscriberImpl,
- subscriberImpl::onSubscribe,
- subscription);
+
+ if (pending == null) {
+ // There is no pending delegate, but we have a previously
+ // subscribed delegate. This is obviously a re-subscribe.
+ // We are in the downstream reader flow, so we should call
+ // onConnection directly.
+ debug.log(Level.DEBUG,
+ "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+ "resusbcribing");
+ onNewSubscription(subscriberImpl,
+ subscriberImpl::onConnection,
+ subscription);
+ } else {
+ // We have some pending subscriber: subscribe it now that we have
+ // a subscription. If we already had a previous delegate then
+ // it will get a dropSubscription().
+ debug.log(Level.DEBUG,
+ "SSLSubscriberWrapper (reader) onSubscribeImpl: %s",
+ "subscribing pending");
+ processPendingSubscriber();
+ }
}
private void onNewSubscription(DelegateWrapper subscriberImpl,
@@ -275,11 +368,6 @@
sslDelegate.resetReaderDemand();
// send the subscription to the subscriber.
method.accept(subscription);
- // reschedule after calling onSubscribe (this should not be
- // strictly needed as the first call to subscription.request()
- // coming after resetting the demand should trigger it).
- // However, it should not do any harm.
- sslDelegate.resumeReader();
// The following twisted logic is just here that we don't invoke
// onError before onSubscribe. It also prevents race conditions
@@ -287,9 +375,7 @@
synchronized (this) {
failed = this.errorRef.get();
completed = finished;
- if (delegate == subscriberImpl) {
- subscribed = subscriberImpl;
- }
+ subscribed = subscriberImpl;
}
if (failed != null) {
subscriberImpl.onError(failed);
@@ -300,7 +386,7 @@
@Override
public void onNext(List<ByteBuffer> item) {
- delegate.onNext(item);
+ subscribed.onNext(item);
}
public void onErrorImpl(Throwable throwable) {
@@ -322,6 +408,10 @@
} else {
debug.log(Level.DEBUG, "%s: delegate null, stored %s", this, failed);
}
+ // now if we have any pending subscriber, we should forward
+ // the error to them immediately as the read scheduler will
+ // already be stopped.
+ processPendingSubscriber();
}
@Override
@@ -351,6 +441,9 @@
finished = true;
subscriberImpl.onComplete();
}
+ // now if we have any pending subscriber, we should complete
+ // them immediately as the read scheduler will already be stopped.
+ processPendingSubscriber();
}
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SequentialScheduler.java Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SequentialScheduler.java Mon Nov 06 18:17:09 2017 +0000
@@ -343,4 +343,22 @@
public void stop() {
state.set(STOP);
}
+
+ /**
+ * Returns a new {@code SequentialScheduler} that executes the provided
+ * {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
+ *
+ * @apiNote
+ * This is equivalent to calling
+ * {@code new SequentialScheduler(new SynchronizedRestartableTask(mainloop));}
+ * The main loop must not do any blocking operation.
+ *
+ * @param mainloop The main loop of the new sequential scheduler.
+ * @return a new {@code SequentialScheduler} that executes the provided
+ * {@code mainLoop} from within a {@link SynchronizedRestartableTask}.
+ */
+ public static SequentialScheduler synchronizedScheduler(Runnable mainloop) {
+ return new SequentialScheduler(new SynchronizedRestartableTask(mainloop));
+ }
+
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SubscriberWrapper.java Mon Nov 06 18:17:09 2017 +0000
@@ -63,14 +63,16 @@
final System.Logger logger =
Utils.getDebugLogger(this::dbgString, DEBUG);
+ public enum SchedulingAction { CONTINUE, RETURN, RESCHEDULE };
+
volatile Flow.Subscription upstreamSubscription;
final SubscriptionBase downstreamSubscription;
volatile boolean upstreamCompleted;
volatile boolean downstreamCompleted;
volatile boolean completionAcknowledged;
private volatile Subscriber<? super List<ByteBuffer>> downstreamSubscriber;
- // Input Q and lo and hi pri output Qs.
- private final ConcurrentLinkedQueue<List<ByteBuffer>> inputQ;
+ // processed byte to send to the downstream subscriber.
+ private final ConcurrentLinkedQueue<List<ByteBuffer>> outputQ;
private final CompletableFuture<Void> cf;
private final SequentialScheduler pushScheduler;
private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
@@ -88,9 +90,10 @@
*/
public SubscriberWrapper()
{
- this.inputQ = new ConcurrentLinkedQueue<>();
- this.cf = new CompletableFuture<>();
- this.pushScheduler = new SequentialScheduler(new DownstreamPusher());
+ this.outputQ = new ConcurrentLinkedQueue<>();
+ this.cf = new MinimalFuture<>();
+ this.pushScheduler =
+ SequentialScheduler.synchronizedScheduler(new DownstreamPusher());
this.downstreamSubscription = new SubscriptionBase(pushScheduler,
this::downstreamCompletion);
}
@@ -160,6 +163,23 @@
}
/**
+ * Override this if anything needs to be done before checking for error
+ * and processing the input queue.
+ * @return
+ */
+ protected SchedulingAction enterScheduling() {
+ return SchedulingAction.CONTINUE;
+ }
+
+ protected boolean signalScheduling() {
+ if (downstreamCompleted || pushScheduler.isStopped()) {
+ return false;
+ }
+ pushScheduler.runOrSchedule();
+ return true;
+ }
+
+ /**
* Delivers buffers of data downstream. After incoming()
* has been called complete == true signifying completion of the upstream
* subscription, data may continue to be delivered, up to when outgoing() is
@@ -186,8 +206,8 @@
} else {
logger.log(Level.DEBUG, () -> "Adding "
+ Utils.remaining(buffers)
- + " to inputQ queue");
- inputQ.add(buffers);
+ + " to outputQ queue");
+ outputQ.add(buffers);
}
logger.log(Level.DEBUG, () -> "pushScheduler "
+ (pushScheduler.isStopped() ? " is stopped!" : " is alive"));
@@ -214,7 +234,7 @@
/**
* Invoked whenever it 'may' be possible to push buffers downstream.
*/
- class DownstreamPusher extends SequentialScheduler.CompleteRestartableTask {
+ class DownstreamPusher implements Runnable {
@Override
public void run() {
try {
@@ -229,7 +249,15 @@
logger.log(Level.DEBUG, "DownstreamPusher: downstream is already completed");
return;
}
-
+ switch (enterScheduling()) {
+ case CONTINUE: break;
+ case RESCHEDULE: pushScheduler.runOrSchedule(); return;
+ case RETURN: return;
+ default:
+ errorRef.compareAndSet(null,
+ new InternalError("unknown scheduling command"));
+ break;
+ }
// If there was an error, send it downstream.
Throwable error = errorRef.get();
if (error != null) {
@@ -240,13 +268,13 @@
logger.log(Level.DEBUG,
() -> "DownstreamPusher: forwarding error downstream: " + error);
pushScheduler.stop();
- inputQ.clear();
+ outputQ.clear();
downstreamSubscriber.onError(error);
return;
}
// OK - no error, let's proceed
- if (!inputQ.isEmpty()) {
+ if (!outputQ.isEmpty()) {
logger.log(Level.DEBUG,
"DownstreamPusher: queue not empty, downstreamSubscription: %s",
downstreamSubscription);
@@ -257,8 +285,8 @@
}
final boolean dbgOn = logger.isLoggable(Level.DEBUG);
- while (!inputQ.isEmpty() && downstreamSubscription.tryDecrement()) {
- List<ByteBuffer> b = inputQ.poll();
+ while (!outputQ.isEmpty() && downstreamSubscription.tryDecrement()) {
+ List<ByteBuffer> b = outputQ.poll();
if (dbgOn) logger.log(Level.DEBUG,
"DownstreamPusher: Pushing "
+ Utils.remaining(b)
@@ -273,7 +301,7 @@
AtomicLong upstreamWindow = new AtomicLong(0);
void upstreamWindowUpdate() {
- long downstreamQueueSize = inputQ.size();
+ long downstreamQueueSize = outputQ.size();
long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize);
if (n > 0)
upstreamRequest(n);
@@ -365,7 +393,7 @@
if (downstreamCompleted || !upstreamCompleted) {
return;
}
- if (!inputQ.isEmpty()) {
+ if (!outputQ.isEmpty()) {
return;
}
if (errorRef.get() != null) {
@@ -398,8 +426,8 @@
.append(" upstreamWindow: ").append(upstreamWindow.toString())
.append(" downstreamCompleted: ").append(Boolean.toString(downstreamCompleted))
.append(" completionAcknowledged: ").append(Boolean.toString(completionAcknowledged))
- .append(" inputQ size: ").append(Integer.toString(inputQ.size()))
- //.append(" inputQ: ").append(inputQ.toString())
+ .append(" outputQ size: ").append(Integer.toString(outputQ.size()))
+ //.append(" outputQ: ").append(outputQ.toString())
.append(" cf: ").append(cf.toString())
.append(" downstreamSubscription: ").append(downstreamSubscription.toString());
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Mon Nov 06 13:06:34 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/websocket/WebSocketImpl.java Mon Nov 06 18:17:09 2017 +0000
@@ -31,7 +31,6 @@
import java.net.Proxy;
import java.net.ProxySelector;
import java.net.URI;
-import java.net.URISyntaxException;
import java.net.URLPermission;
import java.nio.ByteBuffer;
import java.util.Collection;
@@ -48,6 +47,7 @@
import jdk.incubator.http.HttpClient;
import jdk.incubator.http.WebSocket;
import jdk.incubator.http.internal.common.Log;
+import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Pair;
import jdk.incubator.http.internal.common.SequentialScheduler;
import jdk.incubator.http.internal.common.SequentialScheduler.DeferredCompleter;
@@ -113,8 +113,8 @@
*/
private final Object lock = new Object();
- private final CompletableFuture<?> closeReceived = new CompletableFuture<>();
- private final CompletableFuture<?> closeSent = new CompletableFuture<>();
+ private final CompletableFuture<?> closeReceived = new MinimalFuture<>();
+ private final CompletableFuture<?> closeSent = new MinimalFuture<>();
/** Returns the security permission required for the given details. */
static URLPermission permissionForServer(URI uri,
@@ -412,7 +412,7 @@
}
private CompletableFuture<WebSocket> enqueue(OutgoingMessage m) {
- CompletableFuture<WebSocket> cf = new CompletableFuture<>();
+ CompletableFuture<WebSocket> cf = new MinimalFuture<>();
boolean added = queue.add(pair(m, cf));
if (!added) {
// The queue is supposed to be unbounded