View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelDuplexHandler;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelProgressivePromise;
28  import io.netty.channel.ChannelPromise;
29  import io.netty.util.ReferenceCountUtil;
30  import io.netty.util.internal.logging.InternalLogger;
31  import io.netty.util.internal.logging.InternalLoggerFactory;
32  
33  import java.nio.channels.ClosedChannelException;
34  import java.util.ArrayDeque;
35  import java.util.Queue;
36  
37  /**
38   * A {@link ChannelHandler} that adds support for writing a large data stream
39   * asynchronously neither spending a lot of memory nor getting
40   * {@link OutOfMemoryError}.  Large data streaming such as file
41   * transfer requires complicated state management in a {@link ChannelHandler}
42   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
43   * so that you can send a large data stream without difficulties.
44   * <p>
45   * To use {@link ChunkedWriteHandler} in your application, you have to insert
46   * a new {@link ChunkedWriteHandler} instance:
47   * <pre>
48   * {@link ChannelPipeline} p = ...;
49   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
50   * p.addLast("handler", new MyHandler());
51   * </pre>
52   * Once inserted, you can write a {@link ChunkedInput} so that the
53   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
54   * stream chunk by chunk and write the fetched chunk downstream:
55   * <pre>
56   * {@link Channel} ch = ...;
57   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
58   * </pre>
59   *
60   * <h3>Sending a stream which generates a chunk intermittently</h3>
61   *
62   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
63   * Such {@link ChunkedInput} implementation often returns {@code null} on
64   * {@link ChunkedInput#readChunk(ChannelHandlerContext)}, resulting in the indefinitely suspended
65   * transfer.  To resume the transfer when a new chunk is available, you have to
66   * call {@link #resumeTransfer()}.
67   */
68  public class ChunkedWriteHandler extends ChannelDuplexHandler {
69  
70      private static final InternalLogger logger =
71          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
72  
73      private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
74      private volatile ChannelHandlerContext ctx;
75      private PendingWrite currentWrite;
76  
77      public ChunkedWriteHandler() {
78      }
79  
80      /**
81       * @deprecated use {@link #ChunkedWriteHandler()}
82       */
83      @Deprecated
84      public ChunkedWriteHandler(int maxPendingWrites) {
85          if (maxPendingWrites <= 0) {
86              throw new IllegalArgumentException(
87                      "maxPendingWrites: " + maxPendingWrites + " (expected: > 0)");
88          }
89      }
90  
91      @Override
92      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
93          this.ctx = ctx;
94      }
95  
96      /**
97       * Continues to fetch the chunks from the input.
98       */
99      public void resumeTransfer() {
100         final ChannelHandlerContext ctx = this.ctx;
101         if (ctx == null) {
102             return;
103         }
104         if (ctx.executor().inEventLoop()) {
105             resumeTransfer0(ctx);
106         } else {
107             // let the transfer resume on the next event loop round
108             ctx.executor().execute(new Runnable() {
109 
110                 @Override
111                 public void run() {
112                     resumeTransfer0(ctx);
113                 }
114             });
115         }
116     }
117 
118     private void resumeTransfer0(ChannelHandlerContext ctx) {
119         try {
120             doFlush(ctx);
121         } catch (Exception e) {
122             if (logger.isWarnEnabled()) {
123                 logger.warn("Unexpected exception while sending chunks.", e);
124             }
125         }
126     }
127 
128     @Override
129     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
130         queue.add(new PendingWrite(msg, promise));
131     }
132 
133     @Override
134     public void flush(ChannelHandlerContext ctx) throws Exception {
135         doFlush(ctx);
136     }
137 
138     @Override
139     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
140         doFlush(ctx);
141         ctx.fireChannelInactive();
142     }
143 
144     @Override
145     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
146         if (ctx.channel().isWritable()) {
147             // channel is writable again try to continue flushing
148             doFlush(ctx);
149         }
150         ctx.fireChannelWritabilityChanged();
151     }
152 
153     private void discard(Throwable cause) {
154         for (;;) {
155             PendingWrite currentWrite = this.currentWrite;
156 
157             if (this.currentWrite == null) {
158                 currentWrite = queue.poll();
159             } else {
160                 this.currentWrite = null;
161             }
162 
163             if (currentWrite == null) {
164                 break;
165             }
166             Object message = currentWrite.msg;
167             if (message instanceof ChunkedInput) {
168                 ChunkedInput<?> in = (ChunkedInput<?>) message;
169                 try {
170                     if (!in.isEndOfInput()) {
171                         if (cause == null) {
172                             cause = new ClosedChannelException();
173                         }
174                         currentWrite.fail(cause);
175                     } else {
176                         currentWrite.success(in.length());
177                     }
178                     closeInput(in);
179                 } catch (Exception e) {
180                     currentWrite.fail(e);
181                     logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e);
182                     closeInput(in);
183                 }
184             } else {
185                 if (cause == null) {
186                     cause = new ClosedChannelException();
187                 }
188                 currentWrite.fail(cause);
189             }
190         }
191     }
192 
193     private void doFlush(final ChannelHandlerContext ctx) {
194         final Channel channel = ctx.channel();
195         if (!channel.isActive()) {
196             discard(null);
197             return;
198         }
199 
200         boolean requiresFlush = true;
201         ByteBufAllocator allocator = ctx.alloc();
202         while (channel.isWritable()) {
203             if (currentWrite == null) {
204                 currentWrite = queue.poll();
205             }
206 
207             if (currentWrite == null) {
208                 break;
209             }
210             final PendingWrite currentWrite = this.currentWrite;
211             final Object pendingMessage = currentWrite.msg;
212 
213             if (pendingMessage instanceof ChunkedInput) {
214                 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
215                 boolean endOfInput;
216                 boolean suspend;
217                 Object message = null;
218                 try {
219                     message = chunks.readChunk(allocator);
220                     endOfInput = chunks.isEndOfInput();
221 
222                     if (message == null) {
223                         // No need to suspend when reached at the end.
224                         suspend = !endOfInput;
225                     } else {
226                         suspend = false;
227                     }
228                 } catch (final Throwable t) {
229                     this.currentWrite = null;
230 
231                     if (message != null) {
232                         ReferenceCountUtil.release(message);
233                     }
234 
235                     currentWrite.fail(t);
236                     closeInput(chunks);
237                     break;
238                 }
239 
240                 if (suspend) {
241                     // ChunkedInput.nextChunk() returned null and it has
242                     // not reached at the end of input. Let's wait until
243                     // more chunks arrive. Nothing to write or notify.
244                     break;
245                 }
246 
247                 if (message == null) {
248                     // If message is null write an empty ByteBuf.
249                     // See https://github.com/netty/netty/issues/1671
250                     message = Unpooled.EMPTY_BUFFER;
251                 }
252 
253                 ChannelFuture f = ctx.write(message);
254                 if (endOfInput) {
255                     this.currentWrite = null;
256 
257                     // Register a listener which will close the input once the write is complete.
258                     // This is needed because the Chunk may have some resource bound that can not
259                     // be closed before its not written.
260                     //
261                     // See https://github.com/netty/netty/issues/303
262                     f.addListener(new ChannelFutureListener() {
263                         @Override
264                         public void operationComplete(ChannelFuture future) throws Exception {
265                             currentWrite.progress(chunks.progress(), chunks.length());
266                             currentWrite.success(chunks.length());
267                             closeInput(chunks);
268                         }
269                     });
270                 } else if (channel.isWritable()) {
271                     f.addListener(new ChannelFutureListener() {
272                         @Override
273                         public void operationComplete(ChannelFuture future) throws Exception {
274                             if (!future.isSuccess()) {
275                                 closeInput((ChunkedInput<?>) pendingMessage);
276                                 currentWrite.fail(future.cause());
277                             } else {
278                                 currentWrite.progress(chunks.progress(), chunks.length());
279                             }
280                         }
281                     });
282                 } else {
283                     f.addListener(new ChannelFutureListener() {
284                         @Override
285                         public void operationComplete(ChannelFuture future) throws Exception {
286                             if (!future.isSuccess()) {
287                                 closeInput((ChunkedInput<?>) pendingMessage);
288                                 currentWrite.fail(future.cause());
289                             } else {
290                                 currentWrite.progress(chunks.progress(), chunks.length());
291                                 if (channel.isWritable()) {
292                                     resumeTransfer();
293                                 }
294                             }
295                         }
296                     });
297                 }
298                 // Flush each chunk to conserve memory
299                 ctx.flush();
300                 requiresFlush = false;
301             } else {
302                 this.currentWrite = null;
303                 ctx.write(pendingMessage, currentWrite.promise);
304                 requiresFlush = true;
305             }
306 
307             if (!channel.isActive()) {
308                 discard(new ClosedChannelException());
309                 break;
310             }
311         }
312 
313         if (requiresFlush) {
314             ctx.flush();
315         }
316     }
317 
318     private static void closeInput(ChunkedInput<?> chunks) {
319         try {
320             chunks.close();
321         } catch (Throwable t) {
322             if (logger.isWarnEnabled()) {
323                 logger.warn("Failed to close a chunked input.", t);
324             }
325         }
326     }
327 
328     private static final class PendingWrite {
329         final Object msg;
330         final ChannelPromise promise;
331 
332         PendingWrite(Object msg, ChannelPromise promise) {
333             this.msg = msg;
334             this.promise = promise;
335         }
336 
337         void fail(Throwable cause) {
338             ReferenceCountUtil.release(msg);
339             promise.tryFailure(cause);
340         }
341 
342         void success(long total) {
343             if (promise.isDone()) {
344                 // No need to notify the progress or fulfill the promise because it's done already.
345                 return;
346             }
347             progress(total, total);
348             promise.trySuccess();
349         }
350 
351         void progress(long progress, long total) {
352             if (promise instanceof ChannelProgressivePromise) {
353                 ((ChannelProgressivePromise) promise).tryProgress(progress, total);
354             }
355         }
356     }
357 }