src/java.base/share/classes/sun/net/www/http/KeepAliveStream.java
branchJDK-8229867-branch
changeset 57968 8595871a5446
parent 47216 71c04702a3d5
equal deleted inserted replaced
57966:e89c7aaf2906 57968:8595871a5446
     1 /*
     1 /*
     2  * Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved.
     2  * Copyright (c) 1996, 2019, Oracle and/or its affiliates. All rights reserved.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     4  *
     4  *
     5  * This code is free software; you can redistribute it and/or modify it
     5  * This code is free software; you can redistribute it and/or modify it
     6  * under the terms of the GNU General Public License version 2 only, as
     6  * under the terms of the GNU General Public License version 2 only, as
     7  * published by the Free Software Foundation.  Oracle designates this
     7  * published by the Free Software Foundation.  Oracle designates this
    24  */
    24  */
    25 
    25 
    26 package sun.net.www.http;
    26 package sun.net.www.http;
    27 
    27 
    28 import java.io.*;
    28 import java.io.*;
    29 
    29 import java.util.concurrent.locks.ReentrantLock;
    30 import sun.net.ProgressSource;
    30 import sun.net.ProgressSource;
    31 import sun.net.www.MeteredStream;
    31 import sun.net.www.MeteredStream;
    32 import jdk.internal.misc.InnocuousThread;
    32 import jdk.internal.misc.InnocuousThread;
    33 
    33 
    34 /**
    34 /**
    45     HttpClient hc;
    45     HttpClient hc;
    46 
    46 
    47     boolean hurried;
    47     boolean hurried;
    48 
    48 
    49     // has this KeepAliveStream been put on the queue for asynchronous cleanup.
    49     // has this KeepAliveStream been put on the queue for asynchronous cleanup.
    50     protected boolean queuedForCleanup = false;
    50     protected volatile boolean queuedForCleanup = false;
    51 
    51 
    52     private static final KeepAliveStreamCleaner queue = new KeepAliveStreamCleaner();
    52     private static final KeepAliveStreamCleaner queue = new KeepAliveStreamCleaner();
    53     private static Thread cleanerThread; // null
    53     private static Thread cleanerThread; // null
    54 
    54 
    55     /**
    55     /**
    58     public KeepAliveStream(InputStream is, ProgressSource pi, long expected, HttpClient hc)  {
    58     public KeepAliveStream(InputStream is, ProgressSource pi, long expected, HttpClient hc)  {
    59         super(is, pi, expected);
    59         super(is, pi, expected);
    60         this.hc = hc;
    60         this.hc = hc;
    61     }
    61     }
    62 
    62 
       
    63     final ReentrantLock readLock() {
       
    64         return readLock;
       
    65     }
       
    66 
    63     /**
    67     /**
    64      * Attempt to cache this connection
    68      * Attempt to cache this connection
    65      */
    69      */
    66     public void close() throws IOException  {
    70     public void close() throws IOException  {
    67         // If the inputstream is closed already, just return.
    71         // If the inputstream is closed already, or if this stream
    68         if (closed) {
    72         // has already been queued for cleanup.just return.
    69             return;
    73         if (closed || queuedForCleanup) return;
    70         }
       
    71 
       
    72         // If this stream has already been queued for cleanup.
       
    73         if (queuedForCleanup) {
       
    74             return;
       
    75         }
       
    76 
    74 
    77         // Skip past the data that's left in the Inputstream because
    75         // Skip past the data that's left in the Inputstream because
    78         // some sort of error may have occurred.
    76         // some sort of error may have occurred.
    79         // Do this ONLY if the skip won't block. The stream may have
    77         // Do this ONLY if the skip won't block. The stream may have
    80         // been closed at the beginning of a big file and we don't want
    78         // been closed at the beginning of a big file and we don't want
    81         // to hang around for nothing. So if we can't skip without blocking
    79         // to hang around for nothing. So if we can't skip without blocking
    82         // we just close the socket and, therefore, terminate the keepAlive
    80         // we just close the socket and, therefore, terminate the keepAlive
    83         // NOTE: Don't close super class
    81         // NOTE: Don't close super class
       
    82         // For consistency, access to `expected` and `count` should be
       
    83         // protected by readLock
       
    84         readLock.lock();
    84         try {
    85         try {
    85             if (expected > count) {
    86             if (closed || queuedForCleanup) return;
    86                 long nskip = expected - count;
    87             try {
    87                 if (nskip <= available()) {
    88                 if (expected > count) {
    88                     do {} while ((nskip = (expected - count)) > 0L
    89                     long nskip = expected - count;
    89                                  && skip(Math.min(nskip, available())) > 0L);
    90                     if (nskip <= available()) {
    90                 } else if (expected <= KeepAliveStreamCleaner.MAX_DATA_REMAINING && !hurried) {
    91                         do {
    91                     //put this KeepAliveStream on the queue so that the data remaining
    92                         } while ((nskip = (expected - count)) > 0L
    92                     //on the socket can be cleanup asyncronously.
    93                                 && skip(Math.min(nskip, available())) > 0L);
    93                     queueForCleanup(new KeepAliveCleanerEntry(this, hc));
    94                     } else if (expected <= KeepAliveStreamCleaner.MAX_DATA_REMAINING && !hurried) {
    94                 } else {
    95                         //put this KeepAliveStream on the queue so that the data remaining
    95                     hc.closeServer();
    96                         //on the socket can be cleanup asyncronously.
    96                 }
    97                         queueForCleanup(new KeepAliveCleanerEntry(this, hc));
    97             }
    98                     } else {
    98             if (!closed && !hurried && !queuedForCleanup) {
    99                         hc.closeServer();
    99                 hc.finished();
   100                     }
       
   101                 }
       
   102                 if (!closed && !hurried && !queuedForCleanup) {
       
   103                     hc.finished();
       
   104                 }
       
   105             } finally {
       
   106                 if (pi != null)
       
   107                     pi.finishTracking();
       
   108 
       
   109                 if (!queuedForCleanup) {
       
   110                     // nulling out the underlying inputstream as well as
       
   111                     // httpClient to let gc collect the memories faster
       
   112                     in = null;
       
   113                     hc = null;
       
   114                     closed = true;
       
   115                 }
   100             }
   116             }
   101         } finally {
   117         } finally {
   102             if (pi != null)
   118             readLock.unlock();
   103                 pi.finishTracking();
       
   104 
       
   105             if (!queuedForCleanup) {
       
   106                 // nulling out the underlying inputstream as well as
       
   107                 // httpClient to let gc collect the memories faster
       
   108                 in = null;
       
   109                 hc = null;
       
   110                 closed = true;
       
   111             }
       
   112         }
   119         }
   113     }
   120     }
   114 
   121 
   115     /* we explicitly do not support mark/reset */
   122     /* we explicitly do not support mark/reset */
   116 
   123 
   122 
   129 
   123     public void reset() throws IOException {
   130     public void reset() throws IOException {
   124         throw new IOException("mark/reset not supported");
   131         throw new IOException("mark/reset not supported");
   125     }
   132     }
   126 
   133 
   127     public synchronized boolean hurry() {
   134     public boolean hurry() {
       
   135         readLock.lock();
   128         try {
   136         try {
   129             /* CASE 0: we're actually already done */
   137             /* CASE 0: we're actually already done */
   130             if (closed || count >= expected) {
   138             if (closed || count >= expected) {
   131                 return false;
   139                 return false;
   132             } else if (in.available() < (expected - count)) {
   140             } else if (in.available() < (expected - count)) {
   145                 return true;
   153                 return true;
   146             }
   154             }
   147         } catch (IOException e) {
   155         } catch (IOException e) {
   148             // e.printStackTrace();
   156             // e.printStackTrace();
   149             return false;
   157             return false;
       
   158         } finally {
       
   159             readLock.unlock();
   150         }
   160         }
   151     }
   161     }
   152 
   162 
   153     private static void queueForCleanup(KeepAliveCleanerEntry kace) {
   163     private static void queueForCleanup(KeepAliveCleanerEntry kace) {
   154         synchronized(queue) {
   164         queue.queueLock.lock();
       
   165         try {
   155             if(!kace.getQueuedForCleanup()) {
   166             if(!kace.getQueuedForCleanup()) {
   156                 if (!queue.offer(kace)) {
   167                 if (!queue.offer(kace)) {
   157                     kace.getHttpClient().closeServer();
   168                     kace.getHttpClient().closeServer();
   158                     return;
   169                     return;
   159                 }
   170                 }
   160 
   171 
   161                 kace.setQueuedForCleanup();
   172                 kace.setQueuedForCleanup();
   162                 queue.notifyAll();
   173                 queue.waiter.signalAll();
   163             }
   174             }
   164 
   175 
   165             boolean startCleanupThread = (cleanerThread == null);
   176             boolean startCleanupThread = (cleanerThread == null);
   166             if (!startCleanupThread) {
   177             if (!startCleanupThread) {
   167                 if (!cleanerThread.isAlive()) {
   178                 if (!cleanerThread.isAlive()) {
   179                         cleanerThread.start();
   190                         cleanerThread.start();
   180                         return null;
   191                         return null;
   181                     }
   192                     }
   182                 });
   193                 });
   183             }
   194             }
   184         } // queue
   195         } finally {
       
   196             queue.queueLock.unlock();
       
   197         }
   185     }
   198     }
   186 
   199 
   187     protected long remainingToRead() {
   200     protected long remainingToRead() {
       
   201         assert readLock.isHeldByCurrentThread();
   188         return expected - count;
   202         return expected - count;
   189     }
   203     }
   190 
   204 
   191     protected void setClosed() {
   205     protected void setClosed() {
       
   206         assert readLock.isHeldByCurrentThread();
   192         in = null;
   207         in = null;
   193         hc = null;
   208         hc = null;
   194         closed = true;
   209         closed = true;
   195     }
   210     }
   196 }
   211 }