1 /* |
1 /* |
2 * Copyright (c) 1996, 2012, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 1996, 2019, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. Oracle designates this |
7 * published by the Free Software Foundation. Oracle designates this |
58 public KeepAliveStream(InputStream is, ProgressSource pi, long expected, HttpClient hc) { |
58 public KeepAliveStream(InputStream is, ProgressSource pi, long expected, HttpClient hc) { |
59 super(is, pi, expected); |
59 super(is, pi, expected); |
60 this.hc = hc; |
60 this.hc = hc; |
61 } |
61 } |
62 |
62 |
|
63 final ReentrantLock readLock() { |
|
64 return readLock; |
|
65 } |
|
66 |
63 /** |
67 /** |
64 * Attempt to cache this connection |
68 * Attempt to cache this connection |
65 */ |
69 */ |
66 public void close() throws IOException { |
70 public void close() throws IOException { |
67 // If the inputstream is closed already, just return. |
71 // If the inputstream is closed already, or if this stream |
68 if (closed) { |
72 // has already been queued for cleanup.just return. |
69 return; |
73 if (closed || queuedForCleanup) return; |
70 } |
|
71 |
|
72 // If this stream has already been queued for cleanup. |
|
73 if (queuedForCleanup) { |
|
74 return; |
|
75 } |
|
76 |
74 |
77 // Skip past the data that's left in the Inputstream because |
75 // Skip past the data that's left in the Inputstream because |
78 // some sort of error may have occurred. |
76 // some sort of error may have occurred. |
79 // Do this ONLY if the skip won't block. The stream may have |
77 // Do this ONLY if the skip won't block. The stream may have |
80 // been closed at the beginning of a big file and we don't want |
78 // been closed at the beginning of a big file and we don't want |
81 // to hang around for nothing. So if we can't skip without blocking |
79 // to hang around for nothing. So if we can't skip without blocking |
82 // we just close the socket and, therefore, terminate the keepAlive |
80 // we just close the socket and, therefore, terminate the keepAlive |
83 // NOTE: Don't close super class |
81 // NOTE: Don't close super class |
|
82 // For consistency, access to `expected` and `count` should be |
|
83 // protected by readLock |
|
84 readLock.lock(); |
84 try { |
85 try { |
85 if (expected > count) { |
86 if (closed || queuedForCleanup) return; |
86 long nskip = expected - count; |
87 try { |
87 if (nskip <= available()) { |
88 if (expected > count) { |
88 do {} while ((nskip = (expected - count)) > 0L |
89 long nskip = expected - count; |
89 && skip(Math.min(nskip, available())) > 0L); |
90 if (nskip <= available()) { |
90 } else if (expected <= KeepAliveStreamCleaner.MAX_DATA_REMAINING && !hurried) { |
91 do { |
91 //put this KeepAliveStream on the queue so that the data remaining |
92 } while ((nskip = (expected - count)) > 0L |
92 //on the socket can be cleanup asyncronously. |
93 && skip(Math.min(nskip, available())) > 0L); |
93 queueForCleanup(new KeepAliveCleanerEntry(this, hc)); |
94 } else if (expected <= KeepAliveStreamCleaner.MAX_DATA_REMAINING && !hurried) { |
94 } else { |
95 //put this KeepAliveStream on the queue so that the data remaining |
95 hc.closeServer(); |
96 //on the socket can be cleanup asyncronously. |
96 } |
97 queueForCleanup(new KeepAliveCleanerEntry(this, hc)); |
97 } |
98 } else { |
98 if (!closed && !hurried && !queuedForCleanup) { |
99 hc.closeServer(); |
99 hc.finished(); |
100 } |
|
101 } |
|
102 if (!closed && !hurried && !queuedForCleanup) { |
|
103 hc.finished(); |
|
104 } |
|
105 } finally { |
|
106 if (pi != null) |
|
107 pi.finishTracking(); |
|
108 |
|
109 if (!queuedForCleanup) { |
|
110 // nulling out the underlying inputstream as well as |
|
111 // httpClient to let gc collect the memories faster |
|
112 in = null; |
|
113 hc = null; |
|
114 closed = true; |
|
115 } |
100 } |
116 } |
101 } finally { |
117 } finally { |
102 if (pi != null) |
118 readLock.unlock(); |
103 pi.finishTracking(); |
|
104 |
|
105 if (!queuedForCleanup) { |
|
106 // nulling out the underlying inputstream as well as |
|
107 // httpClient to let gc collect the memories faster |
|
108 in = null; |
|
109 hc = null; |
|
110 closed = true; |
|
111 } |
|
112 } |
119 } |
113 } |
120 } |
114 |
121 |
115 /* we explicitly do not support mark/reset */ |
122 /* we explicitly do not support mark/reset */ |
116 |
123 |
145 return true; |
153 return true; |
146 } |
154 } |
147 } catch (IOException e) { |
155 } catch (IOException e) { |
148 // e.printStackTrace(); |
156 // e.printStackTrace(); |
149 return false; |
157 return false; |
|
158 } finally { |
|
159 readLock.unlock(); |
150 } |
160 } |
151 } |
161 } |
152 |
162 |
153 private static void queueForCleanup(KeepAliveCleanerEntry kace) { |
163 private static void queueForCleanup(KeepAliveCleanerEntry kace) { |
154 synchronized(queue) { |
164 queue.queueLock.lock(); |
|
165 try { |
155 if(!kace.getQueuedForCleanup()) { |
166 if(!kace.getQueuedForCleanup()) { |
156 if (!queue.offer(kace)) { |
167 if (!queue.offer(kace)) { |
157 kace.getHttpClient().closeServer(); |
168 kace.getHttpClient().closeServer(); |
158 return; |
169 return; |
159 } |
170 } |
160 |
171 |
161 kace.setQueuedForCleanup(); |
172 kace.setQueuedForCleanup(); |
162 queue.notifyAll(); |
173 queue.waiter.signalAll(); |
163 } |
174 } |
164 |
175 |
165 boolean startCleanupThread = (cleanerThread == null); |
176 boolean startCleanupThread = (cleanerThread == null); |
166 if (!startCleanupThread) { |
177 if (!startCleanupThread) { |
167 if (!cleanerThread.isAlive()) { |
178 if (!cleanerThread.isAlive()) { |