View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  
21  import java.io.InputStream;
22  import java.io.PushbackInputStream;
23  
24  /**
25   * A {@link ChunkedInput} that fetches data from an {@link InputStream} chunk by
26   * chunk.
27   * <p>
28   * Please note that the {@link InputStream} instance that feeds data into
29   * {@link ChunkedStream} must implement {@link InputStream#available()} as
30   * accurately as possible, rather than using the default implementation.
31   * Otherwise, {@link ChunkedStream} will generate many too small chunks or
32   * block unnecessarily often.
33   */
34  public class ChunkedStream implements ChunkedInput<ByteBuf> {
35  
36      static final int DEFAULT_CHUNK_SIZE = 8192;
37  
38      private final PushbackInputStream in;
39      private final int chunkSize;
40      private long offset;
41  
42      /**
43       * Creates a new instance that fetches data from the specified stream.
44       */
45      public ChunkedStream(InputStream in) {
46          this(in, DEFAULT_CHUNK_SIZE);
47      }
48  
49      /**
50       * Creates a new instance that fetches data from the specified stream.
51       *
52       * @param chunkSize the number of bytes to fetch on each
53       *                  {@link #readChunk(ChannelHandlerContext)} call
54       */
55      public ChunkedStream(InputStream in, int chunkSize) {
56          if (in == null) {
57              throw new NullPointerException("in");
58          }
59          if (chunkSize <= 0) {
60              throw new IllegalArgumentException(
61                      "chunkSize: " + chunkSize +
62                      " (expected: a positive integer)");
63          }
64  
65          if (in instanceof PushbackInputStream) {
66              this.in = (PushbackInputStream) in;
67          } else {
68              this.in = new PushbackInputStream(in);
69          }
70          this.chunkSize = chunkSize;
71      }
72  
73      /**
74       * Returns the number of transferred bytes.
75       */
76      public long transferredBytes() {
77          return offset;
78      }
79  
80      @Override
81      public boolean isEndOfInput() throws Exception {
82          int b = in.read();
83          if (b < 0) {
84              return true;
85          } else {
86              in.unread(b);
87              return false;
88          }
89      }
90  
91      @Override
92      public void close() throws Exception {
93          in.close();
94      }
95  
96      @Override
97      public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
98          if (isEndOfInput()) {
99              return null;
100         }
101 
102         final int availableBytes = in.available();
103         final int chunkSize;
104         if (availableBytes <= 0) {
105             chunkSize = this.chunkSize;
106         } else {
107             chunkSize = Math.min(this.chunkSize, in.available());
108         }
109 
110         boolean release = true;
111         ByteBuf buffer = ctx.alloc().buffer(chunkSize);
112         try {
113             // transfer to buffer
114             offset += buffer.writeBytes(in, chunkSize);
115             release = false;
116             return buffer;
117         } finally {
118             if (release) {
119                 buffer.release();
120             }
121         }
122     }
123 
124     @Override
125     public long length() {
126         return -1;
127     }
128 
129     @Override
130     public long progress() {
131         return offset;
132     }
133 }