179 if (!(x.getCause() instanceof ClosedChannelException)) |
179 if (!(x.getCause() instanceof ClosedChannelException)) |
180 throw new RuntimeException("Cause of ClosedChannelException expected"); |
180 throw new RuntimeException("Cause of ClosedChannelException expected"); |
181 } |
181 } |
182 final AtomicReference<Throwable> connectException = |
182 final AtomicReference<Throwable> connectException = |
183 new AtomicReference<Throwable>(); |
183 new AtomicReference<Throwable>(); |
184 ch.connect(server.address(), null, new CompletionHandler<Void,Void>() { |
184 ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() { |
185 public void completed(Void result, Void att) { |
185 public void completed(Void result, Void att) { |
186 } |
186 } |
187 public void failed(Throwable exc, Void att) { |
187 public void failed(Throwable exc, Void att) { |
188 connectException.set(exc); |
188 connectException.set(exc); |
189 } |
189 } |
330 SocketChannel peer = server.accept(); |
330 SocketChannel peer = server.accept(); |
331 |
331 |
332 // start read operation |
332 // start read operation |
333 final CountDownLatch latch = new CountDownLatch(1); |
333 final CountDownLatch latch = new CountDownLatch(1); |
334 ByteBuffer buf = ByteBuffer.allocate(1); |
334 ByteBuffer buf = ByteBuffer.allocate(1); |
335 Future<Integer> res = ch.read(buf, null, |
335 Future<Integer> res = ch.read(buf, (Void)null, |
336 new CompletionHandler<Integer,Void>() { |
336 new CompletionHandler<Integer,Void>() { |
337 public void completed(Integer result, Void att) { |
337 public void completed(Integer result, Void att) { |
338 } |
338 } |
339 public void failed(Throwable exc, Void att) { |
339 public void failed(Throwable exc, Void att) { |
340 } |
340 } |
395 sc.close(); |
395 sc.close(); |
396 |
396 |
397 // reads should complete immediately |
397 // reads should complete immediately |
398 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); |
398 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); |
399 final CountDownLatch latch = new CountDownLatch(1); |
399 final CountDownLatch latch = new CountDownLatch(1); |
400 ch.read(dst, null, new CompletionHandler<Integer,Void>() { |
400 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { |
401 public void completed(Integer result, Void att) { |
401 public void completed(Integer result, Void att) { |
402 int n = result; |
402 int n = result; |
403 if (n > 0) { |
403 if (n > 0) { |
404 ch.read(dst, null, this); |
404 ch.read(dst, (Void)null, this); |
405 } else { |
405 } else { |
406 latch.countDown(); |
406 latch.countDown(); |
407 } |
407 } |
408 } |
408 } |
409 public void failed(Throwable exc, Void att) { |
409 public void failed(Throwable exc, Void att) { |
448 ByteBuffer src = genBuffer(); |
448 ByteBuffer src = genBuffer(); |
449 |
449 |
450 // read until the buffer is full |
450 // read until the buffer is full |
451 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); |
451 final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); |
452 final CountDownLatch latch = new CountDownLatch(1); |
452 final CountDownLatch latch = new CountDownLatch(1); |
453 ch.read(dst, null, new CompletionHandler<Integer,Void>() { |
453 ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { |
454 public void completed(Integer result, Void att) { |
454 public void completed(Integer result, Void att) { |
455 if (dst.hasRemaining()) { |
455 if (dst.hasRemaining()) { |
456 ch.read(dst, null, this); |
456 ch.read(dst, (Void)null, this); |
457 } else { |
457 } else { |
458 latch.countDown(); |
458 latch.countDown(); |
459 } |
459 } |
460 } |
460 } |
461 public void failed(Throwable exc, Void att) { |
461 public void failed(Throwable exc, Void att) { |
506 dsts[i] = ByteBuffer.allocateDirect(100); |
506 dsts[i] = ByteBuffer.allocateDirect(100); |
507 } |
507 } |
508 |
508 |
509 // scattering read that completes ascynhronously |
509 // scattering read that completes ascynhronously |
510 final CountDownLatch latch = new CountDownLatch(1); |
510 final CountDownLatch latch = new CountDownLatch(1); |
511 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, null, |
511 ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, |
512 new CompletionHandler<Long,Void>() { |
512 new CompletionHandler<Long,Void>() { |
513 public void completed(Long result, Void att) { |
513 public void completed(Long result, Void att) { |
514 long n = result; |
514 long n = result; |
515 if (n <= 0) |
515 if (n <= 0) |
516 throw new RuntimeException("No bytes read"); |
516 throw new RuntimeException("No bytes read"); |
534 // read should complete immediately |
534 // read should complete immediately |
535 for (int i=0; i<dsts.length; i++) { |
535 for (int i=0; i<dsts.length; i++) { |
536 dsts[i].rewind(); |
536 dsts[i].rewind(); |
537 } |
537 } |
538 long n = ch |
538 long n = ch |
539 .read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, null, null).get(); |
539 .read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, null).get(); |
540 if (n <= 0) |
540 if (n <= 0) |
541 throw new RuntimeException("No bytes read"); |
541 throw new RuntimeException("No bytes read"); |
542 |
542 |
543 ch.close(); |
543 ch.close(); |
544 sc.close(); |
544 sc.close(); |
560 if (n != 0) |
560 if (n != 0) |
561 throw new RuntimeException("0 expected"); |
561 throw new RuntimeException("0 expected"); |
562 |
562 |
563 // write all bytes and close connection when done |
563 // write all bytes and close connection when done |
564 final ByteBuffer src = genBuffer(); |
564 final ByteBuffer src = genBuffer(); |
565 ch.write(src, null, new CompletionHandler<Integer,Void>() { |
565 ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() { |
566 public void completed(Integer result, Void att) { |
566 public void completed(Integer result, Void att) { |
567 if (src.hasRemaining()) { |
567 if (src.hasRemaining()) { |
568 ch.write(src, null, this); |
568 ch.write(src, (Void)null, this); |
569 } else { |
569 } else { |
570 try { |
570 try { |
571 ch.close(); |
571 ch.close(); |
572 } catch (IOException ignore) { } |
572 } catch (IOException ignore) { } |
573 } |
573 } |
614 SocketChannel sc = server.accept(); |
614 SocketChannel sc = server.accept(); |
615 |
615 |
616 // write buffers (should complete immediately) |
616 // write buffers (should complete immediately) |
617 ByteBuffer[] srcs = genBuffers(1); |
617 ByteBuffer[] srcs = genBuffers(1); |
618 long n = ch |
618 long n = ch |
619 .write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, null, null).get(); |
619 .write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, null).get(); |
620 if (n <= 0) |
620 if (n <= 0) |
621 throw new RuntimeException("No bytes written"); |
621 throw new RuntimeException("No bytes written"); |
622 |
622 |
623 // set to true to signal that no more buffers should be written |
623 // set to true to signal that no more buffers should be written |
624 final AtomicBoolean continueWriting = new AtomicBoolean(true); |
624 final AtomicBoolean continueWriting = new AtomicBoolean(true); |
627 final AtomicLong bytesWritten = new AtomicLong(n); |
627 final AtomicLong bytesWritten = new AtomicLong(n); |
628 |
628 |
629 // write until socket buffer is full so as to create the conditions |
629 // write until socket buffer is full so as to create the conditions |
630 // for when a write does not complete immediately |
630 // for when a write does not complete immediately |
631 srcs = genBuffers(1); |
631 srcs = genBuffers(1); |
632 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, null, |
632 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, |
633 new CompletionHandler<Long,Void>() { |
633 new CompletionHandler<Long,Void>() { |
634 public void completed(Long result, Void att) { |
634 public void completed(Long result, Void att) { |
635 long n = result; |
635 long n = result; |
636 if (n <= 0) |
636 if (n <= 0) |
637 throw new RuntimeException("No bytes written"); |
637 throw new RuntimeException("No bytes written"); |
638 bytesWritten.addAndGet(n); |
638 bytesWritten.addAndGet(n); |
639 if (continueWriting.get()) { |
639 if (continueWriting.get()) { |
640 ByteBuffer[] srcs = genBuffers(8); |
640 ByteBuffer[] srcs = genBuffers(8); |
641 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, |
641 ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, |
642 null, this); |
642 (Void)null, this); |
643 } |
643 } |
644 } |
644 } |
645 public void failed(Throwable exc, Void att) { |
645 public void failed(Throwable exc, Void att) { |
646 } |
646 } |
647 public void cancelled(Void att) { |
647 public void cancelled(Void att) { |
715 System.out.println("-- timeout when reading --"); |
715 System.out.println("-- timeout when reading --"); |
716 |
716 |
717 // this read should timeout |
717 // this read should timeout |
718 ByteBuffer dst = ByteBuffer.allocate(512); |
718 ByteBuffer dst = ByteBuffer.allocate(512); |
719 try { |
719 try { |
720 ch.read(dst, 3, TimeUnit.SECONDS, null, null).get(); |
720 ch.read(dst, 3, TimeUnit.SECONDS, (Void)null, null).get(); |
721 throw new RuntimeException("Read did not timeout"); |
721 throw new RuntimeException("Read did not timeout"); |
722 } catch (ExecutionException x) { |
722 } catch (ExecutionException x) { |
723 if (!(x.getCause() instanceof InterruptedByTimeoutException)) |
723 if (!(x.getCause() instanceof InterruptedByTimeoutException)) |
724 throw new RuntimeException("InterruptedByTimeoutException expected"); |
724 throw new RuntimeException("InterruptedByTimeoutException expected"); |
725 } |
725 } |