src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
branchhttp-client-branch
changeset 56772 38a290eb60eb
parent 56531 15ff86a732ea
child 56795 03ece2518428
equal deleted inserted replaced
56771:73a6534bce94 56772:38a290eb60eb
    44 import java.util.concurrent.Executor;
    44 import java.util.concurrent.Executor;
    45 import java.util.concurrent.Flow;
    45 import java.util.concurrent.Flow;
    46 import java.util.concurrent.Flow.Subscriber;
    46 import java.util.concurrent.Flow.Subscriber;
    47 import java.util.concurrent.atomic.AtomicInteger;
    47 import java.util.concurrent.atomic.AtomicInteger;
    48 import java.util.function.Consumer;
    48 import java.util.function.Consumer;
       
    49 import java.util.function.IntBinaryOperator;
    49 
    50 
    50 /**
    51 /**
    51  * Implements SSL using two SubscriberWrappers.
    52  * Implements SSL using two SubscriberWrappers.
    52  *
    53  *
    53  * <p> Constructor takes two Flow.Subscribers: one that receives the network
    54  * <p> Constructor takes two Flow.Subscribers: one that receives the network
    89 
    90 
    90     private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    91     private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER;
    91     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
    92     private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0);
    92     // When handshake is in progress trying to wrap may produce no bytes.
    93     // When handshake is in progress trying to wrap may produce no bytes.
    93     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
    94     private static final ByteBuffer NOTHING = ByteBuffer.allocate(0);
       
    95     private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate");
    94 
    96 
    95     final Executor exec;
    97     final Executor exec;
    96     final Reader reader;
    98     final Reader reader;
    97     final Writer writer;
    99     final Writer writer;
    98     final SSLEngine engine;
   100     final SSLEngine engine;
   148 
   150 
   149         // connect the Reader to the downReader and the
   151         // connect the Reader to the downReader and the
   150         // Writer to the downWriter.
   152         // Writer to the downWriter.
   151         connect(downReader, downWriter);
   153         connect(downReader, downWriter);
   152 
   154 
   153         //Monitor.add(this::monitor);
   155         if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true")))
       
   156             Monitor.add(this::monitor);
   154     }
   157     }
   155 
   158 
   156     /**
   159     /**
   157      * Returns true if the SSLFlowDelegate has detected a TLS
   160      * Returns true if the SSLFlowDelegate has detected a TLS
   158      * close_notify from the server.
   161      * close_notify from the server.
   862      * and write() functions.
   865      * and write() functions.
   863      */
   866      */
   864     private static final int NOT_HANDSHAKING = 0;
   867     private static final int NOT_HANDSHAKING = 0;
   865     private static final int HANDSHAKING = 1;
   868     private static final int HANDSHAKING = 1;
   866 
   869 
   867     private static final int DOING_TASKS = 4; // bit added to above state
   870     // Bit flags
       
   871     // a thread is currently executing tasks
       
   872     private static final int DOING_TASKS = 4;
       
   873     // a thread wants to execute tasks, while another thread is executing
       
   874     private static final int REQUESTING_TASKS = 8;
       
   875     private static final int TASK_BITS = 12; // Both bits
   868 
   876 
   869     private static final int READER = 1;
   877     private static final int READER = 1;
   870     private static final int WRITER = 2;
   878     private static final int WRITER = 2;
   871 
   879 
   872     private static String states(AtomicInteger state) {
   880     private static String states(AtomicInteger state) {
   873         int s = state.get();
   881         int s = state.get();
   874         StringBuilder sb = new StringBuilder();
   882         StringBuilder sb = new StringBuilder();
   875         int x = s & ~DOING_TASKS;
   883         int x = s & ~TASK_BITS;
   876         switch (x) {
   884         switch (x) {
   877             case NOT_HANDSHAKING:
   885             case NOT_HANDSHAKING:
   878                 sb.append(" NOT_HANDSHAKING ");
   886                 sb.append(" NOT_HANDSHAKING ");
   879                 break;
   887                 break;
   880             case HANDSHAKING:
   888             case HANDSHAKING:
   883             default:
   891             default:
   884                 throw new InternalError();
   892                 throw new InternalError();
   885         }
   893         }
   886         if ((s & DOING_TASKS) > 0)
   894         if ((s & DOING_TASKS) > 0)
   887             sb.append("|DOING_TASKS");
   895             sb.append("|DOING_TASKS");
       
   896         if ((s & REQUESTING_TASKS) > 0)
       
   897             sb.append("|REQUESTING_TASKS");
   888         return sb.toString();
   898         return sb.toString();
   889     }
   899     }
   890 
   900 
   891     private void resumeActivity() {
   901     private void resumeActivity() {
   892         reader.schedule();
   902         reader.schedule();
   895 
   905 
   896     final AtomicInteger handshakeState;
   906     final AtomicInteger handshakeState;
   897     final ConcurrentLinkedQueue<String> stateList =
   907     final ConcurrentLinkedQueue<String> stateList =
   898             debug.on() ? new ConcurrentLinkedQueue<>() : null;
   908             debug.on() ? new ConcurrentLinkedQueue<>() : null;
   899 
   909 
       
   910     // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS
       
   911     // depending on previous value
       
   912     private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> {
       
   913         if ((current & DOING_TASKS) == 0)
       
   914             return DOING_TASKS | (current & HANDSHAKING);
       
   915         else
       
   916             return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING);
       
   917     };
       
   918 
       
   919     // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set
       
   920     // clears bits if not.
       
   921     private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> {
       
   922         if ((current & REQUESTING_TASKS) != 0)
       
   923             return DOING_TASKS | (current & HANDSHAKING);
       
   924         // clear both bits
       
   925         return (current & HANDSHAKING);
       
   926     };
       
   927 
   900     private boolean doHandshake(EngineResult r, int caller) {
   928     private boolean doHandshake(EngineResult r, int caller) {
   901         // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS
   929         // unconditionally sets the HANDSHAKING bit, while preserving task bits
   902         handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS));
   930         handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS));
   903         if (stateList != null && debug.on()) {
   931         if (stateList != null && debug.on()) {
   904             stateList.add(r.handshakeStatus().toString());
   932             stateList.add(r.handshakeStatus().toString());
   905             stateList.add(Integer.toString(caller));
   933             stateList.add(Integer.toString(caller));
   906         }
   934         }
   907         switch (r.handshakeStatus()) {
   935         switch (r.handshakeStatus()) {
   908             case NEED_TASK:
   936             case NEED_TASK:
   909                 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS);
   937                 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS);
   910                 if ((s & DOING_TASKS) > 0) // someone else was doing tasks
   938                 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks
   911                     return false;
   939                     return false;
       
   940                 }
   912 
   941 
   913                 if (debug.on()) debug.log("obtaining and initiating task execution");
   942                 if (debug.on()) debug.log("obtaining and initiating task execution");
   914                 List<Runnable> tasks = obtainTasks();
   943                 List<Runnable> tasks = obtainTasks();
   915                 executeTasks(tasks);
   944                 executeTasks(tasks);
   916                 return false;  // executeTasks will resume activity
   945                 return false;  // executeTasks will resume activity
   940         }
   969         }
   941         return l;
   970         return l;
   942     }
   971     }
   943 
   972 
   944     private void executeTasks(List<Runnable> tasks) {
   973     private void executeTasks(List<Runnable> tasks) {
   945         if (tasks.isEmpty())
       
   946             return;
       
   947         exec.execute(() -> {
   974         exec.execute(() -> {
   948             try {
   975             try {
   949                 List<Runnable> nextTasks = tasks;
   976                 List<Runnable> nextTasks = tasks;
       
   977                 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size()));
   950                 do {
   978                 do {
   951                     nextTasks.forEach(Runnable::run);
   979                     nextTasks.forEach(Runnable::run);
   952                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
   980                     if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
   953                         nextTasks = obtainTasks();
   981                         nextTasks = obtainTasks();
   954                     } else {
   982                     } else {
       
   983                         int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS);
       
   984                         if ((s & DOING_TASKS) != 0) {
       
   985                             if (debug.on()) debug.log("re-running tasks (B)");
       
   986                             nextTasks = obtainTasks();
       
   987                             continue;
       
   988                         }
   955                         break;
   989                         break;
   956                     }
   990                     }
   957                 } while (true);
   991                 } while (true);
   958                 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS);
   992                 if (debug.on()) debug.log("finished task execution");
   959                 resumeActivity();
   993                 resumeActivity();
   960             } catch (Throwable t) {
   994             } catch (Throwable t) {
   961                 handleError(t);
   995                 handleError(t);
   962             }
   996             }
   963         });
   997         });