1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.handler.stream;
17
18 import io.netty5.buffer.api.BufferAllocator;
19 import io.netty5.util.Resource;
20 import io.netty5.channel.Channel;
21 import io.netty5.channel.ChannelHandler;
22 import io.netty5.channel.ChannelHandlerContext;
23 import io.netty5.channel.ChannelPipeline;
24 import io.netty5.util.concurrent.Future;
25 import io.netty5.util.concurrent.Promise;
26 import io.netty5.util.internal.SilentDispose;
27 import io.netty5.util.internal.logging.InternalLogger;
28 import io.netty5.util.internal.logging.InternalLoggerFactory;
29
30 import java.nio.channels.ClosedChannelException;
31 import java.util.ArrayDeque;
32 import java.util.Queue;
33
34 import static io.netty5.util.internal.ObjectUtil.checkPositive;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67 public class ChunkedWriteHandler implements ChannelHandler {
68
69 private static final InternalLogger logger =
70 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
71
72 private final Queue<PendingWrite> queue = new ArrayDeque<>();
73 private volatile ChannelHandlerContext ctx;
74
75 public ChunkedWriteHandler() {
76 }
77
78
79
80
81 @Deprecated
82 public ChunkedWriteHandler(int maxPendingWrites) {
83 checkPositive(maxPendingWrites, "maxPendingWrites");
84 }
85
86 @Override
87 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
88 this.ctx = ctx;
89 }
90
91
92
93
94 public void resumeTransfer() {
95 final ChannelHandlerContext ctx = this.ctx;
96 if (ctx == null) {
97 return;
98 }
99 if (ctx.executor().inEventLoop()) {
100 resumeTransfer0(ctx);
101 } else {
102
103 ctx.executor().execute(() -> resumeTransfer0(ctx));
104 }
105 }
106
107 private void resumeTransfer0(ChannelHandlerContext ctx) {
108 try {
109 doFlush(ctx);
110 } catch (Exception e) {
111 logger.warn("Unexpected exception while sending chunks.", e);
112 }
113 }
114
115 @Override
116 public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
117 Promise<Void> promise = ctx.newPromise();
118 queue.add(new PendingWrite(msg, promise));
119 return promise.asFuture();
120 }
121
122 @Override
123 public void flush(ChannelHandlerContext ctx) {
124 doFlush(ctx);
125 }
126
127 @Override
128 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
129 doFlush(ctx);
130 ctx.fireChannelInactive();
131 }
132
133 @Override
134 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
135 if (ctx.channel().isWritable()) {
136
137 doFlush(ctx);
138 }
139 ctx.fireChannelWritabilityChanged();
140 }
141
142 private void discard(Throwable cause) {
143 for (;;) {
144 PendingWrite currentWrite = queue.poll();
145
146 if (currentWrite == null) {
147 break;
148 }
149 Object message = currentWrite.msg;
150 if (message instanceof ChunkedInput) {
151 ChunkedInput<?> in = (ChunkedInput<?>) message;
152 boolean endOfInput;
153 try {
154 endOfInput = in.isEndOfInput();
155 closeInput(in);
156 } catch (Exception e) {
157 closeInput(in);
158 currentWrite.fail(e);
159 if (logger.isWarnEnabled()) {
160 logger.warn(ChunkedInput.class.getSimpleName() + " failed", e);
161 }
162 continue;
163 }
164
165 if (!endOfInput) {
166 if (cause == null) {
167 cause = new ClosedChannelException();
168 }
169 currentWrite.fail(cause);
170 } else {
171 currentWrite.success();
172 }
173 } else {
174 if (cause == null) {
175 cause = new ClosedChannelException();
176 }
177 currentWrite.fail(cause);
178 }
179 }
180 }
181
182 private void doFlush(final ChannelHandlerContext ctx) {
183 final Channel channel = ctx.channel();
184 if (!channel.isActive()) {
185 discard(null);
186 return;
187 }
188
189 boolean requiresFlush = true;
190 BufferAllocator allocator = ctx.bufferAllocator();
191 while (channel.isWritable()) {
192 final PendingWrite currentWrite = queue.peek();
193
194 if (currentWrite == null) {
195 break;
196 }
197
198 if (currentWrite.promise.isDone()) {
199
200
201
202
203
204
205
206
207
208 queue.remove();
209 continue;
210 }
211
212 final Object pendingMessage = currentWrite.msg;
213
214 if (pendingMessage instanceof ChunkedInput) {
215 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
216 boolean endOfInput;
217 boolean suspend;
218 Object message = null;
219 try {
220 message = chunks.readChunk(allocator);
221 endOfInput = chunks.isEndOfInput();
222
223 if (message == null) {
224
225 suspend = !endOfInput;
226 } else {
227 suspend = false;
228 }
229 } catch (final Throwable t) {
230 queue.remove();
231
232 if (message != null) {
233 Resource.dispose(message);
234 }
235
236 closeInput(chunks);
237 currentWrite.fail(t);
238 break;
239 }
240
241 if (suspend) {
242
243
244
245 break;
246 }
247
248 if (message == null) {
249
250
251 message = allocator.allocate(0);
252 }
253
254 if (endOfInput) {
255
256
257 queue.remove();
258 }
259
260 Future<Void> f = ctx.writeAndFlush(message);
261 if (endOfInput) {
262 if (f.isDone()) {
263 handleEndOfInputFuture(f, currentWrite);
264 } else {
265
266
267
268
269
270 f.addListener(future -> handleEndOfInputFuture(future, currentWrite));
271 }
272 } else {
273 final boolean resume = !channel.isWritable();
274 if (f.isDone()) {
275 handleFuture(channel, f, currentWrite, resume);
276 } else {
277 f.addListener(future -> handleFuture(channel, future, currentWrite, resume));
278 }
279 }
280 requiresFlush = false;
281 } else {
282 queue.remove();
283 ctx.write(pendingMessage).cascadeTo(currentWrite.promise);
284 requiresFlush = true;
285 }
286
287 if (!channel.isActive()) {
288 discard(new ClosedChannelException());
289 break;
290 }
291 }
292
293 if (requiresFlush) {
294 ctx.flush();
295 }
296 }
297
298 private static void handleEndOfInputFuture(Future<?> future, PendingWrite currentWrite) {
299 ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
300 closeInput(input);
301 if (future.isFailed()) {
302 currentWrite.fail(future.cause());
303 } else {
304 currentWrite.success();
305 }
306 }
307
308 private void handleFuture(Channel channel, Future<?> future, PendingWrite currentWrite, boolean resume) {
309 ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
310 if (future.isFailed()) {
311 closeInput(input);
312 currentWrite.fail(future.cause());
313 } else {
314 if (resume && channel.isWritable()) {
315 resumeTransfer();
316 }
317 }
318 }
319
320 private static void closeInput(ChunkedInput<?> chunks) {
321 try {
322 chunks.close();
323 } catch (Throwable t) {
324 if (logger.isWarnEnabled()) {
325 logger.warn("Failed to close a chunked input.", t);
326 }
327 }
328 }
329
330 private static final class PendingWrite {
331 final Object msg;
332 final Promise<Void> promise;
333
334 PendingWrite(Object msg, Promise<Void> promise) {
335 this.msg = msg;
336 this.promise = promise;
337 }
338
339 void fail(Throwable cause) {
340 promise.tryFailure(cause);
341 if (Resource.isAccessible(msg, false)) {
342 SilentDispose.dispose(msg, logger);
343 }
344 }
345
346 void success() {
347 if (promise.isDone()) {
348
349 return;
350 }
351 promise.trySuccess(null);
352 }
353 }
354 }