test/jdk/java/net/httpclient/whitebox/java.net.http/jdk/internal/net/http/WrapperTest.java
author chegar
Wed, 07 Feb 2018 21:45:37 +0000
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 test/jdk/java/net/httpclient/whitebox/java.net.http/java/net/http/WrapperTest.java@42208b2f224e
child 56451 9585061fdb04
permissions -rw-r--r--
http-client-branch: move implementation to jdk.internal.net.http

/*
 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http;

import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.testng.annotations.Test;
import jdk.internal.net.http.common.SubscriberWrapper;

@Test
public class WrapperTest {
    static final int LO_PRI = 1;
    static final int HI_PRI = 2;
    static final int NUM_HI_PRI = 240;
    static final int BUFSIZE = 1016;
    static final int BUFSIZE_INT = BUFSIZE/4;
    static final int HI_PRI_FREQ = 40;

    static final int TOTAL = 10000;
    //static final int TOTAL = 500;

    final SubmissionPublisher<List<ByteBuffer>> publisher;
    final SubscriberWrapper sub1, sub2, sub3;
    final ExecutorService executor = Executors.newCachedThreadPool();
    volatile int hipricount = 0;

    void errorHandler(Flow.Subscriber<? super List<ByteBuffer>> sub, Throwable t) {
        System.err.printf("Exception from %s : %s\n", sub.toString(), t.toString());
    }

    public WrapperTest() {
        publisher = new SubmissionPublisher<>(executor, 600,
                (a, b) -> {
                    errorHandler(a, b);
                });

        CompletableFuture<Void> notif = new CompletableFuture<>();
        LastSubscriber ls = new LastSubscriber(notif);
        sub1 = new Filter1(ls);
        sub2 = new Filter2(sub1);
        sub3 = new Filter2(sub2);
    }

    public class Filter2 extends SubscriberWrapper {
        Filter2(SubscriberWrapper wrapper) {
            super(wrapper);
        }

        // reverse the order of the bytes in each buffer
        public void incoming(List<ByteBuffer> list, boolean complete) {
            List<ByteBuffer> out = new LinkedList<>();
            for (ByteBuffer inbuf : list) {
                int size = inbuf.remaining();
                ByteBuffer outbuf = ByteBuffer.allocate(size);
                for (int i=size; i>0; i--) {
                    byte b = inbuf.get(i-1);
                    outbuf.put(b);
                }
                outbuf.flip();
                out.add(outbuf);
            }
            if (complete) System.out.println("Filter2.complete");
            outgoing(out, complete);
        }

        protected long windowUpdate(long currval) {
            return currval == 0 ? 1 : 0;
        }
    }

    volatile int filter1Calls = 0; // every third call we insert hi pri data

    ByteBuffer getHiPri(int val) {
        ByteBuffer buf = ByteBuffer.allocate(8);
        buf.putInt(HI_PRI);
        buf.putInt(val);
        buf.flip();
        return buf;
    }

    volatile int hiPriAdded = 0;

    public class Filter1 extends SubscriberWrapper {
        Filter1(Flow.Subscriber<List<ByteBuffer>> downstreamSubscriber)
        {
            super();
            subscribe(downstreamSubscriber);
        }

        // Inserts up to NUM_HI_PRI hi priority buffers into flow
        protected void incoming(List<ByteBuffer> in, boolean complete) {
            if ((++filter1Calls % HI_PRI_FREQ) == 0 && (hiPriAdded++ < NUM_HI_PRI)) {
                sub1.outgoing(getHiPri(hipricount++), false);
            }
            // pass data thru
            if (complete) System.out.println("Filter1.complete");
            outgoing(in, complete);
        }

        protected long windowUpdate(long currval) {
            return currval == 0 ? 1 : 0;
        }
    }

    /**
     * Final subscriber in the chain. Compares the data sent by the original
     * publisher.
     */
    static public class LastSubscriber implements Flow.Subscriber<List<ByteBuffer>> {
        volatile Flow.Subscription subscription;
        volatile int hipriCounter=0;
        volatile int lopriCounter=0;
        final CompletableFuture<Void> cf;

        LastSubscriber(CompletableFuture<Void> cf) {
            this.cf = cf;
        }

        @Override
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(50); // say
        }

        private void error(String...args) {
            StringBuilder sb = new StringBuilder();
            for (String s : args) {
                sb.append(s);
                sb.append(' ');
            }
            String msg = sb.toString();
            System.out.println("Error: " + msg);
            RuntimeException e = new RuntimeException(msg);
            cf.completeExceptionally(e);
            subscription.cancel(); // This is where we need a variant that include exception
        }

        private void check(ByteBuffer buf) {
            int type = buf.getInt();
            if (type == HI_PRI) {
                // check next int is hi pri counter
                int c = buf.getInt();
                if (c != hipriCounter)
                    error("hi pri counter", Integer.toString(c), Integer.toString(hipriCounter));
                hipriCounter++;
            } else {
                while (buf.hasRemaining()) {
                    if (buf.getInt() != lopriCounter)
                        error("lo pri counter", Integer.toString(lopriCounter));
                    lopriCounter++;
                }
            }
        }

        @Override
        public void onNext(List<ByteBuffer> items) {
            for (ByteBuffer item : items)
                check(item);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable throwable) {
            error(throwable.getMessage());
        }

        @Override
        public void onComplete() {
            if (hipriCounter != NUM_HI_PRI)
                error("hi pri at end wrong", Integer.toString(hipriCounter), Integer.toString(NUM_HI_PRI));
            else {
                System.out.println("LastSubscriber.complete");
                cf.complete(null); // success
            }
        }
    }

    List<ByteBuffer> getBuffer(int c) {
        ByteBuffer buf = ByteBuffer.allocate(BUFSIZE+4);
        buf.putInt(LO_PRI);
        for (int i=0; i<BUFSIZE_INT; i++) {
            buf.putInt(c++);
        }
        buf.flip();
        return List.of(buf);
    }

    boolean errorTest = false;

    @Test
    public void run() throws InterruptedException {
        try {
            CompletableFuture<Void> completion = sub3.completion();
            publisher.subscribe(sub3);
            // now submit a load of data
            int counter = 0;
            for (int i = 0; i < TOTAL; i++) {
                List<ByteBuffer> bufs = getBuffer(counter);
                //if (i==2)
                    //bufs.get(0).putInt(41, 1234); // error
                counter += BUFSIZE_INT;
                publisher.submit(bufs);
                //if (i % 1000 == 0)
                    //Thread.sleep(1000);
                //if (i == 99) {
                    //publisher.closeExceptionally(new RuntimeException("Test error"));
                    //errorTest = true;
                    //break;
                //}
            }
            if (!errorTest) {
                publisher.close();
            }
            System.out.println("Publisher completed");
            completion.join();
            System.out.println("Subscribers completed ok");
        } finally {
            executor.shutdownNow();
        }
    }

    static void display(CompletableFuture<?> cf) {
        System.out.print (cf);
        if (!cf.isDone())
            return;
        try {
            cf.join(); // wont block
        } catch (Exception e) {
            System.out.println(" " + e);
        }
    }

/*
    public static void main(String[] args) throws InterruptedException {
        WrapperTest test = new WrapperTest();
        test.run();
    }
*/
}