View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.stream;
17  
18  import static org.jboss.netty.buffer.ChannelBuffers.*;
19  
20  import java.io.InputStream;
21  import java.io.PushbackInputStream;
22  
23  /**
24   * A {@link ChunkedInput} that fetches data from an {@link InputStream} chunk by
25   * chunk.
26   * <p>
27   * Please note that the {@link InputStream} instance that feeds data into
28   * {@link ChunkedStream} must implement {@link InputStream#available()} as
29   * accurately as possible, rather than using the default implementation.
30   * Otherwise, {@link ChunkedStream} will generate many too small chunks or
31   * block unnecessarily often.
32   */
33  public class ChunkedStream implements ChunkedInput {
34  
35      static final int DEFAULT_CHUNK_SIZE = 8192;
36  
37      private final PushbackInputStream in;
38      private final int chunkSize;
39      private long offset;
40  
41      /**
42       * Creates a new instance that fetches data from the specified stream.
43       */
44      public ChunkedStream(InputStream in) {
45          this(in, DEFAULT_CHUNK_SIZE);
46      }
47  
48      /**
49       * Creates a new instance that fetches data from the specified stream.
50       *
51       * @param chunkSize the number of bytes to fetch on each
52       *                  {@link #nextChunk()} call
53       */
54      public ChunkedStream(InputStream in, int chunkSize) {
55          if (in == null) {
56              throw new NullPointerException("in");
57          }
58          if (chunkSize <= 0) {
59              throw new IllegalArgumentException(
60                      "chunkSize: " + chunkSize +
61                      " (expected: a positive integer)");
62          }
63  
64          if (in instanceof PushbackInputStream) {
65              this.in = (PushbackInputStream) in;
66          } else {
67              this.in = new PushbackInputStream(in);
68          }
69          this.chunkSize = chunkSize;
70      }
71  
72      /**
73       * Returns the number of transferred bytes.
74       */
75      public long getTransferredBytes() {
76          return offset;
77      }
78  
79      public boolean hasNextChunk() throws Exception {
80          int b = in.read();
81          if (b < 0) {
82              return false;
83          } else {
84              in.unread(b);
85              return true;
86          }
87      }
88  
89      public boolean isEndOfInput() throws Exception {
90          return !hasNextChunk();
91      }
92  
93      public void close() throws Exception {
94          in.close();
95      }
96  
97      public Object nextChunk() throws Exception {
98          if (!hasNextChunk()) {
99              return null;
100         }
101 
102         final int availableBytes = in.available();
103         final int chunkSize;
104         if (availableBytes <= 0) {
105             chunkSize = this.chunkSize;
106         } else {
107             chunkSize = Math.min(this.chunkSize, in.available());
108         }
109         final byte[] chunk = new byte[chunkSize];
110         int readBytes = 0;
111         for (;;) {
112             int localReadBytes = in.read(chunk, readBytes, chunkSize - readBytes);
113             if (localReadBytes < 0) {
114                 break;
115             }
116             readBytes += localReadBytes;
117             offset += localReadBytes;
118 
119             if (readBytes == chunkSize) {
120                 break;
121             }
122         }
123 
124         return wrappedBuffer(chunk, 0, readBytes);
125     }
126 }