# HG changeset patch # User chegar # Date 1520071045 0 # Node ID fc391230cf7b4427a6073c8bb7d6e241d2406882 # Parent 1753108d07b9a9b03f292f21a314bc2a9b56c805 http-client-branch: some SSLFlowDelegate fixes from Daniel and Chris diff -r 1753108d07b9 -r fc391230cf7b src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Sat Mar 03 09:54:31 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java Sat Mar 03 09:57:25 2018 +0000 @@ -307,10 +307,10 @@ // work function where it all happens void processData() { try { - debugr.log(Level.DEBUG, () -> "processData: " + readBuf.remaining() - + " bytes to unwrap " - + states(handshakeState) - + ", " + engine.getHandshakeStatus()); + debugr.log(Level.DEBUG, () -> "processData:" + + " readBuf remaining:" + readBuf.remaining() + + ", state:" + states(handshakeState) + + ", engine handshake status:" + engine.getHandshakeStatus()); int len; boolean complete = false; while ((len = readBuf.remaining()) > 0) { @@ -344,8 +344,9 @@ } if (result.handshaking() && !complete) { debugr.log(Level.DEBUG, "handshaking"); - doHandshake(result, READER); - resumeActivity(); + if (doHandshake(result, READER)) { + resumeActivity(); + } handshaking = true; } else { if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { @@ -507,8 +508,8 @@ } protected void onSubscribe() { - doHandshake(EngineResult.INIT, INIT); - resumeActivity(); + debugw.log(Level.DEBUG, "onSubscribe initiating handshaking"); + addData(HS_TRIGGER); // initiates handshaking } void schedule() { @@ -550,14 +551,20 @@ boolean completing = isCompleting(); try { - debugw.log(Level.DEBUG, () -> "processData(" + Utils.remaining(writeList) + ")"); - while (Utils.remaining(writeList) > 0 || hsTriggered() - || needWrap()) { + debugw.log(Level.DEBUG, () -> "processData, writeList remaining:" + + Utils.remaining(writeList) + ", hsTriggered:" + + hsTriggered() + ", needWrap:" + needWrap()); + + while (Utils.remaining(writeList) > 0 || hsTriggered() || needWrap()) { ByteBuffer[] outbufs = writeList.toArray(Utils.EMPTY_BB_ARRAY); EngineResult result = wrapBuffers(outbufs); debugw.log(Level.DEBUG, "wrapBuffer returned %s", result.result); if (result.status() == Status.CLOSED) { + if (!upstreamCompleted) { + upstreamCompleted = true; + upstreamSubscription.cancel(); + } if (result.bytesProduced() <= 0) return; @@ -571,7 +578,7 @@ boolean handshaking = false; if (result.handshaking()) { debugw.log(Level.DEBUG, "handshaking"); - doHandshake(result, WRITER); + doHandshake(result, WRITER); // ok to ignore return handshaking = true; } else { if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { @@ -582,20 +589,14 @@ cleanList(writeList); // tidy up the source list sendResultBytes(result); if (handshaking && !completing) { - if (writeList.isEmpty() && !result.needUnwrap()) { - writer.addData(HS_TRIGGER); + if (needWrap()) { + continue; + } else { + return; } - if (needWrap()) continue; - return; } } if (completing && Utils.remaining(writeList) == 0) { - /* - System.out.println("WRITER DOO 3"); - engine.closeOutbound(); - EngineResult result = wrapBuffers(Utils.EMPTY_BB_ARRAY); - sendResultBytes(result); - */ if (!completed) { completed = true; writeList.clear(); @@ -685,18 +686,19 @@ writer.stop(); } - private void normalStop() { - stopOnError(null); - } + boolean stopped; - boolean stopped = false; - - synchronized private Void stopOnError(Throwable t) { + private synchronized void normalStop() { if (stopped) - return null; + return; stopped = true; reader.stop(); writer.stop(); + } + + private Void stopOnError(Throwable currentlyUnused) { + // maybe log, etc + normalStop(); return null; } @@ -719,7 +721,7 @@ */ private static final int NOT_HANDSHAKING = 0; private static final int HANDSHAKING = 1; - private static final int INIT = 2; + private static final int DOING_TASKS = 4; // bit added to above state private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); @@ -737,9 +739,6 @@ case HANDSHAKING: sb.append(" HANDSHAKING "); break; - case INIT: - sb.append(" INIT "); - break; default: throw new InternalError(); } @@ -756,28 +755,37 @@ final AtomicInteger handshakeState; final ConcurrentLinkedQueue stateList = new ConcurrentLinkedQueue<>(); - private void doHandshake(EngineResult r, int caller) { - int s = handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS)); + private boolean doHandshake(EngineResult r, int caller) { + // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS + handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS)); stateList.add(r.handshakeStatus().toString()); stateList.add(Integer.toString(caller)); switch (r.handshakeStatus()) { case NEED_TASK: + int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS); if ((s & DOING_TASKS) > 0) // someone else was doing tasks - return; + return false; + + debug.log(Level.DEBUG, "obtaining and initiating task execution"); List tasks = obtainTasks(); executeTasks(tasks); - break; + return false; // executeTasks will resume activity case NEED_WRAP: - writer.addData(HS_TRIGGER); + if (caller == READER) { + writer.addData(HS_TRIGGER); + return false; + } break; case NEED_UNWRAP: case NEED_UNWRAP_AGAIN: // do nothing else + // receiving-side data will trigger unwrap break; default: throw new InternalError("Unexpected handshake status:" + r.handshakeStatus()); } + return true; } private List obtainTasks() { @@ -790,9 +798,10 @@ } private void executeTasks(List tasks) { + if (tasks.isEmpty()) + return; exec.execute(() -> { try { - handshakeState.getAndUpdate((current) -> current | DOING_TASKS); List nextTasks = tasks; do { nextTasks.forEach(Runnable::run); @@ -803,7 +812,7 @@ } } while (true); handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS); - writer.addData(HS_TRIGGER); + //writer.addData(HS_TRIGGER); resumeActivity(); } catch (Throwable t) { handleError(t); @@ -869,11 +878,6 @@ this.destBuffer = destBuffer; } - // Special result used to trigger handshaking in constructor - static EngineResult INIT = - new EngineResult( - new SSLEngineResult(SSLEngineResult.Status.OK, HandshakeStatus.NEED_WRAP, 0, 0)); - boolean handshaking() { HandshakeStatus s = result.getHandshakeStatus(); return s != HandshakeStatus.FINISHED diff -r 1753108d07b9 -r fc391230cf7b src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java --- a/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Sat Mar 03 09:54:31 2018 +0000 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/common/SubscriberWrapper.java Sat Mar 03 09:57:25 2018 +0000 @@ -204,7 +204,7 @@ * Sometime it might be necessary to complete the downstream subscriber * before the upstream completes. For instance, when an SSL server * sends a notify_close. In that case we should let the outgoing - * complete before upstream us completed. + * complete before upstream is completed. * @return true, may be overridden by subclasses. */ public boolean closing() { @@ -217,18 +217,19 @@ assert Utils.remaining(buffers) == 0; boolean closing = closing(); logger.log(Level.DEBUG, - "completionAcknowledged upstreamCompleted:%s, downstreamCompleted:%s, closing:%s", - upstreamCompleted, downstreamCompleted, closing); - if (!upstreamCompleted && !closing) + "completionAcknowledged upstreamCompleted:%s," + + " downstreamCompleted:%s, closing:%s", + upstreamCompleted, downstreamCompleted, closing); + if (!upstreamCompleted && !closing) { throw new IllegalStateException("upstream not completed"); + } completionAcknowledged = true; } else { logger.log(Level.DEBUG, () -> "Adding " - + Utils.remaining(buffers) - + " to outputQ queue"); + + Utils.remaining(buffers) + " to outputQ queue"); outputQ.add(buffers); } - logger.log(Level.DEBUG, () -> "pushScheduler " + logger.log(Level.DEBUG, () -> "pushScheduler" + (pushScheduler.isStopped() ? " is stopped!" : " is alive")); pushScheduler.runOrSchedule(); } @@ -281,7 +282,8 @@ Throwable error = errorRef.get(); if (error != null) { synchronized(this) { - if (downstreamCompleted) return; + if (downstreamCompleted) + return; downstreamCompleted = true; } logger.log(Level.DEBUG, @@ -319,9 +321,11 @@ void upstreamWindowUpdate() { long downstreamQueueSize = outputQ.size(); - long n = upstreamWindowUpdate(upstreamWindow.get(), downstreamQueueSize); - logger.log(Level.DEBUG, "upstreamWindowUpdate, downstreamQueueSize:%d, upstreamWindow:%d", - downstreamQueueSize, upstreamWindow.get()); + long upstreamWindowSize = upstreamWindow.get(); + long n = upstreamWindowUpdate(upstreamWindowSize, downstreamQueueSize); + logger.log(Level.DEBUG, "upstreamWindowUpdate, " + + "downstreamQueueSize:%d, upstreamWindow:%d", + downstreamQueueSize, upstreamWindowSize); if (n > 0) upstreamRequest(n); }