1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.stream;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufHolder;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelDuplexHandler;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelFutureListener;
25 import io.netty.channel.ChannelHandler;
26 import io.netty.channel.ChannelHandlerContext;
27 import io.netty.channel.ChannelPipeline;
28 import io.netty.channel.ChannelProgressivePromise;
29 import io.netty.channel.ChannelPromise;
30 import io.netty.util.ReferenceCountUtil;
31 import io.netty.util.internal.logging.InternalLogger;
32 import io.netty.util.internal.logging.InternalLoggerFactory;
33
34 import java.nio.channels.ClosedChannelException;
35 import java.util.ArrayDeque;
36 import java.util.Queue;
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 public class ChunkedWriteHandler
70 extends ChannelDuplexHandler {
71
72 private static final InternalLogger logger =
73 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
74
75 private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
76 private volatile ChannelHandlerContext ctx;
77 private PendingWrite currentWrite;
78
79 public ChunkedWriteHandler() {
80 }
81
82
83
84
85 @Deprecated
86 public ChunkedWriteHandler(int maxPendingWrites) {
87 if (maxPendingWrites <= 0) {
88 throw new IllegalArgumentException(
89 "maxPendingWrites: " + maxPendingWrites + " (expected: > 0)");
90 }
91 }
92
93 @Override
94 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
95 this.ctx = ctx;
96 }
97
98
99
100
101 public void resumeTransfer() {
102 final ChannelHandlerContext ctx = this.ctx;
103 if (ctx == null) {
104 return;
105 }
106 if (ctx.executor().inEventLoop()) {
107 try {
108 doFlush(ctx);
109 } catch (Exception e) {
110 if (logger.isWarnEnabled()) {
111 logger.warn("Unexpected exception while sending chunks.", e);
112 }
113 }
114 } else {
115
116 ctx.executor().execute(new Runnable() {
117
118 @Override
119 public void run() {
120 try {
121 doFlush(ctx);
122 } catch (Exception e) {
123 if (logger.isWarnEnabled()) {
124 logger.warn("Unexpected exception while sending chunks.", e);
125 }
126 }
127 }
128 });
129 }
130 }
131
132 @Override
133 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
134 queue.add(new PendingWrite(msg, promise));
135 }
136
137 @Override
138 public void flush(ChannelHandlerContext ctx) throws Exception {
139 doFlush(ctx);
140 }
141
142 @Override
143 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
144 doFlush(ctx);
145 ctx.fireChannelInactive();
146 }
147
148 @Override
149 public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
150 if (ctx.channel().isWritable()) {
151
152 doFlush(ctx);
153 }
154 ctx.fireChannelWritabilityChanged();
155 }
156
157 private void discard(Throwable cause) {
158 for (;;) {
159 PendingWrite currentWrite = this.currentWrite;
160
161 if (this.currentWrite == null) {
162 currentWrite = queue.poll();
163 } else {
164 this.currentWrite = null;
165 }
166
167 if (currentWrite == null) {
168 break;
169 }
170 Object message = currentWrite.msg;
171 if (message instanceof ChunkedInput) {
172 ChunkedInput<?> in = (ChunkedInput<?>) message;
173 try {
174 if (!in.isEndOfInput()) {
175 if (cause == null) {
176 cause = new ClosedChannelException();
177 }
178 currentWrite.fail(cause);
179 } else {
180 currentWrite.success();
181 }
182 closeInput(in);
183 } catch (Exception e) {
184 currentWrite.fail(e);
185 logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e);
186 closeInput(in);
187 }
188 } else {
189 if (cause == null) {
190 cause = new ClosedChannelException();
191 }
192 currentWrite.fail(cause);
193 }
194 }
195 }
196
197 private void doFlush(final ChannelHandlerContext ctx) throws Exception {
198 final Channel channel = ctx.channel();
199 if (!channel.isActive()) {
200 discard(null);
201 return;
202 }
203
204 boolean requiresFlush = true;
205 while (channel.isWritable()) {
206 if (currentWrite == null) {
207 currentWrite = queue.poll();
208 }
209
210 if (currentWrite == null) {
211 break;
212 }
213 final PendingWrite currentWrite = this.currentWrite;
214 final Object pendingMessage = currentWrite.msg;
215
216 if (pendingMessage instanceof ChunkedInput) {
217 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
218 boolean endOfInput;
219 boolean suspend;
220 Object message = null;
221 try {
222 message = chunks.readChunk(ctx);
223 endOfInput = chunks.isEndOfInput();
224
225 if (message == null) {
226
227 suspend = !endOfInput;
228 } else {
229 suspend = false;
230 }
231 } catch (final Throwable t) {
232 this.currentWrite = null;
233
234 if (message != null) {
235 ReferenceCountUtil.release(message);
236 }
237
238 currentWrite.fail(t);
239 closeInput(chunks);
240 break;
241 }
242
243 if (suspend) {
244
245
246
247 break;
248 }
249
250 if (message == null) {
251
252
253 message = Unpooled.EMPTY_BUFFER;
254 }
255
256 final int amount = amount(message);
257 ChannelFuture f = ctx.write(message);
258 if (endOfInput) {
259 this.currentWrite = null;
260
261
262
263
264
265
266 f.addListener(new ChannelFutureListener() {
267 @Override
268 public void operationComplete(ChannelFuture future) throws Exception {
269 currentWrite.progress(amount);
270 currentWrite.success();
271 closeInput(chunks);
272 }
273 });
274 } else if (channel.isWritable()) {
275 f.addListener(new ChannelFutureListener() {
276 @Override
277 public void operationComplete(ChannelFuture future) throws Exception {
278 if (!future.isSuccess()) {
279 closeInput((ChunkedInput<?>) pendingMessage);
280 currentWrite.fail(future.cause());
281 } else {
282 currentWrite.progress(amount);
283 }
284 }
285 });
286 } else {
287 f.addListener(new ChannelFutureListener() {
288 @Override
289 public void operationComplete(ChannelFuture future) throws Exception {
290 if (!future.isSuccess()) {
291 closeInput((ChunkedInput<?>) pendingMessage);
292 currentWrite.fail(future.cause());
293 } else {
294 currentWrite.progress(amount);
295 if (channel.isWritable()) {
296 resumeTransfer();
297 }
298 }
299 }
300 });
301 }
302
303 ctx.flush();
304 requiresFlush = false;
305 } else {
306 ctx.write(pendingMessage, currentWrite.promise);
307 this.currentWrite = null;
308 requiresFlush = true;
309 }
310
311 if (!channel.isActive()) {
312 discard(new ClosedChannelException());
313 break;
314 }
315 }
316
317 if (requiresFlush) {
318 ctx.flush();
319 }
320 }
321
322 static void closeInput(ChunkedInput<?> chunks) {
323 try {
324 chunks.close();
325 } catch (Throwable t) {
326 if (logger.isWarnEnabled()) {
327 logger.warn("Failed to close a chunked input.", t);
328 }
329 }
330 }
331
332 private static final class PendingWrite {
333 final Object msg;
334 final ChannelPromise promise;
335 private long progress;
336
337 PendingWrite(Object msg, ChannelPromise promise) {
338 this.msg = msg;
339 this.promise = promise;
340 }
341
342 void fail(Throwable cause) {
343 ReferenceCountUtil.release(msg);
344 promise.tryFailure(cause);
345 }
346
347 void success() {
348 if (promise.isDone()) {
349
350 return;
351 }
352
353 if (promise instanceof ChannelProgressivePromise) {
354
355 ((ChannelProgressivePromise) promise).tryProgress(progress, progress);
356 }
357
358 promise.trySuccess();
359 }
360
361 void progress(int amount) {
362 progress += amount;
363 if (promise instanceof ChannelProgressivePromise) {
364 ((ChannelProgressivePromise) promise).tryProgress(progress, -1);
365 }
366 }
367 }
368
369 private static int amount(Object msg) {
370 if (msg instanceof ByteBuf) {
371 return ((ByteBuf) msg).readableBytes();
372 }
373 if (msg instanceof ByteBufHolder) {
374 return ((ByteBufHolder) msg).content().readableBytes();
375 }
376 return 1;
377 }
378 }