1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.stream;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20
21 import java.io.InputStream;
22 import java.io.PushbackInputStream;
23
24 import static java.util.Objects.requireNonNull;
25
26
27
28
29
30
31
32
33
34
35
36 public class ChunkedStream implements ChunkedInput<Buffer> {
37 static final int DEFAULT_CHUNK_SIZE = 8192;
38
39 private final PushbackInputStream in;
40 private final int chunkSize;
41 private long offset;
42 private boolean closed;
43 private byte[] cachedArray;
44
45
46
47
48 public ChunkedStream(InputStream in) {
49 this(in, DEFAULT_CHUNK_SIZE);
50 }
51
52
53
54
55
56
57 public ChunkedStream(InputStream in, int chunkSize) {
58 requireNonNull(in, "in");
59 if (chunkSize <= 0) {
60 throw new IllegalArgumentException(
61 "chunkSize: " + chunkSize +
62 " (expected: a positive integer)");
63 }
64
65 if (in instanceof PushbackInputStream) {
66 this.in = (PushbackInputStream) in;
67 } else {
68 this.in = new PushbackInputStream(in);
69 }
70 this.chunkSize = chunkSize;
71 }
72
73
74
75
76 public long transferredBytes() {
77 return offset;
78 }
79
80 @Override
81 public boolean isEndOfInput() throws Exception {
82 if (closed) {
83 return true;
84 }
85 if (in.available() > 0) {
86 return false;
87 }
88
89 int b = in.read();
90 if (b < 0) {
91 return true;
92 } else {
93 in.unread(b);
94 return false;
95 }
96 }
97
98 @Override
99 public void close() throws Exception {
100 closed = true;
101 in.close();
102 }
103
104 @Override
105 public Buffer readChunk(BufferAllocator allocator) throws Exception {
106 if (isEndOfInput()) {
107 return null;
108 }
109
110 final int availableBytes = in.available();
111 final int chunkSize;
112 if (availableBytes <= 0) {
113 chunkSize = this.chunkSize;
114 } else {
115 chunkSize = Math.min(this.chunkSize, in.available());
116 }
117
118 boolean release = true;
119 Buffer buffer = allocator.allocate(chunkSize);
120 try {
121
122 int written;
123 try (var iter = buffer.forEachWritable()) {
124 var component = iter.first();
125 if (component.hasWritableArray()) {
126 written = in.read(component.writableArray(),
127 component.writableArrayOffset(),
128 component.writableArrayLength());
129 } else {
130 int size = Math.min(component.writableBytes(), chunkSize);
131 if (cachedArray == null || cachedArray.length < size) {
132 cachedArray = new byte[size];
133 }
134 written = in.read(cachedArray, 0, size);
135 if (written > 0) {
136 buffer.writeBytes(cachedArray, 0, written);
137 }
138 }
139 }
140 if (written < 0) {
141 return null;
142 }
143 offset += written;
144 release = false;
145 return buffer;
146 } finally {
147 if (release) {
148 buffer.close();
149 }
150 }
151 }
152
153 @Override
154 public long length() {
155 return -1;
156 }
157
158 @Override
159 public long progress() {
160 return offset;
161 }
162 }