src/java.net.http/share/classes/jdk/internal/net/http/Http1Request.java
branchhttp-client-branch
changeset 56204 e5d0c20217a3
parent 56200 d712a6342387
child 56209 43d5ad612710
equal deleted inserted replaced
56203:be57b0428d84 56204:e5d0c20217a3
   240             requestPublisher.subscribe(subscriber);
   240             requestPublisher.subscribe(subscriber);
   241         }
   241         }
   242         return subscriber;
   242         return subscriber;
   243     }
   243     }
   244 
   244 
   245     class StreamSubscriber extends Http1BodySubscriber {
   245     final class StreamSubscriber extends Http1BodySubscriber {
   246 
   246 
   247         @Override
   247         @Override
   248         public void onSubscribe(Flow.Subscription subscription) {
   248         public void onSubscribe(Flow.Subscription subscription) {
   249             if (this.subscription != null) {
   249             if (isSubscribed()) {
   250                 Throwable t = new IllegalStateException("already subscribed");
   250                 Throwable t = new IllegalStateException("already subscribed");
   251                 http1Exchange.appendToOutgoing(t);
   251                 http1Exchange.appendToOutgoing(t);
   252             } else {
   252             } else {
   253                 this.subscription = subscription;
   253                 setSubscription(subscription);
   254             }
   254             }
   255         }
   255         }
   256 
   256 
   257         @Override
   257         @Override
   258         public void onNext(ByteBuffer item) {
   258         public void onNext(ByteBuffer item) {
   273         @Override
   273         @Override
   274         public void onError(Throwable throwable) {
   274         public void onError(Throwable throwable) {
   275             if (complete)
   275             if (complete)
   276                 return;
   276                 return;
   277 
   277 
   278             subscription.cancel();
   278             cancelSubscription();
   279             http1Exchange.appendToOutgoing(throwable);
   279             http1Exchange.appendToOutgoing(throwable);
   280         }
   280         }
   281 
   281 
   282         @Override
   282         @Override
   283         public void onComplete() {
   283         public void onComplete() {
   296 
   296 
   297             }
   297             }
   298         }
   298         }
   299     }
   299     }
   300 
   300 
   301     class FixedContentSubscriber extends Http1BodySubscriber {
   301     final class FixedContentSubscriber extends Http1BodySubscriber {
   302 
   302 
   303         private volatile long contentWritten;
   303         private volatile long contentWritten;
   304 
   304 
   305         @Override
   305         @Override
   306         public void onSubscribe(Flow.Subscription subscription) {
   306         public void onSubscribe(Flow.Subscription subscription) {
   307             if (this.subscription != null) {
   307             if (isSubscribed()) {
   308                 Throwable t = new IllegalStateException("already subscribed");
   308                 Throwable t = new IllegalStateException("already subscribed");
   309                 http1Exchange.appendToOutgoing(t);
   309                 http1Exchange.appendToOutgoing(t);
   310             } else {
   310             } else {
   311                 this.subscription = subscription;
   311                 setSubscription(subscription);
   312             }
   312             }
   313         }
   313         }
   314 
   314 
   315         @Override
   315         @Override
   316         public void onNext(ByteBuffer item) {
   316         public void onNext(ByteBuffer item) {
   322             } else {
   322             } else {
   323                 long writing = item.remaining();
   323                 long writing = item.remaining();
   324                 long written = (contentWritten += writing);
   324                 long written = (contentWritten += writing);
   325 
   325 
   326                 if (written > contentLength) {
   326                 if (written > contentLength) {
   327                     subscription.cancel();
   327                     cancelSubscription();
   328                     String msg = connection.getConnectionFlow()
   328                     String msg = connection.getConnectionFlow()
   329                                   + " [" + Thread.currentThread().getName() +"] "
   329                                   + " [" + Thread.currentThread().getName() +"] "
   330                                   + "Too many bytes in request body. Expected: "
   330                                   + "Too many bytes in request body. Expected: "
   331                                   + contentLength + ", got: " + written;
   331                                   + contentLength + ", got: " + written;
   332                     http1Exchange.appendToOutgoing(new IOException(msg));
   332                     http1Exchange.appendToOutgoing(new IOException(msg));
   340         public void onError(Throwable throwable) {
   340         public void onError(Throwable throwable) {
   341             debug.log(Level.DEBUG, "onError");
   341             debug.log(Level.DEBUG, "onError");
   342             if (complete)  // TODO: error?
   342             if (complete)  // TODO: error?
   343                 return;
   343                 return;
   344 
   344 
   345             subscription.cancel();
   345             cancelSubscription();
   346             http1Exchange.appendToOutgoing(throwable);
   346             http1Exchange.appendToOutgoing(throwable);
   347         }
   347         }
   348 
   348 
   349         @Override
   349         @Override
   350         public void onComplete() {
   350         public void onComplete() {
   354                 http1Exchange.appendToOutgoing(t);
   354                 http1Exchange.appendToOutgoing(t);
   355             } else {
   355             } else {
   356                 complete = true;
   356                 complete = true;
   357                 long written = contentWritten;
   357                 long written = contentWritten;
   358                 if (contentLength > written) {
   358                 if (contentLength > written) {
   359                     subscription.cancel();
   359                     cancelSubscription();
   360                     Throwable t = new IOException(connection.getConnectionFlow()
   360                     Throwable t = new IOException(connection.getConnectionFlow()
   361                                          + " [" + Thread.currentThread().getName() +"] "
   361                                          + " [" + Thread.currentThread().getName() +"] "
   362                                          + "Too few bytes returned by the publisher ("
   362                                          + "Too few bytes returned by the publisher ("
   363                                                   + written + "/"
   363                                                   + written + "/"
   364                                                   + contentLength + ")");
   364                                                   + contentLength + ")");