View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.stream;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  
21  import java.io.InputStream;
22  import java.io.PushbackInputStream;
23  
24  import static java.util.Objects.requireNonNull;
25  
26  /**
27   * A {@link ChunkedInput} that fetches data from an {@link InputStream} chunk by
28   * chunk.
29   * <p>
30   * Please note that the {@link InputStream} instance that feeds data into
31   * {@link ChunkedStream} must implement {@link InputStream#available()} as
32   * accurately as possible, rather than using the default implementation.
33   * Otherwise, {@link ChunkedStream} will generate many too small chunks or
34   * block unnecessarily often.
35   */
36  public class ChunkedStream implements ChunkedInput<Buffer> {
37      static final int DEFAULT_CHUNK_SIZE = 8192;
38  
39      private final PushbackInputStream in;
40      private final int chunkSize;
41      private long offset;
42      private boolean closed;
43      private byte[] cachedArray;
44  
45      /**
46       * Creates a new instance that fetches data from the specified stream.
47       */
48      public ChunkedStream(InputStream in) {
49          this(in, DEFAULT_CHUNK_SIZE);
50      }
51  
52      /**
53       * Creates a new instance that fetches data from the specified stream.
54       *
55       * @param chunkSize the number of bytes to fetch on each {@link #readChunk(BufferAllocator)} call.
56       */
57      public ChunkedStream(InputStream in, int chunkSize) {
58          requireNonNull(in, "in");
59          if (chunkSize <= 0) {
60              throw new IllegalArgumentException(
61                      "chunkSize: " + chunkSize +
62                      " (expected: a positive integer)");
63          }
64  
65          if (in instanceof PushbackInputStream) {
66              this.in = (PushbackInputStream) in;
67          } else {
68              this.in = new PushbackInputStream(in);
69          }
70          this.chunkSize = chunkSize;
71      }
72  
73      /**
74       * Returns the number of transferred bytes.
75       */
76      public long transferredBytes() {
77          return offset;
78      }
79  
80      @Override
81      public boolean isEndOfInput() throws Exception {
82          if (closed) {
83              return true;
84          }
85          if (in.available() > 0) {
86              return false;
87          }
88  
89          int b = in.read();
90          if (b < 0) {
91              return true;
92          } else {
93              in.unread(b);
94              return false;
95          }
96      }
97  
98      @Override
99      public void close() throws Exception {
100         closed = true;
101         in.close();
102     }
103 
104     @Override
105     public Buffer readChunk(BufferAllocator allocator) throws Exception {
106         if (isEndOfInput()) {
107             return null;
108         }
109 
110         final int availableBytes = in.available();
111         final int chunkSize;
112         if (availableBytes <= 0) {
113             chunkSize = this.chunkSize;
114         } else {
115             chunkSize = Math.min(this.chunkSize, in.available());
116         }
117 
118         boolean release = true;
119         Buffer buffer = allocator.allocate(chunkSize);
120         try {
121             // transfer to buffer
122             int written;
123             try (var iter = buffer.forEachWritable()) {
124                 var component = iter.first();
125                 if (component.hasWritableArray()) {
126                     written = in.read(component.writableArray(),
127                             component.writableArrayOffset(),
128                             component.writableArrayLength());
129                 } else {
130                     int size = Math.min(component.writableBytes(), chunkSize);
131                     if (cachedArray == null || cachedArray.length < size) {
132                         cachedArray = new byte[size];
133                     }
134                     written = in.read(cachedArray, 0, size);
135                     if (written > 0) {
136                         buffer.writeBytes(cachedArray, 0, written);
137                     }
138                 }
139             }
140             if (written < 0) {
141                 return null;
142             }
143             offset += written;
144             release = false;
145             return buffer;
146         } finally {
147             if (release) {
148                 buffer.close();
149             }
150         }
151     }
152 
153     @Override
154     public long length() {
155         return -1;
156     }
157 
158     @Override
159     public long progress() {
160         return offset;
161     }
162 }