1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.stream;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19
20 import io.netty.buffer.ByteBufAllocator;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelDuplexHandler;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.channel.ChannelHandler;
27 import io.netty.channel.ChannelHandlerContext;
28 import io.netty.channel.ChannelPipeline;
29 import io.netty.channel.ChannelProgressivePromise;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.util.ReferenceCountUtil;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.nio.channels.ClosedChannelException;
36 import java.util.ArrayDeque;
37 import java.util.Queue;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public class ChunkedWriteHandler extends ChannelDuplexHandler {
71
72 private static final InternalLogger logger =
73 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
74
75 private Queue<PendingWrite> queue;
76 private volatile ChannelHandlerContext ctx;
77
78 public ChunkedWriteHandler() {
79 }
80
81
82
83
84 @Deprecated
85 public ChunkedWriteHandler(int maxPendingWrites) {
86 checkPositive(maxPendingWrites, "maxPendingWrites");
87 }
88
89 private void allocateQueue() {
90 if (queue == null) {
91 queue = new ArrayDeque<PendingWrite>();
92 }
93 }
94
95 private boolean queueIsEmpty() {
96 return queue == null || queue.isEmpty();
97 }
98
99 @Override
100 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
101 this.ctx = ctx;
102 }
103
104
105
106
107 public void resumeTransfer() {
108 final ChannelHandlerContext ctx = this.ctx;
109 if (ctx == null) {
110 return;
111 }
112 if (ctx.executor().inEventLoop()) {
113 resumeTransfer0(ctx);
114 } else {
115
116 ctx.executor().execute(new Runnable() {
117
118 @Override
119 public void run() {
120 resumeTransfer0(ctx);
121 }
122 });
123 }
124 }
125
126 private void resumeTransfer0(ChannelHandlerContext ctx) {
127 try {
128 doFlush(ctx);
129 } catch (Exception e) {
130 logger.warn("Unexpected exception while sending chunks.", e);
131 }
132 }
133
134 @Override
135 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
136 if (!queueIsEmpty() || msg instanceof ChunkedInput) {
137 allocateQueue();
138 queue.add(new PendingWrite(msg, promise));
139 } else {
140 ctx.write(msg, promise);
141 }
142 }
143
144 @Override
145 public void flush(ChannelHandlerContext ctx) throws Exception {
146 doFlush(ctx);
147 }
148
149 @Override
150 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
151 doFlush(ctx);
152 ctx.fireChannelInactive();
153 }
154
155 @Override
156 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
157 if (ctx.channel().isWritable()) {
158
159 doFlush(ctx);
160 }
161 ctx.fireChannelWritabilityChanged();
162 }
163
164 private void discard(Throwable cause) {
165 if (queueIsEmpty()) {
166 return;
167 }
168 for (;;) {
169 PendingWrite currentWrite = queue.poll();
170
171 if (currentWrite == null) {
172 break;
173 }
174 Object message = currentWrite.msg;
175 if (message instanceof ChunkedInput) {
176 ChunkedInput<?> in = (ChunkedInput<?>) message;
177 boolean endOfInput;
178 long inputLength;
179 try {
180 endOfInput = in.isEndOfInput();
181 inputLength = in.length();
182 closeInput(in);
183 } catch (Exception e) {
184 closeInput(in);
185 currentWrite.fail(e);
186 logger.warn("ChunkedInput failed", e);
187 continue;
188 }
189
190 if (!endOfInput) {
191 if (cause == null) {
192 cause = new ClosedChannelException();
193 }
194 currentWrite.fail(cause);
195 } else {
196 currentWrite.success(inputLength);
197 }
198 } else {
199 if (cause == null) {
200 cause = new ClosedChannelException();
201 }
202 currentWrite.fail(cause);
203 }
204 }
205 }
206
207 private void doFlush(final ChannelHandlerContext ctx) {
208 final Channel channel = ctx.channel();
209 if (!channel.isActive()) {
210
211
212
213 discard(null);
214 ctx.flush();
215 return;
216 }
217
218 if (queueIsEmpty()) {
219 ctx.flush();
220 return;
221 }
222
223 boolean requiresFlush = true;
224 ByteBufAllocator allocator = ctx.alloc();
225 while (channel.isWritable()) {
226 final PendingWrite currentWrite = queue.peek();
227
228 if (currentWrite == null) {
229 break;
230 }
231
232 if (currentWrite.promise.isDone()) {
233
234
235
236
237
238
239
240
241
242 queue.remove();
243 continue;
244 }
245
246 final Object pendingMessage = currentWrite.msg;
247
248 if (pendingMessage instanceof ChunkedInput) {
249 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
250 boolean endOfInput;
251 boolean suspend;
252 Object message = null;
253 try {
254 message = chunks.readChunk(allocator);
255 endOfInput = chunks.isEndOfInput();
256
257 suspend = message == null && !endOfInput;
258
259 } catch (final Throwable t) {
260 queue.remove();
261
262 if (message != null) {
263 ReferenceCountUtil.release(message);
264 }
265
266 closeInput(chunks);
267 currentWrite.fail(t);
268 break;
269 }
270
271 if (suspend) {
272
273
274
275 break;
276 }
277
278 if (message == null) {
279
280
281 message = Unpooled.EMPTY_BUFFER;
282 }
283
284 if (endOfInput) {
285
286
287 queue.remove();
288 }
289
290 ChannelFuture f = ctx.writeAndFlush(message);
291 if (endOfInput) {
292 if (f.isDone()) {
293 handleEndOfInputFuture(f, chunks, currentWrite);
294 } else {
295
296
297
298
299
300 f.addListener(new ChannelFutureListener() {
301 @Override
302 public void operationComplete(ChannelFuture future) {
303 handleEndOfInputFuture(future, chunks, currentWrite);
304 }
305 });
306 }
307 } else {
308 final boolean resume = !channel.isWritable();
309 if (f.isDone()) {
310 handleFuture(f, chunks, currentWrite, resume);
311 } else {
312 f.addListener(new ChannelFutureListener() {
313 @Override
314 public void operationComplete(ChannelFuture future) {
315 handleFuture(future, chunks, currentWrite, resume);
316 }
317 });
318 }
319 }
320 requiresFlush = false;
321 } else {
322 queue.remove();
323 ctx.write(pendingMessage, currentWrite.promise);
324 requiresFlush = true;
325 }
326
327 if (!channel.isActive()) {
328 discard(new ClosedChannelException());
329 break;
330 }
331 }
332
333 if (requiresFlush) {
334 ctx.flush();
335 }
336 }
337
338 private static void handleEndOfInputFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite) {
339 if (!future.isSuccess()) {
340 closeInput(input);
341 currentWrite.fail(future.cause());
342 } else {
343
344 long inputProgress = input.progress();
345 long inputLength = input.length();
346 closeInput(input);
347 currentWrite.progress(inputProgress, inputLength);
348 currentWrite.success(inputLength);
349 }
350 }
351
352 private void handleFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite, boolean resume) {
353 if (!future.isSuccess()) {
354 closeInput(input);
355 currentWrite.fail(future.cause());
356 } else {
357 currentWrite.progress(input.progress(), input.length());
358 if (resume && future.channel().isWritable()) {
359 resumeTransfer();
360 }
361 }
362 }
363
364 private static void closeInput(ChunkedInput<?> chunks) {
365 try {
366 chunks.close();
367 } catch (Throwable t) {
368 logger.warn("Failed to close a ChunkedInput.", t);
369 }
370 }
371
372 private static final class PendingWrite {
373 final Object msg;
374 final ChannelPromise promise;
375
376 PendingWrite(Object msg, ChannelPromise promise) {
377 this.msg = msg;
378 this.promise = promise;
379 }
380
381 void fail(Throwable cause) {
382 ReferenceCountUtil.release(msg);
383 promise.tryFailure(cause);
384 }
385
386 void success(long total) {
387 if (promise.isDone()) {
388
389 return;
390 }
391 progress(total, total);
392 promise.trySuccess();
393 }
394
395 void progress(long progress, long total) {
396 if (promise instanceof ChannelProgressivePromise) {
397 ((ChannelProgressivePromise) promise).tryProgress(progress, total);
398 }
399 }
400 }
401 }