src/java.net.http/share/classes/jdk/internal/net/http/common/SSLFlowDelegate.java
equal
deleted
inserted
replaced
1 /* |
1 /* |
2 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 2017, 2019, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. Oracle designates this |
7 * published by the Free Software Foundation. Oracle designates this |
30 import javax.net.ssl.SSLEngine; |
30 import javax.net.ssl.SSLEngine; |
31 import javax.net.ssl.SSLEngineResult; |
31 import javax.net.ssl.SSLEngineResult; |
32 import javax.net.ssl.SSLEngineResult.HandshakeStatus; |
32 import javax.net.ssl.SSLEngineResult.HandshakeStatus; |
33 import javax.net.ssl.SSLEngineResult.Status; |
33 import javax.net.ssl.SSLEngineResult.Status; |
34 import javax.net.ssl.SSLException; |
34 import javax.net.ssl.SSLException; |
|
35 import javax.net.ssl.SSLHandshakeException; |
35 import java.io.IOException; |
36 import java.io.IOException; |
36 import java.lang.ref.Reference; |
37 import java.lang.ref.Reference; |
37 import java.lang.ref.ReferenceQueue; |
38 import java.lang.ref.ReferenceQueue; |
38 import java.lang.ref.WeakReference; |
39 import java.lang.ref.WeakReference; |
39 import java.nio.ByteBuffer; |
40 import java.nio.ByteBuffer; |
107 final CompletableFuture<String> alpnCF; // completes on initial handshake |
108 final CompletableFuture<String> alpnCF; // completes on initial handshake |
108 final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped |
109 final Monitorable monitor = isMonitored ? this::monitor : null; // prevent GC until SSLFD is stopped |
109 volatile boolean close_notify_received; |
110 volatile boolean close_notify_received; |
110 final CompletableFuture<Void> readerCF; |
111 final CompletableFuture<Void> readerCF; |
111 final CompletableFuture<Void> writerCF; |
112 final CompletableFuture<Void> writerCF; |
|
113 final CompletableFuture<Void> stopCF; |
112 final Consumer<ByteBuffer> recycler; |
114 final Consumer<ByteBuffer> recycler; |
113 static AtomicInteger scount = new AtomicInteger(1); |
115 static AtomicInteger scount = new AtomicInteger(1); |
114 final int id; |
116 final int id; |
115 |
117 |
116 /** |
118 /** |
147 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); |
149 this.handshakeState = new AtomicInteger(NOT_HANDSHAKING); |
148 this.readerCF = reader.completion(); |
150 this.readerCF = reader.completion(); |
149 this.writerCF = reader.completion(); |
151 this.writerCF = reader.completion(); |
150 readerCF.exceptionally(this::stopOnError); |
152 readerCF.exceptionally(this::stopOnError); |
151 writerCF.exceptionally(this::stopOnError); |
153 writerCF.exceptionally(this::stopOnError); |
152 |
154 this.stopCF = CompletableFuture.allOf(reader.completion(), writer.completion()) |
153 CompletableFuture.allOf(reader.completion(), writer.completion()) |
|
154 .thenRun(this::normalStop); |
155 .thenRun(this::normalStop); |
155 this.alpnCF = new MinimalFuture<>(); |
156 this.alpnCF = new MinimalFuture<>(); |
156 |
157 |
157 // connect the Reader to the downReader and the |
158 // connect the Reader to the downReader and the |
158 // Writer to the downWriter. |
159 // Writer to the downWriter. |
300 @Override |
301 @Override |
301 public String toString() { |
302 public String toString() { |
302 return "READER: " + super.toString() + ", readBuf: " + readBuf.toString() |
303 return "READER: " + super.toString() + ", readBuf: " + readBuf.toString() |
303 + ", count: " + count.toString() + ", scheduler: " |
304 + ", count: " + count.toString() + ", scheduler: " |
304 + (scheduler.isStopped() ? "stopped" : "running") |
305 + (scheduler.isStopped() ? "stopped" : "running") |
305 + ", status: " + lastUnwrapStatus; |
306 + ", status: " + lastUnwrapStatus |
|
307 + ", handshakeState: " + handshakeState.get() |
|
308 + ", engine: " + engine.getHandshakeStatus(); |
306 } |
309 } |
307 |
310 |
308 private void reallocReadBuf() { |
311 private void reallocReadBuf() { |
309 int sz = readBuf.capacity(); |
312 int sz = readBuf.capacity(); |
310 ByteBuffer newb = ByteBuffer.allocate(sz * 2); |
313 ByteBuffer newb = ByteBuffer.allocate(sz * 2); |
427 return; |
430 return; |
428 } |
431 } |
429 if (complete && result.status() == Status.CLOSED) { |
432 if (complete && result.status() == Status.CLOSED) { |
430 if (debugr.on()) debugr.log("Closed: completing"); |
433 if (debugr.on()) debugr.log("Closed: completing"); |
431 outgoing(Utils.EMPTY_BB_LIST, true); |
434 outgoing(Utils.EMPTY_BB_LIST, true); |
|
435 // complete ALPN if not yet completed |
|
436 setALPN(); |
432 return; |
437 return; |
433 } |
438 } |
434 if (result.handshaking()) { |
439 if (result.handshaking()) { |
435 handshaking = true; |
440 handshaking = true; |
436 if (debugr.on()) debugr.log("handshaking"); |
441 if (debugr.on()) debugr.log("handshaking"); |
437 if (doHandshake(result, READER)) continue; // need unwrap |
442 if (doHandshake(result, READER)) continue; // need unwrap |
438 else break; // doHandshake will have triggered the write scheduler if necessary |
443 else break; // doHandshake will have triggered the write scheduler if necessary |
439 } else { |
444 } else { |
440 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { |
445 if (trySetALPN()) { |
441 handshaking = false; |
|
442 applicationBufferSize = engine.getSession().getApplicationBufferSize(); |
|
443 packetBufferSize = engine.getSession().getPacketBufferSize(); |
|
444 setALPN(); |
|
445 resumeActivity(); |
446 resumeActivity(); |
446 } |
447 } |
447 } |
448 } |
448 } catch (IOException ex) { |
449 } catch (IOException ex) { |
449 errorCommon(ex); |
450 errorCommon(ex); |
739 |
740 |
740 if (result.status() == Status.CLOSED) { |
741 if (result.status() == Status.CLOSED) { |
741 if (!upstreamCompleted) { |
742 if (!upstreamCompleted) { |
742 upstreamCompleted = true; |
743 upstreamCompleted = true; |
743 upstreamSubscription.cancel(); |
744 upstreamSubscription.cancel(); |
|
745 // complete ALPN if not yet completed |
|
746 setALPN(); |
744 } |
747 } |
745 if (result.bytesProduced() <= 0) |
748 if (result.bytesProduced() <= 0) |
746 return; |
749 return; |
747 |
750 |
748 if (!completing && !completed) { |
751 if (!completing && !completed) { |
756 if (result.handshaking()) { |
759 if (result.handshaking()) { |
757 if (debugw.on()) debugw.log("handshaking"); |
760 if (debugw.on()) debugw.log("handshaking"); |
758 doHandshake(result, WRITER); // ok to ignore return |
761 doHandshake(result, WRITER); // ok to ignore return |
759 handshaking = true; |
762 handshaking = true; |
760 } else { |
763 } else { |
761 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { |
764 if (trySetALPN()) { |
762 applicationBufferSize = engine.getSession().getApplicationBufferSize(); |
|
763 packetBufferSize = engine.getSession().getPacketBufferSize(); |
|
764 setALPN(); |
|
765 resumeActivity(); |
765 resumeActivity(); |
766 } |
766 } |
767 } |
767 } |
768 cleanList(writeList); // tidy up the source list |
768 cleanList(writeList); // tidy up the source list |
769 sendResultBytes(result); |
769 sendResultBytes(result); |
912 if (stopped) |
912 if (stopped) |
913 return; |
913 return; |
914 stopped = true; |
914 stopped = true; |
915 reader.stop(); |
915 reader.stop(); |
916 writer.stop(); |
916 writer.stop(); |
|
917 // make sure the alpnCF is completed. |
|
918 if (!alpnCF.isDone()) { |
|
919 Throwable alpn = new SSLHandshakeException( |
|
920 "Connection closed before successful ALPN negotiation"); |
|
921 alpnCF.completeExceptionally(alpn); |
|
922 } |
917 if (isMonitored) Monitor.remove(monitor); |
923 if (isMonitored) Monitor.remove(monitor); |
918 } |
924 } |
919 |
925 |
920 private Void stopOnError(Throwable currentlyUnused) { |
926 private Void stopOnError(Throwable error) { |
921 // maybe log, etc |
927 // maybe log, etc |
|
928 // ensure the ALPN is completed |
|
929 // We could also do this in SSLTube.SSLSubscriberWrapper |
|
930 // onError/onComplete - with the caveat that the ALP CF |
|
931 // would get completed externally. Doing it here keeps |
|
932 // it all inside SSLFlowDelegate. |
|
933 if (!alpnCF.isDone()) { |
|
934 alpnCF.completeExceptionally(error); |
|
935 } |
922 normalStop(); |
936 normalStop(); |
923 return null; |
937 return null; |
924 } |
938 } |
925 |
939 |
926 private void cleanList(List<ByteBuffer> l) { |
940 private void cleanList(List<ByteBuffer> l) { |
1068 } |
1082 } |
1069 break; |
1083 break; |
1070 } |
1084 } |
1071 } while (true); |
1085 } while (true); |
1072 if (debug.on()) debug.log("finished task execution"); |
1086 if (debug.on()) debug.log("finished task execution"); |
|
1087 HandshakeStatus hs = engine.getHandshakeStatus(); |
|
1088 if (hs == HandshakeStatus.FINISHED || hs == HandshakeStatus.NOT_HANDSHAKING) { |
|
1089 // We're no longer handshaking, try setting ALPN |
|
1090 trySetALPN(); |
|
1091 } |
1073 resumeActivity(); |
1092 resumeActivity(); |
1074 } catch (Throwable t) { |
1093 } catch (Throwable t) { |
1075 handleError(t); |
1094 handleError(t); |
1076 } |
1095 } |
1077 }); |
1096 }); |
|
1097 } |
|
1098 |
|
1099 boolean trySetALPN() { |
|
1100 // complete ALPN CF if needed. |
|
1101 if ((handshakeState.getAndSet(NOT_HANDSHAKING) & ~DOING_TASKS) == HANDSHAKING) { |
|
1102 applicationBufferSize = engine.getSession().getApplicationBufferSize(); |
|
1103 packetBufferSize = engine.getSession().getPacketBufferSize(); |
|
1104 setALPN(); |
|
1105 return true; |
|
1106 } |
|
1107 return false; |
1078 } |
1108 } |
1079 |
1109 |
1080 // FIXME: acknowledge a received CLOSE request from peer |
1110 // FIXME: acknowledge a received CLOSE request from peer |
1081 EngineResult doClosure(EngineResult r) throws IOException { |
1111 EngineResult doClosure(EngineResult r) throws IOException { |
1082 if (debug.on()) |
1112 if (debug.on()) |