View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  
21  import java.io.InputStream;
22  import java.io.PushbackInputStream;
23  
24  /**
25   * A {@link ChunkedInput} that fetches data from an {@link InputStream} chunk by
26   * chunk.
27   * <p>
28   * Please note that the {@link InputStream} instance that feeds data into
29   * {@link ChunkedStream} must implement {@link InputStream#available()} as
30   * accurately as possible, rather than using the default implementation.
31   * Otherwise, {@link ChunkedStream} will generate many too small chunks or
32   * block unnecessarily often.
33   */
34  public class ChunkedStream implements ChunkedInput<ByteBuf> {
35  
36      static final int DEFAULT_CHUNK_SIZE = 8192;
37  
38      private final PushbackInputStream in;
39      private final int chunkSize;
40      private long offset;
41      private boolean closed;
42  
43      /**
44       * Creates a new instance that fetches data from the specified stream.
45       */
46      public ChunkedStream(InputStream in) {
47          this(in, DEFAULT_CHUNK_SIZE);
48      }
49  
50      /**
51       * Creates a new instance that fetches data from the specified stream.
52       *
53       * @param chunkSize the number of bytes to fetch on each
54       *                  {@link #readChunk(ChannelHandlerContext)} call
55       */
56      public ChunkedStream(InputStream in, int chunkSize) {
57          if (in == null) {
58              throw new NullPointerException("in");
59          }
60          if (chunkSize <= 0) {
61              throw new IllegalArgumentException(
62                      "chunkSize: " + chunkSize +
63                      " (expected: a positive integer)");
64          }
65  
66          if (in instanceof PushbackInputStream) {
67              this.in = (PushbackInputStream) in;
68          } else {
69              this.in = new PushbackInputStream(in);
70          }
71          this.chunkSize = chunkSize;
72      }
73  
74      /**
75       * Returns the number of transferred bytes.
76       */
77      public long transferredBytes() {
78          return offset;
79      }
80  
81      @Override
82      public boolean isEndOfInput() throws Exception {
83          if (closed) {
84              return true;
85          }
86  
87          int b = in.read();
88          if (b < 0) {
89              return true;
90          } else {
91              in.unread(b);
92              return false;
93          }
94      }
95  
96      @Override
97      public void close() throws Exception {
98          closed = true;
99          in.close();
100     }
101 
102     @Override
103     public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
104         if (isEndOfInput()) {
105             return null;
106         }
107 
108         final int availableBytes = in.available();
109         final int chunkSize;
110         if (availableBytes <= 0) {
111             chunkSize = this.chunkSize;
112         } else {
113             chunkSize = Math.min(this.chunkSize, in.available());
114         }
115 
116         boolean release = true;
117         ByteBuf buffer = ctx.alloc().buffer(chunkSize);
118         try {
119             // transfer to buffer
120             offset += buffer.writeBytes(in, chunkSize);
121             release = false;
122             return buffer;
123         } finally {
124             if (release) {
125                 buffer.release();
126             }
127         }
128     }
129 }