1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.stream;
17
18 import static io.netty.util.internal.ObjectUtil.checkPositive;
19
20 import io.netty.buffer.ByteBufAllocator;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelDuplexHandler;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.channel.ChannelHandler;
27 import io.netty.channel.ChannelHandlerContext;
28 import io.netty.channel.ChannelPipeline;
29 import io.netty.channel.ChannelProgressivePromise;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.util.ReferenceCountUtil;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.nio.channels.ClosedChannelException;
36 import java.util.ArrayDeque;
37 import java.util.Queue;
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 public class ChunkedWriteHandler extends ChannelDuplexHandler {
71
72 private static final InternalLogger logger =
73 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
74
75 private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
76 private volatile ChannelHandlerContext ctx;
77
78 public ChunkedWriteHandler() {
79 }
80
81
82
83
84 @Deprecated
85 public ChunkedWriteHandler(int maxPendingWrites) {
86 checkPositive(maxPendingWrites, "maxPendingWrites");
87 }
88
89 @Override
90 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
91 this.ctx = ctx;
92 }
93
94
95
96
97 public void resumeTransfer() {
98 final ChannelHandlerContext ctx = this.ctx;
99 if (ctx == null) {
100 return;
101 }
102 if (ctx.executor().inEventLoop()) {
103 resumeTransfer0(ctx);
104 } else {
105
106 ctx.executor().execute(new Runnable() {
107
108 @Override
109 public void run() {
110 resumeTransfer0(ctx);
111 }
112 });
113 }
114 }
115
116 private void resumeTransfer0(ChannelHandlerContext ctx) {
117 try {
118 doFlush(ctx);
119 } catch (Exception e) {
120 logger.warn("Unexpected exception while sending chunks.", e);
121 }
122 }
123
124 @Override
125 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
126 queue.add(new PendingWrite(msg, promise));
127 }
128
129 @Override
130 public void flush(ChannelHandlerContext ctx) throws Exception {
131 doFlush(ctx);
132 }
133
134 @Override
135 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
136 doFlush(ctx);
137 ctx.fireChannelInactive();
138 }
139
140 @Override
141 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
142 if (ctx.channel().isWritable()) {
143
144 doFlush(ctx);
145 }
146 ctx.fireChannelWritabilityChanged();
147 }
148
149 private void discard(Throwable cause) {
150 for (;;) {
151 PendingWrite currentWrite = queue.poll();
152
153 if (currentWrite == null) {
154 break;
155 }
156 Object message = currentWrite.msg;
157 if (message instanceof ChunkedInput) {
158 ChunkedInput<?> in = (ChunkedInput<?>) message;
159 boolean endOfInput;
160 long inputLength;
161 try {
162 endOfInput = in.isEndOfInput();
163 inputLength = in.length();
164 closeInput(in);
165 } catch (Exception e) {
166 closeInput(in);
167 currentWrite.fail(e);
168 if (logger.isWarnEnabled()) {
169 logger.warn(ChunkedInput.class.getSimpleName() + " failed", e);
170 }
171 continue;
172 }
173
174 if (!endOfInput) {
175 if (cause == null) {
176 cause = new ClosedChannelException();
177 }
178 currentWrite.fail(cause);
179 } else {
180 currentWrite.success(inputLength);
181 }
182 } else {
183 if (cause == null) {
184 cause = new ClosedChannelException();
185 }
186 currentWrite.fail(cause);
187 }
188 }
189 }
190
191 private void doFlush(final ChannelHandlerContext ctx) {
192 final Channel channel = ctx.channel();
193 if (!channel.isActive()) {
194 discard(null);
195 return;
196 }
197
198 boolean requiresFlush = true;
199 ByteBufAllocator allocator = ctx.alloc();
200 while (channel.isWritable()) {
201 final PendingWrite currentWrite = queue.peek();
202
203 if (currentWrite == null) {
204 break;
205 }
206
207 if (currentWrite.promise.isDone()) {
208
209
210
211
212
213
214
215
216
217 queue.remove();
218 continue;
219 }
220
221 final Object pendingMessage = currentWrite.msg;
222
223 if (pendingMessage instanceof ChunkedInput) {
224 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
225 boolean endOfInput;
226 boolean suspend;
227 Object message = null;
228 try {
229 message = chunks.readChunk(allocator);
230 endOfInput = chunks.isEndOfInput();
231
232 if (message == null) {
233
234 suspend = !endOfInput;
235 } else {
236 suspend = false;
237 }
238 } catch (final Throwable t) {
239 queue.remove();
240
241 if (message != null) {
242 ReferenceCountUtil.release(message);
243 }
244
245 closeInput(chunks);
246 currentWrite.fail(t);
247 break;
248 }
249
250 if (suspend) {
251
252
253
254 break;
255 }
256
257 if (message == null) {
258
259
260 message = Unpooled.EMPTY_BUFFER;
261 }
262
263 if (endOfInput) {
264
265
266 queue.remove();
267 }
268
269 ChannelFuture f = ctx.writeAndFlush(message);
270 if (endOfInput) {
271 if (f.isDone()) {
272 handleEndOfInputFuture(f, currentWrite);
273 } else {
274
275
276
277
278
279 f.addListener(new ChannelFutureListener() {
280 @Override
281 public void operationComplete(ChannelFuture future) {
282 handleEndOfInputFuture(future, currentWrite);
283 }
284 });
285 }
286 } else {
287 final boolean resume = !channel.isWritable();
288 if (f.isDone()) {
289 handleFuture(f, currentWrite, resume);
290 } else {
291 f.addListener(new ChannelFutureListener() {
292 @Override
293 public void operationComplete(ChannelFuture future) {
294 handleFuture(future, currentWrite, resume);
295 }
296 });
297 }
298 }
299 requiresFlush = false;
300 } else {
301 queue.remove();
302 ctx.write(pendingMessage, currentWrite.promise);
303 requiresFlush = true;
304 }
305
306 if (!channel.isActive()) {
307 discard(new ClosedChannelException());
308 break;
309 }
310 }
311
312 if (requiresFlush) {
313 ctx.flush();
314 }
315 }
316
317 private static void handleEndOfInputFuture(ChannelFuture future, PendingWrite currentWrite) {
318 ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
319 if (!future.isSuccess()) {
320 closeInput(input);
321 currentWrite.fail(future.cause());
322 } else {
323
324 long inputProgress = input.progress();
325 long inputLength = input.length();
326 closeInput(input);
327 currentWrite.progress(inputProgress, inputLength);
328 currentWrite.success(inputLength);
329 }
330 }
331
332 private void handleFuture(ChannelFuture future, PendingWrite currentWrite, boolean resume) {
333 ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
334 if (!future.isSuccess()) {
335 closeInput(input);
336 currentWrite.fail(future.cause());
337 } else {
338 currentWrite.progress(input.progress(), input.length());
339 if (resume && future.channel().isWritable()) {
340 resumeTransfer();
341 }
342 }
343 }
344
345 private static void closeInput(ChunkedInput<?> chunks) {
346 try {
347 chunks.close();
348 } catch (Throwable t) {
349 if (logger.isWarnEnabled()) {
350 logger.warn("Failed to close a chunked input.", t);
351 }
352 }
353 }
354
355 private static final class PendingWrite {
356 final Object msg;
357 final ChannelPromise promise;
358
359 PendingWrite(Object msg, ChannelPromise promise) {
360 this.msg = msg;
361 this.promise = promise;
362 }
363
364 void fail(Throwable cause) {
365 ReferenceCountUtil.release(msg);
366 promise.tryFailure(cause);
367 }
368
369 void success(long total) {
370 if (promise.isDone()) {
371
372 return;
373 }
374 progress(total, total);
375 promise.trySuccess();
376 }
377
378 void progress(long progress, long total) {
379 if (promise instanceof ChannelProgressivePromise) {
380 ((ChannelProgressivePromise) promise).tryProgress(progress, total);
381 }
382 }
383 }
384 }