test/jdk/java/net/httpclient/websocket/java.net.http/jdk/internal/net/http/websocket/MockTransport.java
author chegar
Wed, 07 Feb 2018 21:45:37 +0000
branchhttp-client-branch
changeset 56092 fd85b2bf2b0d
parent 56089 test/jdk/java/net/httpclient/websocket/java.net.http/java/net/http/internal/websocket/MockTransport.java@42208b2f224e
permissions -rw-r--r--
http-client-branch: move implementation to jdk.internal.net.http

/*
 * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

package jdk.internal.net.http.websocket;

import java.net.http.WebSocket.MessagePart;
import jdk.internal.net.http.common.Demand;
import jdk.internal.net.http.common.SequentialScheduler;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static jdk.internal.net.http.websocket.TestSupport.fullCopy;

public class MockTransport<T> implements Transport<T> {

    private final long startTime = System.currentTimeMillis();
    private final Queue<Invocation> output = new ConcurrentLinkedQueue<>();
    private final Queue<CompletableFuture<Consumer<MessageStreamConsumer>>>
            input = new ConcurrentLinkedQueue<>();
    private final Supplier<T> supplier;
    private final MessageStreamConsumer consumer;
    private final SequentialScheduler scheduler
            = new SequentialScheduler(new ReceiveTask());
    private final Demand demand = new Demand();

    public MockTransport(Supplier<T> sendResultSupplier,
                         MessageStreamConsumer consumer) {
        this.supplier = sendResultSupplier;
        this.consumer = consumer;
        input.addAll(receive());
    }

    @Override
    public final CompletableFuture<T> sendText(CharSequence message,
                                               boolean isLast) {
        output.add(Invocation.sendText(message, isLast));
        return send(String.format("sendText(%s, %s)", message, isLast),
                    () -> sendText0(message, isLast));
    }

    protected CompletableFuture<T> sendText0(CharSequence message,
                                             boolean isLast) {
        return defaultSend();
    }

    protected CompletableFuture<T> defaultSend() {
        return CompletableFuture.completedFuture(result());
    }

    @Override
    public final CompletableFuture<T> sendBinary(ByteBuffer message,
                                                 boolean isLast) {
        output.add(Invocation.sendBinary(message, isLast));
        return send(String.format("sendBinary(%s, %s)", message, isLast),
                    () -> sendBinary0(message, isLast));
    }

    protected CompletableFuture<T> sendBinary0(ByteBuffer message,
                                               boolean isLast) {
        return defaultSend();
    }

    @Override
    public final CompletableFuture<T> sendPing(ByteBuffer message) {
        output.add(Invocation.sendPing(message));
        return send(String.format("sendPing(%s)", message),
                    () -> sendPing0(message));
    }

    protected CompletableFuture<T> sendPing0(ByteBuffer message) {
        return defaultSend();
    }

    @Override
    public final CompletableFuture<T> sendPong(ByteBuffer message) {
        output.add(Invocation.sendPong(message));
        return send(String.format("sendPong(%s)", message),
                    () -> sendPong0(message));
    }

    protected CompletableFuture<T> sendPong0(ByteBuffer message) {
        return defaultSend();
    }

    @Override
    public final CompletableFuture<T> sendClose(int statusCode, String reason) {
        output.add(Invocation.sendClose(statusCode, reason));
        return send(String.format("sendClose(%s, %s)", statusCode, reason),
                    () -> sendClose0(statusCode, reason));
    }

    protected CompletableFuture<T> sendClose0(int statusCode, String reason) {
        return defaultSend();
    }

    protected Collection<CompletableFuture<Consumer<MessageStreamConsumer>>> receive() {
        return List.of();
    }

    public static Consumer<MessageStreamConsumer> onText(CharSequence data,
                                                         MessagePart part) {
        return c -> c.onText(data.toString(), part);
    }

    public static Consumer<MessageStreamConsumer> onBinary(ByteBuffer data,
                                                           MessagePart part) {
        return c -> c.onBinary(fullCopy(data), part);
    }

    public static Consumer<MessageStreamConsumer> onPing(ByteBuffer data) {
        return c -> c.onPing(fullCopy(data));
    }

    public static Consumer<MessageStreamConsumer> onPong(ByteBuffer data) {
        return c -> c.onPong(fullCopy(data));
    }

    public static Consumer<MessageStreamConsumer> onClose(int statusCode,
                                                          String reason) {
        return c -> c.onClose(statusCode, reason);
    }

    public static Consumer<MessageStreamConsumer> onError(Throwable error) {
        return c -> c.onError(error);
    }

    public static Consumer<MessageStreamConsumer> onComplete() {
        return c -> c.onComplete();
    }

    @Override
    public void request(long n) {
        demand.increase(n);
        scheduler.runOrSchedule();
    }

    @Override
    public void acknowledgeReception() {
        demand.tryDecrement();
    }

    @Override
    public final void closeOutput() throws IOException {
        output.add(Invocation.closeOutput());
        begin("closeOutput()");
        closeOutput0();
        end("closeOutput()");
    }

    protected void closeOutput0() throws IOException {
        defaultClose();
    }

    protected void defaultClose() throws IOException {
    }

    @Override
    public final void closeInput() throws IOException {
        output.add(Invocation.closeInput());
        begin("closeInput()");
        closeInput0();
        end("closeInput()");
    }

    protected void closeInput0() throws IOException {
        defaultClose();
    }

    public abstract static class Invocation {

        static Invocation.SendText sendText(CharSequence message,
                                            boolean isLast) {
            return new SendText(message, isLast);
        }

        static Invocation.SendBinary sendBinary(ByteBuffer message,
                                                boolean isLast) {
            return new SendBinary(message, isLast);
        }

        static Invocation.SendPing sendPing(ByteBuffer message) {
            return new SendPing(message);
        }

        static Invocation.SendPong sendPong(ByteBuffer message) {
            return new SendPong(message);
        }

        static Invocation.SendClose sendClose(int statusCode, String reason) {
            return new SendClose(statusCode, reason);
        }

        public static CloseOutput closeOutput() {
            return new CloseOutput();
        }

        public static CloseInput closeInput() {
            return new CloseInput();
        }

        public static final class SendText extends Invocation {

            final CharSequence message;
            final boolean isLast;

            SendText(CharSequence message, boolean isLast) {
                this.message = message.toString();
                this.isLast = isLast;
            }

            @Override
            public boolean equals(Object obj) {
                if (this == obj) return true;
                if (obj == null || getClass() != obj.getClass()) return false;
                SendText sendText = (SendText) obj;
                return isLast == sendText.isLast &&
                        Objects.equals(message, sendText.message);
            }

            @Override
            public int hashCode() {
                return Objects.hash(isLast, message);
            }
        }

        public static final class SendBinary extends Invocation {

            final ByteBuffer message;
            final boolean isLast;

            SendBinary(ByteBuffer message, boolean isLast) {
                this.message = fullCopy(message);
                this.isLast = isLast;
            }

            @Override
            public boolean equals(Object obj) {
                if (this == obj) return true;
                if (obj == null || getClass() != obj.getClass()) return false;
                SendBinary that = (SendBinary) obj;
                return isLast == that.isLast &&
                        Objects.equals(message, that.message);
            }

            @Override
            public int hashCode() {
                return Objects.hash(message, isLast);
            }
        }

        private static final class SendPing extends Invocation {

            final ByteBuffer message;

            SendPing(ByteBuffer message) {
                this.message = fullCopy(message);
            }

            @Override
            public boolean equals(Object obj) {
                if (this == obj) return true;
                if (obj == null || getClass() != obj.getClass()) return false;
                SendPing sendPing = (SendPing) obj;
                return Objects.equals(message, sendPing.message);
            }

            @Override
            public int hashCode() {
                return Objects.hash(message);
            }
        }

        private static final class SendPong extends Invocation {

            final ByteBuffer message;

            SendPong(ByteBuffer message) {
                this.message = fullCopy(message);
            }

            @Override
            public boolean equals(Object obj) {
                if (this == obj) return true;
                if (obj == null || getClass() != obj.getClass()) return false;
                SendPing sendPing = (SendPing) obj;
                return Objects.equals(message, sendPing.message);
            }

            @Override
            public int hashCode() {
                return Objects.hash(message);
            }
        }

        private static final class SendClose extends Invocation {

            final int statusCode;
            final String reason;

            SendClose(int statusCode, String reason) {
                this.statusCode = statusCode;
                this.reason = reason;
            }

            @Override
            public boolean equals(Object obj) {
                if (this == obj) return true;
                if (obj == null || getClass() != obj.getClass()) return false;
                SendClose sendClose = (SendClose) obj;
                return statusCode == sendClose.statusCode &&
                        Objects.equals(reason, sendClose.reason);
            }

            @Override
            public int hashCode() {
                return Objects.hash(statusCode, reason);
            }
        }

        private static final class CloseOutput extends Invocation {

            CloseOutput() { }

            @Override
            public int hashCode() {
                return 0;
            }

            @Override
            public boolean equals(Object obj) {
                return obj instanceof CloseOutput;
            }
        }

        private static final class CloseInput extends Invocation {

            CloseInput() { }

            @Override
            public int hashCode() {
                return 0;
            }

            @Override
            public boolean equals(Object obj) {
                return obj instanceof CloseInput;
            }
        }
    }

    public Queue<Invocation> invocations() {
        return new LinkedList<>(output);
    }

    protected final T result() {
        return supplier.get();
    }

    private CompletableFuture<T> send(String name,
                                      Supplier<CompletableFuture<T>> supplier) {
        begin(name);
        CompletableFuture<T> cf = supplier.get().whenComplete((r, e) -> {
            System.out.printf("[%6s ms.] complete %s%n", elapsedTime(), name);
        });
        end(name);
        return cf;
    }

    private void begin(String name) {
        System.out.printf("[%6s ms.] begin %s%n", elapsedTime(), name);
    }

    private void end(String name) {
        System.out.printf("[%6s ms.] end %s%n", elapsedTime(), name);
    }

    private long elapsedTime() {
        return System.currentTimeMillis() - startTime;
    }

    private final class ReceiveTask implements SequentialScheduler.RestartableTask {

        @Override
        public void run(SequentialScheduler.DeferredCompleter taskCompleter) {
            if (!scheduler.isStopped() && !demand.isFulfilled() && !input.isEmpty()) {
                CompletableFuture<Consumer<MessageStreamConsumer>> cf = input.remove();
                if (cf.isDone()) { // Forcing synchronous execution
                    cf.join().accept(consumer);
                    repeat(taskCompleter);
                } else {
                    cf.whenCompleteAsync((r, e) -> {
                        r.accept(consumer);
                        repeat(taskCompleter);
                    });
                }
            } else {
                taskCompleter.complete();
            }
        }

        private void repeat(SequentialScheduler.DeferredCompleter taskCompleter) {
            taskCompleter.complete();
            scheduler.runOrSchedule();
        }
    }
}