# HG changeset patch # User chegar # Date 1511263997 0 # Node ID a88515bdd90aa50ffe0c338e8b0e343d934c814a # Parent dbcbcda0e4134f9283d4323a49603cc40ac2d8b9# Parent 1e3a22efaefdbc94c4b5199e2c4047496afa0f4a http-client-branch: merge diff -r dbcbcda0e413 -r a88515bdd90a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SynchronousPublisher.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/SynchronousPublisher.java Tue Nov 21 10:24:53 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,506 +0,0 @@ -/* - * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.incubator.http.internal.common; - -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.WeakHashMap; -import java.util.concurrent.Flow.Publisher; -import java.util.concurrent.Flow.Subscriber; -import java.util.concurrent.Flow.Subscription; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; - -/** - * This publisher signals {@code onNext} synchronously and - * {@code onComplete}/{@code onError} asynchronously to its only subscriber. - * - * <p> This publisher supports a single subscriber over this publisher's - * lifetime. {@code signalComplete} and {@code signalError} may be called before - * the subscriber has subscribed. - * - * <p> The subscriber's requests are signalled to the subscription supplied to - * the {@code feedback} method. - * - * <p> {@code subscribe} and {@code feedback} methods can be called in any - * order. - * - * <p> {@code signalNext} may be called recursively, the implementation will - * bound the depth of the recursion. - * - * <p> It is always an error to call {@code signalNext} without a sufficient - * demand. - * - * <p> If subscriber throws an exception from any of its methods, the - * subscription will be cancelled. - */ -public final class SynchronousPublisher<T> implements Publisher<T> { - /* - * PENDING, ACTIVE and CANCELLED are states. TERMINATE and DELIVERING are - * state modifiers, they cannot appear in the state on their own. - * - * PENDING, ACTIVE and CANCELLED are mutually exclusive states. Any two of - * those bits cannot be set at the same time in state. - * - * PENDING -----------------> ACTIVE <------> DELIVERING - * | | - * +------> TERMINATE <------+ - * | | | - * | v | - * +------> CANCELLED <------+ - * - * The following states are allowed: - * - * PENDING - * PENDING | TERMINATE, - * ACTIVE, - * ACTIVE | DELIVERING, - * ACTIVE | TERMINATE, - * ACTIVE | DELIVERING | TERMINATE - * CANCELLED - */ - /** - * A state modifier meaning {@code onSubscribe} has not been called yet. - * - * <p> After {@code onSubscribe} has been called the machine can transition - * into {@code ACTIVE}, {@code PENDING | TERMINATE} or {@code CANCELLED}. - */ - private static final int PENDING = 1; - /** - * A state modifier meaning {@code onSubscribe} has been called, no error - * and no completion has been signalled and {@code onNext} may be called. - */ - private static final int ACTIVE = 2; - /** - * A state modifier meaning no calls to subscriber may be made. - * - * <p> Once this modifier is set, it will not be unset. It's a final state. - */ - private static final int CANCELLED = 4; - /** - * A state modifier meaning {@code onNext} is being called and no other - * signal may be made. - * - * <p> This bit can be set at any time. signalNext uses it to ensure the - * method is called sequentially. - */ - private static final int DELIVERING = 8; - /** - * A state modifier meaning the next call must be either {@code onComplete} - * or {@code onError}. - * - * <p> The concrete method depends on the value of {@code terminationType}). - * {@code TERMINATE} bit cannot appear on its own, it can be set only with - * {@code PENDING} or {@code ACTIVE}. - */ - private static final int TERMINATE = 16; - /** - * Current demand. If fulfilled, no {@code onNext} signals may be made. - */ - private final Demand demand = new Demand(); - /** - * The current state of the subscription. Contains disjunctions of the above - * state modifiers. - */ - private final AtomicInteger state = new AtomicInteger(PENDING); - /** - * A convenient way to represent 3 values: not set, completion and error. - */ - private final AtomicReference<Optional<Throwable>> terminationType - = new AtomicReference<>(); - /** - * {@code signalNext} uses this lock to ensure the method is called in a - * thread-safe manner. - */ - private final ReentrantLock nextLock = new ReentrantLock(); - private T next; - - private final Object lock = new Object(); - /** - * This map stores the subscribers attempted to subscribe to this publisher. - * It is needed so this publisher does not call {@code onSubscribe} on a - * subscriber more than once (Rule 2.12). - * - * <p> It will most likely have a single entry for the only subscriber. - * Because this publisher is one-off, subscribing to it more than once is an - * error. - */ - private final Map<Subscriber<?>, Object> knownSubscribers - = new WeakHashMap<>(1, 1); - /** - * The active subscriber. This reference will be reset to {@code null} once - * the subscription becomes cancelled (Rule 3.13). - */ - private volatile Subscriber<? super T> subscriber; - /** - * A temporary subscription that receives all calls to - * {@code request}/{@code cancel} until two things happen: (1) the feedback - * becomes set and (2) {@code onSubscribe} method is called on the - * subscriber. - * - * <p> The first condition is obvious. The second one is about not - * propagating requests to {@code feedback} until {@code onSubscribe} call - * has been finished. The reason is that Rule 1.3 requires the subscriber - * methods to be called in a thread-safe manner. This, in particular, - * implies that if called from multiple threads, the calls must not be - * concurrent. If, for instance, {@code subscription.request(long)) (and - * this is a usual state of affairs) is called from within - * {@code onSubscribe} call, the publisher will have to resort to some sort - * of queueing (locks, queues, etc.) of possibly arriving {@code onNext} - * signals while in {@code onSubscribe}. This publisher doesn't queue - * signals, instead it "queues" requests. Because requests are just numbers - * and requests are additive, the effective queue is a single number of - * total requests made so far. - */ - private final TemporarySubscription temporarySubscription - = new TemporarySubscription(); - private volatile Subscription feedback; - /** - * Keeping track of whether a subscription may be made. (The {@code - * subscriber} field may later become {@code null}, but this flag is - * permanent. Once {@code true} forever {@code true}. - */ - private boolean subscribed; - - @Override - public void subscribe(Subscriber<? super T> sub) { - Objects.requireNonNull(sub); - boolean success = false; - boolean duplicate = false; - synchronized (lock) { - if (!subscribed) { - subscribed = true; - subscriber = sub; - assert !knownSubscribers.containsKey(subscriber); - knownSubscribers.put(subscriber, null); - success = true; - } else if (sub.equals(subscriber)) { - duplicate = true; - } else if (!knownSubscribers.containsKey(sub)) { - knownSubscribers.put(sub, null); - } else { - return; - } - } - if (success) { - signalSubscribe(); - } else if (duplicate) { - signalError(new IllegalStateException("Duplicate subscribe")); - } else { - // This is a best-effort attempt for an isolated publisher to call - // a foreign subscriber's methods in a sequential order. However it - // cannot be guaranteed unless all publishers share information on - // all subscribers in the system. This publisher does its job right. - sub.onSubscribe(new NopSubscription()); - sub.onError(new IllegalStateException("Already subscribed")); - } - } - - /** - * Accepts a subscription that is signalled with the subscriber's requests. - * - * @throws NullPointerException - * if {@code subscription} is {@code null} - * @throws IllegalStateException - * if there is a feedback subscription already - */ - public void feedback(Subscription subscription) { - Objects.requireNonNull(subscription); - synchronized (lock) { - if (feedback != null) { - throw new IllegalStateException( - "Already has a feedback subscription"); - } - feedback = subscription; - if ((state.get() & PENDING) == 0) { - temporarySubscription.replaceWith(new PermanentSubscription()); - } - } - } - - /** - * Tries to deliver the specified item to the subscriber. - * - * <p> The item may not be delivered even if there is a demand. This can - * happen as a result of subscriber cancelling the subscription by - * signalling {@code cancel} or this publisher cancelling the subscription - * by signaling {@code onError} or {@code onComplete}. - * - * <p> Given no exception is thrown, a call to this method decremented the - * demand. - * - * @param item - * the item to deliver to the subscriber - * - * @return {@code true} iff the subscriber has received {@code item} - * @throws NullPointerException - * if {@code item} is {@code null} - * @throws IllegalStateException - * if there is no demand - * @throws IllegalStateException - * the method is called concurrently - */ - public boolean signalNext(T item) { - Objects.requireNonNull(item); - if (!nextLock.tryLock()) { - throw new IllegalStateException("Concurrent signalling"); - } - boolean recursion = false; - try { - next = item; - while (true) { - int s = state.get(); - if ((s & DELIVERING) == DELIVERING) { - recursion = true; - break; - } else if (state.compareAndSet(s, s | DELIVERING)) { - break; - } - } - if (!demand.tryDecrement()) { - // Hopefully this will help to find bugs in this publisher's - // clients. Because signalNext should never be issues without - // having a sufficient demand. Even if the thing is cancelled! -// next = null; - throw new IllegalStateException("No demand"); - } - if (recursion) { - return true; - } - while (next != null) { - int s = state.get(); - if ((s & (ACTIVE | TERMINATE)) == (ACTIVE | TERMINATE)) { - if (state.compareAndSet( - s, CANCELLED | (s & ~(ACTIVE | TERMINATE)))) { - // terminationType must be read only after the - // termination condition has been observed - // (those have been stored in the opposite order) - Optional<Throwable> t = terminationType.get(); - dispatchTerminationAndUnsubscribe(t); - return false; - } - } else if ((s & ACTIVE) == ACTIVE) { - try { - T t = next; - next = null; - subscriber.onNext(t); - } catch (Throwable t) { - cancelNow(); - throw t; - } - } else if ((s & CANCELLED) == CANCELLED) { - return false; - } else if ((s & PENDING) == PENDING) { - // Actually someone called signalNext even before - // onSubscribe has been called, but from this publisher's - // API point of view it's still "No demand" - throw new IllegalStateException("No demand"); - } else { - throw new InternalError(String.valueOf(s)); - } - } - return true; - } finally { - while (!recursion) { // If the call was not recursive unset the bit - int s = state.get(); - if ((s & DELIVERING) != DELIVERING) { - throw new InternalError(String.valueOf(s)); - } else if (state.compareAndSet(s, s & ~DELIVERING)) { - break; - } - } - nextLock.unlock(); - } - } - - /** - * Cancels the subscription by signalling {@code onError} to the subscriber. - * - * <p> Will not signal {@code onError} if the subscription has been - * cancelled already. - * - * <p> This method may be called at any time. - * - * @param error - * the error to signal - * - * @throws NullPointerException - * if {@code error} is {@code null} - */ - public void signalError(Throwable error) { - terminateNow(Optional.of(error)); - } - - /** - * Cancels the subscription by signalling {@code onComplete} to the - * subscriber. - * - * <p> Will not signal {@code onComplete} if the subscription has been - * cancelled already. - * - * <p> This method may be called at any time. - */ - public void signalComplete() { - terminateNow(Optional.empty()); - } - - /** - * Must be called first and at most once. - */ - private void signalSubscribe() { - assert subscribed; - try { - subscriber.onSubscribe(temporarySubscription); - } catch (Throwable t) { - cancelNow(); - throw t; - } - while (true) { - int s = state.get(); - if ((s & (PENDING | TERMINATE)) == (PENDING | TERMINATE)) { - if (state.compareAndSet( - s, CANCELLED | (s & ~(PENDING | TERMINATE)))) { - Optional<Throwable> t = terminationType.get(); - dispatchTerminationAndUnsubscribe(t); - return; - } - } else if ((s & PENDING) == PENDING) { - if (state.compareAndSet(s, ACTIVE | (s & ~PENDING))) { - synchronized (lock) { - if (feedback != null) { - temporarySubscription - .replaceWith(new PermanentSubscription()); - } - } - return; - } - } else { // It should not be in any other state - throw new InternalError(String.valueOf(s)); - } - } - } - - private void unsubscribe() { - subscriber = null; - } - - private final static class NopSubscription implements Subscription { - - @Override - public void request(long n) { } - @Override - public void cancel() { } - } - - private final class PermanentSubscription implements Subscription { - - @Override - public void request(long n) { - if (n <= 0) { - signalError(new IllegalArgumentException( - "non-positive subscription request")); - } else { - demand.increase(n); - feedback.request(n); - } - } - - @Override - public void cancel() { - if (cancelNow()) { - unsubscribe(); - // feedback.cancel() is called at most once - // (let's not assume idempotency) - feedback.cancel(); - } - } - } - - /** - * Cancels the subscription unless it has been cancelled already. - * - * @return {@code true} iff the subscription has been cancelled as a result - * of this call - */ - private boolean cancelNow() { - while (true) { - int s = state.get(); - if ((s & CANCELLED) == CANCELLED) { - return false; - } else if ((s & (ACTIVE | PENDING)) != 0) { - // ACTIVE or PENDING - if (state.compareAndSet( - s, CANCELLED | (s & ~(ACTIVE | PENDING)))) { - unsubscribe(); - return true; - } - } else { - throw new InternalError(String.valueOf(s)); - } - } - } - - /** - * Terminates this subscription unless is has been cancelled already. - * - * @param t the type of termination - */ - private void terminateNow(Optional<Throwable> t) { - // Termination condition must be set only after the termination - // type has been set (those will be read in the opposite order) - if (!terminationType.compareAndSet(null, t)) { - return; - } - while (true) { - int s = state.get(); - if ((s & CANCELLED) == CANCELLED) { - return; - } else if ((s & (PENDING | DELIVERING)) != 0) { - // PENDING or DELIVERING (which implies ACTIVE) - if (state.compareAndSet(s, s | TERMINATE)) { - return; - } - } else if ((s & ACTIVE) == ACTIVE) { - if (state.compareAndSet(s, CANCELLED | (s & ~ACTIVE))) { - dispatchTerminationAndUnsubscribe(t); - return; - } - } else { - throw new InternalError(String.valueOf(s)); - } - } - } - - private void dispatchTerminationAndUnsubscribe(Optional<Throwable> t) { - try { - t.ifPresentOrElse(subscriber::onError, subscriber::onComplete); - } finally { - unsubscribe(); - } - } -} diff -r dbcbcda0e413 -r a88515bdd90a src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/TemporarySubscription.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/TemporarySubscription.java Tue Nov 21 10:24:53 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,112 +0,0 @@ -/* - * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. Oracle designates this - * particular file as subject to the "Classpath" exception as provided - * by Oracle in the LICENSE file that accompanied this code. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.incubator.http.internal.common; - -import jdk.incubator.http.internal.common.SequentialScheduler.CompleteRestartableTask; - -import java.util.Objects; -import java.util.concurrent.Flow.Subscription; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Acts as a subscription receiving calls to {@code request} and {@code cancel} - * methods until the replacing subscription is set. - * - * <p> After the replacing subscription is set, it gets updated with the result - * of calls happened before that and starts receiving calls to its - * {@code request} and {@code cancel} methods. - * - * <p> This subscription ensures that {@code request} and {@code cancel} methods - * of the replacing subscription are called sequentially. - */ -public final class TemporarySubscription implements Subscription { - - private final AtomicReference<Subscription> subscription = new AtomicReference<>(); - private final Demand demand = new Demand(); - private volatile boolean cancelled; - private volatile long illegalValue = 1; - - private final SequentialScheduler scheduler = new SequentialScheduler(new UpdateTask()); - - @Override - public void request(long n) { - if (n <= 0) { - // Any non-positive request would do, no need to remember them - // all or any one in particular. - // tl;dr racy, but don't care - illegalValue = n; - } else { - demand.increase(n); - } - scheduler.runOrSchedule(); - } - - @Override - public void cancel() { - cancelled = true; - scheduler.runOrSchedule(); - } - - public void replaceWith(Subscription permanentSubscription) { - Objects.requireNonNull(permanentSubscription); - if (permanentSubscription == this) { - // Otherwise it would be an unpleasant bug to chase - throw new IllegalStateException("Self replacement"); - } - if (!subscription.compareAndSet(null, permanentSubscription)) { - throw new IllegalStateException("Already replaced"); - } - scheduler.runOrSchedule(); - } - - private final class UpdateTask extends CompleteRestartableTask { - - @Override - public void run() { - Subscription dst = TemporarySubscription.this.subscription.get(); - if (dst == null) { - return; - } - /* As long as the result is effectively the same, it does not matter - how requests are accumulated and what goes first: request or - cancel. See rules 3.5, 3.6, 3.7 and 3.9 from the reactive-streams - specification. */ - long illegalValue = TemporarySubscription.this.illegalValue; - if (illegalValue <= 0) { - dst.request(illegalValue); - scheduler.stop(); - } else if (cancelled) { - dst.cancel(); - scheduler.stop(); - } else { - long accumulatedValue = demand.decreaseAndGet(Long.MAX_VALUE); - if (accumulatedValue > 0) { - dst.request(accumulatedValue); - } - } - } - } -} diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/BuildingWebSocketDriver.java --- a/test/jdk/java/net/httpclient/websocket/BuildingWebSocketDriver.java Tue Nov 21 10:24:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/websocket/BuildingWebSocketDriver.java Tue Nov 21 11:33:17 2017 +0000 @@ -26,11 +26,6 @@ * @bug 8159053 * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java - * * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.BuildingWebSocketTest */ -public final class BuildingWebSocketDriver { -// * @run testng/othervm -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MockListenerTest -// * @run testng/othervm -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MockChannelTest -// * @run testng/othervm/timeout=1000 -Ddataproviderthreadcount=16 -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.PingTest -} +public final class BuildingWebSocketDriver { } diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/HeaderWriterDriver.java --- a/test/jdk/java/net/httpclient/websocket/HeaderWriterDriver.java Tue Nov 21 10:24:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/websocket/HeaderWriterDriver.java Tue Nov 21 11:33:17 2017 +0000 @@ -26,11 +26,6 @@ * @bug 8159053 * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java - * * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.HeaderWriterTest */ -public final class HeaderWriterDriver { -// * @run testng/othervm -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MockListenerTest -// * @run testng/othervm -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MockChannelTest -// * @run testng/othervm/timeout=1000 -Ddataproviderthreadcount=16 -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.PingTest -} +public final class HeaderWriterDriver { } diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/LoggingHelper.java --- a/test/jdk/java/net/httpclient/websocket/LoggingHelper.java Tue Nov 21 10:24:53 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -import java.io.File; - -public final class LoggingHelper { - - /* - * I wish we had a support for java.util.logging in jtreg similar to what we - * have for security policy files: - * - * @run main/othervm/jul=logging.properties ClassUnderTest - */ - public static void setupLogging() { - String path = System.getProperty("test.src", ".") + File.separator + "logging.properties"; - System.setProperty("java.util.logging.config.file", path); - } -} diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/MaskerDriver.java --- a/test/jdk/java/net/httpclient/websocket/MaskerDriver.java Tue Nov 21 10:24:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/websocket/MaskerDriver.java Tue Nov 21 11:33:17 2017 +0000 @@ -26,11 +26,6 @@ * @bug 8159053 * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java - * * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MaskerTest */ -public final class MaskerDriver { -// * @run testng/othervm -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MockListenerTest -// * @run testng/othervm -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MockChannelTest -// * @run testng/othervm/timeout=1000 -Ddataproviderthreadcount=16 -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.PingTest -} +public final class MaskerDriver { } diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/ReaderDriver.java --- a/test/jdk/java/net/httpclient/websocket/ReaderDriver.java Tue Nov 21 10:24:53 2017 +0000 +++ b/test/jdk/java/net/httpclient/websocket/ReaderDriver.java Tue Nov 21 11:33:17 2017 +0000 @@ -26,11 +26,6 @@ * @bug 8159053 * @modules jdk.incubator.httpclient/jdk.incubator.http.internal.websocket:open * @compile/module=jdk.incubator.httpclient jdk/incubator/http/internal/websocket/TestSupport.java - * * @run testng/othervm --add-reads jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.ReaderTest */ -public final class ReaderDriver { -// * @run testng/othervm -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MockListenerTest -// * @run testng/othervm -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.MockChannelTest -// * @run testng/othervm/timeout=1000 -Ddataproviderthreadcount=16 -XaddReads:jdk.incubator.httpclient=ALL-UNNAMED jdk.incubator.httpclient/jdk.incubator.http.internal.websocket.PingTest -} +public final class ReaderDriver { } diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/CloseTest.java --- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/CloseTest.java Tue Nov 21 10:24:53 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,246 +0,0 @@ -/* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ - -package jdk.incubator.http.internal.websocket; - -import org.testng.annotations.DataProvider; -import org.testng.annotations.Test; - -import jdk.incubator.http.WebSocket; - -import java.net.URI; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.StandardCharsets; -import java.util.Iterator; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import static jdk.incubator.http.internal.websocket.TestSupport.Expectation.ifExpect; -import static jdk.incubator.http.internal.websocket.TestSupport.cartesianIterator; -import static java.util.Arrays.asList; -import static java.util.List.of; - -/* - * Tests for Close message handling: examines sendClose/onClose contracts. - */ -public final class CloseTest { - - /* - * Verifies the domain of the arguments of sendClose(code, reason). - */ - @Test(dataProvider = "sendClose") - public void testSendCloseArguments(int code, String reason) { - WebSocket ws = newWebSocket(); - ifExpect( - reason == null, - NullPointerException.class::isInstance) - .orExpect( - !isOutgoingCodeLegal(code), - IllegalArgumentException.class::isInstance) - .orExpect( - !isReasonLegal(reason), - IllegalArgumentException.class::isInstance) - .assertThrows(() -> ws.sendClose(code, reason)); - } - - /* - * After sendClose(code, reason) has returned normally or exceptionally, no - * more messages can be sent. However, if the invocation has thrown IAE/NPE - * (i.e. programming error) messages can still be sent (failure atomicity). - */ - public void testSendClose(int code, String reason) { - newWebSocket().sendClose(10, ""); - } - - /* - * After sendClose() has been invoked, no more messages can be sent. - */ - public void testSendClose() { - WebSocket ws = newWebSocket(); - CompletableFuture<WebSocket> cf = ws.sendClose(); - } - - // TODO: sendClose can be invoked whenever is suitable without ISE - // + idempotency - - /* - * An invocation of sendClose(code, reason) will cause a Close message with - * the same code and the reason to appear on the wire. - */ - public void testSendCloseWysiwyg(int code, String reason) { - - } - - /* - * An invocation of sendClose() will cause an empty Close message to appear - * on the wire. - */ - public void testSendCloseWysiwyg() { - - } - - /* - * Automatic Closing handshake. Listener receives onClose() and returns from - * it. WebSocket closes in accordance to the returned value. - */ - public void testClosingHandshake1() { - // TODO: closed if observed shortly after the returned CS completes - } - - /* - * sendClose is invoked from within onClose. After sendClose has returned, - * isClosed() reports true. - */ - public void testClosingHandshake2() { - // 1. newWebSocket().sendClose(); - // 2. onClose return null - // 3. isClosed() == true - } - - /* - * sendClose has been invoked, then onClose. Shortly after onClose has - * returned, isClosed reports true. - */ - public void testClosingHandshake3() { - } - - /* - * Return from onClose with nevercompleting CS then sendClose(). - */ - public void testClosingHandshake4() { - - } - - /* - * Exceptions thrown from onClose and exceptions a CS returned from onClose - * "completes exceptionally" with are ignored. In other words, they are - * never reported to onError(). - */ - public void testOnCloseExceptions() { - - } - - /* - * An incoming Close message on the wire will cause an invocation of onClose - * with appropriate values. However, if this message violates the WebSocket - * Protocol, onError is invoked instead. - * - * // TODO: automatic close (if error) AND isClose returns true from onError - */ - public void testOnCloseWysiwyg() { - - } - - /* - * Data is read off the wire. An end-of-stream has been reached while - * reading a frame. - * - * onError is invoked with java.net.ProtocolException and the WebSocket this - * listener has been attached to - */ - public void testUnexpectedEOS() { - - } - - /* - * Data is read off the wire. An end-of-stream has been reached in between - * frames, and no Close frame has been received yet. - * - * onClose is invoked with the status code 1006 and the WebSocket this - * listener has been attached to - */ - public void testEOS() { - - } - - // TODO: check buffers for change - - @DataProvider(name = "sendClose") - public Iterator<Object[]> createData() { - List<Integer> codes = asList( - Integer.MIN_VALUE, -1, 0, 1, 500, 998, 999, 1000, 1001, 1002, - 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, - 1013, 1014, 1015, 1016, 2998, 2999, 3000, 3001, 3998, 3999, - 4000, 4001, 4998, 4999, 5000, 5001, 32768, 65535, 65536, - Integer.MAX_VALUE); - String longReason1 = "This is a reason string. Nothing special except " + - "its UTF-8 representation is a bit " + - "longer than one hundred and twenty three bytes."; - assert longReason1.getBytes(StandardCharsets.UTF_8).length > 123; - - // Russian alphabet repeated cyclically until it's enough to pass "123" - // bytes length - StringBuilder b = new StringBuilder(); - char c = '\u0410'; - for (int i = 0; i < 62; i++) { - b.append(c); - if (++c > '\u042F') { - c = '\u0410'; - } - } - String longReason2 = b.toString(); - assert longReason2.length() <= 123 - && longReason2.getBytes(StandardCharsets.UTF_8).length > 123; - - String malformedReason = new String(new char[]{0xDC00, 0xD800}); - - List<String> reasons = asList - (null, "", "abc", longReason1, longReason2, malformedReason); - - return cartesianIterator(of(codes, reasons), args -> args); - } - - private boolean isReasonLegal(String reason) { - if (reason == null) { - return false; - } - ByteBuffer result; - try { - result = StandardCharsets.UTF_8.newEncoder().encode(CharBuffer.wrap(reason)); - } catch (CharacterCodingException e) { - return false; - } - return result.remaining() <= 123; - } - - private static boolean isOutgoingCodeLegal(int code) { - if (code < 1000 || code > 4999) { - return false; - } - if (code < 1016) { - return code == 1000 || code == 1001 || code == 1008 || code == 1011; - } - return code >= 3000; - } - - private WebSocket newWebSocket() { - WebSocket.Listener l = new WebSocket.Listener() { }; - return new WebSocketImpl(URI.create("ws://example.com"), - "", - new MockChannel.Builder().build(), - l); - } -} diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/DataProviders.java --- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/DataProviders.java Tue Nov 21 10:24:53 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,107 +0,0 @@ -/* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package jdk.incubator.http.internal.websocket; - -import org.testng.annotations.DataProvider; - -import jdk.incubator.http.internal.websocket.TestSupport.F5; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; -import java.util.stream.Stream; - -import static jdk.incubator.http.internal.websocket.TestSupport.cartesianIterator; -import static jdk.incubator.http.internal.websocket.TestSupport.concat; -import static jdk.incubator.http.internal.websocket.TestSupport.iteratorOf; -import static jdk.incubator.http.internal.websocket.TestSupport.iteratorOf1; -import static java.util.List.of; - -/* - * Data providers for WebSocket tests - */ -public final class DataProviders { - - /* - * Various ByteBuffer-s to be passed to sendPing/sendPong. - * - * Actual data is put in the middle of the buffer to make sure the code under - * test relies on position/limit rather than on 0 and capacity. - * - * +-------------------+-------~ ~-------------+--------------+ - * |<---- leading ---->|<------~ ~--- data --->|<- trailing ->| - * +-------------------+-------~ ~-------------+--------------+ - * ^0 ^position ^limit ^capacity - */ - @DataProvider(name = "outgoingData", parallel = true) - public static Iterator<Object[]> outgoingData() { - List<Integer> leading = of(0, 1, 17, 125); - List<Integer> trailing = of(0, 1, 19, 123); - List<Integer> sizes = of(0, 1, 2, 17, 32, 64, 122, 123, 124, 125, 126, 127, 128, 256); - List<Boolean> direct = of(true, false); - List<Boolean> readonly = of(false); // TODO: return readonly (true) - F5<Integer, Integer, Integer, Boolean, Boolean, Object[]> f = - (l, t, s, d, r) -> { - ByteBuffer b; - if (d) { - b = ByteBuffer.allocateDirect(l + t + s); - } else { - b = ByteBuffer.allocate(l + t + s); - } - fill(b); - if (r) { - b = b.asReadOnlyBuffer(); - } - b.position(l).limit(l + s); - return new ByteBuffer[]{b}; - }; - Iterator<Object[]> product = cartesianIterator(leading, trailing, sizes, direct, readonly, f); - Iterator<Object[]> i = iteratorOf1(new Object[]{null}); - return concat(iteratorOf(i, product)); - } - - @DataProvider(name = "incomingData", parallel = true) - public static Iterator<Object[]> incomingData() { - return Stream.of(0, 1, 2, 17, 63, 125) - .map(i -> new Object[]{fill(ByteBuffer.allocate(i))}) - .iterator(); - } - - @DataProvider(name = "incorrectFrame") - public static Iterator<Object[]> incorrectFrame() { - List<Boolean> fin = of(true, false ); - List<Boolean> rsv1 = of(true, false ); - List<Boolean> rsv2 = of(true, false ); - List<Boolean> rsv3 = of(true, false ); - List<Integer> sizes = of(0, 126, 1024); - return cartesianIterator(fin, rsv1, rsv2, rsv3, sizes, - (a, b, c, d, e) -> new Object[]{a, b, c, d, ByteBuffer.allocate(e)}); - } - - private static ByteBuffer fill(ByteBuffer b) { - int i = 0; - while (b.hasRemaining()) { - b.put((byte) (++i & 0xff)); - } - return b; - } -} diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockChannel.java --- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockChannel.java Tue Nov 21 10:24:53 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,415 +0,0 @@ -/* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package jdk.incubator.http.internal.websocket; - -import jdk.incubator.http.WebSocket.MessagePart; -import jdk.incubator.http.internal.websocket.Frame.Opcode; -import jdk.incubator.http.internal.websocket.TestSupport.F1; -import jdk.incubator.http.internal.websocket.TestSupport.F2; -import jdk.incubator.http.internal.websocket.TestSupport.InvocationChecker; -import jdk.incubator.http.internal.websocket.TestSupport.InvocationExpectation; -import jdk.incubator.http.internal.websocket.TestSupport.Mock; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.OptionalInt; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; - -import static jdk.incubator.http.internal.websocket.Frame.MAX_HEADER_SIZE_BYTES; - -final class MockChannel implements RawChannel, Mock { - - /* Reads and writes must be able to be served concurrently, thus 2 threads */ // TODO: test this - private final Executor executor = Executors.newFixedThreadPool(2); - private final Object stateLock = new Object(); - private final Object readLock = new Object(); - private final Object writeLock = new Object(); - private volatile boolean closed; - private boolean isInputOpen = true; - private boolean isOutputOpen = true; - private final Frame.Reader reader = new Frame.Reader(); - private final MockFrameConsumer delegate; - private final Iterator<ReadRule> readScenario; - private ReadRule currentRule; - private final AtomicBoolean handedOver = new AtomicBoolean(); - - private MockChannel(Iterable<ReadRule> scenario, - Iterable<InvocationExpectation> expectations) { - Iterator<ReadRule> iterator = scenario.iterator(); - if (!iterator.hasNext()) { - throw new RuntimeException(); - } - this.readScenario = iterator; - this.currentRule = iterator.next(); - this.delegate = new MockFrameConsumer(expectations); - } - - @Override - public void registerEvent(RawEvent event) throws IOException { - int ops = event.interestOps(); - if ((ops & SelectionKey.OP_WRITE) != 0) { - synchronized (stateLock) { - checkOpen(); - executor.execute(event::handle); - } - } else if ((ops & SelectionKey.OP_READ) != 0) { - CompletionStage<?> cs; - synchronized (readLock) { - cs = currentRule().whenReady(); - synchronized (stateLock) { - checkOpen(); - cs.thenRun(() -> executor.execute(event::handle)); - } - } - } else { - throw new RuntimeException("Unexpected registration: " + ops); - } - } - - @Override - public ByteBuffer initialByteBuffer() throws IllegalStateException { - if (!handedOver.compareAndSet(false, true)) { - throw new IllegalStateException(); - } - return ByteBuffer.allocate(0); - } - - @Override - public ByteBuffer read() throws IOException { - synchronized (readLock) { - checkOpen(); - synchronized (stateLock) { - if (!isInputOpen) { - return null; - } - } - ByteBuffer r = currentRule().read(); - checkOpen(); - return r; - } - } - - @Override - public long write(ByteBuffer[] src, int offset, int len) throws IOException { - synchronized (writeLock) { - checkOpen(); - synchronized (stateLock) { - if (!isOutputOpen) { - throw new ClosedChannelException(); - } - } - long n = 0; - for (int i = offset; i < offset + len && isOpen(); i++) { - ByteBuffer b = src[i]; - int rem = src[i].remaining(); - while (b.hasRemaining() && isOpen()) { - reader.readFrame(b, delegate); - } - n += rem; - } - checkOpen(); - return n; - } - } - - public boolean isOpen() { - return !closed; - } - - @Override - public void shutdownInput() throws IOException { - synchronized (stateLock) { - if (!isOpen()) { - throw new ClosedChannelException(); - } - isInputOpen = false; - } - } - - @Override - public void shutdownOutput() throws IOException { - synchronized (stateLock) { - if (!isOpen()) { - throw new ClosedChannelException(); - } - isOutputOpen = false; - } - } - - @Override - public void close() { - synchronized (stateLock) { - closed = true; - } - } - - @Override - public String toString() { - return super.toString() + "[" + (closed ? "closed" : "open") + "]"; - } - - private ReadRule currentRule() { - assert Thread.holdsLock(readLock); - while (!currentRule.applies()) { // There should be the terminal rule which always applies - currentRule = readScenario.next(); - } - return currentRule; - } - - private void checkOpen() throws ClosedChannelException { - if (!isOpen()) { - throw new ClosedChannelException(); - } - } - - @Override - public CompletableFuture<Void> expectations(long timeout, TimeUnit unit) { - return delegate.expectations(timeout, unit); - } - - private static class MockFrameConsumer extends FrameConsumer implements Mock { - - private final Frame.Masker masker = new Frame.Masker(); - - MockFrameConsumer(Iterable<InvocationExpectation> expectations) { - super(new MockMessageStreamConsumer(expectations)); - } - - @Override - public void mask(boolean value) { - } - - @Override - public void maskingKey(int value) { - masker.mask(value); - } - - @Override - public void payloadData(ByteBuffer data) { - int p = data.position(); - int l = data.limit(); - masker.transferMasking(data, data); -// select(p, l, data); FIXME - super.payloadData(data); - } - - @Override - public CompletableFuture<Void> expectations(long timeout, TimeUnit unit) { - return ((Mock) getOutput()).expectations(timeout, unit); - } - } - - private static final class MockMessageStreamConsumer implements MessageStreamConsumer, Mock { - - private final InvocationChecker checker; - - MockMessageStreamConsumer(Iterable<InvocationExpectation> expectations) { - checker = new InvocationChecker(expectations); - } - - @Override - public void onText(MessagePart part, CharSequence data) { - checker.checkInvocation("onText", part, data); - } - - @Override - public void onBinary(MessagePart part, ByteBuffer data) { - checker.checkInvocation("onBinary", part, data); - } - - @Override - public void onPing(ByteBuffer data) { - checker.checkInvocation("onPing", data); - } - - @Override - public void onPong(ByteBuffer data) { - checker.checkInvocation("onPong", data); - } - - @Override - public void onClose(OptionalInt statusCode, CharSequence reason) { - checker.checkInvocation("onClose", statusCode, reason); - } - - @Override - public void onError(Exception e) { - checker.checkInvocation("onError", e); - } - - @Override - public void onComplete() { - checker.checkInvocation("onComplete"); - } - - @Override - public CompletableFuture<Void> expectations(long timeout, TimeUnit unit) { - return checker.expectations(timeout, unit); - } - } - - public static final class Builder { - - private final Frame.HeaderWriter b = new Frame.HeaderWriter(); - private final List<InvocationExpectation> expectations = new LinkedList<>(); - private final List<ReadRule> scenario = new LinkedList<>(); - - Builder expectPing(F1<? super ByteBuffer, Boolean> predicate) { - InvocationExpectation e = new InvocationExpectation("onPing", - args -> predicate.apply((ByteBuffer) args[0])); - expectations.add(e); - return this; - } - - Builder expectPong(F1<? super ByteBuffer, Boolean> predicate) { - InvocationExpectation e = new InvocationExpectation("onPong", - args -> predicate.apply((ByteBuffer) args[0])); - expectations.add(e); - return this; - } - - Builder expectClose(F2<? super Integer, ? super String, Boolean> predicate) { - InvocationExpectation e = new InvocationExpectation("onClose", - args -> predicate.apply((Integer) args[0], (String) args[1])); - expectations.add(e); - return this; - } - - Builder provideFrame(boolean fin, boolean rsv1, boolean rsv2, - boolean rsv3, Opcode opcode, ByteBuffer data) { - - ByteBuffer b = ByteBuffer.allocate(MAX_HEADER_SIZE_BYTES + data.remaining()); - this.b.fin(fin).rsv1(rsv1).rsv2(rsv2).rsv3(rsv3).opcode(opcode).noMask() - .payloadLen(data.remaining()).write(b); - - int p = data.position(); - int l = data.limit(); - b.put(data); - b.flip(); -// select(p, l, data); FIXME - - ReadRule r = new ReadRule() { - - private volatile boolean provided; - - @Override - public CompletionStage<?> whenReady() { - return NOW; - } - - @Override - public ByteBuffer read() throws IOException { - provided = true; - return data; - } - - @Override - public boolean applies() { - return !provided; - } - }; - scenario.add(r); - return this; - } - - Builder provideEos() { - ReadRule r = new ReadRule() { - - @Override - public CompletionStage<?> whenReady() { - return NOW; - } - - @Override - public ByteBuffer read() throws IOException { - return null; - } - - @Override - public boolean applies() { - return true; - } - }; - scenario.add(r); - return this; - } - - Builder provideException(Supplier<? extends IOException> s) { - return this; - } - - MockChannel build() { - LinkedList<ReadRule> scenario = new LinkedList<>(this.scenario); - scenario.add(new Terminator()); - return new MockChannel(scenario, new LinkedList<>(expectations)); - } - } - - private interface ReadRule { - - /* - * Returns a CS which when completed means `read(ByteBuffer dst)` can be - * invoked - */ - CompletionStage<?> whenReady(); - - ByteBuffer read() throws IOException; - - /* - * Returns true if this rule still applies, otherwise returns false - */ - boolean applies(); - } - - public static final class Terminator implements ReadRule { - - @Override - public CompletionStage<?> whenReady() { - return NEVER; - } - - @Override - public ByteBuffer read() { - return ByteBuffer.allocate(0); - } - - @Override - public boolean applies() { - return true; - } - } - - private static final CompletionStage<?> NOW = CompletableFuture.completedStage(null); - private static final CompletionStage<?> NEVER = new CompletableFuture(); -} diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockChannelTest.java --- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockChannelTest.java Tue Nov 21 10:24:53 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,102 +0,0 @@ -/* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package jdk.incubator.http.internal.websocket; - -import org.testng.annotations.Test; -import jdk.incubator.http.internal.websocket.Frame.Opcode; - -import java.io.IOException; -import jdk.incubator.http.internal.websocket.TestSupport.AssertionFailedException; - -import java.nio.ByteBuffer; -import java.util.concurrent.TimeUnit; - -import static jdk.incubator.http.internal.websocket.TestSupport.assertThrows; -import static jdk.incubator.http.internal.websocket.TestSupport.checkExpectations; -import static jdk.incubator.http.internal.websocket.Frame.MAX_HEADER_SIZE_BYTES; - -public final class MockChannelTest { - - // TODO: tests for read (stubbing) - - @Test - public void testPass01() { - MockChannel ch = new MockChannel.Builder().build(); - checkExpectations(1, TimeUnit.SECONDS, ch); - } - - @Test - public void testPass02() throws IOException { - int len = 8; - ByteBuffer header = ByteBuffer.allocate(MAX_HEADER_SIZE_BYTES); - ByteBuffer data = ByteBuffer.allocate(len); - new Frame.HeaderWriter() - .fin(true).opcode(Opcode.PONG).payloadLen(len).mask(0x12345678) - .write(header); - header.flip(); - MockChannel ch = new MockChannel.Builder() - .expectPong(bb -> bb.remaining() == len) - .build(); - ch.write(new ByteBuffer[]{header, data}, 0, 2); - checkExpectations(1, TimeUnit.SECONDS, ch); - } - - @Test - public void testPass03() throws IOException { - int len = 8; - ByteBuffer header = ByteBuffer.allocate(MAX_HEADER_SIZE_BYTES); - ByteBuffer data = ByteBuffer.allocate(len - 2); // not all data is written - new Frame.HeaderWriter() - .fin(true).opcode(Opcode.PONG).payloadLen(len).mask(0x12345678) - .write(header); - header.flip(); - MockChannel ch = new MockChannel.Builder().build(); // expected no invocations - ch.write(new ByteBuffer[]{header, data}, 0, 2); - checkExpectations(1, TimeUnit.SECONDS, ch); - } - - @Test - public void testFail01() { - MockChannel ch = new MockChannel.Builder() - .expectClose((code, reason) -> code == 1002 && reason.isEmpty()) - .build(); - assertThrows(AssertionFailedException.class, - () -> checkExpectations(1, TimeUnit.SECONDS, ch)); - } - - @Test - public void testFail02() throws IOException { - ByteBuffer header = ByteBuffer.allocate(MAX_HEADER_SIZE_BYTES); - new Frame.HeaderWriter() - .fin(true).opcode(Opcode.CLOSE).payloadLen(2).mask(0x12345678) - .write(header); - header.flip(); - ByteBuffer data = ByteBuffer.allocate(2).putChar((char) 1004).flip(); - MockChannel ch = new MockChannel.Builder() - .expectClose((code, reason) -> code == 1002 && reason.isEmpty()) - .build(); - ch.write(new ByteBuffer[]{header, data}, 0, 2); - assertThrows(AssertionFailedException.class, - () -> checkExpectations(1, TimeUnit.SECONDS, ch)); - } -} diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListener.java --- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListener.java Tue Nov 21 10:24:53 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,131 +0,0 @@ -/* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package jdk.incubator.http.internal.websocket; - -import jdk.incubator.http.internal.websocket.TestSupport.F1; -import jdk.incubator.http.internal.websocket.TestSupport.F2; -import jdk.incubator.http.internal.websocket.TestSupport.F3; -import jdk.incubator.http.internal.websocket.TestSupport.InvocationChecker; -import jdk.incubator.http.internal.websocket.TestSupport.InvocationExpectation; -import jdk.incubator.http.internal.websocket.TestSupport.Mock; -import jdk.incubator.http.WebSocket; -import jdk.incubator.http.WebSocket.Listener; -import jdk.incubator.http.WebSocket.MessagePart; -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.TimeUnit; - -final class MockListener implements Listener, Mock { - - private final InvocationChecker checker; - - @Override - public CompletableFuture<Void> expectations(long timeout, TimeUnit unit) { - return checker.expectations(timeout, unit); - } - - public static final class Builder { - - private final List<InvocationExpectation> expectations = new LinkedList<>(); - - Builder expectOnOpen(F1<? super WebSocket, Boolean> predicate) { - InvocationExpectation e = new InvocationExpectation("onOpen", - args -> predicate.apply((WebSocket) args[0])); - expectations.add(e); - return this; - } - - Builder expectOnPing(F2<? super WebSocket, ? super ByteBuffer, Boolean> predicate) { - InvocationExpectation e = new InvocationExpectation("onPing", - args -> predicate.apply((WebSocket) args[0], (ByteBuffer) args[1])); - expectations.add(e); - return this; - } - - Builder expectOnClose(F3<? super WebSocket, ? super Integer, ? super String, Boolean> predicate) { - expectations.add(new InvocationExpectation("onClose", - args -> predicate.apply((WebSocket) args[0], (Integer) args[1], (String) args[2]))); - return this; - } - - Builder expectOnError(F2<? super WebSocket, ? super Throwable, Boolean> predicate) { - expectations.add(new InvocationExpectation("onError", - args -> predicate.apply((WebSocket) args[0], (Throwable) args[1]))); - return this; - } - - MockListener build() { - return new MockListener(new LinkedList<>(expectations)); - } - } - - private MockListener(List<InvocationExpectation> expectations) { - this.checker = new InvocationChecker(expectations); - } - - @Override - public void onOpen(WebSocket webSocket) { - checker.checkInvocation("onOpen", webSocket); - } - - @Override - public CompletionStage<?> onText(WebSocket webSocket, CharSequence message, - MessagePart part) { - checker.checkInvocation("onText", webSocket, message, part); - return null; - } - - @Override - public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer message, - MessagePart part) { - checker.checkInvocation("onBinary", webSocket, message, part); - return null; - } - - @Override - public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) { - checker.checkInvocation("onPing", webSocket, message); - return null; - } - - @Override - public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) { - checker.checkInvocation("onPong", webSocket, message); - return null; - } - - @Override - public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, - String reason) { - checker.checkInvocation("onClose", webSocket, statusCode, reason); - return null; - } - - @Override - public void onError(WebSocket webSocket, Throwable error) { - checker.checkInvocation("onError", webSocket, error); - } -} diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListenerTest.java --- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/MockListenerTest.java Tue Nov 21 10:24:53 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,99 +0,0 @@ -/* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package jdk.incubator.http.internal.websocket; - -import org.testng.annotations.Test; - -import jdk.incubator.http.internal.websocket.TestSupport.AssertionFailedException; -import java.util.concurrent.TimeUnit; - -import static jdk.incubator.http.internal.websocket.TestSupport.assertThrows; -import static jdk.incubator.http.internal.websocket.TestSupport.checkExpectations; - -public class MockListenerTest { - - @Test - public void testPass01() { - MockListener l = new MockListener.Builder().build(); - checkExpectations(1, TimeUnit.SECONDS, l); - } - - @Test - public void testPass02() { - MockListener l = new MockListener.Builder() - .expectOnOpen(ws -> ws == null) - .build(); - l.onOpen(null); - checkExpectations(1, TimeUnit.SECONDS, l); - } - - @Test - public void testPass03() { - MockListener l = new MockListener.Builder() - .expectOnOpen(ws -> ws == null) - .expectOnClose((ws, code, reason) -> - ws == null && code == 1002 && "blah".equals(reason)) - .build(); - l.onOpen(null); - l.onClose(null, 1002, "blah"); - checkExpectations(1, TimeUnit.SECONDS, l); - } - - @Test - public void testFail01() { - MockListener l = new MockListener.Builder() - .expectOnOpen(ws -> ws != null) - .build(); - l.onOpen(null); - assertThrows(AssertionFailedException.class, - () -> checkExpectations(1, TimeUnit.SECONDS, l)); - } - - @Test - public void testFail02() { - MockListener l = new MockListener.Builder() - .expectOnOpen(ws -> true) - .build(); - assertThrows(AssertionFailedException.class, - () -> checkExpectations(1, TimeUnit.SECONDS, l)); - } - - @Test - public void testFail03() { - MockListener l = new MockListener.Builder() - .expectOnOpen(ws -> true) - .build(); - l.onOpen(null); - l.onClose(null, 1002, ""); - assertThrows(AssertionFailedException.class, - () -> checkExpectations(1, TimeUnit.SECONDS, l)); - } - - @Test - public void testFail04() { - MockListener l = new MockListener.Builder().build(); - l.onClose(null, 1002, ""); - assertThrows(AssertionFailedException.class, - () -> checkExpectations(1, TimeUnit.SECONDS, l)); - } -} diff -r dbcbcda0e413 -r a88515bdd90a test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/PingTest.java --- a/test/jdk/java/net/httpclient/websocket/jdk.incubator.httpclient/jdk/incubator/http/internal/websocket/PingTest.java Tue Nov 21 10:24:53 2017 +0000 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,195 +0,0 @@ -/* - * Copyright (c) 2016, 2017, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA - * or visit www.oracle.com if you need additional information or have any - * questions. - */ -package jdk.incubator.http.internal.websocket; - -import org.testng.SkipException; -import org.testng.annotations.Test; -import jdk.incubator.http.internal.websocket.Frame.Opcode; - -import java.io.IOException; -import java.net.ProtocolException; -import jdk.incubator.http.WebSocket; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import static jdk.incubator.http.internal.websocket.TestSupport.Expectation.ifExpect; -import static jdk.incubator.http.internal.websocket.TestSupport.assertCompletesExceptionally; -import static jdk.incubator.http.internal.websocket.TestSupport.checkExpectations; -import static org.testng.Assert.assertSame; - -/* - * Examines sendPing/onPing contracts - */ -public final class PingTest { - - /* - * sendPing(message) is invoked. If the `message` argument is illegal, then - * the method must throw an appropriate exception. Otherwise no exception - * must be thrown. - */ -// @Test(dataProvider = "outgoingData", dataProviderClass = DataProviders.class) - public void testSendPingArguments(ByteBuffer message) { - WebSocket ws = newWebSocket(); - ifExpect( - message == null, - NullPointerException.class::isInstance) - .orExpect( - message != null && message.remaining() > 125, - IllegalArgumentException.class::isInstance) - .assertThrows( - () -> ws.sendPing(message) - ); - } - - /* - * sendPing(message) with a legal argument has been invoked, then: - * - * 1. A Ping message with the same payload appears on the wire - * 2. The CF returned from the method completes normally with the same - * WebSocket that sendPing has been called on - */ - @Test(dataProvider = "outgoingData", dataProviderClass = DataProviders.class) - public void testSendPingWysiwyg(ByteBuffer message) throws ExecutionException, InterruptedException { - if (message == null || message.remaining() > 125) { - return; - } - ByteBuffer snapshot = copy(message); - MockChannel channel = new MockChannel.Builder() - .expectPing(snapshot::equals) - .build(); - WebSocket ws = newWebSocket(channel); - CompletableFuture<WebSocket> cf = ws.sendPing(message); - WebSocket ws1 = cf.join(); - assertSame(ws1, ws); // (2) - checkExpectations(channel); // (1) - } - - /* - * If an I/O error occurs while Ping messages is being sent, then: - * - * 1. The CF returned from sendPing completes exceptionally with this I/O - * error as the cause - */ -// @Test - public void testSendPingIOException() { - MockChannel ch = new MockChannel.Builder() -// .provideWriteException(IOException::new) - .build(); - WebSocket ws = newWebSocket(ch); - CompletableFuture<WebSocket> cf = ws.sendPing(ByteBuffer.allocate(16)); - assertCompletesExceptionally(IOException.class, cf); - } - - /* - * If an incorrect Ping frame appears on the wire, then: - * - * 1. onError with the java.net.ProtocolException is invoked - * 1. A Close frame with status code 1002 appears on the wire - */ -// @Test(dataProvider = "incorrectFrame", dataProviderClass = DataProviders.class) - public void testOnPingIncorrect(boolean fin, boolean rsv1, boolean rsv2, - boolean rsv3, ByteBuffer data) { - if (fin && !rsv1 && !rsv2 && !rsv3 && data.remaining() <= 125) { - throw new SkipException("Correct frame"); - } - CompletableFuture<WebSocket> webSocket = new CompletableFuture<>(); - MockChannel channel = new MockChannel.Builder() - .provideFrame(fin, rsv1, rsv2, rsv3, Opcode.PING, data) - .expectClose((code, reason) -> - Integer.valueOf(1002).equals(code) && "".equals(reason)) - .build(); - MockListener listener = new MockListener.Builder() - .expectOnOpen((ws) -> true) - .expectOnError((ws, error) -> error instanceof ProtocolException) - .build(); - webSocket.complete(newWebSocket(channel, listener)); - checkExpectations(500, TimeUnit.MILLISECONDS, channel, listener); - } - - /* - * If a Ping message has been read off the wire, then: - * - * 1. onPing is invoked with the data and the WebSocket the listener has - * been attached to - * 2. A Pong message with the same contents will be sent in reply - */ - @Test(dataProvider = "incomingData", dataProviderClass = DataProviders.class) - public void testOnPingReply(ByteBuffer data) { - CompletableFuture<WebSocket> webSocket = new CompletableFuture<>(); - MockChannel channel = new MockChannel.Builder() - .provideFrame(true, false, false, false, Opcode.PING, data) - .expectPong(data::equals) - .build(); - MockListener listener = new MockListener.Builder() - .expectOnOpen((ws) -> true) // maybe should capture with a CF? - .expectOnPing((ws, bb) -> data.equals(bb)) - .build(); - webSocket.complete(newWebSocket(channel, listener)); - checkExpectations(500, TimeUnit.MILLISECONDS, channel, listener); - } - - /* - * If onPing throws an exception or CS returned from it completes - * exceptionally, then: - * - * 1. onError is invoked with this particular exception as the cause and the - * WebSocket the listener has been attached to - */ - public void testOnPingExceptions() { - } - - /* - * If a Ping message has been read off the wire and an I/O error occurs - * while WebSocket sends a Pong reply to it, then: - * - * 1. onError is invoked with this error as the cause and the WebSocket this - * listener has been attached to - */ - public void testOnPingReplyIOException() { - } - - private WebSocket newWebSocket() { - return newWebSocket(new MockChannel.Builder().build()); - } - - private WebSocket newWebSocket(RawChannel ch) { - return newWebSocket(ch, new WebSocket.Listener() { }); - } - - private WebSocket newWebSocket(RawChannel ch, WebSocket.Listener l) { -// WebSocketImpl ws = new WebSocketImpl("", ch, l, Executors.newCachedThreadPool()); -// ws.(); -// ws.request(Long.MAX_VALUE); - return null; // FIXME - } - - public static ByteBuffer copy(ByteBuffer src) { - int pos = src.position(); - ByteBuffer b = ByteBuffer.allocate(src.remaining()).put(src).flip(); - src.position(pos); - return b; - } -}