View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.ChannelHandlerContext;
21  
22  import java.io.InputStream;
23  import java.io.PushbackInputStream;
24  
25  /**
26   * A {@link ChunkedInput} that fetches data from an {@link InputStream} chunk by
27   * chunk.
28   * <p>
29   * Please note that the {@link InputStream} instance that feeds data into
30   * {@link ChunkedStream} must implement {@link InputStream#available()} as
31   * accurately as possible, rather than using the default implementation.
32   * Otherwise, {@link ChunkedStream} will generate many too small chunks or
33   * block unnecessarily often.
34   */
35  public class ChunkedStream implements ChunkedInput<ByteBuf> {
36  
37      static final int DEFAULT_CHUNK_SIZE = 8192;
38  
39      private final PushbackInputStream in;
40      private final int chunkSize;
41      private long offset;
42      private boolean closed;
43  
44      /**
45       * Creates a new instance that fetches data from the specified stream.
46       */
47      public ChunkedStream(InputStream in) {
48          this(in, DEFAULT_CHUNK_SIZE);
49      }
50  
51      /**
52       * Creates a new instance that fetches data from the specified stream.
53       *
54       * @param chunkSize the number of bytes to fetch on each
55       *                  {@link #readChunk(ChannelHandlerContext)} call
56       */
57      public ChunkedStream(InputStream in, int chunkSize) {
58          if (in == null) {
59              throw new NullPointerException("in");
60          }
61          if (chunkSize <= 0) {
62              throw new IllegalArgumentException(
63                      "chunkSize: " + chunkSize +
64                      " (expected: a positive integer)");
65          }
66  
67          if (in instanceof PushbackInputStream) {
68              this.in = (PushbackInputStream) in;
69          } else {
70              this.in = new PushbackInputStream(in);
71          }
72          this.chunkSize = chunkSize;
73      }
74  
75      /**
76       * Returns the number of transferred bytes.
77       */
78      public long transferredBytes() {
79          return offset;
80      }
81  
82      @Override
83      public boolean isEndOfInput() throws Exception {
84          if (closed) {
85              return true;
86          }
87  
88          int b = in.read();
89          if (b < 0) {
90              return true;
91          } else {
92              in.unread(b);
93              return false;
94          }
95      }
96  
97      @Override
98      public void close() throws Exception {
99          closed = true;
100         in.close();
101     }
102 
103     @Deprecated
104     @Override
105     public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
106         return readChunk(ctx.alloc());
107     }
108 
109     @Override
110     public ByteBuf readChunk(ByteBufAllocator allocator) throws Exception {
111         if (isEndOfInput()) {
112             return null;
113         }
114 
115         final int availableBytes = in.available();
116         final int chunkSize;
117         if (availableBytes <= 0) {
118             chunkSize = this.chunkSize;
119         } else {
120             chunkSize = Math.min(this.chunkSize, in.available());
121         }
122 
123         boolean release = true;
124         ByteBuf buffer = allocator.buffer(chunkSize);
125         try {
126             // transfer to buffer
127             offset += buffer.writeBytes(in, chunkSize);
128             release = false;
129             return buffer;
130         } finally {
131             if (release) {
132                 buffer.release();
133             }
134         }
135     }
136 
137     @Override
138     public long length() {
139         return -1;
140     }
141 
142     @Override
143     public long progress() {
144         return offset;
145     }
146 }