http-client-branch: (cleanup) http-client-branch
authorprappo
Fri, 10 Nov 2017 16:24:07 +0300
branchhttp-client-branch
changeset 55799 c71f52f48d97
parent 55798 fa84be3c77e4
child 55800 c4307c12419d
http-client-branch: (cleanup)
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AuthenticationFilter.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java
src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Queue.java
test/jdk/java/net/httpclient/http2/server/BodyInputStream.java
test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java
test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java
test/jdk/java/net/httpclient/http2/server/Queue.java
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java	Fri Nov 10 16:24:07 2017 +0300
@@ -30,7 +30,6 @@
 import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.CompletableFuture;
-import jdk.incubator.http.internal.common.ByteBufferReference;
 import jdk.incubator.http.internal.common.SSLTube;
 import jdk.incubator.http.internal.common.Utils;
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java	Fri Nov 10 16:24:07 2017 +0300
@@ -30,7 +30,6 @@
 import java.net.InetSocketAddress;
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.CompletableFuture;
-import jdk.incubator.http.internal.common.ByteBufferReference;
 import jdk.incubator.http.internal.common.SSLTube;
 import jdk.incubator.http.internal.common.Utils;
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AuthenticationFilter.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AuthenticationFilter.java	Fri Nov 10 16:24:07 2017 +0300
@@ -216,7 +216,6 @@
             return null;   // error gets returned to app
         }
 
-        String realm = parser.findValue("realm");
         AuthInfo au = proxy ? exchange.proxyauth : exchange.serverauth;
         if (au == null) {
             PasswordAuthentication pw = getCredentials(authval, proxy, req);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java	Fri Nov 10 16:24:07 2017 +0300
@@ -29,7 +29,6 @@
 import java.lang.System.Logger.Level;
 import java.net.InetSocketAddress;
 import java.net.ProxySelector;
-import java.net.SocketPermission;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URLPermission;
@@ -70,7 +69,6 @@
     final AccessControlContext acc;
     final MultiExchange<?,T> multi;
     final Executor parentExecutor;
-    final HttpRequest.BodyPublisher requestPublisher;
     boolean upgrading; // to HTTP/2
     final PushGroup<?,T> pushGroup;
     final String dbgTag;
@@ -82,7 +80,6 @@
         this.multi = multi;
         this.acc = multi.acc;
         this.parentExecutor = multi.executor;
-        this.requestPublisher = request.requestPublisher;
         this.pushGroup = multi.pushGroup;
         this.dbgTag = "Exchange";
     }
@@ -98,7 +95,6 @@
         this.client = multi.client();
         this.multi = multi;
         this.parentExecutor = multi.executor;
-        this.requestPublisher = request.requestPublisher;
         this.pushGroup = multi.pushGroup;
         this.dbgTag = "Exchange";
     }
@@ -213,8 +209,6 @@
         request.setH2Upgrade(client.client2());
     }
 
-    static final SocketPermission[] SOCKET_ARRAY = new SocketPermission[0];
-
     synchronized IOException getCancelCause() {
         return failed;
     }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java	Fri Nov 10 16:24:07 2017 +0300
@@ -78,7 +78,6 @@
     static <U> CompletableFuture<? extends ExchangeImpl<U>>
     get(Exchange<U> exchange, HttpConnection connection)
     {
-        HttpRequestImpl req = exchange.request();
         if (exchange.version() == HTTP_1_1) {
             DEBUG_LOGGER.log(Level.DEBUG, "get: HTTP/1.1: new Http1Exchange");
             return createHttp1Exchange(exchange, connection);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java	Fri Nov 10 16:24:07 2017 +0300
@@ -170,7 +170,7 @@
             InetSocketAddress addr = request.getAddress(client);
             this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
         }
-        this.requestAction = new Http1Request(request, client, this);
+        this.requestAction = new Http1Request(request, this);
         this.asyncReceiver = new Http1AsyncReceiver(executor, this);
         asyncReceiver.subscribe(new InitialErrorReceiver());
     }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java	Fri Nov 10 16:24:07 2017 +0300
@@ -47,7 +47,6 @@
  *  An HTTP/1.1 request.
  */
 class Http1Request {
-    private final HttpClientImpl client;
     private final HttpRequestImpl request;
     private final Http1Exchange<?> http1Exchange;
     private final HttpConnection connection;
@@ -58,11 +57,9 @@
     private volatile long contentLength;
 
     Http1Request(HttpRequestImpl request,
-                 HttpClientImpl client,
                  Http1Exchange<?> http1Exchange)
         throws IOException
     {
-        this.client = client;
         this.request = request;
         this.http1Exchange = http1Exchange;
         this.connection = http1Exchange.connection();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java	Fri Nov 10 16:24:07 2017 +0300
@@ -52,12 +52,10 @@
     private HttpHeaders headers;
     private int responseCode;
     private final Http1Exchange<T> exchange;
-    private final boolean redirecting; // redirecting
     private boolean return2Cache; // return connection to cache when finished
     private final HeadersReader headersReader; // used to read the headers
     private final BodyReader bodyReader; // used to read the body
     private final Http1AsyncReceiver asyncReceiver;
-    private volatile boolean reading;
     private volatile EOFException eof;
 
     // Revisit: can we get rid of this?
@@ -74,7 +72,6 @@
         this.request = exchange.request();
         this.exchange = exchange;
         this.connection = conn;
-        this.redirecting = false;
         this.asyncReceiver = asyncReceiver;
         headersReader = new HeadersReader(this::advance);
         bodyReader = new BodyReader(this::advance);
@@ -240,7 +237,6 @@
         asyncReceiver.clear();
         if (return2Cache) {
             Log.logTrace("Attempting to return connection to the pool: {0}", connection);
-            reading = false;
             // TODO: need to do something here?
             // connection.setAsyncCallbacks(null, null, null);
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java	Fri Nov 10 16:24:07 2017 +0300
@@ -163,7 +163,7 @@
 
     void stop() {
         debug.log(Level.DEBUG, "stopping");
-        connections.values().stream().forEach(this::close);
+        connections.values().forEach(this::close);
         connections.clear();
     }
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java	Fri Nov 10 16:24:07 2017 +0300
@@ -338,7 +338,6 @@
         }
 
         final class HttpWriteSubscription implements Flow.Subscription {
-            volatile boolean cancelled;
             final Demand demand = new Demand();
 
             @Override
@@ -355,7 +354,6 @@
             public void cancel() {
                 debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by "
                           + getConnectionFlow());
-                cancelled = true;
             }
 
             void flush() {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java	Fri Nov 10 16:24:07 2017 +0300
@@ -31,7 +31,6 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.concurrent.CompletableFuture;
-import jdk.incubator.http.internal.common.ByteBufferReference;
 import jdk.incubator.http.internal.common.FlowTube;
 import jdk.incubator.http.internal.common.MinimalFuture;
 import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java	Fri Nov 10 16:24:07 2017 +0300
@@ -259,7 +259,6 @@
         volatile ByteBuffer nextBuffer;
         volatile boolean need2Read = true;
         volatile boolean haveNext;
-        volatile Throwable error;
 
         StreamIterator(InputStream is) {
             this(is, Utils::getBuffer);
@@ -291,7 +290,6 @@
                 nextBuffer.position(0);
                 return n;
             } catch (IOException ex) {
-                error = ex;
                 return -1;
             }
         }
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java	Fri Nov 10 16:24:07 2017 +0300
@@ -66,10 +66,8 @@
 
     static final int LF = 10;
     static final int CR = 13;
-    static final int SP = 0x20;
-    static final int BUF_SIZE = 1024;
 
-    boolean chunkedContent, chunkedContentInitialized;
+    private boolean chunkedContent, chunkedContentInitialized;
 
     boolean contentChunked() throws IOException {
         if (chunkedContentInitialized) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java	Fri Nov 10 16:24:07 2017 +0300
@@ -449,7 +449,6 @@
 
         @Override
         public void onResponse(HttpResponse<V> response) {
-            HttpRequest request = response.request();
             CompletableFuture<HttpResponse<V>> cf = results.get(response.request());
             cf.complete(response);
         }
@@ -475,7 +474,6 @@
      */
     static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> {
 
-        volatile Flow.Subscription subscription;
         final CompletableFuture<T> cf = new MinimalFuture<>();
         final Optional<T> result;
 
@@ -485,7 +483,6 @@
 
         @Override
         public void onSubscribe(Flow.Subscription subscription) {
-            this.subscription = subscription;
             subscription.request(Long.MAX_VALUE);
         }
 
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java	Fri Nov 10 16:24:07 2017 +0300
@@ -108,15 +108,12 @@
     protected volatile int streamid;
 
     long responseContentLen = -1;
-    long responseBytesProcessed = 0;
     long requestContentLen;
 
     final Http2Connection connection;
-    HttpClientImpl client;
     final HttpRequestImpl request;
     final DecodingCallback rspHeadersConsumer;
     HttpHeadersImpl responseHeaders;
-    final HttpHeadersImpl requestHeaders;
     final HttpHeadersImpl requestPseudoHeaders;
     volatile HttpResponse.BodySubscriber<T> responseSubscriber;
     final HttpRequest.BodyPublisher requestPublisher;
@@ -133,7 +130,7 @@
     private volatile boolean endStreamSent;
 
     // state flags
-    boolean requestSent, responseReceived, responseHeadersReceived;
+    private boolean requestSent, responseReceived;
 
     /**
      * A reference to this Stream's connection Send Window controller. The
@@ -293,13 +290,11 @@
            WindowController windowController)
     {
         super(e);
-        this.client = client;
         this.connection = connection;
         this.windowController = windowController;
         this.request = e.request();
         this.requestPublisher = request.requestPublisher;  // may be null
         responseHeaders = new HttpHeadersImpl();
-        requestHeaders = new HttpHeadersImpl();
         rspHeadersConsumer = (name, value) -> {
             responseHeaders.addHeader(name.toString(), value.toString());
             if (Log.headers() && Log.trace()) {
@@ -359,10 +354,6 @@
     }
 
     protected void handleResponse() throws IOException {
-        synchronized(this) {
-            responseHeadersReceived = true;
-        }
-        HttpConnection c = connection.connection; // TODO: improve
         responseCode = (int)responseHeaders
                 .firstValueAsLong(":status")
                 .orElseThrow(() -> new IOException("no statuscode in response"));
@@ -856,10 +847,6 @@
         }
     }
 
-    final synchronized boolean isResponseReceived() {
-        return responseReceived;
-    }
-
     synchronized void responseReceived() {
         responseReceived = true;
         if (requestSent) {
@@ -959,7 +946,6 @@
 
     static class PushedStream<U,T> extends Stream<T> {
         final PushGroup<U,T> pushGroup;
-        private final Stream<T> parent;      // used by server push streams
         // push streams need the response CF allocated up front as it is
         // given directly to user via the multi handler callback function.
         final CompletableFuture<Response> pushCF;
@@ -976,7 +962,6 @@
             this.pushReq = pushReq.request();
             this.pushCF = new MinimalFuture<>();
             this.responseCF = new MinimalFuture<>();
-            this.parent = parent;
         }
 
         CompletableFuture<HttpResponse<T>> responseCF() {
@@ -1061,7 +1046,6 @@
         // create and return the PushResponseImpl
         @Override
         protected void handleResponse() {
-            HttpConnection c = connection.connection; // TODO: improve
             responseCode = (int)responseHeaders
                 .firstValueAsLong(":status")
                 .orElse(-1);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Queue.java	Fri Nov 10 12:36:44 2017 +0000
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,177 +0,0 @@
-/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation.  Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.common;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.stream.Stream;
-
-// Each stream has one of these for input. Each Http2Connection has one
-// for output. Can be used blocking or asynchronously.
-
-public class Queue<T> implements ExceptionallyCloseable  {
-
-    private final LinkedList<T> q = new LinkedList<>();
-    private volatile boolean closed = false;
-    private volatile Throwable exception = null;
-    private Runnable callback;
-    private boolean callbackDisabled = false;
-    private int waiters; // true if someone waiting
-
-    public synchronized int size() {
-        return q.size();
-    }
-
-//    public synchronized boolean tryPut(T obj) throws IOException {
-//        if (closed) return false;
-//        put(obj);
-//        return true;
-//    }
-
-    public synchronized void put(T obj) throws IOException {
-        if (closed) {
-            throw new IOException("stream closed");
-        }
-
-        q.add(obj);
-
-        if (waiters > 0) {
-            notifyAll();
-        }
-
-        if (callbackDisabled) {
-            return;
-        }
-
-        if (q.size() > 0 && callback != null) {
-            // Note: calling callback while holding the lock is
-            // dangerous and may lead to deadlocks.
-            callback.run();
-        }
-    }
-
-//    public synchronized void disableCallback() {
-//        callbackDisabled = true;
-//    }
-
-//    public synchronized void enableCallback() {
-//        callbackDisabled = false;
-//        while (q.size() > 0) {
-//            callback.run();
-//        }
-//    }
-
-//    /**
-//     * callback is invoked any time put is called where
-//     * the Queue was empty.
-//     */
-//    public synchronized void registerPutCallback(Runnable callback) {
-//        Objects.requireNonNull(callback);
-//        this.callback = callback;
-//        if (q.size() > 0) {
-//            // Note: calling callback while holding the lock is
-//            // dangerous and may lead to deadlocks.
-//            callback.run();
-//        }
-//    }
-
-    @Override
-    public synchronized void close() {
-        closed = true;
-        notifyAll();
-    }
-
-    @Override
-    public synchronized void closeExceptionally(Throwable t) {
-        if (exception == null) exception = t;
-        else if (t != null && t != exception) {
-            if (!Stream.of(exception.getSuppressed())
-                .filter(x -> x == t)
-                .findFirst()
-                .isPresent())
-            {
-                exception.addSuppressed(t);
-            }
-        }
-        close();
-    }
-
-    public synchronized T take() throws IOException {
-        if (closed) {
-            throw newIOException("stream closed");
-        }
-        try {
-            while (q.size() == 0) {
-                waiters++;
-                wait();
-                if (closed) {
-                    throw newIOException("Queue closed");
-                }
-                waiters--;
-            }
-            return q.removeFirst();
-        } catch (InterruptedException ex) {
-            throw new IOException(ex);
-        }
-    }
-
-    public synchronized T poll() throws IOException {
-        if (closed) {
-            throw newIOException("stream closed");
-        }
-
-        if (q.isEmpty()) {
-            return null;
-        }
-        T res = q.removeFirst();
-        return res;
-    }
-
-//    public synchronized T[] pollAll(T[] type) throws IOException {
-//        T[] ret = q.toArray(type);
-//        q.clear();
-//        return ret;
-//    }
-
-//    public synchronized void pushback(T v) {
-//        q.addFirst(v);
-//    }
-
-//    public synchronized void pushbackAll(T[] v) {
-//        for (int i=v.length-1; i>=0; i--) {
-//            q.addFirst(v[i]);
-//        }
-//    }
-
-    private IOException newIOException(String msg) {
-        if (exception == null) {
-            return new IOException(msg);
-        } else {
-            return new IOException(msg, exception);
-        }
-    }
-
-}
--- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java	Fri Nov 10 16:24:07 2017 +0300
@@ -25,7 +25,6 @@
 import java.nio.ByteBuffer;
 
 import jdk.incubator.http.internal.common.ByteBufferReference;
-import jdk.incubator.http.internal.common.Queue;
 import jdk.incubator.http.internal.common.Utils;
 import jdk.incubator.http.internal.frame.DataFrame;
 import jdk.incubator.http.internal.frame.Http2Frame;
--- a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java	Fri Nov 10 16:24:07 2017 +0300
@@ -25,7 +25,6 @@
 import java.nio.ByteBuffer;
 
 import jdk.incubator.http.internal.common.ByteBufferReference;
-import jdk.incubator.http.internal.common.Queue;
 import jdk.incubator.http.internal.frame.DataFrame;
 
 /**
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Fri Nov 10 12:36:44 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java	Fri Nov 10 16:24:07 2017 +0300
@@ -40,7 +40,6 @@
 import java.util.function.Consumer;
 import jdk.incubator.http.internal.common.ByteBufferReference;
 import jdk.incubator.http.internal.common.HttpHeadersImpl;
-import jdk.incubator.http.internal.common.Queue;
 import jdk.incubator.http.internal.frame.DataFrame;
 import jdk.incubator.http.internal.frame.FramesDecoder;
 import jdk.incubator.http.internal.frame.FramesEncoder;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Queue.java	Fri Nov 10 16:24:07 2017 +0300
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation.  Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import jdk.incubator.http.internal.common.ExceptionallyCloseable;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.stream.Stream;
+
+// Each stream has one of these for input. Each Http2Connection has one
+// for output. Can be used blocking or asynchronously.
+
+public class Queue<T> implements ExceptionallyCloseable {
+
+    private final LinkedList<T> q = new LinkedList<>();
+    private volatile boolean closed = false;
+    private volatile Throwable exception = null;
+    private Runnable callback;
+    private boolean callbackDisabled = false;
+    private int waiters; // true if someone waiting
+
+    public synchronized int size() {
+        return q.size();
+    }
+
+//    public synchronized boolean tryPut(T obj) throws IOException {
+//        if (closed) return false;
+//        put(obj);
+//        return true;
+//    }
+
+    public synchronized void put(T obj) throws IOException {
+        if (closed) {
+            throw new IOException("stream closed");
+        }
+
+        q.add(obj);
+
+        if (waiters > 0) {
+            notifyAll();
+        }
+
+        if (callbackDisabled) {
+            return;
+        }
+
+        if (q.size() > 0 && callback != null) {
+            // Note: calling callback while holding the lock is
+            // dangerous and may lead to deadlocks.
+            callback.run();
+        }
+    }
+
+//    public synchronized void disableCallback() {
+//        callbackDisabled = true;
+//    }
+
+//    public synchronized void enableCallback() {
+//        callbackDisabled = false;
+//        while (q.size() > 0) {
+//            callback.run();
+//        }
+//    }
+
+//    /**
+//     * callback is invoked any time put is called where
+//     * the Queue was empty.
+//     */
+//    public synchronized void registerPutCallback(Runnable callback) {
+//        Objects.requireNonNull(callback);
+//        this.callback = callback;
+//        if (q.size() > 0) {
+//            // Note: calling callback while holding the lock is
+//            // dangerous and may lead to deadlocks.
+//            callback.run();
+//        }
+//    }
+
+    @Override
+    public synchronized void close() {
+        closed = true;
+        notifyAll();
+    }
+
+    @Override
+    public synchronized void closeExceptionally(Throwable t) {
+        if (exception == null) exception = t;
+        else if (t != null && t != exception) {
+            if (!Stream.of(exception.getSuppressed())
+                .filter(x -> x == t)
+                .findFirst()
+                .isPresent())
+            {
+                exception.addSuppressed(t);
+            }
+        }
+        close();
+    }
+
+    public synchronized T take() throws IOException {
+        if (closed) {
+            throw newIOException("stream closed");
+        }
+        try {
+            while (q.size() == 0) {
+                waiters++;
+                wait();
+                if (closed) {
+                    throw newIOException("Queue closed");
+                }
+                waiters--;
+            }
+            return q.removeFirst();
+        } catch (InterruptedException ex) {
+            throw new IOException(ex);
+        }
+    }
+
+    public synchronized T poll() throws IOException {
+        if (closed) {
+            throw newIOException("stream closed");
+        }
+
+        if (q.isEmpty()) {
+            return null;
+        }
+        T res = q.removeFirst();
+        return res;
+    }
+
+//    public synchronized T[] pollAll(T[] type) throws IOException {
+//        T[] ret = q.toArray(type);
+//        q.clear();
+//        return ret;
+//    }
+
+//    public synchronized void pushback(T v) {
+//        q.addFirst(v);
+//    }
+
+//    public synchronized void pushbackAll(T[] v) {
+//        for (int i=v.length-1; i>=0; i--) {
+//            q.addFirst(v[i]);
+//        }
+//    }
+
+    private IOException newIOException(String msg) {
+        if (exception == null) {
+            return new IOException(msg);
+        } else {
+            return new IOException(msg, exception);
+        }
+    }
+
+}