test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/WrapperTest.java
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 42208b2f224e
child 56451 9585061fdb04
equal deleted inserted replaced
56091:aedd6133e7a0 56092:fd85b2bf2b0d
       
     1 /*
       
     2  * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.
       
     8  *
       
     9  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    12  * version 2 for more details (a copy is included in the LICENSE file that
       
    13  * accompanied this code).
       
    14  *
       
    15  * You should have received a copy of the GNU General Public License version
       
    16  * 2 along with this work; if not, write to the Free Software Foundation,
       
    17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    18  *
       
    19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    20  * or visit www.oracle.com if you need additional information or have any
       
    21  * questions.
       
    22  */
       
    23 
       
    24 package jdk.internal.net.http;
       
    25 
       
    26 import java.nio.ByteBuffer;
       
    27 import java.util.LinkedList;
       
    28 import java.util.List;
       
    29 import java.util.concurrent.*;
       
    30 import java.util.concurrent.atomic.*;
       
    31 import org.testng.annotations.Test;
       
    32 import jdk.internal.net.http.common.SubscriberWrapper;
       
    33 
       
    34 @Test
       
    35 public class WrapperTest {
       
    36     static final int LO_PRI = 1;
       
    37     static final int HI_PRI = 2;
       
    38     static final int NUM_HI_PRI = 240;
       
    39     static final int BUFSIZE = 1016;
       
    40     static final int BUFSIZE_INT = BUFSIZE/4;
       
    41     static final int HI_PRI_FREQ = 40;
       
    42 
       
    43     static final int TOTAL = 10000;
       
    44     //static final int TOTAL = 500;
       
    45 
       
    46     final SubmissionPublisher<List<ByteBuffer>> publisher;
       
    47     final SubscriberWrapper sub1, sub2, sub3;
       
    48     final ExecutorService executor = Executors.newCachedThreadPool();
       
    49     volatile int hipricount = 0;
       
    50 
       
    51     void errorHandler(Flow.Subscriber<? super List<ByteBuffer>> sub, Throwable t) {
       
    52         System.err.printf("Exception from %s : %s\n", sub.toString(), t.toString());
       
    53     }
       
    54 
       
    55     public WrapperTest() {
       
    56         publisher = new SubmissionPublisher<>(executor, 600,
       
    57                 (a, b) -> {
       
    58                     errorHandler(a, b);
       
    59                 });
       
    60 
       
    61         CompletableFuture<Void> notif = new CompletableFuture<>();
       
    62         LastSubscriber ls = new LastSubscriber(notif);
       
    63         sub1 = new Filter1(ls);
       
    64         sub2 = new Filter2(sub1);
       
    65         sub3 = new Filter2(sub2);
       
    66     }
       
    67 
       
    68     public class Filter2 extends SubscriberWrapper {
       
    69         Filter2(SubscriberWrapper wrapper) {
       
    70             super(wrapper);
       
    71         }
       
    72 
       
    73         // reverse the order of the bytes in each buffer
       
    74         public void incoming(List<ByteBuffer> list, boolean complete) {
       
    75             List<ByteBuffer> out = new LinkedList<>();
       
    76             for (ByteBuffer inbuf : list) {
       
    77                 int size = inbuf.remaining();
       
    78                 ByteBuffer outbuf = ByteBuffer.allocate(size);
       
    79                 for (int i=size; i>0; i--) {
       
    80                     byte b = inbuf.get(i-1);
       
    81                     outbuf.put(b);
       
    82                 }
       
    83                 outbuf.flip();
       
    84                 out.add(outbuf);
       
    85             }
       
    86             if (complete) System.out.println("Filter2.complete");
       
    87             outgoing(out, complete);
       
    88         }
       
    89 
       
    90         protected long windowUpdate(long currval) {
       
    91             return currval == 0 ? 1 : 0;
       
    92         }
       
    93     }
       
    94 
       
    95     volatile int filter1Calls = 0; // every third call we insert hi pri data
       
    96 
       
    97     ByteBuffer getHiPri(int val) {
       
    98         ByteBuffer buf = ByteBuffer.allocate(8);
       
    99         buf.putInt(HI_PRI);
       
   100         buf.putInt(val);
       
   101         buf.flip();
       
   102         return buf;
       
   103     }
       
   104 
       
   105     volatile int hiPriAdded = 0;
       
   106 
       
   107     public class Filter1 extends SubscriberWrapper {
       
   108         Filter1(Flow.Subscriber<List<ByteBuffer>> downstreamSubscriber)
       
   109         {
       
   110             super();
       
   111             subscribe(downstreamSubscriber);
       
   112         }
       
   113 
       
   114         // Inserts up to NUM_HI_PRI hi priority buffers into flow
       
   115         protected void incoming(List<ByteBuffer> in, boolean complete) {
       
   116             if ((++filter1Calls % HI_PRI_FREQ) == 0 && (hiPriAdded++ < NUM_HI_PRI)) {
       
   117                 sub1.outgoing(getHiPri(hipricount++), false);
       
   118             }
       
   119             // pass data thru
       
   120             if (complete) System.out.println("Filter1.complete");
       
   121             outgoing(in, complete);
       
   122         }
       
   123 
       
   124         protected long windowUpdate(long currval) {
       
   125             return currval == 0 ? 1 : 0;
       
   126         }
       
   127     }
       
   128 
       
   129     /**
       
   130      * Final subscriber in the chain. Compares the data sent by the original
       
   131      * publisher.
       
   132      */
       
   133     static public class LastSubscriber implements Flow.Subscriber<List<ByteBuffer>> {
       
   134         volatile Flow.Subscription subscription;
       
   135         volatile int hipriCounter=0;
       
   136         volatile int lopriCounter=0;
       
   137         final CompletableFuture<Void> cf;
       
   138 
       
   139         LastSubscriber(CompletableFuture<Void> cf) {
       
   140             this.cf = cf;
       
   141         }
       
   142 
       
   143         @Override
       
   144         public void onSubscribe(Flow.Subscription subscription) {
       
   145             this.subscription = subscription;
       
   146             subscription.request(50); // say
       
   147         }
       
   148 
       
   149         private void error(String...args) {
       
   150             StringBuilder sb = new StringBuilder();
       
   151             for (String s : args) {
       
   152                 sb.append(s);
       
   153                 sb.append(' ');
       
   154             }
       
   155             String msg = sb.toString();
       
   156             System.out.println("Error: " + msg);
       
   157             RuntimeException e = new RuntimeException(msg);
       
   158             cf.completeExceptionally(e);
       
   159             subscription.cancel(); // This is where we need a variant that include exception
       
   160         }
       
   161 
       
   162         private void check(ByteBuffer buf) {
       
   163             int type = buf.getInt();
       
   164             if (type == HI_PRI) {
       
   165                 // check next int is hi pri counter
       
   166                 int c = buf.getInt();
       
   167                 if (c != hipriCounter)
       
   168                     error("hi pri counter", Integer.toString(c), Integer.toString(hipriCounter));
       
   169                 hipriCounter++;
       
   170             } else {
       
   171                 while (buf.hasRemaining()) {
       
   172                     if (buf.getInt() != lopriCounter)
       
   173                         error("lo pri counter", Integer.toString(lopriCounter));
       
   174                     lopriCounter++;
       
   175                 }
       
   176             }
       
   177         }
       
   178 
       
   179         @Override
       
   180         public void onNext(List<ByteBuffer> items) {
       
   181             for (ByteBuffer item : items)
       
   182                 check(item);
       
   183             subscription.request(1);
       
   184         }
       
   185 
       
   186         @Override
       
   187         public void onError(Throwable throwable) {
       
   188             error(throwable.getMessage());
       
   189         }
       
   190 
       
   191         @Override
       
   192         public void onComplete() {
       
   193             if (hipriCounter != NUM_HI_PRI)
       
   194                 error("hi pri at end wrong", Integer.toString(hipriCounter), Integer.toString(NUM_HI_PRI));
       
   195             else {
       
   196                 System.out.println("LastSubscriber.complete");
       
   197                 cf.complete(null); // success
       
   198             }
       
   199         }
       
   200     }
       
   201 
       
   202     List<ByteBuffer> getBuffer(int c) {
       
   203         ByteBuffer buf = ByteBuffer.allocate(BUFSIZE+4);
       
   204         buf.putInt(LO_PRI);
       
   205         for (int i=0; i<BUFSIZE_INT; i++) {
       
   206             buf.putInt(c++);
       
   207         }
       
   208         buf.flip();
       
   209         return List.of(buf);
       
   210     }
       
   211 
       
   212     boolean errorTest = false;
       
   213 
       
   214     @Test
       
   215     public void run() throws InterruptedException {
       
   216         try {
       
   217             CompletableFuture<Void> completion = sub3.completion();
       
   218             publisher.subscribe(sub3);
       
   219             // now submit a load of data
       
   220             int counter = 0;
       
   221             for (int i = 0; i < TOTAL; i++) {
       
   222                 List<ByteBuffer> bufs = getBuffer(counter);
       
   223                 //if (i==2)
       
   224                     //bufs.get(0).putInt(41, 1234); // error
       
   225                 counter += BUFSIZE_INT;
       
   226                 publisher.submit(bufs);
       
   227                 //if (i % 1000 == 0)
       
   228                     //Thread.sleep(1000);
       
   229                 //if (i == 99) {
       
   230                     //publisher.closeExceptionally(new RuntimeException("Test error"));
       
   231                     //errorTest = true;
       
   232                     //break;
       
   233                 //}
       
   234             }
       
   235             if (!errorTest) {
       
   236                 publisher.close();
       
   237             }
       
   238             System.out.println("Publisher completed");
       
   239             completion.join();
       
   240             System.out.println("Subscribers completed ok");
       
   241         } finally {
       
   242             executor.shutdownNow();
       
   243         }
       
   244     }
       
   245 
       
   246     static void display(CompletableFuture<?> cf) {
       
   247         System.out.print (cf);
       
   248         if (!cf.isDone())
       
   249             return;
       
   250         try {
       
   251             cf.join(); // wont block
       
   252         } catch (Exception e) {
       
   253             System.out.println(" " + e);
       
   254         }
       
   255     }
       
   256 
       
   257 /*
       
   258     public static void main(String[] args) throws InterruptedException {
       
   259         WrapperTest test = new WrapperTest();
       
   260         test.run();
       
   261     }
       
   262 */
       
   263 }