View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  
21  import java.nio.ByteBuffer;
22  import java.nio.channels.ReadableByteChannel;
23  
24  /**
25   * A {@link ChunkedInput} that fetches data from a {@link ReadableByteChannel}
26   * chunk by chunk.  Please note that the {@link ReadableByteChannel} must
27   * operate in blocking mode.  Non-blocking mode channels are not supported.
28   */
29  public class ChunkedNioStream implements ChunkedInput<ByteBuf> {
30  
31      private final ReadableByteChannel in;
32  
33      private final int chunkSize;
34      private long offset;
35  
36      /**
37       * Associated ByteBuffer
38       */
39      private final ByteBuffer byteBuffer;
40  
41      /**
42       * Creates a new instance that fetches data from the specified channel.
43       */
44      public ChunkedNioStream(ReadableByteChannel in) {
45          this(in, ChunkedStream.DEFAULT_CHUNK_SIZE);
46      }
47  
48      /**
49       * Creates a new instance that fetches data from the specified channel.
50       *
51       * @param chunkSize the number of bytes to fetch on each
52       *                  {@link #readChunk(ChannelHandlerContext)} call
53       */
54      public ChunkedNioStream(ReadableByteChannel in, int chunkSize) {
55          if (in == null) {
56              throw new NullPointerException("in");
57          }
58          if (chunkSize <= 0) {
59              throw new IllegalArgumentException("chunkSize: " + chunkSize +
60                      " (expected: a positive integer)");
61          }
62          this.in = in;
63          offset = 0;
64          this.chunkSize = chunkSize;
65          byteBuffer = ByteBuffer.allocate(chunkSize);
66      }
67  
68      /**
69       * Returns the number of transferred bytes.
70       */
71      public long transferredBytes() {
72          return offset;
73      }
74  
75      @Override
76      public boolean isEndOfInput() throws Exception {
77          if (byteBuffer.position() > 0) {
78              // A previous read was not over, so there is a next chunk in the buffer at least
79              return false;
80          }
81          if (in.isOpen()) {
82              // Try to read a new part, and keep this part (no rewind)
83              int b = in.read(byteBuffer);
84              if (b < 0) {
85                  return true;
86              } else {
87                  offset += b;
88                  return false;
89              }
90          }
91          return true;
92      }
93  
94      @Override
95      public void close() throws Exception {
96          in.close();
97      }
98  
99      @Override
100     public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
101         if (isEndOfInput()) {
102             return null;
103         }
104         // buffer cannot be not be empty from there
105         int readBytes = byteBuffer.position();
106         for (;;) {
107             int localReadBytes = in.read(byteBuffer);
108             if (localReadBytes < 0) {
109                 break;
110             }
111             readBytes += localReadBytes;
112             offset += localReadBytes;
113             if (readBytes == chunkSize) {
114                 break;
115             }
116         }
117         byteBuffer.flip();
118         boolean release = true;
119         ByteBuf buffer = ctx.alloc().buffer(byteBuffer.remaining());
120         try {
121             buffer.writeBytes(byteBuffer);
122             byteBuffer.clear();
123             release = false;
124             return buffer;
125         } finally {
126             if (release) {
127                 buffer.release();
128             }
129         }
130     }
131 }