View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.buffer.Unpooled;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelDuplexHandler;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandler;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelProgressivePromise;
28  import io.netty.channel.ChannelPromise;
29  import io.netty.util.ReferenceCountUtil;
30  import io.netty.util.internal.logging.InternalLogger;
31  import io.netty.util.internal.logging.InternalLoggerFactory;
32  
33  import java.nio.channels.ClosedChannelException;
34  import java.util.ArrayDeque;
35  import java.util.Queue;
36  
37  /**
38   * A {@link ChannelHandler} that adds support for writing a large data stream
39   * asynchronously neither spending a lot of memory nor getting
40   * {@link OutOfMemoryError}.  Large data streaming such as file
41   * transfer requires complicated state management in a {@link ChannelHandler}
42   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
43   * so that you can send a large data stream without difficulties.
44   * <p>
45   * To use {@link ChunkedWriteHandler} in your application, you have to insert
46   * a new {@link ChunkedWriteHandler} instance:
47   * <pre>
48   * {@link ChannelPipeline} p = ...;
49   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
50   * p.addLast("handler", new MyHandler());
51   * </pre>
52   * Once inserted, you can write a {@link ChunkedInput} so that the
53   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
54   * stream chunk by chunk and write the fetched chunk downstream:
55   * <pre>
56   * {@link Channel} ch = ...;
57   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
58   * </pre>
59   *
60   * <h3>Sending a stream which generates a chunk intermittently</h3>
61   *
62   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
63   * Such {@link ChunkedInput} implementation often returns {@code null} on
64   * {@link ChunkedInput#readChunk(ChannelHandlerContext)}, resulting in the indefinitely suspended
65   * transfer.  To resume the transfer when a new chunk is available, you have to
66   * call {@link #resumeTransfer()}.
67   */
68  public class ChunkedWriteHandler extends ChannelDuplexHandler {
69  
70      private static final InternalLogger logger =
71          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
72  
73      private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
74      private volatile ChannelHandlerContext ctx;
75      private PendingWrite currentWrite;
76  
77      public ChunkedWriteHandler() {
78      }
79  
80      /**
81       * @deprecated use {@link #ChunkedWriteHandler()}
82       */
83      @Deprecated
84      public ChunkedWriteHandler(int maxPendingWrites) {
85          if (maxPendingWrites <= 0) {
86              throw new IllegalArgumentException(
87                      "maxPendingWrites: " + maxPendingWrites + " (expected: > 0)");
88          }
89      }
90  
91      @Override
92      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
93          this.ctx = ctx;
94      }
95  
96      /**
97       * Continues to fetch the chunks from the input.
98       */
99      public void resumeTransfer() {
100         final ChannelHandlerContext ctx = this.ctx;
101         if (ctx == null) {
102             return;
103         }
104         if (ctx.executor().inEventLoop()) {
105             resumeTransfer0(ctx);
106         } else {
107             // let the transfer resume on the next event loop round
108             ctx.executor().execute(new Runnable() {
109 
110                 @Override
111                 public void run() {
112                     resumeTransfer0(ctx);
113                 }
114             });
115         }
116     }
117 
118     private void resumeTransfer0(ChannelHandlerContext ctx) {
119         try {
120             doFlush(ctx);
121         } catch (Exception e) {
122             if (logger.isWarnEnabled()) {
123                 logger.warn("Unexpected exception while sending chunks.", e);
124             }
125         }
126     }
127 
128     @Override
129     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
130         queue.add(new PendingWrite(msg, promise));
131     }
132 
133     @Override
134     public void flush(ChannelHandlerContext ctx) throws Exception {
135         doFlush(ctx);
136     }
137 
138     @Override
139     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
140         doFlush(ctx);
141         ctx.fireChannelInactive();
142     }
143 
144     @Override
145     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
146         if (ctx.channel().isWritable()) {
147             // channel is writable again try to continue flushing
148             doFlush(ctx);
149         }
150         ctx.fireChannelWritabilityChanged();
151     }
152 
153     private void discard(Throwable cause) {
154         for (;;) {
155             PendingWrite currentWrite = this.currentWrite;
156 
157             if (this.currentWrite == null) {
158                 currentWrite = queue.poll();
159             } else {
160                 this.currentWrite = null;
161             }
162 
163             if (currentWrite == null) {
164                 break;
165             }
166             Object message = currentWrite.msg;
167             if (message instanceof ChunkedInput) {
168                 ChunkedInput<?> in = (ChunkedInput<?>) message;
169                 try {
170                     if (!in.isEndOfInput()) {
171                         if (cause == null) {
172                             cause = new ClosedChannelException();
173                         }
174                         currentWrite.fail(cause);
175                     } else {
176                         currentWrite.success(in.length());
177                     }
178                     closeInput(in);
179                 } catch (Exception e) {
180                     currentWrite.fail(e);
181                     if (logger.isWarnEnabled()) {
182                         logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e);
183                     }
184                     closeInput(in);
185                 }
186             } else {
187                 if (cause == null) {
188                     cause = new ClosedChannelException();
189                 }
190                 currentWrite.fail(cause);
191             }
192         }
193     }
194 
195     private void doFlush(final ChannelHandlerContext ctx) {
196         final Channel channel = ctx.channel();
197         if (!channel.isActive()) {
198             discard(null);
199             return;
200         }
201 
202         boolean requiresFlush = true;
203         ByteBufAllocator allocator = ctx.alloc();
204         while (channel.isWritable()) {
205             if (currentWrite == null) {
206                 currentWrite = queue.poll();
207             }
208 
209             if (currentWrite == null) {
210                 break;
211             }
212             final PendingWrite currentWrite = this.currentWrite;
213             final Object pendingMessage = currentWrite.msg;
214 
215             if (pendingMessage instanceof ChunkedInput) {
216                 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
217                 boolean endOfInput;
218                 boolean suspend;
219                 Object message = null;
220                 try {
221                     message = chunks.readChunk(allocator);
222                     endOfInput = chunks.isEndOfInput();
223 
224                     if (message == null) {
225                         // No need to suspend when reached at the end.
226                         suspend = !endOfInput;
227                     } else {
228                         suspend = false;
229                     }
230                 } catch (final Throwable t) {
231                     this.currentWrite = null;
232 
233                     if (message != null) {
234                         ReferenceCountUtil.release(message);
235                     }
236 
237                     currentWrite.fail(t);
238                     closeInput(chunks);
239                     break;
240                 }
241 
242                 if (suspend) {
243                     // ChunkedInput.nextChunk() returned null and it has
244                     // not reached at the end of input. Let's wait until
245                     // more chunks arrive. Nothing to write or notify.
246                     break;
247                 }
248 
249                 if (message == null) {
250                     // If message is null write an empty ByteBuf.
251                     // See https://github.com/netty/netty/issues/1671
252                     message = Unpooled.EMPTY_BUFFER;
253                 }
254 
255                 ChannelFuture f = ctx.write(message);
256                 if (endOfInput) {
257                     this.currentWrite = null;
258 
259                     // Register a listener which will close the input once the write is complete.
260                     // This is needed because the Chunk may have some resource bound that can not
261                     // be closed before its not written.
262                     //
263                     // See https://github.com/netty/netty/issues/303
264                     f.addListener(new ChannelFutureListener() {
265                         @Override
266                         public void operationComplete(ChannelFuture future) throws Exception {
267                             currentWrite.progress(chunks.progress(), chunks.length());
268                             currentWrite.success(chunks.length());
269                             closeInput(chunks);
270                         }
271                     });
272                 } else if (channel.isWritable()) {
273                     f.addListener(new ChannelFutureListener() {
274                         @Override
275                         public void operationComplete(ChannelFuture future) throws Exception {
276                             if (!future.isSuccess()) {
277                                 closeInput((ChunkedInput<?>) pendingMessage);
278                                 currentWrite.fail(future.cause());
279                             } else {
280                                 currentWrite.progress(chunks.progress(), chunks.length());
281                             }
282                         }
283                     });
284                 } else {
285                     f.addListener(new ChannelFutureListener() {
286                         @Override
287                         public void operationComplete(ChannelFuture future) throws Exception {
288                             if (!future.isSuccess()) {
289                                 closeInput((ChunkedInput<?>) pendingMessage);
290                                 currentWrite.fail(future.cause());
291                             } else {
292                                 currentWrite.progress(chunks.progress(), chunks.length());
293                                 if (channel.isWritable()) {
294                                     resumeTransfer();
295                                 }
296                             }
297                         }
298                     });
299                 }
300                 // Flush each chunk to conserve memory
301                 ctx.flush();
302                 requiresFlush = false;
303             } else {
304                 this.currentWrite = null;
305                 ctx.write(pendingMessage, currentWrite.promise);
306                 requiresFlush = true;
307             }
308 
309             if (!channel.isActive()) {
310                 discard(new ClosedChannelException());
311                 break;
312             }
313         }
314 
315         if (requiresFlush) {
316             ctx.flush();
317         }
318     }
319 
320     private static void closeInput(ChunkedInput<?> chunks) {
321         try {
322             chunks.close();
323         } catch (Throwable t) {
324             if (logger.isWarnEnabled()) {
325                 logger.warn("Failed to close a chunked input.", t);
326             }
327         }
328     }
329 
330     private static final class PendingWrite {
331         final Object msg;
332         final ChannelPromise promise;
333 
334         PendingWrite(Object msg, ChannelPromise promise) {
335             this.msg = msg;
336             this.promise = promise;
337         }
338 
339         void fail(Throwable cause) {
340             ReferenceCountUtil.release(msg);
341             promise.tryFailure(cause);
342         }
343 
344         void success(long total) {
345             if (promise.isDone()) {
346                 // No need to notify the progress or fulfill the promise because it's done already.
347                 return;
348             }
349             progress(total, total);
350             promise.trySuccess();
351         }
352 
353         void progress(long progress, long total) {
354             if (promise instanceof ChannelProgressivePromise) {
355                 ((ChannelProgressivePromise) promise).tryProgress(progress, total);
356             }
357         }
358     }
359 }