--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLConnection.java Fri Nov 10 16:24:07 2017 +0300
@@ -30,7 +30,6 @@
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
-import jdk.incubator.http.internal.common.ByteBufferReference;
import jdk.incubator.http.internal.common.SSLTube;
import jdk.incubator.http.internal.common.Utils;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AsyncSSLTunnelConnection.java Fri Nov 10 16:24:07 2017 +0300
@@ -30,7 +30,6 @@
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
-import jdk.incubator.http.internal.common.ByteBufferReference;
import jdk.incubator.http.internal.common.SSLTube;
import jdk.incubator.http.internal.common.Utils;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AuthenticationFilter.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/AuthenticationFilter.java Fri Nov 10 16:24:07 2017 +0300
@@ -216,7 +216,6 @@
return null; // error gets returned to app
}
- String realm = parser.findValue("realm");
AuthInfo au = proxy ? exchange.proxyauth : exchange.serverauth;
if (au == null) {
PasswordAuthentication pw = getCredentials(authval, proxy, req);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Exchange.java Fri Nov 10 16:24:07 2017 +0300
@@ -29,7 +29,6 @@
import java.lang.System.Logger.Level;
import java.net.InetSocketAddress;
import java.net.ProxySelector;
-import java.net.SocketPermission;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLPermission;
@@ -70,7 +69,6 @@
final AccessControlContext acc;
final MultiExchange<?,T> multi;
final Executor parentExecutor;
- final HttpRequest.BodyPublisher requestPublisher;
boolean upgrading; // to HTTP/2
final PushGroup<?,T> pushGroup;
final String dbgTag;
@@ -82,7 +80,6 @@
this.multi = multi;
this.acc = multi.acc;
this.parentExecutor = multi.executor;
- this.requestPublisher = request.requestPublisher;
this.pushGroup = multi.pushGroup;
this.dbgTag = "Exchange";
}
@@ -98,7 +95,6 @@
this.client = multi.client();
this.multi = multi;
this.parentExecutor = multi.executor;
- this.requestPublisher = request.requestPublisher;
this.pushGroup = multi.pushGroup;
this.dbgTag = "Exchange";
}
@@ -213,8 +209,6 @@
request.setH2Upgrade(client.client2());
}
- static final SocketPermission[] SOCKET_ARRAY = new SocketPermission[0];
-
synchronized IOException getCancelCause() {
return failed;
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ExchangeImpl.java Fri Nov 10 16:24:07 2017 +0300
@@ -78,7 +78,6 @@
static <U> CompletableFuture<? extends ExchangeImpl<U>>
get(Exchange<U> exchange, HttpConnection connection)
{
- HttpRequestImpl req = exchange.request();
if (exchange.version() == HTTP_1_1) {
DEBUG_LOGGER.log(Level.DEBUG, "get: HTTP/1.1: new Http1Exchange");
return createHttp1Exchange(exchange, connection);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Exchange.java Fri Nov 10 16:24:07 2017 +0300
@@ -170,7 +170,7 @@
InetSocketAddress addr = request.getAddress(client);
this.connection = HttpConnection.getConnection(addr, client, request, HTTP_1_1);
}
- this.requestAction = new Http1Request(request, client, this);
+ this.requestAction = new Http1Request(request, this);
this.asyncReceiver = new Http1AsyncReceiver(executor, this);
asyncReceiver.subscribe(new InitialErrorReceiver());
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Request.java Fri Nov 10 16:24:07 2017 +0300
@@ -47,7 +47,6 @@
* An HTTP/1.1 request.
*/
class Http1Request {
- private final HttpClientImpl client;
private final HttpRequestImpl request;
private final Http1Exchange<?> http1Exchange;
private final HttpConnection connection;
@@ -58,11 +57,9 @@
private volatile long contentLength;
Http1Request(HttpRequestImpl request,
- HttpClientImpl client,
Http1Exchange<?> http1Exchange)
throws IOException
{
- this.client = client;
this.request = request;
this.http1Exchange = http1Exchange;
this.connection = http1Exchange.connection();
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http1Response.java Fri Nov 10 16:24:07 2017 +0300
@@ -52,12 +52,10 @@
private HttpHeaders headers;
private int responseCode;
private final Http1Exchange<T> exchange;
- private final boolean redirecting; // redirecting
private boolean return2Cache; // return connection to cache when finished
private final HeadersReader headersReader; // used to read the headers
private final BodyReader bodyReader; // used to read the body
private final Http1AsyncReceiver asyncReceiver;
- private volatile boolean reading;
private volatile EOFException eof;
// Revisit: can we get rid of this?
@@ -74,7 +72,6 @@
this.request = exchange.request();
this.exchange = exchange;
this.connection = conn;
- this.redirecting = false;
this.asyncReceiver = asyncReceiver;
headersReader = new HeadersReader(this::advance);
bodyReader = new BodyReader(this::advance);
@@ -240,7 +237,6 @@
asyncReceiver.clear();
if (return2Cache) {
Log.logTrace("Attempting to return connection to the pool: {0}", connection);
- reading = false;
// TODO: need to do something here?
// connection.setAsyncCallbacks(null, null, null);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Http2ClientImpl.java Fri Nov 10 16:24:07 2017 +0300
@@ -163,7 +163,7 @@
void stop() {
debug.log(Level.DEBUG, "stopping");
- connections.values().stream().forEach(this::close);
+ connections.values().forEach(this::close);
connections.clear();
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpConnection.java Fri Nov 10 16:24:07 2017 +0300
@@ -338,7 +338,6 @@
}
final class HttpWriteSubscription implements Flow.Subscription {
- volatile boolean cancelled;
final Demand demand = new Demand();
@Override
@@ -355,7 +354,6 @@
public void cancel() {
debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by "
+ getConnectionFlow());
- cancelled = true;
}
void flush() {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/PlainTunnelingConnection.java Fri Nov 10 16:24:07 2017 +0300
@@ -31,7 +31,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CompletableFuture;
-import jdk.incubator.http.internal.common.ByteBufferReference;
import jdk.incubator.http.internal.common.FlowTube;
import jdk.incubator.http.internal.common.MinimalFuture;
import static jdk.incubator.http.HttpResponse.BodyHandler.discard;
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/RequestPublishers.java Fri Nov 10 16:24:07 2017 +0300
@@ -259,7 +259,6 @@
volatile ByteBuffer nextBuffer;
volatile boolean need2Read = true;
volatile boolean haveNext;
- volatile Throwable error;
StreamIterator(InputStream is) {
this(is, Utils::getBuffer);
@@ -291,7 +290,6 @@
nextBuffer.position(0);
return n;
} catch (IOException ex) {
- error = ex;
return -1;
}
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Fri Nov 10 16:24:07 2017 +0300
@@ -66,10 +66,8 @@
static final int LF = 10;
static final int CR = 13;
- static final int SP = 0x20;
- static final int BUF_SIZE = 1024;
- boolean chunkedContent, chunkedContentInitialized;
+ private boolean chunkedContent, chunkedContentInitialized;
boolean contentChunked() throws IOException {
if (chunkedContentInitialized) {
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Fri Nov 10 16:24:07 2017 +0300
@@ -449,7 +449,6 @@
@Override
public void onResponse(HttpResponse<V> response) {
- HttpRequest request = response.request();
CompletableFuture<HttpResponse<V>> cf = results.get(response.request());
cf.complete(response);
}
@@ -475,7 +474,6 @@
*/
static class NullSubscriber<T> implements HttpResponse.BodySubscriber<T> {
- volatile Flow.Subscription subscription;
final CompletableFuture<T> cf = new MinimalFuture<>();
final Optional<T> result;
@@ -485,7 +483,6 @@
@Override
public void onSubscribe(Flow.Subscription subscription) {
- this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Fri Nov 10 12:36:44 2017 +0000
+++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Fri Nov 10 16:24:07 2017 +0300
@@ -108,15 +108,12 @@
protected volatile int streamid;
long responseContentLen = -1;
- long responseBytesProcessed = 0;
long requestContentLen;
final Http2Connection connection;
- HttpClientImpl client;
final HttpRequestImpl request;
final DecodingCallback rspHeadersConsumer;
HttpHeadersImpl responseHeaders;
- final HttpHeadersImpl requestHeaders;
final HttpHeadersImpl requestPseudoHeaders;
volatile HttpResponse.BodySubscriber<T> responseSubscriber;
final HttpRequest.BodyPublisher requestPublisher;
@@ -133,7 +130,7 @@
private volatile boolean endStreamSent;
// state flags
- boolean requestSent, responseReceived, responseHeadersReceived;
+ private boolean requestSent, responseReceived;
/**
* A reference to this Stream's connection Send Window controller. The
@@ -293,13 +290,11 @@
WindowController windowController)
{
super(e);
- this.client = client;
this.connection = connection;
this.windowController = windowController;
this.request = e.request();
this.requestPublisher = request.requestPublisher; // may be null
responseHeaders = new HttpHeadersImpl();
- requestHeaders = new HttpHeadersImpl();
rspHeadersConsumer = (name, value) -> {
responseHeaders.addHeader(name.toString(), value.toString());
if (Log.headers() && Log.trace()) {
@@ -359,10 +354,6 @@
}
protected void handleResponse() throws IOException {
- synchronized(this) {
- responseHeadersReceived = true;
- }
- HttpConnection c = connection.connection; // TODO: improve
responseCode = (int)responseHeaders
.firstValueAsLong(":status")
.orElseThrow(() -> new IOException("no statuscode in response"));
@@ -856,10 +847,6 @@
}
}
- final synchronized boolean isResponseReceived() {
- return responseReceived;
- }
-
synchronized void responseReceived() {
responseReceived = true;
if (requestSent) {
@@ -959,7 +946,6 @@
static class PushedStream<U,T> extends Stream<T> {
final PushGroup<U,T> pushGroup;
- private final Stream<T> parent; // used by server push streams
// push streams need the response CF allocated up front as it is
// given directly to user via the multi handler callback function.
final CompletableFuture<Response> pushCF;
@@ -976,7 +962,6 @@
this.pushReq = pushReq.request();
this.pushCF = new MinimalFuture<>();
this.responseCF = new MinimalFuture<>();
- this.parent = parent;
}
CompletableFuture<HttpResponse<T>> responseCF() {
@@ -1061,7 +1046,6 @@
// create and return the PushResponseImpl
@Override
protected void handleResponse() {
- HttpConnection c = connection.connection; // TODO: improve
responseCode = (int)responseHeaders
.firstValueAsLong(":status")
.orElse(-1);
--- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/common/Queue.java Fri Nov 10 12:36:44 2017 +0000
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,177 +0,0 @@
-/*
- * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
- * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
- *
- * This code is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 only, as
- * published by the Free Software Foundation. Oracle designates this
- * particular file as subject to the "Classpath" exception as provided
- * by Oracle in the LICENSE file that accompanied this code.
- *
- * This code is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
- * version 2 for more details (a copy is included in the LICENSE file that
- * accompanied this code).
- *
- * You should have received a copy of the GNU General Public License version
- * 2 along with this work; if not, write to the Free Software Foundation,
- * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
- *
- * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
- * or visit www.oracle.com if you need additional information or have any
- * questions.
- */
-
-package jdk.incubator.http.internal.common;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.stream.Stream;
-
-// Each stream has one of these for input. Each Http2Connection has one
-// for output. Can be used blocking or asynchronously.
-
-public class Queue<T> implements ExceptionallyCloseable {
-
- private final LinkedList<T> q = new LinkedList<>();
- private volatile boolean closed = false;
- private volatile Throwable exception = null;
- private Runnable callback;
- private boolean callbackDisabled = false;
- private int waiters; // true if someone waiting
-
- public synchronized int size() {
- return q.size();
- }
-
-// public synchronized boolean tryPut(T obj) throws IOException {
-// if (closed) return false;
-// put(obj);
-// return true;
-// }
-
- public synchronized void put(T obj) throws IOException {
- if (closed) {
- throw new IOException("stream closed");
- }
-
- q.add(obj);
-
- if (waiters > 0) {
- notifyAll();
- }
-
- if (callbackDisabled) {
- return;
- }
-
- if (q.size() > 0 && callback != null) {
- // Note: calling callback while holding the lock is
- // dangerous and may lead to deadlocks.
- callback.run();
- }
- }
-
-// public synchronized void disableCallback() {
-// callbackDisabled = true;
-// }
-
-// public synchronized void enableCallback() {
-// callbackDisabled = false;
-// while (q.size() > 0) {
-// callback.run();
-// }
-// }
-
-// /**
-// * callback is invoked any time put is called where
-// * the Queue was empty.
-// */
-// public synchronized void registerPutCallback(Runnable callback) {
-// Objects.requireNonNull(callback);
-// this.callback = callback;
-// if (q.size() > 0) {
-// // Note: calling callback while holding the lock is
-// // dangerous and may lead to deadlocks.
-// callback.run();
-// }
-// }
-
- @Override
- public synchronized void close() {
- closed = true;
- notifyAll();
- }
-
- @Override
- public synchronized void closeExceptionally(Throwable t) {
- if (exception == null) exception = t;
- else if (t != null && t != exception) {
- if (!Stream.of(exception.getSuppressed())
- .filter(x -> x == t)
- .findFirst()
- .isPresent())
- {
- exception.addSuppressed(t);
- }
- }
- close();
- }
-
- public synchronized T take() throws IOException {
- if (closed) {
- throw newIOException("stream closed");
- }
- try {
- while (q.size() == 0) {
- waiters++;
- wait();
- if (closed) {
- throw newIOException("Queue closed");
- }
- waiters--;
- }
- return q.removeFirst();
- } catch (InterruptedException ex) {
- throw new IOException(ex);
- }
- }
-
- public synchronized T poll() throws IOException {
- if (closed) {
- throw newIOException("stream closed");
- }
-
- if (q.isEmpty()) {
- return null;
- }
- T res = q.removeFirst();
- return res;
- }
-
-// public synchronized T[] pollAll(T[] type) throws IOException {
-// T[] ret = q.toArray(type);
-// q.clear();
-// return ret;
-// }
-
-// public synchronized void pushback(T v) {
-// q.addFirst(v);
-// }
-
-// public synchronized void pushbackAll(T[] v) {
-// for (int i=v.length-1; i>=0; i--) {
-// q.addFirst(v[i]);
-// }
-// }
-
- private IOException newIOException(String msg) {
- if (exception == null) {
- return new IOException(msg);
- } else {
- return new IOException(msg, exception);
- }
- }
-
-}
--- a/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Fri Nov 10 12:36:44 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyInputStream.java Fri Nov 10 16:24:07 2017 +0300
@@ -25,7 +25,6 @@
import java.nio.ByteBuffer;
import jdk.incubator.http.internal.common.ByteBufferReference;
-import jdk.incubator.http.internal.common.Queue;
import jdk.incubator.http.internal.common.Utils;
import jdk.incubator.http.internal.frame.DataFrame;
import jdk.incubator.http.internal.frame.Http2Frame;
--- a/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java Fri Nov 10 12:36:44 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/BodyOutputStream.java Fri Nov 10 16:24:07 2017 +0300
@@ -25,7 +25,6 @@
import java.nio.ByteBuffer;
import jdk.incubator.http.internal.common.ByteBufferReference;
-import jdk.incubator.http.internal.common.Queue;
import jdk.incubator.http.internal.frame.DataFrame;
/**
--- a/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri Nov 10 12:36:44 2017 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Http2TestServerConnection.java Fri Nov 10 16:24:07 2017 +0300
@@ -40,7 +40,6 @@
import java.util.function.Consumer;
import jdk.incubator.http.internal.common.ByteBufferReference;
import jdk.incubator.http.internal.common.HttpHeadersImpl;
-import jdk.incubator.http.internal.common.Queue;
import jdk.incubator.http.internal.frame.DataFrame;
import jdk.incubator.http.internal.frame.FramesDecoder;
import jdk.incubator.http.internal.frame.FramesEncoder;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/test/jdk/java/net/httpclient/http2/server/Queue.java Fri Nov 10 16:24:07 2017 +0300
@@ -0,0 +1,177 @@
+/*
+ * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This code is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 only, as
+ * published by the Free Software Foundation. Oracle designates this
+ * particular file as subject to the "Classpath" exception as provided
+ * by Oracle in the LICENSE file that accompanied this code.
+ *
+ * This code is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * version 2 for more details (a copy is included in the LICENSE file that
+ * accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License version
+ * 2 along with this work; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
+ *
+ * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
+ * or visit www.oracle.com if you need additional information or have any
+ * questions.
+ */
+
+import jdk.incubator.http.internal.common.ExceptionallyCloseable;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.stream.Stream;
+
+// Each stream has one of these for input. Each Http2Connection has one
+// for output. Can be used blocking or asynchronously.
+
+public class Queue<T> implements ExceptionallyCloseable {
+
+ private final LinkedList<T> q = new LinkedList<>();
+ private volatile boolean closed = false;
+ private volatile Throwable exception = null;
+ private Runnable callback;
+ private boolean callbackDisabled = false;
+ private int waiters; // true if someone waiting
+
+ public synchronized int size() {
+ return q.size();
+ }
+
+// public synchronized boolean tryPut(T obj) throws IOException {
+// if (closed) return false;
+// put(obj);
+// return true;
+// }
+
+ public synchronized void put(T obj) throws IOException {
+ if (closed) {
+ throw new IOException("stream closed");
+ }
+
+ q.add(obj);
+
+ if (waiters > 0) {
+ notifyAll();
+ }
+
+ if (callbackDisabled) {
+ return;
+ }
+
+ if (q.size() > 0 && callback != null) {
+ // Note: calling callback while holding the lock is
+ // dangerous and may lead to deadlocks.
+ callback.run();
+ }
+ }
+
+// public synchronized void disableCallback() {
+// callbackDisabled = true;
+// }
+
+// public synchronized void enableCallback() {
+// callbackDisabled = false;
+// while (q.size() > 0) {
+// callback.run();
+// }
+// }
+
+// /**
+// * callback is invoked any time put is called where
+// * the Queue was empty.
+// */
+// public synchronized void registerPutCallback(Runnable callback) {
+// Objects.requireNonNull(callback);
+// this.callback = callback;
+// if (q.size() > 0) {
+// // Note: calling callback while holding the lock is
+// // dangerous and may lead to deadlocks.
+// callback.run();
+// }
+// }
+
+ @Override
+ public synchronized void close() {
+ closed = true;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void closeExceptionally(Throwable t) {
+ if (exception == null) exception = t;
+ else if (t != null && t != exception) {
+ if (!Stream.of(exception.getSuppressed())
+ .filter(x -> x == t)
+ .findFirst()
+ .isPresent())
+ {
+ exception.addSuppressed(t);
+ }
+ }
+ close();
+ }
+
+ public synchronized T take() throws IOException {
+ if (closed) {
+ throw newIOException("stream closed");
+ }
+ try {
+ while (q.size() == 0) {
+ waiters++;
+ wait();
+ if (closed) {
+ throw newIOException("Queue closed");
+ }
+ waiters--;
+ }
+ return q.removeFirst();
+ } catch (InterruptedException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ public synchronized T poll() throws IOException {
+ if (closed) {
+ throw newIOException("stream closed");
+ }
+
+ if (q.isEmpty()) {
+ return null;
+ }
+ T res = q.removeFirst();
+ return res;
+ }
+
+// public synchronized T[] pollAll(T[] type) throws IOException {
+// T[] ret = q.toArray(type);
+// q.clear();
+// return ret;
+// }
+
+// public synchronized void pushback(T v) {
+// q.addFirst(v);
+// }
+
+// public synchronized void pushbackAll(T[] v) {
+// for (int i=v.length-1; i>=0; i--) {
+// q.addFirst(v[i]);
+// }
+// }
+
+ private IOException newIOException(String msg) {
+ if (exception == null) {
+ return new IOException(msg);
+ } else {
+ return new IOException(msg, exception);
+ }
+ }
+
+}