31 import jdk.internal.net.http.common.SequentialScheduler; |
31 import jdk.internal.net.http.common.SequentialScheduler; |
32 import jdk.internal.net.http.common.Utils; |
32 import jdk.internal.net.http.common.Utils; |
33 import jdk.internal.net.http.websocket.OpeningHandshake.Result; |
33 import jdk.internal.net.http.websocket.OpeningHandshake.Result; |
34 |
34 |
35 import java.io.IOException; |
35 import java.io.IOException; |
|
36 import java.io.InterruptedIOException; |
36 import java.lang.ref.Reference; |
37 import java.lang.ref.Reference; |
37 import java.net.ProtocolException; |
38 import java.net.ProtocolException; |
38 import java.net.URI; |
39 import java.net.URI; |
39 import java.net.http.WebSocket; |
40 import java.net.http.WebSocket; |
40 import java.nio.ByteBuffer; |
41 import java.nio.ByteBuffer; |
41 import java.util.Objects; |
42 import java.util.Objects; |
42 import java.util.concurrent.CompletableFuture; |
43 import java.util.concurrent.CompletableFuture; |
43 import java.util.concurrent.CompletionStage; |
44 import java.util.concurrent.CompletionStage; |
|
45 import java.util.concurrent.TimeUnit; |
44 import java.util.concurrent.TimeoutException; |
46 import java.util.concurrent.TimeoutException; |
45 import java.util.concurrent.atomic.AtomicBoolean; |
47 import java.util.concurrent.atomic.AtomicBoolean; |
46 import java.util.concurrent.atomic.AtomicLong; |
48 import java.util.concurrent.atomic.AtomicLong; |
47 import java.util.concurrent.atomic.AtomicReference; |
49 import java.util.concurrent.atomic.AtomicReference; |
48 import java.util.function.BiConsumer; |
50 import java.util.function.BiConsumer; |
49 import java.util.function.Function; |
51 import java.util.function.Function; |
50 |
52 |
51 import static java.util.Objects.requireNonNull; |
53 import static java.util.Objects.requireNonNull; |
|
54 import static jdk.internal.net.http.common.MinimalFuture.completedFuture; |
52 import static jdk.internal.net.http.common.MinimalFuture.failedFuture; |
55 import static jdk.internal.net.http.common.MinimalFuture.failedFuture; |
53 import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY; |
56 import static jdk.internal.net.http.websocket.StatusCodes.CLOSED_ABNORMALLY; |
54 import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE; |
57 import static jdk.internal.net.http.websocket.StatusCodes.NO_STATUS_CODE; |
55 import static jdk.internal.net.http.websocket.StatusCodes.isLegalToSendFromClient; |
58 import static jdk.internal.net.http.websocket.StatusCodes.isLegalToSendFromClient; |
56 import static jdk.internal.net.http.websocket.WebSocketImpl.State.BINARY; |
59 import static jdk.internal.net.http.websocket.WebSocketImpl.State.BINARY; |
148 this.uri = requireNonNull(uri); |
153 this.uri = requireNonNull(uri); |
149 this.subprotocol = requireNonNull(subprotocol); |
154 this.subprotocol = requireNonNull(subprotocol); |
150 this.listener = requireNonNull(listener); |
155 this.listener = requireNonNull(listener); |
151 this.transport = transportFactory.createTransport( |
156 this.transport = transportFactory.createTransport( |
152 new SignallingMessageConsumer()); |
157 new SignallingMessageConsumer()); |
|
158 closeTimeout = readCloseTimeout(); |
|
159 } |
|
160 |
|
161 private static int readCloseTimeout() { |
|
162 String property = "jdk.httpclient.websocket.closeTimeout"; |
|
163 int defaultValue = 30; |
|
164 String value = Utils.getNetProperty(property); |
|
165 int v; |
|
166 if (value == null) { |
|
167 v = defaultValue; |
|
168 } else { |
|
169 try { |
|
170 v = Integer.parseUnsignedInt(value); |
|
171 } catch (NumberFormatException ignored) { |
|
172 v = defaultValue; |
|
173 } |
|
174 } |
|
175 if (DEBUG) { |
|
176 System.out.printf("[WebSocket] %s=%s, using value %s%n", |
|
177 property, value, v); |
|
178 } |
|
179 return v; |
153 } |
180 } |
154 |
181 |
155 // FIXME: add to action handling of errors -> signalError() |
182 // FIXME: add to action handling of errors -> signalError() |
156 |
183 |
157 @Override |
184 @Override |
158 public CompletableFuture<WebSocket> sendText(CharSequence message, |
185 public CompletableFuture<WebSocket> sendText(CharSequence message, |
159 boolean isLast) { |
186 boolean isLast) { |
160 Objects.requireNonNull(message); |
187 Objects.requireNonNull(message); |
161 long id; |
188 long id; |
162 if (DEBUG) { |
189 if (DEBUG) { |
163 id = counter.incrementAndGet(); |
190 id = sendCounter.incrementAndGet(); |
164 System.out.printf("[WebSocket] %s send text: payload length=%s last=%s%n", |
191 System.out.printf("[WebSocket] %s send text: payload length=%s last=%s%n", |
165 id, message.length(), isLast); |
192 id, message.length(), isLast); |
166 } |
193 } |
167 CompletableFuture<WebSocket> result; |
194 CompletableFuture<WebSocket> result; |
168 if (!outstandingSend.compareAndSet(false, true)) { |
195 if (!outstandingSend.compareAndSet(false, true)) { |
182 public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, |
209 public CompletableFuture<WebSocket> sendBinary(ByteBuffer message, |
183 boolean isLast) { |
210 boolean isLast) { |
184 Objects.requireNonNull(message); |
211 Objects.requireNonNull(message); |
185 long id; |
212 long id; |
186 if (DEBUG) { |
213 if (DEBUG) { |
187 id = counter.incrementAndGet(); |
214 id = sendCounter.incrementAndGet(); |
188 System.out.printf("[WebSocket] %s send binary: payload=%s last=%s%n", |
215 System.out.printf("[WebSocket] %s send binary: payload=%s last=%s%n", |
189 id, message, isLast); |
216 id, message, isLast); |
190 } |
217 } |
191 CompletableFuture<WebSocket> result; |
218 CompletableFuture<WebSocket> result; |
192 if (!outstandingSend.compareAndSet(false, true)) { |
219 if (!outstandingSend.compareAndSet(false, true)) { |
252 public CompletableFuture<WebSocket> sendClose(int statusCode, |
279 public CompletableFuture<WebSocket> sendClose(int statusCode, |
253 String reason) { |
280 String reason) { |
254 Objects.requireNonNull(reason); |
281 Objects.requireNonNull(reason); |
255 long id; |
282 long id; |
256 if (DEBUG) { |
283 if (DEBUG) { |
257 id = counter.incrementAndGet(); |
284 id = sendCounter.incrementAndGet(); |
258 System.out.printf("[WebSocket] %s send close: statusCode=%s, reason.length=%s%n", |
285 System.out.printf("[WebSocket] %s send close: statusCode=%s, reason.length=%s%n", |
259 id, statusCode, reason); |
286 id, statusCode, reason); |
260 } |
287 } |
261 CompletableFuture<WebSocket> result; |
288 CompletableFuture<WebSocket> result; |
262 if (!isLegalToSendFromClient(statusCode)) { |
289 if (!isLegalToSendFromClient(statusCode)) { |
270 id, result); |
297 id, result); |
271 } |
298 } |
272 return replaceNull(result); |
299 return replaceNull(result); |
273 } |
300 } |
274 |
301 |
275 /* |
|
276 * Sends a Close message, then shuts down the output since no more |
|
277 * messages are expected to be sent at this point. |
|
278 */ |
|
279 private CompletableFuture<WebSocket> sendClose0(int statusCode, |
302 private CompletableFuture<WebSocket> sendClose0(int statusCode, |
280 String reason) { |
303 String reason) { |
281 outputClosed = true; |
304 outputClosed = true; |
282 BiConsumer<WebSocket, Throwable> closer = (r, e) -> { |
|
283 Throwable cause = Utils.getCompletionCause(e); |
|
284 if (cause instanceof IllegalArgumentException) { |
|
285 // or pre=check it (isLegalToSendFromClient(statusCode)) |
|
286 return; |
|
287 } |
|
288 try { |
|
289 transport.closeOutput(); |
|
290 } catch (IOException ex) { |
|
291 Log.logError(ex); |
|
292 } |
|
293 if (cause instanceof TimeoutException) { // FIXME: it is not the case anymore |
|
294 if (DEBUG) { |
|
295 System.out.println("[WebSocket] sendClose0 error: " + e); |
|
296 } |
|
297 try { |
|
298 transport.closeInput(); |
|
299 } catch (IOException ex) { |
|
300 Log.logError(ex); |
|
301 } |
|
302 } |
|
303 }; |
|
304 CompletableFuture<WebSocket> cf |
305 CompletableFuture<WebSocket> cf |
305 = transport.sendClose(statusCode, reason, this, closer); |
306 = transport.sendClose(statusCode, reason, this, (r, e) -> { }); |
306 return cf; |
307 CompletableFuture<WebSocket> closeOrTimeout |
|
308 = replaceNull(cf).orTimeout(closeTimeout, TimeUnit.SECONDS); |
|
309 // The snippet below, whose purpose might not be immediately obvious, |
|
310 // is a trick used to complete a dependant stage with an IOException. |
|
311 // A checked IOException cannot be thrown from inside the BiConsumer |
|
312 // supplied to the handle method. Instead a CompletionStage completed |
|
313 // exceptionally with this IOException is returned. |
|
314 return closeOrTimeout.handle(this::processCloseOutcome) |
|
315 .thenCompose(Function.identity()); |
|
316 } |
|
317 |
|
318 private CompletionStage<WebSocket> processCloseOutcome(WebSocket webSocket, |
|
319 Throwable e) { |
|
320 if (DEBUG) { |
|
321 System.out.printf("[WebSocket] send close completed, error=%s%n", e); |
|
322 if (e != null) { |
|
323 e.printStackTrace(System.out); |
|
324 } |
|
325 } |
|
326 if (e == null) { |
|
327 return completedFuture(webSocket); |
|
328 } |
|
329 Throwable cause = Utils.getCompletionCause(e); |
|
330 if (cause instanceof IllegalArgumentException) { |
|
331 return failedFuture(cause); |
|
332 } |
|
333 try { |
|
334 transport.closeOutput(); |
|
335 } catch (IOException ignored) { } |
|
336 |
|
337 if (cause instanceof TimeoutException) { |
|
338 inputClosed = true; |
|
339 try { |
|
340 transport.closeInput(); |
|
341 } catch (IOException ignored) { } |
|
342 return failedFuture(new InterruptedIOException( |
|
343 "Could not send close within a reasonable timeout")); |
|
344 } |
|
345 return failedFuture(cause); |
307 } |
346 } |
308 |
347 |
309 @Override |
348 @Override |
310 public void request(long n) { |
349 public void request(long n) { |
311 if (DEBUG) { |
350 if (DEBUG) { |
396 processPong(); |
442 processPong(); |
397 tryChangeState(PONG, IDLE); |
443 tryChangeState(PONG, IDLE); |
398 break; |
444 break; |
399 case CLOSE: |
445 case CLOSE: |
400 processClose(); |
446 processClose(); |
401 return; |
447 break loop; |
402 case ERROR: |
448 case ERROR: |
403 processError(); |
449 processError(); |
404 return; |
450 break loop; |
405 case IDLE: |
451 case IDLE: |
406 if (demand.tryDecrement() |
452 if (demand.tryDecrement() |
407 && tryChangeState(IDLE, WAITING)) { |
453 && tryChangeState(IDLE, WAITING)) { |
408 transport.request(1); |
454 transport.request(1); |
409 } |
455 } |
410 return; |
456 break loop; |
411 case WAITING: |
457 case WAITING: |
412 // For debugging spurious signalling: when there was a |
458 // For debugging spurious signalling: when there was a |
413 // signal, but apparently nothing has changed |
459 // signal, but apparently nothing has changed |
414 return; |
460 break loop; |
415 default: |
461 default: |
416 throw new InternalError(String.valueOf(s)); |
462 throw new InternalError(String.valueOf(s)); |
417 } |
463 } |
418 } catch (Throwable t) { |
464 } catch (Throwable t) { |
419 signalError(t); |
465 signalError(t); |
420 } |
466 } |
|
467 } |
|
468 if (DEBUG) { |
|
469 System.out.printf("[WebSocket] exit receive task%n"); |
421 } |
470 } |
422 } |
471 } |
423 |
472 |
424 private void processError() throws IOException { |
473 private void processError() throws IOException { |
425 if (DEBUG) { |
474 if (DEBUG) { |
429 receiveScheduler.stop(); |
478 receiveScheduler.stop(); |
430 Throwable err = error.get(); |
479 Throwable err = error.get(); |
431 if (err instanceof FailWebSocketException) { |
480 if (err instanceof FailWebSocketException) { |
432 int code1 = ((FailWebSocketException) err).getStatusCode(); |
481 int code1 = ((FailWebSocketException) err).getStatusCode(); |
433 err = new ProtocolException().initCause(err); |
482 err = new ProtocolException().initCause(err); |
434 sendClose0(code1, "") |
483 if (DEBUG) { |
|
484 System.out.printf("[WebSocket] failing %s with error=%s statusCode=%s%n", |
|
485 WebSocketImpl.this, err, code1); |
|
486 } |
|
487 sendClose0(code1, "") // TODO handle errors from here |
435 .whenComplete( |
488 .whenComplete( |
436 (r, e) -> { |
489 (r, e) -> { |
437 if (e != null) { |
490 if (e != null) { |
438 Log.logError(e); |
491 Log.logError(e); |
439 } |
492 } |
440 }); |
493 }); |
441 } |
494 } |
442 listener.onError(WebSocketImpl.this, err); |
495 long id; |
|
496 if (DEBUG) { |
|
497 id = receiveCounter.incrementAndGet(); |
|
498 System.out.printf("[WebSocket] enter onError %s error=%s%n", |
|
499 id, err); |
|
500 } |
|
501 try { |
|
502 listener.onError(WebSocketImpl.this, err); |
|
503 } finally { |
|
504 if (DEBUG) { |
|
505 System.out.printf("[WebSocket] exit onError %s%n", id); |
|
506 } |
|
507 } |
443 } |
508 } |
444 |
509 |
445 private void processClose() throws IOException { |
510 private void processClose() throws IOException { |
446 if (DEBUG) { |
511 if (DEBUG) { |
447 System.out.println("[WebSocket] processClose"); |
512 System.out.println("[WebSocket] processClose"); |
448 } |
513 } |
449 transport.closeInput(); |
514 transport.closeInput(); |
450 receiveScheduler.stop(); |
515 receiveScheduler.stop(); |
451 CompletionStage<?> readyToClose; |
516 CompletionStage<?> cs = null; // when the listener is ready to close |
452 readyToClose = listener.onClose(WebSocketImpl.this, statusCode, reason); |
517 long id; |
453 if (readyToClose == null) { |
518 if (DEBUG) { |
454 readyToClose = DONE; |
519 id = receiveCounter.incrementAndGet(); |
|
520 System.out.printf("[WebSocket] enter onClose %s statusCode=%s reason.length=%s%n", |
|
521 id, statusCode, reason.length()); |
|
522 } |
|
523 try { |
|
524 cs = listener.onClose(WebSocketImpl.this, statusCode, reason); |
|
525 } finally { |
|
526 System.out.printf("[WebSocket] exit onClose %s returned %s%n", |
|
527 id, cs); |
|
528 } |
|
529 if (cs == null) { |
|
530 cs = DONE; |
455 } |
531 } |
456 int code; |
532 int code; |
457 if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) { |
533 if (statusCode == NO_STATUS_CODE || statusCode == CLOSED_ABNORMALLY) { |
458 code = NORMAL_CLOSURE; |
534 code = NORMAL_CLOSURE; |
459 if (DEBUG) { |
535 if (DEBUG) { |
474 }); |
550 }); |
475 }); |
551 }); |
476 } |
552 } |
477 |
553 |
478 private void processPong() { |
554 private void processPong() { |
479 listener.onPong(WebSocketImpl.this, binaryData); |
555 long id; |
|
556 if (DEBUG) { |
|
557 id = receiveCounter.incrementAndGet(); |
|
558 System.out.printf("[WebSocket] enter onPong %s payload=%s%n", |
|
559 id, binaryData); |
|
560 } |
|
561 CompletionStage<?> cs = null; |
|
562 try { |
|
563 cs = listener.onPong(WebSocketImpl.this, binaryData); |
|
564 } finally { |
|
565 System.out.printf("[WebSocket] exit onPong %s returned %s%n", |
|
566 id, cs); |
|
567 } |
480 } |
568 } |
481 |
569 |
482 private void processPing() { |
570 private void processPing() { |
483 // Let's make a full copy of this tiny data. What we want here |
571 if (DEBUG) { |
484 // is to rule out a possibility the shared data we send might be |
572 System.out.printf("[WebSocket] processPing%n"); |
485 // corrupted by processing in the listener. |
573 } |
486 ByteBuffer slice = binaryData.slice(); |
574 ByteBuffer slice = binaryData.slice(); |
|
575 // A full copy of this (small) data is made. This way sending a |
|
576 // replying Pong could be done in parallel with the listener |
|
577 // handling this Ping. |
487 ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining()) |
578 ByteBuffer copy = ByteBuffer.allocate(binaryData.remaining()) |
488 .put(binaryData) |
579 .put(binaryData) |
489 .flip(); |
580 .flip(); |
490 // Non-exclusive send; |
581 // Non-exclusive send; |
491 BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> { |
582 BiConsumer<WebSocketImpl, Throwable> reporter = (r, e) -> { |
492 if (e != null) { // Better error handing. What if already closed? |
583 if (e != null) { // TODO: better error handing. What if already closed? |
493 signalError(Utils.getCompletionCause(e)); |
584 signalError(Utils.getCompletionCause(e)); |
494 } |
585 } |
495 }; |
586 }; |
496 transport.sendPong(copy, WebSocketImpl.this, reporter); |
587 transport.sendPong(copy, WebSocketImpl.this, reporter); |
497 listener.onPing(WebSocketImpl.this, slice); |
588 long id; |
|
589 if (DEBUG) { |
|
590 id = receiveCounter.incrementAndGet(); |
|
591 System.out.printf("[WebSocket] enter onPing %s payload=%s%n", |
|
592 id, slice); |
|
593 } |
|
594 CompletionStage<?> cs = null; |
|
595 try { |
|
596 cs = listener.onPing(WebSocketImpl.this, slice); |
|
597 } finally { |
|
598 if (DEBUG) { |
|
599 System.out.printf("[WebSocket] exit onPing %s returned %s%n", |
|
600 id, cs); |
|
601 } |
|
602 } |
498 } |
603 } |
499 |
604 |
500 private void processBinary() { |
605 private void processBinary() { |
501 listener.onBinary(WebSocketImpl.this, binaryData, part); |
606 long id; |
|
607 if (DEBUG) { |
|
608 id = receiveCounter.incrementAndGet(); |
|
609 System.out.printf("[WebSocket] enter onBinary %s payload=%s, part=%s%n", |
|
610 id, binaryData, part); |
|
611 } |
|
612 CompletionStage<?> cs = null; |
|
613 try { |
|
614 cs = listener.onBinary(WebSocketImpl.this, binaryData, part); |
|
615 } finally { |
|
616 if (DEBUG) { |
|
617 System.out.printf("[WebSocket] exit onBinary %s returned %s%n", |
|
618 id, cs); |
|
619 } |
|
620 } |
502 } |
621 } |
503 |
622 |
504 private void processText() { |
623 private void processText() { |
505 listener.onText(WebSocketImpl.this, text, part); |
624 long id; |
|
625 if (DEBUG) { |
|
626 id = receiveCounter.incrementAndGet(); |
|
627 System.out.printf("[WebSocket] enter onText %s payload.length=%s part=%s%n", |
|
628 id, text.length(), part); |
|
629 } |
|
630 CompletionStage<?> cs = null; |
|
631 try { |
|
632 cs = listener.onText(WebSocketImpl.this, text, part); |
|
633 } finally { |
|
634 if (DEBUG) { |
|
635 System.out.printf("[WebSocket] exit onText %s returned %s%n", |
|
636 id, cs); |
|
637 } |
|
638 } |
506 } |
639 } |
507 |
640 |
508 private void processOpen() { |
641 private void processOpen() { |
509 listener.onOpen(WebSocketImpl.this); |
642 long id; |
|
643 if (DEBUG) { |
|
644 id = receiveCounter.incrementAndGet(); |
|
645 System.out.printf("[WebSocket] enter onOpen %s%n", id); |
|
646 } |
|
647 try { |
|
648 listener.onOpen(WebSocketImpl.this); |
|
649 } finally { |
|
650 if (DEBUG) { |
|
651 System.out.printf("[WebSocket] exit onOpen %s%n", id); |
|
652 } |
|
653 } |
510 } |
654 } |
511 } |
655 } |
512 |
656 |
513 private void signalOpen() { |
657 private void signalOpen() { |
514 if (DEBUG) { |
658 if (DEBUG) { |