View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.stream;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  
21  import java.nio.ByteBuffer;
22  import java.nio.channels.ReadableByteChannel;
23  
24  import static io.netty5.util.internal.ObjectUtil.checkPositive;
25  import static java.util.Objects.requireNonNull;
26  
27  /**
28   * A {@link ChunkedInput} that fetches data from a {@link ReadableByteChannel}
29   * chunk by chunk.  Please note that the {@link ReadableByteChannel} must
30   * operate in blocking mode.  Non-blocking mode channels are not supported.
31   */
32  public class ChunkedNioStream implements ChunkedInput<Buffer> {
33  
34      private final ReadableByteChannel in;
35  
36      private final int chunkSize;
37      private long offset;
38  
39      /**
40       * Associated ByteBuffer
41       */
42      private final ByteBuffer byteBuffer;
43  
44      /**
45       * Creates a new instance that fetches data from the specified channel.
46       */
47      public ChunkedNioStream(ReadableByteChannel in) {
48          this(in, ChunkedStream.DEFAULT_CHUNK_SIZE);
49      }
50  
51      /**
52       * Creates a new instance that fetches data from the specified channel.
53       *
54       * @param chunkSize the number of bytes to fetch on each
55       *                  {@link #readChunk(BufferAllocator)} call
56       */
57      public ChunkedNioStream(ReadableByteChannel in, int chunkSize) {
58          this.in = requireNonNull(in, "in");
59          this.chunkSize = checkPositive(chunkSize, "chunkSize");
60          byteBuffer = ByteBuffer.allocate(chunkSize);
61      }
62  
63      /**
64       * Returns the number of transferred bytes.
65       */
66      public long transferredBytes() {
67          return offset;
68      }
69  
70      @Override
71      public boolean isEndOfInput() throws Exception {
72          if (byteBuffer.position() > 0) {
73              // A previous read was not over, so there is a next chunk in the buffer at least
74              return false;
75          }
76          if (in.isOpen()) {
77              // Try to read a new part, and keep this part (no rewind)
78              int b = in.read(byteBuffer);
79              if (b < 0) {
80                  return true;
81              } else {
82                  offset += b;
83                  return false;
84              }
85          }
86          return true;
87      }
88  
89      @Override
90      public void close() throws Exception {
91          in.close();
92      }
93  
94      @Override
95      public Buffer readChunk(BufferAllocator allocator) throws Exception {
96          if (isEndOfInput()) {
97              return null;
98          }
99          // buffer cannot be not be empty from there
100         int readBytes = byteBuffer.position();
101         for (;;) {
102             int localReadBytes = in.read(byteBuffer);
103             if (localReadBytes < 0) {
104                 break;
105             }
106             readBytes += localReadBytes;
107             offset += localReadBytes;
108             if (readBytes == chunkSize) {
109                 break;
110             }
111         }
112         byteBuffer.flip();
113         boolean release = true;
114         Buffer buffer = allocator.allocate(byteBuffer.remaining());
115         try {
116             buffer.writeBytes(byteBuffer);
117             byteBuffer.clear();
118             release = false;
119             return buffer;
120         } finally {
121             if (release) {
122                 buffer.close();
123             }
124         }
125     }
126 
127     @Override
128     public long length() {
129         return -1;
130     }
131 
132     @Override
133     public long progress() {
134         return offset;
135     }
136 }