44 import java.util.concurrent.Executor; |
44 import java.util.concurrent.Executor; |
45 import java.util.concurrent.Flow; |
45 import java.util.concurrent.Flow; |
46 import java.util.concurrent.Flow.Subscriber; |
46 import java.util.concurrent.Flow.Subscriber; |
47 import java.util.concurrent.atomic.AtomicInteger; |
47 import java.util.concurrent.atomic.AtomicInteger; |
48 import java.util.function.Consumer; |
48 import java.util.function.Consumer; |
|
49 import java.util.function.IntBinaryOperator; |
49 |
50 |
50 /** |
51 /** |
51 * Implements SSL using two SubscriberWrappers. |
52 * Implements SSL using two SubscriberWrappers. |
52 * |
53 * |
53 * <p> Constructor takes two Flow.Subscribers: one that receives the network |
54 * <p> Constructor takes two Flow.Subscribers: one that receives the network |
89 |
90 |
90 private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; |
91 private static final ByteBuffer SENTINEL = Utils.EMPTY_BYTEBUFFER; |
91 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); |
92 private static final ByteBuffer HS_TRIGGER = ByteBuffer.allocate(0); |
92 // When handshake is in progress trying to wrap may produce no bytes. |
93 // When handshake is in progress trying to wrap may produce no bytes. |
93 private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); |
94 private static final ByteBuffer NOTHING = ByteBuffer.allocate(0); |
|
95 private static final String monProp = Utils.getProperty("jdk.internal.httpclient.monitorFlowDelegate"); |
94 |
96 |
95 final Executor exec; |
97 final Executor exec; |
96 final Reader reader; |
98 final Reader reader; |
97 final Writer writer; |
99 final Writer writer; |
98 final SSLEngine engine; |
100 final SSLEngine engine; |
148 |
150 |
149 // connect the Reader to the downReader and the |
151 // connect the Reader to the downReader and the |
150 // Writer to the downWriter. |
152 // Writer to the downWriter. |
151 connect(downReader, downWriter); |
153 connect(downReader, downWriter); |
152 |
154 |
153 //Monitor.add(this::monitor); |
155 if (monProp != null && (monProp.equals("") || monProp.equalsIgnoreCase("true"))) |
|
156 Monitor.add(this::monitor); |
154 } |
157 } |
155 |
158 |
156 /** |
159 /** |
157 * Returns true if the SSLFlowDelegate has detected a TLS |
160 * Returns true if the SSLFlowDelegate has detected a TLS |
158 * close_notify from the server. |
161 * close_notify from the server. |
862 * and write() functions. |
865 * and write() functions. |
863 */ |
866 */ |
864 private static final int NOT_HANDSHAKING = 0; |
867 private static final int NOT_HANDSHAKING = 0; |
865 private static final int HANDSHAKING = 1; |
868 private static final int HANDSHAKING = 1; |
866 |
869 |
867 private static final int DOING_TASKS = 4; // bit added to above state |
870 // Bit flags |
|
871 // a thread is currently executing tasks |
|
872 private static final int DOING_TASKS = 4; |
|
873 // a thread wants to execute tasks, while another thread is executing |
|
874 private static final int REQUESTING_TASKS = 8; |
|
875 private static final int TASK_BITS = 12; // Both bits |
868 |
876 |
869 private static final int READER = 1; |
877 private static final int READER = 1; |
870 private static final int WRITER = 2; |
878 private static final int WRITER = 2; |
871 |
879 |
872 private static String states(AtomicInteger state) { |
880 private static String states(AtomicInteger state) { |
873 int s = state.get(); |
881 int s = state.get(); |
874 StringBuilder sb = new StringBuilder(); |
882 StringBuilder sb = new StringBuilder(); |
875 int x = s & ~DOING_TASKS; |
883 int x = s & ~TASK_BITS; |
876 switch (x) { |
884 switch (x) { |
877 case NOT_HANDSHAKING: |
885 case NOT_HANDSHAKING: |
878 sb.append(" NOT_HANDSHAKING "); |
886 sb.append(" NOT_HANDSHAKING "); |
879 break; |
887 break; |
880 case HANDSHAKING: |
888 case HANDSHAKING: |
883 default: |
891 default: |
884 throw new InternalError(); |
892 throw new InternalError(); |
885 } |
893 } |
886 if ((s & DOING_TASKS) > 0) |
894 if ((s & DOING_TASKS) > 0) |
887 sb.append("|DOING_TASKS"); |
895 sb.append("|DOING_TASKS"); |
|
896 if ((s & REQUESTING_TASKS) > 0) |
|
897 sb.append("|REQUESTING_TASKS"); |
888 return sb.toString(); |
898 return sb.toString(); |
889 } |
899 } |
890 |
900 |
891 private void resumeActivity() { |
901 private void resumeActivity() { |
892 reader.schedule(); |
902 reader.schedule(); |
895 |
905 |
896 final AtomicInteger handshakeState; |
906 final AtomicInteger handshakeState; |
897 final ConcurrentLinkedQueue<String> stateList = |
907 final ConcurrentLinkedQueue<String> stateList = |
898 debug.on() ? new ConcurrentLinkedQueue<>() : null; |
908 debug.on() ? new ConcurrentLinkedQueue<>() : null; |
899 |
909 |
|
910 // Atomically executed to update task bits. Sets either DOING_TASKS or REQUESTING_TASKS |
|
911 // depending on previous value |
|
912 private static final IntBinaryOperator REQUEST_OR_DO_TASKS = (current, ignored) -> { |
|
913 if ((current & DOING_TASKS) == 0) |
|
914 return DOING_TASKS | (current & HANDSHAKING); |
|
915 else |
|
916 return DOING_TASKS | REQUESTING_TASKS | (current & HANDSHAKING); |
|
917 }; |
|
918 |
|
919 // Atomically executed to update task bits. Sets DOING_TASKS if REQUESTING was set |
|
920 // clears bits if not. |
|
921 private static final IntBinaryOperator FINISH_OR_DO_TASKS = (current, ignored) -> { |
|
922 if ((current & REQUESTING_TASKS) != 0) |
|
923 return DOING_TASKS | (current & HANDSHAKING); |
|
924 // clear both bits |
|
925 return (current & HANDSHAKING); |
|
926 }; |
|
927 |
900 private boolean doHandshake(EngineResult r, int caller) { |
928 private boolean doHandshake(EngineResult r, int caller) { |
901 // unconditionally sets the HANDSHAKING bit, while preserving DOING_TASKS |
929 // unconditionally sets the HANDSHAKING bit, while preserving task bits |
902 handshakeState.getAndAccumulate(HANDSHAKING, (current, update) -> update | (current & DOING_TASKS)); |
930 handshakeState.getAndAccumulate(0, (current, unused) -> HANDSHAKING | (current & TASK_BITS)); |
903 if (stateList != null && debug.on()) { |
931 if (stateList != null && debug.on()) { |
904 stateList.add(r.handshakeStatus().toString()); |
932 stateList.add(r.handshakeStatus().toString()); |
905 stateList.add(Integer.toString(caller)); |
933 stateList.add(Integer.toString(caller)); |
906 } |
934 } |
907 switch (r.handshakeStatus()) { |
935 switch (r.handshakeStatus()) { |
908 case NEED_TASK: |
936 case NEED_TASK: |
909 int s = handshakeState.getAndUpdate((current) -> current | DOING_TASKS); |
937 int s = handshakeState.accumulateAndGet(0, REQUEST_OR_DO_TASKS); |
910 if ((s & DOING_TASKS) > 0) // someone else was doing tasks |
938 if ((s & REQUESTING_TASKS) > 0) { // someone else is or will do tasks |
911 return false; |
939 return false; |
|
940 } |
912 |
941 |
913 if (debug.on()) debug.log("obtaining and initiating task execution"); |
942 if (debug.on()) debug.log("obtaining and initiating task execution"); |
914 List<Runnable> tasks = obtainTasks(); |
943 List<Runnable> tasks = obtainTasks(); |
915 executeTasks(tasks); |
944 executeTasks(tasks); |
916 return false; // executeTasks will resume activity |
945 return false; // executeTasks will resume activity |
940 } |
969 } |
941 return l; |
970 return l; |
942 } |
971 } |
943 |
972 |
944 private void executeTasks(List<Runnable> tasks) { |
973 private void executeTasks(List<Runnable> tasks) { |
945 if (tasks.isEmpty()) |
|
946 return; |
|
947 exec.execute(() -> { |
974 exec.execute(() -> { |
948 try { |
975 try { |
949 List<Runnable> nextTasks = tasks; |
976 List<Runnable> nextTasks = tasks; |
|
977 if (debug.on()) debug.log("#tasks to execute: " + Integer.toString(nextTasks.size())); |
950 do { |
978 do { |
951 nextTasks.forEach(Runnable::run); |
979 nextTasks.forEach(Runnable::run); |
952 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { |
980 if (engine.getHandshakeStatus() == HandshakeStatus.NEED_TASK) { |
953 nextTasks = obtainTasks(); |
981 nextTasks = obtainTasks(); |
954 } else { |
982 } else { |
|
983 int s = handshakeState.accumulateAndGet(0, FINISH_OR_DO_TASKS); |
|
984 if ((s & DOING_TASKS) != 0) { |
|
985 if (debug.on()) debug.log("re-running tasks (B)"); |
|
986 nextTasks = obtainTasks(); |
|
987 continue; |
|
988 } |
955 break; |
989 break; |
956 } |
990 } |
957 } while (true); |
991 } while (true); |
958 handshakeState.getAndUpdate((current) -> current & ~DOING_TASKS); |
992 if (debug.on()) debug.log("finished task execution"); |
959 resumeActivity(); |
993 resumeActivity(); |
960 } catch (Throwable t) { |
994 } catch (Throwable t) { |
961 handleError(t); |
995 handleError(t); |
962 } |
996 } |
963 }); |
997 }); |