# HG changeset patch # User chegar # Date 1517851106 0 # Node ID 3353cb42b1b4c6e270a22658fdb6edd27d910a0c # Parent 66a9c3185028974cf368e87c905f59678529685b http-client-branch: immutable data Flow diff -r 66a9c3185028 -r 3353cb42b1b4 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java Mon Feb 05 15:51:09 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/BufferingSubscriber.java Mon Feb 05 17:18:26 2018 +0000 @@ -280,7 +280,8 @@ throw new InternalError("onNext on inactive subscriber"); synchronized (buffersLock) { - accumulatedBytes += Utils.accumulateBuffers(internalBuffers, item); + internalBuffers.addAll(item); + accumulatedBytes += remaining(item); } downstreamSubscription.pushDemanded(); diff -r 66a9c3185028 -r 3353cb42b1b4 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Mon Feb 05 15:51:09 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/HttpResponse.java Mon Feb 05 17:18:26 2018 +0000 @@ -863,12 +863,12 @@ * *

The object acts as a {@link Flow.Subscriber}<{@link List}<{@link * ByteBuffer}>> to the HTTP client implementation, which publishes - * unmodifiable lists of ByteBuffers containing the response body. The Flow - * of data, as well as the order of ByteBuffers in the Flow lists, is a - * strictly ordered representation of the response body. Both the Lists and - * the ByteBuffers, once passed to the subscriber, are no longer used by the - * HTTP client. The subscriber converts the incoming buffers of data to some - * user-defined object type {@code T}. + * unmodifiable lists of read-only ByteBuffers containing the response body. + * The Flow of data, as well as the order of ByteBuffers in the Flow lists, + * is a strictly ordered representation of the response body. Both the Lists + * and the ByteBuffers, once passed to the subscriber, are no longer used by + * the HTTP client. The subscriber converts the incoming buffers of data to + * some user-defined object type {@code T}. * *

The {@link #getBody()} method returns a {@link CompletionStage}{@code * } that provides the response body object. The {@code CompletionStage} diff -r 66a9c3185028 -r 3353cb42b1b4 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Mon Feb 05 15:51:09 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseContent.java Mon Feb 05 17:18:26 2018 +0000 @@ -29,6 +29,7 @@ import java.lang.System.Logger.Level; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Consumer; import jdk.incubator.http.internal.common.Utils; @@ -161,7 +162,7 @@ // demand. boolean hasDemand = sub.demand().tryDecrement(); assert hasDemand; - pusher.onNext(out); + pusher.onNext(Collections.unmodifiableList(out)); } debug.log(Level.DEBUG, () -> "done!"); assert closedExceptionally == null; @@ -183,7 +184,7 @@ // demand. boolean hasDemand = sub.demand().tryDecrement(); assert hasDemand; - pusher.onNext(out); + pusher.onNext(Collections.unmodifiableList(out)); } assert state == ChunkState.DONE || !b.hasRemaining(); } catch(Throwable t) { @@ -307,7 +308,7 @@ int bytes2return = Math.min(bytesread, unfulfilled); debug.log(Level.DEBUG, "Returning chunk bytes: %d", bytes2return); - returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return); + returnBuffer = Utils.sliceWithLimitedCapacity(chunk, bytes2return).asReadOnlyBuffer(); unfulfilled = bytesremaining -= bytes2return; if (unfulfilled == 0) bytesToConsume = 2; } @@ -440,7 +441,7 @@ int amount = Math.min(b.remaining(), unfulfilled); unfulfilled = remaining -= amount; ByteBuffer buffer = Utils.sliceWithLimitedCapacity(b, amount); - pusher.onNext(List.of(buffer)); + pusher.onNext(List.of(buffer.asReadOnlyBuffer())); } if (unfulfilled == 0) { // We're done! All data has been received. diff -r 66a9c3185028 -r 3353cb42b1b4 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Mon Feb 05 15:51:09 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/ResponseSubscribers.java Mon Feb 05 17:18:26 2018 +0000 @@ -207,7 +207,7 @@ // and won't be used anywhere except this place. // So it's free simply to store them for further processing. assert Utils.hasRemaining(items); - Utils.accumulateBuffers(received, items); + received.addAll(items); } @Override diff -r 66a9c3185028 -r 3353cb42b1b4 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Mon Feb 05 15:51:09 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/Stream.java Mon Feb 05 17:18:26 2018 +0000 @@ -150,44 +150,49 @@ // can't process anything yet return; - while (!inputQ.isEmpty()) { - Http2Frame frame = inputQ.peek(); - if (frame instanceof ResetFrame) { - inputQ.remove(); - handleReset((ResetFrame)frame); - return; - } - DataFrame df = (DataFrame)frame; - boolean finished = df.getFlag(DataFrame.END_STREAM); + try { + while (!inputQ.isEmpty()) { + Http2Frame frame = inputQ.peek(); + if (frame instanceof ResetFrame) { + inputQ.remove(); + handleReset((ResetFrame)frame); + return; + } + DataFrame df = (DataFrame)frame; + boolean finished = df.getFlag(DataFrame.END_STREAM); - List buffers = df.getData(); - List dsts = Collections.unmodifiableList(buffers); - int size = Utils.remaining(dsts, Integer.MAX_VALUE); - if (size == 0 && finished) { - inputQ.remove(); - Log.logTrace("responseSubscriber.onComplete"); - debug.log(Level.DEBUG, "incoming: onComplete"); - sched.stop(); - responseSubscriber.onComplete(); - setEndStreamReceived(); - return; - } else if (userSubscription.tryDecrement()) { - inputQ.remove(); - Log.logTrace("responseSubscriber.onNext {0}", size); - debug.log(Level.DEBUG, "incoming: onNext(%d)", size); - responseSubscriber.onNext(dsts); - if (consumed(df)) { + List buffers = df.getData(); + List dsts = Collections.unmodifiableList(buffers); + int size = Utils.remaining(dsts, Integer.MAX_VALUE); + if (size == 0 && finished) { + inputQ.remove(); Log.logTrace("responseSubscriber.onComplete"); debug.log(Level.DEBUG, "incoming: onComplete"); sched.stop(); responseSubscriber.onComplete(); setEndStreamReceived(); return; + } else if (userSubscription.tryDecrement()) { + inputQ.remove(); + Log.logTrace("responseSubscriber.onNext {0}", size); + debug.log(Level.DEBUG, "incoming: onNext(%d)", size); + responseSubscriber.onNext(dsts); + if (consumed(df)) { + Log.logTrace("responseSubscriber.onComplete"); + debug.log(Level.DEBUG, "incoming: onComplete"); + sched.stop(); + responseSubscriber.onComplete(); + setEndStreamReceived(); + return; + } + } else { + return; } - } else { - return; } + } catch (Throwable throwable) { + failed = throwable; } + Throwable t = failed; if (t != null) { sched.stop(); diff -r 66a9c3185028 -r 3353cb42b1b4 src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java --- a/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java Mon Feb 05 15:51:09 2018 +0000 +++ b/src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/frame/FramesDecoder.java Mon Feb 05 17:18:26 2018 +0000 @@ -281,7 +281,8 @@ int extract = Math.min(remaining, bytecount); ByteBuffer extractedBuf; if (isDataFrame) { - extractedBuf = Utils.sliceWithLimitedCapacity(currentBuffer, extract); + extractedBuf = Utils.sliceWithLimitedCapacity(currentBuffer, extract) + .asReadOnlyBuffer(); slicedToDataFrame = true; } else { // Header frames here diff -r 66a9c3185028 -r 3353cb42b1b4 test/jdk/java/net/httpclient/ConcurrentResponses.java --- a/test/jdk/java/net/httpclient/ConcurrentResponses.java Mon Feb 05 15:51:09 2018 +0000 +++ b/test/jdk/java/net/httpclient/ConcurrentResponses.java Mon Feb 05 17:18:26 2018 +0000 @@ -217,6 +217,9 @@ // Muck any data beyond the give limit, since there shouldn't // be any of interest to the HTTP Client. for (ByteBuffer buffer : buffers) { + if (buffer.isReadOnly()) + continue; + if (buffer.limit() != buffer.capacity()) { final int limit = buffer.limit(); final int position = buffer.position(); diff -r 66a9c3185028 -r 3353cb42b1b4 test/jdk/java/net/httpclient/ImmutableFlowItems.java --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/jdk/java/net/httpclient/ImmutableFlowItems.java Mon Feb 05 17:18:26 2018 +0000 @@ -0,0 +1,281 @@ +/* + * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + * questions. + */ + +/* + * @test + * @summary Tests response body subscribers's onNext's Lists are unmodifiable, + * and that the buffers are read-only + * @library /lib/testlibrary http2/server + * @build jdk.testlibrary.SimpleSSLContext + * @modules java.base/sun.net.www.http + * jdk.incubator.httpclient/jdk.incubator.http.internal.common + * jdk.incubator.httpclient/jdk.incubator.http.internal.frame + * jdk.incubator.httpclient/jdk.incubator.http.internal.hpack + * @run testng/othervm ImmutableFlowItems + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Flow; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import com.sun.net.httpserver.HttpsConfigurator; +import com.sun.net.httpserver.HttpsServer; +import jdk.incubator.http.HttpClient; +import jdk.incubator.http.HttpHeaders; +import jdk.incubator.http.HttpRequest; +import jdk.incubator.http.HttpResponse; +import jdk.incubator.http.HttpResponse.BodyHandler; +import jdk.incubator.http.HttpResponse.BodySubscriber; +import javax.net.ssl.SSLContext; +import jdk.testlibrary.SimpleSSLContext; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import static java.lang.System.out; +import static java.nio.charset.StandardCharsets.UTF_8; +import static jdk.incubator.http.HttpResponse.BodySubscriber.asString; +import static org.testng.Assert.*; + +public class ImmutableFlowItems { + + SSLContext sslContext; + HttpServer httpTestServer; // HTTP/1.1 [ 4 servers ] + HttpsServer httpsTestServer; // HTTPS/1.1 + Http2TestServer http2TestServer; // HTTP/2 ( h2c ) + Http2TestServer https2TestServer; // HTTP/2 ( h2 ) + String httpURI_fixed; + String httpURI_chunk; + String httpsURI_fixed; + String httpsURI_chunk; + String http2URI_fixed; + String http2URI_chunk; + String https2URI_fixed; + String https2URI_chunk; + + @DataProvider(name = "variants") + public Object[][] variants() { + return new Object[][]{ + { httpURI_fixed }, + { httpURI_chunk }, + { httpsURI_fixed }, + { httpsURI_chunk }, + { http2URI_fixed }, + { http2URI_chunk }, + { https2URI_fixed }, + { https2URI_chunk }, + }; + } + + static final String BODY = "You'll never plough a field by turning it over in your mind."; + + HttpClient newHttpClient() { + return HttpClient.newBuilder() + .sslContext(sslContext) + .build(); + } + + @Test(dataProvider = "variants") + public void testAsString(String uri) throws Exception { + HttpClient client = newHttpClient(); + + HttpRequest req = HttpRequest.newBuilder(URI.create(uri)) + .build(); + + BodyHandler handler = new CRSBodyHandler(); + client.sendAsync(req, handler) + .thenApply(HttpResponse::body) + .thenAccept(body -> assertEquals(body, BODY)) + .join(); + } + + static class CRSBodyHandler implements BodyHandler { + @Override + public BodySubscriber apply(int statusCode, HttpHeaders responseHeaders) { + assertEquals(statusCode, 200); + return new CRSBodySubscriber(); + } + } + + static class CRSBodySubscriber implements BodySubscriber { + private final BodySubscriber asString = asString(UTF_8); + + @Override + public void onSubscribe(Flow.Subscription subscription) { + asString.onSubscribe(subscription); + } + + @Override + public void onNext(List item) { + assertUnmodifiableList(item); + long c = item.stream().filter(ByteBuffer::isReadOnly).count(); + assertEquals(c, item.size(), "Unexpected writable buffer in: " +item); + asString.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + asString.onError(throwable); + } + + @Override + public void onComplete() { + asString.onComplete(); + } + + @Override + public CompletionStage getBody() { + return asString.getBody(); + } + } + + static final Class UOE = UnsupportedOperationException.class; + + static void assertUnmodifiableList(List list) { + assertNotNull(list); + ByteBuffer b = list.get(0); + assertNotNull(b); + assertThrows(UOE, () -> list.add(ByteBuffer.wrap(new byte[0]))); + assertThrows(UOE, () -> list.remove(b)); + } + + + @BeforeTest + public void setup() throws Exception { + sslContext = new SimpleSSLContext().get(); + if (sslContext == null) + throw new AssertionError("Unexpected null sslContext"); + + // HTTP/1.1 + HttpHandler h1_fixedLengthHandler = new HTTP1_FixedLengthHandler(); + HttpHandler h1_chunkHandler = new HTTP1_ChunkedHandler(); + InetSocketAddress sa = new InetSocketAddress("localhost", 0); + httpTestServer = HttpServer.create(sa, 0); + httpTestServer.createContext("/http1/fixed", h1_fixedLengthHandler); + httpTestServer.createContext("/http1/chunk", h1_chunkHandler); + httpURI_fixed = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/fixed"; + httpURI_chunk = "http://127.0.0.1:" + httpTestServer.getAddress().getPort() + "/http1/chunk"; + + httpsTestServer = HttpsServer.create(sa, 0); + httpsTestServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); + httpsTestServer.createContext("/https1/fixed", h1_fixedLengthHandler); + httpsTestServer.createContext("/https1/chunk", h1_chunkHandler); + httpsURI_fixed = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/fixed"; + httpsURI_chunk = "https://127.0.0.1:" + httpsTestServer.getAddress().getPort() + "/https1/chunk"; + + // HTTP/2 + Http2Handler h2_fixedLengthHandler = new HTTP2_FixedLengthHandler(); + Http2Handler h2_chunkedHandler = new HTTP2_VariableHandler(); + + http2TestServer = new Http2TestServer("127.0.0.1", false, 0); + http2TestServer.addHandler(h2_fixedLengthHandler, "/http2/fixed"); + http2TestServer.addHandler(h2_chunkedHandler, "/http2/chunk"); + int port = http2TestServer.getAddress().getPort(); + http2URI_fixed = "http://127.0.0.1:" + port + "/http2/fixed"; + http2URI_chunk = "http://127.0.0.1:" + port + "/http2/chunk"; + + https2TestServer = new Http2TestServer("127.0.0.1", true, 0); + https2TestServer.addHandler(h2_fixedLengthHandler, "/https2/fixed"); + https2TestServer.addHandler(h2_chunkedHandler, "/https2/chunk"); + port = https2TestServer.getAddress().getPort(); + https2URI_fixed = "https://127.0.0.1:" + port + "/https2/fixed"; + https2URI_chunk = "https://127.0.0.1:" + port + "/https2/chunk"; + + httpTestServer.start(); + httpsTestServer.start(); + http2TestServer.start(); + https2TestServer.start(); + } + + @AfterTest + public void teardown() throws Exception { + httpTestServer.stop(0); + httpsTestServer.stop(0); + http2TestServer.stop(); + https2TestServer.stop(); + } + + static class HTTP1_FixedLengthHandler implements HttpHandler { + @Override + public void handle(HttpExchange t) throws IOException { + out.println("HTTP1_FixedLengthHandler received request to " + t.getRequestURI()); + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + is.readAllBytes(); + byte[] bytes = BODY.getBytes(UTF_8); + t.sendResponseHeaders(200, bytes.length); + os.write(bytes); + } + } + } + + static class HTTP1_ChunkedHandler implements HttpHandler { + @Override + public void handle(HttpExchange t) throws IOException { + out.println("HTTP1_ChunkedHandler received request to " + t.getRequestURI()); + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + is.readAllBytes(); + byte[] bytes = BODY.getBytes(UTF_8); + t.sendResponseHeaders(200, 0); + os.write(bytes); + } + } + } + + static class HTTP2_FixedLengthHandler implements Http2Handler { + @Override + public void handle(Http2TestExchange t) throws IOException { + out.println("HTTP2_FixedLengthHandler received request to " + t.getRequestURI()); + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + is.readAllBytes(); + byte[] bytes = BODY.getBytes(UTF_8); + t.sendResponseHeaders(200, bytes.length); + os.write(bytes); + } + } + } + + static class HTTP2_VariableHandler implements Http2Handler { + @Override + public void handle(Http2TestExchange t) throws IOException { + out.println("HTTP2_VariableHandler received request to " + t.getRequestURI()); + try (InputStream is = t.getRequestBody(); + OutputStream os = t.getResponseBody()) { + is.readAllBytes(); + byte[] bytes = BODY.getBytes(UTF_8); + t.sendResponseHeaders(200, 0); // variable + os.write(bytes); + } + } + } +}