8177290: add copy factory methods for unmodifiable List, Set, Map
8184690: add Collectors for collecting into unmodifiable List, Set, and Map
Reviewed-by: alanb, briangoetz, dholmes, jrose, rriggs, scolebourne
/*
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package jdk.incubator.http;
import java.io.IOException;
import java.net.InetSocketAddress;
import jdk.incubator.http.HttpResponse.BodyHandler;
import jdk.incubator.http.HttpResponse.BodyProcessor;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import jdk.incubator.http.internal.common.Log;
import jdk.incubator.http.internal.common.MinimalFuture;
import jdk.incubator.http.internal.common.Utils;
/**
* Encapsulates one HTTP/1.1 request/responseAsync exchange.
*/
class Http1Exchange<T> extends ExchangeImpl<T> {
final HttpRequestImpl request; // main request
private final List<CompletableFuture<?>> operations; // used for cancel
final Http1Request requestAction;
private volatile Http1Response<T> response;
// use to record possible cancellation raised before any operation
// has been initiated.
private IOException failed;
final HttpConnection connection;
final HttpClientImpl client;
final Executor executor;
volatile ByteBuffer buffer; // used for receiving
@Override
public String toString() {
return request.toString();
}
HttpRequestImpl request() {
return request;
}
Http1Exchange(Exchange<T> exchange, HttpConnection connection)
throws IOException
{
super(exchange);
this.request = exchange.request();
this.client = exchange.client();
this.executor = exchange.executor();
this.operations = new LinkedList<>();
this.buffer = Utils.EMPTY_BYTEBUFFER;
if (connection != null) {
this.connection = connection;
} else {
InetSocketAddress addr = request.getAddress(client);
this.connection = HttpConnection.getConnection(addr, client, request);
}
this.requestAction = new Http1Request(request, client, this.connection);
}
HttpConnection connection() {
return connection;
}
@Override
T readBody(BodyHandler<T> handler, boolean returnConnectionToPool)
throws IOException
{
BodyProcessor<T> processor = handler.apply(response.responseCode(),
response.responseHeaders());
CompletableFuture<T> bodyCF = response.readBody(processor,
returnConnectionToPool,
this::executeInline);
try {
return bodyCF.join();
} catch (CompletionException e) {
throw Utils.getIOException(e);
}
}
private void executeInline(Runnable r) {
r.run();
}
synchronized ByteBuffer getBuffer() {
return buffer;
}
@Override
CompletableFuture<T> readBodyAsync(BodyHandler<T> handler,
boolean returnConnectionToPool,
Executor executor)
{
BodyProcessor<T> processor = handler.apply(response.responseCode(),
response.responseHeaders());
CompletableFuture<T> bodyCF = response.readBody(processor,
returnConnectionToPool,
executor);
return bodyCF;
}
@Override
void sendHeadersOnly() throws IOException, InterruptedException {
try {
if (!connection.connected()) {
connection.connect();
}
requestAction.sendHeadersOnly();
} catch (Throwable e) {
connection.close();
throw e;
}
}
@Override
void sendBody() throws IOException {
try {
requestAction.continueRequest();
} catch (Throwable e) {
connection.close();
throw e;
}
}
@Override
Response getResponse() throws IOException {
try {
response = new Http1Response<>(connection, this);
response.readHeaders();
Response r = response.response();
buffer = response.getBuffer();
return r;
} catch (Throwable t) {
connection.close();
throw t;
}
}
private void closeConnection() {
connection.close();
}
/**
* Cancel checks to see if request and responseAsync finished already.
* If not it closes the connection and completes all pending operations
*/
@Override
void cancel() {
cancel(new IOException("Request cancelled"));
}
/**
* Cancel checks to see if request and responseAsync finished already.
* If not it closes the connection and completes all pending operations
*/
@Override
synchronized void cancel(IOException cause) {
if (requestAction != null && requestAction.finished()
&& response != null && response.finished()) {
return;
}
connection.close();
int count = 0;
if (operations.isEmpty()) {
failed = cause;
Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms] no pending operation."
+ "\n\tCan''t cancel yet with {2}",
request.uri(),
request.duration() == null ? -1 :
// calling duration.toMillis() can throw an exception.
// this is just debugging, we don't care if it overflows.
(request.duration().getSeconds() * 1000
+ request.duration().getNano() / 1000000),
cause);
} else {
for (CompletableFuture<?> cf : operations) {
cf.completeExceptionally(cause);
count++;
}
}
Log.logError("Http1Exchange.cancel: count=" + count);
}
CompletableFuture<Response> getResponseAsyncImpl(Executor executor) {
return MinimalFuture.supply( () -> {
response = new Http1Response<>(connection, Http1Exchange.this);
response.readHeaders();
Response r = response.response();
buffer = response.getBuffer();
return r;
}, executor);
}
@Override
CompletableFuture<Response> getResponseAsync(Executor executor) {
CompletableFuture<Response> cf =
connection.whenReceivingResponse()
.thenCompose((v) -> getResponseAsyncImpl(executor));
IOException cause;
synchronized(this) {
operations.add(cf);
cause = failed;
failed = null;
}
if (cause != null) {
Log.logTrace("Http1Exchange: request [{0}/timeout={1}ms]"
+ "\n\tCompleting exceptionally with {2}\n",
request.uri(),
request.duration() == null ? -1 :
// calling duration.toMillis() can throw an exception.
// this is just debugging, we don't care if it overflows.
(request.duration().getSeconds() * 1000
+ request.duration().getNano() / 1000000),
cause);
cf.completeExceptionally(cause);
}
return cf;
}
}