1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.stream;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20
21 import java.nio.ByteBuffer;
22 import java.nio.channels.ReadableByteChannel;
23
24 import static io.netty5.util.internal.ObjectUtil.checkPositive;
25 import static java.util.Objects.requireNonNull;
26
27
28
29
30
31
32 public class ChunkedNioStream implements ChunkedInput<Buffer> {
33
34 private final ReadableByteChannel in;
35
36 private final int chunkSize;
37 private long offset;
38
39
40
41
42 private final ByteBuffer byteBuffer;
43
44
45
46
47 public ChunkedNioStream(ReadableByteChannel in) {
48 this(in, ChunkedStream.DEFAULT_CHUNK_SIZE);
49 }
50
51
52
53
54
55
56
57 public ChunkedNioStream(ReadableByteChannel in, int chunkSize) {
58 this.in = requireNonNull(in, "in");
59 this.chunkSize = checkPositive(chunkSize, "chunkSize");
60 byteBuffer = ByteBuffer.allocate(chunkSize);
61 }
62
63
64
65
66 public long transferredBytes() {
67 return offset;
68 }
69
70 @Override
71 public boolean isEndOfInput() throws Exception {
72 if (byteBuffer.position() > 0) {
73
74 return false;
75 }
76 if (in.isOpen()) {
77
78 int b = in.read(byteBuffer);
79 if (b < 0) {
80 return true;
81 } else {
82 offset += b;
83 return false;
84 }
85 }
86 return true;
87 }
88
89 @Override
90 public void close() throws Exception {
91 in.close();
92 }
93
94 @Override
95 public Buffer readChunk(BufferAllocator allocator) throws Exception {
96 if (isEndOfInput()) {
97 return null;
98 }
99
100 int readBytes = byteBuffer.position();
101 for (;;) {
102 int localReadBytes = in.read(byteBuffer);
103 if (localReadBytes < 0) {
104 break;
105 }
106 readBytes += localReadBytes;
107 offset += localReadBytes;
108 if (readBytes == chunkSize) {
109 break;
110 }
111 }
112 byteBuffer.flip();
113 boolean release = true;
114 Buffer buffer = allocator.allocate(byteBuffer.remaining());
115 try {
116 buffer.writeBytes(byteBuffer);
117 byteBuffer.clear();
118 release = false;
119 return buffer;
120 } finally {
121 if (release) {
122 buffer.close();
123 }
124 }
125 }
126
127 @Override
128 public long length() {
129 return -1;
130 }
131
132 @Override
133 public long progress() {
134 return offset;
135 }
136 }