1 /* |
1 /* |
2 * Copyright (c) 1999, 2013, Oracle and/or its affiliates. All rights reserved. |
2 * Copyright (c) 1999, 2019, Oracle and/or its affiliates. All rights reserved. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. |
4 * |
4 * |
5 * This code is free software; you can redistribute it and/or modify it |
5 * This code is free software; you can redistribute it and/or modify it |
6 * under the terms of the GNU General Public License version 2 only, as |
6 * under the terms of the GNU General Public License version 2 only, as |
7 * published by the Free Software Foundation. Oracle designates this |
7 * published by the Free Software Foundation. Oracle designates this |
38 * <p> |
36 * <p> |
39 * A <code>ChunkedInputStream</code> is also <code>Hurryable</code> so it |
37 * A <code>ChunkedInputStream</code> is also <code>Hurryable</code> so it |
40 * can be hurried to the end of the stream if the bytes are available on |
38 * can be hurried to the end of the stream if the bytes are available on |
41 * the underlying stream. |
39 * the underlying stream. |
42 */ |
40 */ |
43 public |
41 public class ChunkedInputStream extends InputStream implements Hurryable { |
44 class ChunkedInputStream extends InputStream implements Hurryable { |
|
45 |
42 |
46 /** |
43 /** |
47 * The underlying stream |
44 * The underlying stream |
48 */ |
45 */ |
49 private InputStream in; |
46 private InputStream in; |
123 * Indicates if the chunked stream has been closed using the |
120 * Indicates if the chunked stream has been closed using the |
124 * <code>close</code> method. |
121 * <code>close</code> method. |
125 */ |
122 */ |
126 private boolean closed; |
123 private boolean closed; |
127 |
124 |
|
125 final ReentrantLock readLock = new ReentrantLock(); |
|
126 |
128 /* |
127 /* |
129 * Maximum chunk header size of 2KB + 2 bytes for CRLF |
128 * Maximum chunk header size of 2KB + 2 bytes for CRLF |
130 */ |
129 */ |
131 private static final int MAX_CHUNK_HEADER_SIZE = 2050; |
130 private static final int MAX_CHUNK_HEADER_SIZE = 2050; |
|
131 |
132 |
132 |
133 /** |
133 /** |
134 * State to indicate that next field should be :- |
134 * State to indicate that next field should be :- |
135 * chunk-size [ chunk-extension ] CRLF |
135 * chunk-size [ chunk-extension ] CRLF |
136 */ |
136 */ |
643 * @return the next byte of data, or <code>-1</code> if the end of the |
643 * @return the next byte of data, or <code>-1</code> if the end of the |
644 * stream is reached. |
644 * stream is reached. |
645 * @exception IOException if an I/O error occurs. |
645 * @exception IOException if an I/O error occurs. |
646 * @see java.io.FilterInputStream#in |
646 * @see java.io.FilterInputStream#in |
647 */ |
647 */ |
648 public synchronized int read() throws IOException { |
648 public int read() throws IOException { |
649 ensureOpen(); |
649 readLock.lock(); |
650 if (chunkPos >= chunkCount) { |
650 try { |
651 if (readAhead(true) <= 0) { |
651 ensureOpen(); |
652 return -1; |
652 if (chunkPos >= chunkCount) { |
653 } |
653 if (readAhead(true) <= 0) { |
654 } |
654 return -1; |
655 return chunkData[chunkPos++] & 0xff; |
655 } |
|
656 } |
|
657 return chunkData[chunkPos++] & 0xff; |
|
658 } finally { |
|
659 readLock.unlock(); |
|
660 } |
656 } |
661 } |
657 |
662 |
658 |
663 |
659 /** |
664 /** |
660 * Reads bytes from this stream into the specified byte array, starting at |
665 * Reads bytes from this stream into the specified byte array, starting at |
665 * @param len maximum number of bytes to read. |
670 * @param len maximum number of bytes to read. |
666 * @return the number of bytes read, or <code>-1</code> if the end of |
671 * @return the number of bytes read, or <code>-1</code> if the end of |
667 * the stream has been reached. |
672 * the stream has been reached. |
668 * @exception IOException if an I/O error occurs. |
673 * @exception IOException if an I/O error occurs. |
669 */ |
674 */ |
670 public synchronized int read(byte b[], int off, int len) |
675 public int read(byte b[], int off, int len) |
671 throws IOException |
676 throws IOException |
672 { |
677 { |
673 ensureOpen(); |
678 readLock.lock(); |
674 if ((off < 0) || (off > b.length) || (len < 0) || |
679 try { |
675 ((off + len) > b.length) || ((off + len) < 0)) { |
680 ensureOpen(); |
676 throw new IndexOutOfBoundsException(); |
681 if ((off < 0) || (off > b.length) || (len < 0) || |
677 } else if (len == 0) { |
682 ((off + len) > b.length) || ((off + len) < 0)) { |
678 return 0; |
683 throw new IndexOutOfBoundsException(); |
679 } |
684 } else if (len == 0) { |
680 |
685 return 0; |
681 int avail = chunkCount - chunkPos; |
686 } |
682 if (avail <= 0) { |
687 |
683 /* |
688 int avail = chunkCount - chunkPos; |
684 * Optimization: if we're in the middle of the chunk read |
689 if (avail <= 0) { |
685 * directly from the underlying stream into the caller's |
690 /* |
686 * buffer |
691 * Optimization: if we're in the middle of the chunk read |
687 */ |
692 * directly from the underlying stream into the caller's |
688 if (state == STATE_READING_CHUNK) { |
693 * buffer |
689 return fastRead( b, off, len ); |
694 */ |
690 } |
695 if (state == STATE_READING_CHUNK) { |
691 |
696 return fastRead(b, off, len); |
692 /* |
697 } |
693 * We're not in the middle of a chunk so we must read ahead |
698 |
694 * until there is some chunk data available. |
699 /* |
695 */ |
700 * We're not in the middle of a chunk so we must read ahead |
696 avail = readAhead(true); |
701 * until there is some chunk data available. |
697 if (avail < 0) { |
702 */ |
698 return -1; /* EOF */ |
703 avail = readAhead(true); |
699 } |
704 if (avail < 0) { |
700 } |
705 return -1; /* EOF */ |
701 int cnt = (avail < len) ? avail : len; |
706 } |
702 System.arraycopy(chunkData, chunkPos, b, off, cnt); |
707 } |
703 chunkPos += cnt; |
708 int cnt = (avail < len) ? avail : len; |
704 |
709 System.arraycopy(chunkData, chunkPos, b, off, cnt); |
705 return cnt; |
710 chunkPos += cnt; |
|
711 |
|
712 return cnt; |
|
713 } finally { |
|
714 readLock.unlock(); |
|
715 } |
706 } |
716 } |
707 |
717 |
708 /** |
718 /** |
709 * Returns the number of bytes that can be read from this input |
719 * Returns the number of bytes that can be read from this input |
710 * stream without blocking. |
720 * stream without blocking. |
712 * @return the number of bytes that can be read from this input |
722 * @return the number of bytes that can be read from this input |
713 * stream without blocking. |
723 * stream without blocking. |
714 * @exception IOException if an I/O error occurs. |
724 * @exception IOException if an I/O error occurs. |
715 * @see java.io.FilterInputStream#in |
725 * @see java.io.FilterInputStream#in |
716 */ |
726 */ |
717 public synchronized int available() throws IOException { |
727 public int available() throws IOException { |
718 ensureOpen(); |
728 readLock.lock(); |
719 |
729 try { |
720 int avail = chunkCount - chunkPos; |
730 ensureOpen(); |
721 if(avail > 0) { |
731 |
722 return avail; |
732 int avail = chunkCount - chunkPos; |
723 } |
733 if (avail > 0) { |
724 |
734 return avail; |
725 avail = readAhead(false); |
735 } |
726 |
736 |
727 if (avail < 0) { |
737 avail = readAhead(false); |
728 return 0; |
738 |
729 } else { |
739 if (avail < 0) { |
730 return avail; |
740 return 0; |
|
741 } else { |
|
742 return avail; |
|
743 } |
|
744 } finally { |
|
745 readLock.unlock(); |
731 } |
746 } |
732 } |
747 } |
733 |
748 |
734 /** |
749 /** |
735 * Close the stream by either returning the connection to the |
750 * Close the stream by either returning the connection to the |
740 * possible (without blocking) then the connection can be |
755 * possible (without blocking) then the connection can be |
741 * returned to the keep alive cache. |
756 * returned to the keep alive cache. |
742 * |
757 * |
743 * @exception IOException if an I/O error occurs. |
758 * @exception IOException if an I/O error occurs. |
744 */ |
759 */ |
745 public synchronized void close() throws IOException { |
760 public void close() throws IOException { |
746 if (closed) { |
761 if (closed) return; |
747 return; |
762 readLock.lock(); |
748 } |
763 try { |
749 closeUnderlying(); |
764 if (closed) { |
750 closed = true; |
765 return; |
|
766 } |
|
767 closeUnderlying(); |
|
768 closed = true; |
|
769 } finally { |
|
770 readLock.unlock(); |
|
771 } |
751 } |
772 } |
752 |
773 |
753 /** |
774 /** |
754 * Hurry the input stream by reading everything from the underlying |
775 * Hurry the input stream by reading everything from the underlying |
755 * stream. If the last chunk (and optional trailers) can be read without |
776 * stream. If the last chunk (and optional trailers) can be read without |
757 * <p> |
778 * <p> |
758 * Note that if an error has occurred or we can't get to last chunk |
779 * Note that if an error has occurred or we can't get to last chunk |
759 * without blocking then this stream can't be hurried and should be |
780 * without blocking then this stream can't be hurried and should be |
760 * closed. |
781 * closed. |
761 */ |
782 */ |
762 public synchronized boolean hurry() { |
783 public boolean hurry() { |
763 if (in == null || error) { |
784 readLock.lock(); |
764 return false; |
|
765 } |
|
766 |
|
767 try { |
785 try { |
768 readAhead(false); |
786 if (in == null || error) { |
769 } catch (Exception e) { |
787 return false; |
770 return false; |
788 } |
771 } |
789 |
772 |
790 try { |
773 if (error) { |
791 readAhead(false); |
774 return false; |
792 } catch (Exception e) { |
775 } |
793 return false; |
776 |
794 } |
777 return (state == STATE_DONE); |
795 |
|
796 if (error) { |
|
797 return false; |
|
798 } |
|
799 |
|
800 return (state == STATE_DONE); |
|
801 } finally { |
|
802 readLock.unlock(); |
|
803 } |
778 } |
804 } |
779 |
805 |
780 } |
806 } |