1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.stream;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19
20 import io.netty.buffer.ByteBufAllocator;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelDuplexHandler;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.channel.ChannelHandler;
27 import io.netty.channel.ChannelHandlerContext;
28 import io.netty.channel.ChannelPipeline;
29 import io.netty.channel.ChannelProgressivePromise;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.util.ReferenceCountUtil;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.nio.channels.ClosedChannelException;
36 import java.util.ArrayDeque;
37 import java.util.Queue;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public class ChunkedWriteHandler extends ChannelDuplexHandler {
71
72 private static final InternalLogger logger =
73 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
74
75 private Queue<PendingWrite> queue;
76 private volatile ChannelHandlerContext ctx;
77
78 public ChunkedWriteHandler() {
79 }
80
81
82
83
84 @Deprecated
85 public ChunkedWriteHandler(int maxPendingWrites) {
86 checkPositive(maxPendingWrites, "maxPendingWrites");
87 }
88
89 private void allocateQueue() {
90 if (queue == null) {
91 queue = new ArrayDeque<PendingWrite>();
92 }
93 }
94
95 private boolean queueIsEmpty() {
96 return queue == null || queue.isEmpty();
97 }
98
99 @Override
100 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
101 this.ctx = ctx;
102 }
103
104
105
106
107 public void resumeTransfer() {
108 final ChannelHandlerContext ctx = this.ctx;
109 if (ctx == null) {
110 return;
111 }
112 if (ctx.executor().inEventLoop()) {
113 resumeTransfer0(ctx);
114 } else {
115
116 ctx.executor().execute(new Runnable() {
117
118 @Override
119 public void run() {
120 resumeTransfer0(ctx);
121 }
122 });
123 }
124 }
125
126 private void resumeTransfer0(ChannelHandlerContext ctx) {
127 try {
128 doFlush(ctx);
129 } catch (Exception e) {
130 logger.warn("Unexpected exception while sending chunks.", e);
131 }
132 }
133
134 @Override
135 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
136 if (!queueIsEmpty() || msg instanceof ChunkedInput) {
137 allocateQueue();
138 queue.add(new PendingWrite(msg, promise));
139 } else {
140 ctx.write(msg, promise);
141 }
142 }
143
144 @Override
145 public void flush(ChannelHandlerContext ctx) throws Exception {
146 doFlush(ctx);
147 }
148
149 @Override
150 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
151 doFlush(ctx);
152 ctx.fireChannelInactive();
153 }
154
155 @Override
156 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
157 if (ctx.channel().isWritable()) {
158
159 doFlush(ctx);
160 }
161 ctx.fireChannelWritabilityChanged();
162 }
163
164 private void discard(Throwable cause) {
165 if (queueIsEmpty()) {
166 return;
167 }
168 for (;;) {
169 PendingWrite currentWrite = queue.poll();
170
171 if (currentWrite == null) {
172 break;
173 }
174 Object message = currentWrite.msg;
175 if (message instanceof ChunkedInput) {
176 ChunkedInput<?> in = (ChunkedInput<?>) message;
177 boolean endOfInput;
178 long inputLength;
179 try {
180 endOfInput = in.isEndOfInput();
181 inputLength = in.length();
182 closeInput(in);
183 } catch (Exception e) {
184 closeInput(in);
185 currentWrite.fail(e);
186 logger.warn("ChunkedInput failed", e);
187 continue;
188 }
189
190 if (!endOfInput) {
191 if (cause == null) {
192 cause = new ClosedChannelException();
193 }
194 currentWrite.fail(cause);
195 } else {
196 currentWrite.success(inputLength);
197 }
198 } else {
199 if (cause == null) {
200 cause = new ClosedChannelException();
201 }
202 currentWrite.fail(cause);
203 }
204 }
205 }
206
207 private void doFlush(final ChannelHandlerContext ctx) {
208 final Channel channel = ctx.channel();
209 if (!channel.isActive()) {
210
211
212
213 discard(null);
214 ctx.flush();
215 return;
216 }
217
218 if (queueIsEmpty()) {
219 ctx.flush();
220 return;
221 }
222
223 boolean requiresFlush = true;
224 ByteBufAllocator allocator = ctx.alloc();
225 while (channel.isWritable()) {
226 final PendingWrite currentWrite = queue.peek();
227
228 if (currentWrite == null) {
229 break;
230 }
231
232 if (currentWrite.promise.isDone()) {
233
234
235
236
237
238
239
240
241
242 queue.remove();
243 continue;
244 }
245
246 final Object pendingMessage = currentWrite.msg;
247
248 if (pendingMessage instanceof ChunkedInput) {
249 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
250 boolean endOfInput;
251 boolean suspend;
252 Object message = null;
253 try {
254 message = chunks.readChunk(allocator);
255 endOfInput = chunks.isEndOfInput();
256
257 suspend = message == null && !endOfInput;
258
259 } catch (final Throwable t) {
260 queue.remove();
261
262 if (message != null) {
263 ReferenceCountUtil.release(message);
264 }
265
266 closeInput(chunks);
267 currentWrite.fail(t);
268 break;
269 }
270
271 if (suspend) {
272
273
274
275 break;
276 }
277
278 if (message == null) {
279
280
281 message = Unpooled.EMPTY_BUFFER;
282 }
283
284 if (endOfInput) {
285
286
287 queue.remove();
288 }
289
290 ChannelFuture f = ctx.writeAndFlush(message);
291 if (endOfInput) {
292 if (f.isDone()) {
293 handleEndOfInputFuture(f, chunks, currentWrite);
294 } else {
295
296
297
298
299
300 f.addListener((ChannelFutureListener) future ->
301 handleEndOfInputFuture(future, chunks, currentWrite));
302 }
303 } else {
304 final boolean resume = !channel.isWritable();
305 if (f.isDone()) {
306 handleFuture(f, chunks, currentWrite, resume);
307 } else {
308 f.addListener((ChannelFutureListener) future ->
309 handleFuture(future, chunks, currentWrite, resume));
310 }
311 }
312 requiresFlush = false;
313 } else {
314 queue.remove();
315 ctx.write(pendingMessage, currentWrite.promise);
316 requiresFlush = true;
317 }
318
319 if (!channel.isActive()) {
320 discard(new ClosedChannelException());
321 break;
322 }
323 }
324
325 if (requiresFlush) {
326 ctx.flush();
327 }
328 }
329
330 private static void handleEndOfInputFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite) {
331 if (!future.isSuccess()) {
332 closeInput(input);
333 currentWrite.fail(future.cause());
334 } else {
335
336 long inputProgress = input.progress();
337 long inputLength = input.length();
338 closeInput(input);
339 currentWrite.progress(inputProgress, inputLength);
340 currentWrite.success(inputLength);
341 }
342 }
343
344 private void handleFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite, boolean resume) {
345 if (!future.isSuccess()) {
346 closeInput(input);
347 currentWrite.fail(future.cause());
348 } else {
349 currentWrite.progress(input.progress(), input.length());
350 if (resume && future.channel().isWritable()) {
351 resumeTransfer();
352 }
353 }
354 }
355
356 private static void closeInput(ChunkedInput<?> chunks) {
357 try {
358 chunks.close();
359 } catch (Throwable t) {
360 logger.warn("Failed to close a ChunkedInput.", t);
361 }
362 }
363
364 private static final class PendingWrite {
365 final Object msg;
366 final ChannelPromise promise;
367
368 PendingWrite(Object msg, ChannelPromise promise) {
369 this.msg = msg;
370 this.promise = promise;
371 }
372
373 void fail(Throwable cause) {
374 ReferenceCountUtil.release(msg);
375 promise.tryFailure(cause);
376 }
377
378 void success(long total) {
379 if (promise.isDone()) {
380
381 return;
382 }
383 progress(total, total);
384 promise.trySuccess();
385 }
386
387 void progress(long progress, long total) {
388 if (promise instanceof ChannelProgressivePromise) {
389 ((ChannelProgressivePromise) promise).tryProgress(progress, total);
390 }
391 }
392 }
393 }