1 /* |
1 /* |
2 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 2017, 2018, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. |
7 * published by the Free Software Foundation. |
311 @Override |
311 @Override |
312 public void onNext(List<ByteBuffer> item) { |
312 public void onNext(List<ByteBuffer> item) { |
313 System.out.printf("EchoTube add %s [requested:%s, queue:%s]%n", |
313 System.out.printf("EchoTube add %s [requested:%s, queue:%s]%n", |
314 Utils.remaining(item), requested, queue.size()); |
314 Utils.remaining(item), requested, queue.size()); |
315 queue.add(item); |
315 queue.add(item); |
316 processingScheduler.deferOrSchedule(executor); |
316 processingScheduler.runOrSchedule(executor); |
317 } |
317 } |
318 |
318 |
319 @Override |
319 @Override |
320 public void onError(Throwable throwable) { |
320 public void onError(Throwable throwable) { |
321 System.out.println("EchoTube add " + throwable); |
321 System.out.println("EchoTube add " + throwable); |
322 queue.add(throwable); |
322 queue.add(throwable); |
323 processingScheduler.deferOrSchedule(executor); |
323 processingScheduler.runOrSchedule(executor); |
324 } |
324 } |
325 |
325 |
326 @Override |
326 @Override |
327 public void onComplete() { |
327 public void onComplete() { |
328 System.out.println("EchoTube add EOF"); |
328 System.out.println("EchoTube add EOF"); |
329 queue.add(EOF); |
329 queue.add(EOF); |
330 processingScheduler.deferOrSchedule(executor); |
330 processingScheduler.runOrSchedule(executor); |
331 } |
331 } |
332 |
332 |
333 @Override |
333 @Override |
334 public boolean isFinished() { |
334 public boolean isFinished() { |
335 return cancelled.get(); |
335 return cancelled.get(); |
342 System.out.println("EchoTube got request: " + n); |
342 System.out.println("EchoTube got request: " + n); |
343 if (n <= 0) { |
343 if (n <= 0) { |
344 throw new InternalError(); |
344 throw new InternalError(); |
345 } |
345 } |
346 if (demand.increase(n)) { |
346 if (demand.increase(n)) { |
347 processingScheduler.deferOrSchedule(executor); |
347 processingScheduler.runOrSchedule(executor); |
348 } |
348 } |
349 } |
349 } |
350 |
350 |
351 @Override |
351 @Override |
352 public void cancel() { |
352 public void cancel() { |