src/jdk.incubator.httpclient/share/classes/jdk/incubator/http/internal/LineSubscriberAdapter.java
branchhttp-client-branch
changeset 56078 6c11b48a0695
parent 56009 cf8792f51dee
equal deleted inserted replaced
56077:3f6b75adcdc0 56078:6c11b48a0695
       
     1 /*
       
     2  * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved.
       
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
       
     4  *
       
     5  * This code is free software; you can redistribute it and/or modify it
       
     6  * under the terms of the GNU General Public License version 2 only, as
       
     7  * published by the Free Software Foundation.  Oracle designates this
       
     8  * particular file as subject to the "Classpath" exception as provided
       
     9  * by Oracle in the LICENSE file that accompanied this code.
       
    10  *
       
    11  * This code is distributed in the hope that it will be useful, but WITHOUT
       
    12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
       
    13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
       
    14  * version 2 for more details (a copy is included in the LICENSE file that
       
    15  * accompanied this code).
       
    16  *
       
    17  * You should have received a copy of the GNU General Public License version
       
    18  * 2 along with this work; if not, write to the Free Software Foundation,
       
    19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
       
    20  *
       
    21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
       
    22  * or visit www.oracle.com if you need additional information or have any
       
    23  * questions.
       
    24  */
       
    25 
       
    26 package jdk.incubator.http.internal;
       
    27 
       
    28 import java.nio.ByteBuffer;
       
    29 import java.nio.CharBuffer;
       
    30 import java.nio.charset.CharacterCodingException;
       
    31 import java.nio.charset.Charset;
       
    32 import java.nio.charset.CharsetDecoder;
       
    33 import java.nio.charset.CoderResult;
       
    34 import java.nio.charset.CodingErrorAction;
       
    35 import java.util.List;
       
    36 import java.util.Objects;
       
    37 import java.util.concurrent.CompletableFuture;
       
    38 import java.util.concurrent.CompletionStage;
       
    39 import java.util.concurrent.ConcurrentLinkedDeque;
       
    40 import java.util.concurrent.Flow;
       
    41 import java.util.concurrent.Flow.Subscriber;
       
    42 import java.util.concurrent.Flow.Subscription;
       
    43 import java.util.concurrent.atomic.AtomicLong;
       
    44 import java.util.concurrent.atomic.AtomicReference;
       
    45 import java.util.function.Function;
       
    46 import jdk.incubator.http.internal.common.Demand;
       
    47 import jdk.incubator.http.HttpResponse.BodySubscriber;
       
    48 import jdk.incubator.http.internal.common.MinimalFuture;
       
    49 import jdk.incubator.http.internal.common.SequentialScheduler;
       
    50 
       
    51 /** An adapter between {@code BodySubscriber} and {@code Flow.Subscriber<String>}. */
       
    52 public final class LineSubscriberAdapter<S extends Subscriber<? super String>,R>
       
    53         implements BodySubscriber<R> {
       
    54     private final CompletableFuture<R> cf = new MinimalFuture<>();
       
    55     private final S subscriber;
       
    56     private final Function<S, R> finisher;
       
    57     private final Charset charset;
       
    58     private final String eol;
       
    59     private volatile LineSubscription downstream;
       
    60 
       
    61     private LineSubscriberAdapter(S subscriber,
       
    62                                   Function<S, R> finisher,
       
    63                                   Charset charset,
       
    64                                   String eol) {
       
    65         if (eol != null && eol.isEmpty())
       
    66             throw new IllegalArgumentException("empty line separator");
       
    67         this.subscriber = Objects.requireNonNull(subscriber);
       
    68         this.finisher = Objects.requireNonNull(finisher);
       
    69         this.charset = Objects.requireNonNull(charset);
       
    70         this.eol = eol;
       
    71     }
       
    72 
       
    73     @Override
       
    74     public void onSubscribe(Subscription subscription) {
       
    75         downstream = LineSubscription.create(subscription,
       
    76                                              charset,
       
    77                                              eol,
       
    78                                              subscriber,
       
    79                                              cf);
       
    80         subscriber.onSubscribe(downstream);
       
    81     }
       
    82 
       
    83     @Override
       
    84     public void onNext(List<ByteBuffer> item) {
       
    85         try {
       
    86             downstream.submit(item);
       
    87         } catch (Throwable t) {
       
    88             onError(t);
       
    89         }
       
    90     }
       
    91 
       
    92     @Override
       
    93     public void onError(Throwable throwable) {
       
    94         try {
       
    95             downstream.signalError(throwable);
       
    96         } finally {
       
    97             cf.completeExceptionally(throwable);
       
    98         }
       
    99     }
       
   100 
       
   101     @Override
       
   102     public void onComplete() {
       
   103         try {
       
   104             downstream.signalComplete();
       
   105         } finally {
       
   106             cf.complete(finisher.apply(subscriber));
       
   107         }
       
   108     }
       
   109 
       
   110     @Override
       
   111     public CompletionStage<R> getBody() {
       
   112         return cf;
       
   113     }
       
   114 
       
   115     public static <S extends Subscriber<? super String>, R> LineSubscriberAdapter<S, R>
       
   116     create(S subscriber, Function<S, R> finisher, Charset charset, String eol)
       
   117     {
       
   118         if (eol != null && eol.isEmpty())
       
   119             throw new IllegalArgumentException("empty line separator");
       
   120         return new LineSubscriberAdapter<>(Objects.requireNonNull(subscriber),
       
   121                 Objects.requireNonNull(finisher),
       
   122                 Objects.requireNonNull(charset),
       
   123                 eol);
       
   124     }
       
   125 
       
   126     static final class LineSubscription implements Flow.Subscription {
       
   127         final Flow.Subscription upstreamSubscription;
       
   128         final CharsetDecoder decoder;
       
   129         final String newline;
       
   130         final Demand downstreamDemand;
       
   131         final ConcurrentLinkedDeque<ByteBuffer> queue;
       
   132         final SequentialScheduler scheduler;
       
   133         final Flow.Subscriber<? super String> upstream;
       
   134         final CompletableFuture<?> cf;
       
   135         private final AtomicReference<Throwable> errorRef = new AtomicReference<>();
       
   136         private final AtomicLong demanded = new AtomicLong();
       
   137         private volatile boolean completed;
       
   138         private volatile boolean cancelled;
       
   139 
       
   140         private final char[] chars = new char[1024];
       
   141         private final ByteBuffer leftover = ByteBuffer.wrap(new byte[64]);
       
   142         private final CharBuffer buffer = CharBuffer.wrap(chars);
       
   143         private final StringBuilder builder = new StringBuilder();
       
   144         private int lineCount;
       
   145         private String nextLine;
       
   146 
       
   147         private LineSubscription(Flow.Subscription s,
       
   148                                  CharsetDecoder dec,
       
   149                                  String separator,
       
   150                                  Flow.Subscriber<? super String> subscriber,
       
   151                                  CompletableFuture<?> completion) {
       
   152             downstreamDemand = new Demand();
       
   153             queue = new ConcurrentLinkedDeque<>();
       
   154             upstreamSubscription = Objects.requireNonNull(s);
       
   155             decoder = Objects.requireNonNull(dec);
       
   156             newline = separator;
       
   157             upstream = Objects.requireNonNull(subscriber);
       
   158             cf = Objects.requireNonNull(completion);
       
   159             scheduler = SequentialScheduler.synchronizedScheduler(this::loop);
       
   160         }
       
   161 
       
   162         @Override
       
   163         public void request(long n) {
       
   164             if (cancelled) return;
       
   165             if (downstreamDemand.increase(n)) {
       
   166                 scheduler.runOrSchedule();
       
   167             }
       
   168         }
       
   169 
       
   170         @Override
       
   171         public void cancel() {
       
   172             cancelled = true;
       
   173             upstreamSubscription.cancel();
       
   174         }
       
   175 
       
   176         public void submit(List<ByteBuffer> list) {
       
   177             queue.addAll(list);
       
   178             demanded.decrementAndGet();
       
   179             scheduler.runOrSchedule();
       
   180         }
       
   181 
       
   182         public void signalComplete() {
       
   183             completed = true;
       
   184             scheduler.runOrSchedule();
       
   185         }
       
   186 
       
   187         public void signalError(Throwable error) {
       
   188             if (errorRef.compareAndSet(null,
       
   189                     Objects.requireNonNull(error))) {
       
   190                 scheduler.runOrSchedule();
       
   191             }
       
   192         }
       
   193 
       
   194         // This method looks at whether some bytes where left over (in leftover)
       
   195         // from decoding the previous buffer when the previous buffer was in
       
   196         // underflow. If so, it takes bytes one by one from the new buffer 'in'
       
   197         // and combines them with the leftover bytes until 'in' is exhausted or a
       
   198         // character was produced in 'out', resolving the previous underflow.
       
   199         // Returns true if the buffer is still in underflow, false otherwise.
       
   200         // However, in both situation some chars might have been produced in 'out'.
       
   201         private boolean isUnderFlow(ByteBuffer in, CharBuffer out, boolean endOfInput)
       
   202                 throws CharacterCodingException {
       
   203             int limit = leftover.position();
       
   204             if (limit == 0) {
       
   205                 // no leftover
       
   206                 return false;
       
   207             } else {
       
   208                 CoderResult res = null;
       
   209                 while (in.hasRemaining()) {
       
   210                     leftover.position(limit);
       
   211                     leftover.limit(++limit);
       
   212                     leftover.put(in.get());
       
   213                     leftover.position(0);
       
   214                     res = decoder.decode(leftover, out,
       
   215                             endOfInput && !in.hasRemaining());
       
   216                     int remaining = leftover.remaining();
       
   217                     if (remaining > 0) {
       
   218                         assert leftover.position() == 0;
       
   219                         leftover.position(remaining);
       
   220                     } else {
       
   221                         leftover.position(0);
       
   222                     }
       
   223                     leftover.limit(leftover.capacity());
       
   224                     if (res.isUnderflow() && remaining > 0 && in.hasRemaining()) {
       
   225                         continue;
       
   226                     }
       
   227                     if (res.isError()) {
       
   228                         res.throwException();
       
   229                     }
       
   230                     assert !res.isOverflow();
       
   231                     return false;
       
   232                 }
       
   233                 return !endOfInput;
       
   234             }
       
   235         }
       
   236 
       
   237         // extract characters from start to end and remove them from
       
   238         // the StringBuilder
       
   239         private static String take(StringBuilder b, int start, int end) {
       
   240             assert start == 0;
       
   241             String line;
       
   242             if (end == start) return "";
       
   243             line = b.substring(start, end);
       
   244             b.delete(start, end);
       
   245             return line;
       
   246         }
       
   247 
       
   248         // finds end of line, returns -1 if not found, or the position after
       
   249         // the line delimiter if found, removing the delimiter in the process.
       
   250         private static int endOfLine(StringBuilder b, String eol, boolean endOfInput) {
       
   251             int len = b.length();
       
   252             if (eol != null) { // delimiter explicitly specified
       
   253                 int i = b.indexOf(eol);
       
   254                 if (i >= 0) {
       
   255                     // remove the delimiter and returns the position
       
   256                     // of the char after it.
       
   257                     b.delete(i, i + eol.length());
       
   258                     return i;
       
   259                 }
       
   260             } else { // no delimiter specified, behaves as BufferedReader::readLine
       
   261                 boolean crfound = false;
       
   262                 for (int i = 0; i < len; i++) {
       
   263                     char c = b.charAt(i);
       
   264                     if (c == '\n') {
       
   265                         // '\n' or '\r\n' found.
       
   266                         // remove the delimiter and returns the position
       
   267                         // of the char after it.
       
   268                         b.delete(crfound ? i - 1 : i, i + 1);
       
   269                         return crfound ? i - 1 : i;
       
   270                     } else if (crfound) {
       
   271                         // previous char was '\r', c != '\n'
       
   272                         assert i != 0;
       
   273                         // remove the delimiter and returns the position
       
   274                         // of the char after it.
       
   275                         b.delete(i - 1, i);
       
   276                         return i - 1;
       
   277                     }
       
   278                     crfound = c == '\r';
       
   279                 }
       
   280                 if (crfound && endOfInput) {
       
   281                     // remove the delimiter and returns the position
       
   282                     // of the char after it.
       
   283                     b.delete(len - 1, len);
       
   284                     return len - 1;
       
   285                 }
       
   286             }
       
   287             return endOfInput && len > 0 ? len : -1;
       
   288         }
       
   289 
       
   290         // Looks at whether the StringBuilder contains a line.
       
   291         // Returns null if more character are needed.
       
   292         private static String nextLine(StringBuilder b, String eol, boolean endOfInput) {
       
   293             int next = endOfLine(b, eol, endOfInput);
       
   294             return (next > -1) ? take(b, 0, next) : null;
       
   295         }
       
   296 
       
   297         // Attempts to read the next line. Returns the next line if
       
   298         // the delimiter was found, null otherwise. The delimiters are
       
   299         // consumed.
       
   300         private String nextLine()
       
   301                 throws CharacterCodingException {
       
   302             assert nextLine == null;
       
   303             LINES:
       
   304             while (nextLine == null) {
       
   305                 boolean endOfInput = completed && queue.isEmpty();
       
   306                 nextLine = nextLine(builder, newline,
       
   307                         endOfInput && leftover.position() == 0);
       
   308                 if (nextLine != null) return nextLine;
       
   309                 ByteBuffer b;
       
   310                 BUFFERS:
       
   311                 while ((b = queue.peek()) != null) {
       
   312                     if (!b.hasRemaining()) {
       
   313                         queue.poll();
       
   314                         continue BUFFERS;
       
   315                     }
       
   316                     BYTES:
       
   317                     while (b.hasRemaining()) {
       
   318                         buffer.position(0);
       
   319                         buffer.limit(buffer.capacity());
       
   320                         boolean endofInput = completed && queue.size() <= 1;
       
   321                         if (isUnderFlow(b, buffer, endofInput)) {
       
   322                             assert !b.hasRemaining();
       
   323                             if (buffer.position() > 0) {
       
   324                                 buffer.flip();
       
   325                                 builder.append(buffer);
       
   326                             }
       
   327                             continue BUFFERS;
       
   328                         }
       
   329                         CoderResult res = decoder.decode(b, buffer, endofInput);
       
   330                         if (res.isError()) res.throwException();
       
   331                         if (buffer.position() > 0) {
       
   332                             buffer.flip();
       
   333                             builder.append(buffer);
       
   334                             continue LINES;
       
   335                         }
       
   336                         if (res.isUnderflow() && b.hasRemaining()) {
       
   337                             //System.out.println("underflow: adding " + b.remaining() + " bytes");
       
   338                             leftover.put(b);
       
   339                             assert !b.hasRemaining();
       
   340                             continue BUFFERS;
       
   341                         }
       
   342                     }
       
   343                 }
       
   344 
       
   345                 assert queue.isEmpty();
       
   346                 if (endOfInput) {
       
   347                     // Time to cleanup: there may be some undecoded leftover bytes
       
   348                     // We need to flush them out.
       
   349                     // The decoder has been configured to replace malformed/unmappable
       
   350                     // chars with some replacement, in order to behave like
       
   351                     // InputStreamReader.
       
   352                     leftover.flip();
       
   353                     buffer.position(0);
       
   354                     buffer.limit(buffer.capacity());
       
   355 
       
   356                     // decode() must be called just before flush, even if there
       
   357                     // is nothing to decode. We must do this even if leftover
       
   358                     // has no remaining bytes.
       
   359                     CoderResult res = decoder.decode(leftover, buffer, endOfInput);
       
   360                     if (buffer.position() > 0) {
       
   361                         buffer.flip();
       
   362                         builder.append(buffer);
       
   363                     }
       
   364                     if (res.isError()) res.throwException();
       
   365 
       
   366                     // Now call decoder.flush()
       
   367                     buffer.position(0);
       
   368                     buffer.limit(buffer.capacity());
       
   369                     res = decoder.flush(buffer);
       
   370                     if (buffer.position() > 0) {
       
   371                         buffer.flip();
       
   372                         builder.append(buffer);
       
   373                     }
       
   374                     if (res.isError()) res.throwException();
       
   375 
       
   376                     // It's possible that we reach here twice - just for the
       
   377                     // purpose of checking that no bytes were left over, so
       
   378                     // we reset leftover/decoder to make the function reentrant.
       
   379                     leftover.position(0);
       
   380                     leftover.limit(leftover.capacity());
       
   381                     decoder.reset();
       
   382 
       
   383                     // if some chars were produced then this call will
       
   384                     // return them.
       
   385                     return nextLine = nextLine(builder, newline, endOfInput);
       
   386                 }
       
   387                 return null;
       
   388             }
       
   389             return null;
       
   390         }
       
   391 
       
   392         // The main sequential scheduler loop.
       
   393         private void loop() {
       
   394             try {
       
   395                 while (!cancelled) {
       
   396                     Throwable error = errorRef.get();
       
   397                     if (error != null) {
       
   398                         cancelled = true;
       
   399                         scheduler.stop();
       
   400                         upstream.onError(error);
       
   401                         cf.completeExceptionally(error);
       
   402                         return;
       
   403                     }
       
   404                     if (nextLine == null) nextLine = nextLine();
       
   405                     if (nextLine == null) {
       
   406                         if (completed) {
       
   407                             scheduler.stop();
       
   408                             if (leftover.position() != 0) {
       
   409                                 // Underflow: not all bytes could be
       
   410                                 // decoded, but no more bytes will be coming.
       
   411                                 // This should not happen as we should already
       
   412                                 // have got a MalformedInputException, or
       
   413                                 // replaced the unmappable chars.
       
   414                                 errorRef.compareAndSet(null,
       
   415                                         new IllegalStateException(
       
   416                                                 "premature end of input ("
       
   417                                                         + leftover.position()
       
   418                                                         + " undecoded bytes)"));
       
   419                                 continue;
       
   420                             } else {
       
   421                                 upstream.onComplete();
       
   422                             }
       
   423                             return;
       
   424                         } else if (demanded.get() == 0
       
   425                                 && !downstreamDemand.isFulfilled()) {
       
   426                             long incr = Math.max(1, downstreamDemand.get());
       
   427                             demanded.addAndGet(incr);
       
   428                             upstreamSubscription.request(incr);
       
   429                             continue;
       
   430                         } else return;
       
   431                     }
       
   432                     assert nextLine != null;
       
   433                     assert newline != null && !nextLine.endsWith(newline)
       
   434                             || !nextLine.endsWith("\n") || !nextLine.endsWith("\r");
       
   435                     if (downstreamDemand.tryDecrement()) {
       
   436                         String forward = nextLine;
       
   437                         nextLine = null;
       
   438                         upstream.onNext(forward);
       
   439                     } else return; // no demand: come back later
       
   440                 }
       
   441             } catch (Throwable t) {
       
   442                 try {
       
   443                     upstreamSubscription.cancel();
       
   444                 } finally {
       
   445                     signalError(t);
       
   446                 }
       
   447             }
       
   448         }
       
   449 
       
   450         static LineSubscription create(Flow.Subscription s,
       
   451                                        Charset charset,
       
   452                                        String lineSeparator,
       
   453                                        Flow.Subscriber<? super String> upstream,
       
   454                                        CompletableFuture<?> cf) {
       
   455             return new LineSubscription(Objects.requireNonNull(s),
       
   456                     Objects.requireNonNull(charset).newDecoder()
       
   457                             // use the same decoder configuration than
       
   458                             // java.io.InputStreamReader
       
   459                             .onMalformedInput(CodingErrorAction.REPLACE)
       
   460                             .onUnmappableCharacter(CodingErrorAction.REPLACE),
       
   461                     lineSeparator,
       
   462                     Objects.requireNonNull(upstream),
       
   463                     Objects.requireNonNull(cf));
       
   464         }
       
   465     }
       
   466 }
       
   467