1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty.handler.stream;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelDuplexHandler;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelFutureListener;
25  import io.netty.channel.ChannelHandler;
26  import io.netty.channel.ChannelHandlerContext;
27  import io.netty.channel.ChannelPipeline;
28  import io.netty.channel.ChannelProgressivePromise;
29  import io.netty.channel.ChannelPromise;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.nio.channels.ClosedChannelException;
35  import java.util.ArrayDeque;
36  import java.util.Queue;
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  public class ChunkedWriteHandler
70          extends ChannelDuplexHandler {
71  
72      private static final InternalLogger logger =
73          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
74  
75      private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
76      private volatile ChannelHandlerContext ctx;
77      private PendingWrite currentWrite;
78  
79      public ChunkedWriteHandler() {
80      }
81  
82      
83  
84  
85      @Deprecated
86      public ChunkedWriteHandler(int maxPendingWrites) {
87          if (maxPendingWrites <= 0) {
88              throw new IllegalArgumentException(
89                      "maxPendingWrites: " + maxPendingWrites + " (expected: > 0)");
90          }
91      }
92  
93      @Override
94      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
95          this.ctx = ctx;
96      }
97  
98      
99  
100 
101     public void resumeTransfer() {
102         final ChannelHandlerContext ctx = this.ctx;
103         if (ctx == null) {
104             return;
105         }
106         if (ctx.executor().inEventLoop()) {
107             try {
108                 doFlush(ctx);
109             } catch (Exception e) {
110                 if (logger.isWarnEnabled()) {
111                     logger.warn("Unexpected exception while sending chunks.", e);
112                 }
113             }
114         } else {
115             
116             ctx.executor().execute(new Runnable() {
117 
118                 @Override
119                 public void run() {
120                     try {
121                         doFlush(ctx);
122                     } catch (Exception e) {
123                         if (logger.isWarnEnabled()) {
124                             logger.warn("Unexpected exception while sending chunks.", e);
125                         }
126                     }
127                 }
128             });
129         }
130     }
131 
132     @Override
133     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
134         queue.add(new PendingWrite(msg, promise));
135     }
136 
137     @Override
138     public void flush(ChannelHandlerContext ctx) throws Exception {
139         doFlush(ctx);
140     }
141 
142     @Override
143     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
144         doFlush(ctx);
145         ctx.fireChannelInactive();
146     }
147 
148     @Override
149     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
150         if (ctx.channel().isWritable()) {
151             
152             doFlush(ctx);
153         }
154         ctx.fireChannelWritabilityChanged();
155     }
156 
157     private void discard(Throwable cause) {
158         for (;;) {
159             PendingWrite currentWrite = this.currentWrite;
160 
161             if (this.currentWrite == null) {
162                 currentWrite = queue.poll();
163             } else {
164                 this.currentWrite = null;
165             }
166 
167             if (currentWrite == null) {
168                 break;
169             }
170             Object message = currentWrite.msg;
171             if (message instanceof ChunkedInput) {
172                 ChunkedInput<?> in = (ChunkedInput<?>) message;
173                 try {
174                     if (!in.isEndOfInput()) {
175                         if (cause == null) {
176                             cause = new ClosedChannelException();
177                         }
178                         currentWrite.fail(cause);
179                     } else {
180                         currentWrite.success();
181                     }
182                     closeInput(in);
183                 } catch (Exception e) {
184                     currentWrite.fail(e);
185                     logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e);
186                     closeInput(in);
187                 }
188             } else {
189                 if (cause == null) {
190                     cause = new ClosedChannelException();
191                 }
192                 currentWrite.fail(cause);
193             }
194         }
195     }
196 
197     private void doFlush(final ChannelHandlerContext ctx) throws Exception {
198         final Channel channel = ctx.channel();
199         if (!channel.isActive()) {
200             discard(null);
201             return;
202         }
203 
204         boolean requiresFlush = true;
205         while (channel.isWritable()) {
206             if (currentWrite == null) {
207                 currentWrite = queue.poll();
208             }
209 
210             if (currentWrite == null) {
211                 break;
212             }
213             final PendingWrite currentWrite = this.currentWrite;
214             final Object pendingMessage = currentWrite.msg;
215 
216             if (pendingMessage instanceof ChunkedInput) {
217                 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
218                 boolean endOfInput;
219                 boolean suspend;
220                 Object message = null;
221                 try {
222                     message = chunks.readChunk(ctx);
223                     endOfInput = chunks.isEndOfInput();
224 
225                     if (message == null) {
226                         
227                         suspend = !endOfInput;
228                     } else {
229                         suspend = false;
230                     }
231                 } catch (final Throwable t) {
232                     this.currentWrite = null;
233 
234                     if (message != null) {
235                         ReferenceCountUtil.release(message);
236                     }
237 
238                     currentWrite.fail(t);
239                     closeInput(chunks);
240                     break;
241                 }
242 
243                 if (suspend) {
244                     
245                     
246                     
247                     break;
248                 }
249 
250                 if (message == null) {
251                     
252                     
253                     message = Unpooled.EMPTY_BUFFER;
254                 }
255 
256                 final int amount = amount(message);
257                 ChannelFuture f = ctx.write(message);
258                 if (endOfInput) {
259                     this.currentWrite = null;
260 
261                     
262                     
263                     
264                     
265                     
266                     f.addListener(new ChannelFutureListener() {
267                         @Override
268                         public void operationComplete(ChannelFuture future) throws Exception {
269                             currentWrite.progress(amount);
270                             currentWrite.success();
271                             closeInput(chunks);
272                         }
273                     });
274                 } else if (channel.isWritable()) {
275                     f.addListener(new ChannelFutureListener() {
276                         @Override
277                         public void operationComplete(ChannelFuture future) throws Exception {
278                             if (!future.isSuccess()) {
279                                 closeInput((ChunkedInput<?>) pendingMessage);
280                                 currentWrite.fail(future.cause());
281                             } else {
282                                 currentWrite.progress(amount);
283                             }
284                         }
285                     });
286                 } else {
287                     f.addListener(new ChannelFutureListener() {
288                         @Override
289                         public void operationComplete(ChannelFuture future) throws Exception {
290                             if (!future.isSuccess()) {
291                                 closeInput((ChunkedInput<?>) pendingMessage);
292                                 currentWrite.fail(future.cause());
293                             } else {
294                                 currentWrite.progress(amount);
295                                 if (channel.isWritable()) {
296                                     resumeTransfer();
297                                 }
298                             }
299                         }
300                     });
301                 }
302                 
303                 ctx.flush();
304                 requiresFlush = false;
305             } else {
306                 ctx.write(pendingMessage, currentWrite.promise);
307                 this.currentWrite = null;
308                 requiresFlush = true;
309             }
310 
311             if (!channel.isActive()) {
312                 discard(new ClosedChannelException());
313                 break;
314             }
315         }
316 
317         if (requiresFlush) {
318             ctx.flush();
319         }
320     }
321 
322     static void closeInput(ChunkedInput<?> chunks) {
323         try {
324             chunks.close();
325         } catch (Throwable t) {
326             if (logger.isWarnEnabled()) {
327                 logger.warn("Failed to close a chunked input.", t);
328             }
329         }
330     }
331 
332     private static final class PendingWrite {
333         final Object msg;
334         final ChannelPromise promise;
335         private long progress;
336 
337         PendingWrite(Object msg, ChannelPromise promise) {
338             this.msg = msg;
339             this.promise = promise;
340         }
341 
342         void fail(Throwable cause) {
343             ReferenceCountUtil.release(msg);
344             promise.tryFailure(cause);
345         }
346 
347         void success() {
348             if (promise.isDone()) {
349                 
350                 return;
351             }
352 
353             if (promise instanceof ChannelProgressivePromise) {
354                 
355                 ((ChannelProgressivePromise) promise).tryProgress(progress, progress);
356             }
357 
358             promise.trySuccess();
359         }
360 
361         void progress(int amount) {
362             progress += amount;
363             if (promise instanceof ChannelProgressivePromise) {
364                 ((ChannelProgressivePromise) promise).tryProgress(progress, -1);
365             }
366         }
367     }
368 
369     private static int amount(Object msg) {
370         if (msg instanceof ByteBuf) {
371             return ((ByteBuf) msg).readableBytes();
372         }
373         if (msg instanceof ByteBufHolder) {
374             return ((ByteBufHolder) msg).content().readableBytes();
375         }
376         return 1;
377     }
378 }