View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufHolder;
20  import io.netty.buffer.Unpooled;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelDuplexHandler;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelFutureListener;
25  import io.netty.channel.ChannelHandler;
26  import io.netty.channel.ChannelHandlerContext;
27  import io.netty.channel.ChannelPipeline;
28  import io.netty.channel.ChannelProgressivePromise;
29  import io.netty.channel.ChannelPromise;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.nio.channels.ClosedChannelException;
35  import java.util.ArrayDeque;
36  import java.util.Queue;
37  
38  /**
39   * A {@link ChannelHandler} that adds support for writing a large data stream
40   * asynchronously neither spending a lot of memory nor getting
41   * {@link OutOfMemoryError}.  Large data streaming such as file
42   * transfer requires complicated state management in a {@link ChannelHandler}
43   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
44   * so that you can send a large data stream without difficulties.
45   * <p>
46   * To use {@link ChunkedWriteHandler} in your application, you have to insert
47   * a new {@link ChunkedWriteHandler} instance:
48   * <pre>
49   * {@link ChannelPipeline} p = ...;
50   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
51   * p.addLast("handler", new MyHandler());
52   * </pre>
53   * Once inserted, you can write a {@link ChunkedInput} so that the
54   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
55   * stream chunk by chunk and write the fetched chunk downstream:
56   * <pre>
57   * {@link Channel} ch = ...;
58   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
59   * </pre>
60   *
61   * <h3>Sending a stream which generates a chunk intermittently</h3>
62   *
63   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
64   * Such {@link ChunkedInput} implementation often returns {@code null} on
65   * {@link ChunkedInput#readChunk(ChannelHandlerContext)}, resulting in the indefinitely suspended
66   * transfer.  To resume the transfer when a new chunk is available, you have to
67   * call {@link #resumeTransfer()}.
68   */
69  public class ChunkedWriteHandler
70          extends ChannelDuplexHandler {
71  
72      private static final InternalLogger logger =
73          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
74  
75      private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
76      private volatile ChannelHandlerContext ctx;
77      private PendingWrite currentWrite;
78  
79      public ChunkedWriteHandler() {
80      }
81  
82      /**
83       * @deprecated use {@link #ChunkedWriteHandler()}
84       */
85      @Deprecated
86      public ChunkedWriteHandler(int maxPendingWrites) {
87          if (maxPendingWrites <= 0) {
88              throw new IllegalArgumentException(
89                      "maxPendingWrites: " + maxPendingWrites + " (expected: > 0)");
90          }
91      }
92  
93      @Override
94      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
95          this.ctx = ctx;
96      }
97  
98      /**
99       * Continues to fetch the chunks from the input.
100      */
101     public void resumeTransfer() {
102         final ChannelHandlerContext ctx = this.ctx;
103         if (ctx == null) {
104             return;
105         }
106         if (ctx.executor().inEventLoop()) {
107             try {
108                 doFlush(ctx);
109             } catch (Exception e) {
110                 if (logger.isWarnEnabled()) {
111                     logger.warn("Unexpected exception while sending chunks.", e);
112                 }
113             }
114         } else {
115             // let the transfer resume on the next event loop round
116             ctx.executor().execute(new Runnable() {
117 
118                 @Override
119                 public void run() {
120                     try {
121                         doFlush(ctx);
122                     } catch (Exception e) {
123                         if (logger.isWarnEnabled()) {
124                             logger.warn("Unexpected exception while sending chunks.", e);
125                         }
126                     }
127                 }
128             });
129         }
130     }
131 
132     @Override
133     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
134         queue.add(new PendingWrite(msg, promise));
135     }
136 
137     @Override
138     public void flush(ChannelHandlerContext ctx) throws Exception {
139         doFlush(ctx);
140     }
141 
142     @Override
143     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
144         doFlush(ctx);
145         ctx.fireChannelInactive();
146     }
147 
148     @Override
149     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
150         if (ctx.channel().isWritable()) {
151             // channel is writable again try to continue flushing
152             doFlush(ctx);
153         }
154         ctx.fireChannelWritabilityChanged();
155     }
156 
157     private void discard(Throwable cause) {
158         for (;;) {
159             PendingWrite currentWrite = this.currentWrite;
160 
161             if (this.currentWrite == null) {
162                 currentWrite = queue.poll();
163             } else {
164                 this.currentWrite = null;
165             }
166 
167             if (currentWrite == null) {
168                 break;
169             }
170             Object message = currentWrite.msg;
171             if (message instanceof ChunkedInput) {
172                 ChunkedInput<?> in = (ChunkedInput<?>) message;
173                 try {
174                     if (!in.isEndOfInput()) {
175                         if (cause == null) {
176                             cause = new ClosedChannelException();
177                         }
178                         currentWrite.fail(cause);
179                     } else {
180                         currentWrite.success();
181                     }
182                     closeInput(in);
183                 } catch (Exception e) {
184                     currentWrite.fail(e);
185                     logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e);
186                     closeInput(in);
187                 }
188             } else {
189                 if (cause == null) {
190                     cause = new ClosedChannelException();
191                 }
192                 currentWrite.fail(cause);
193             }
194         }
195     }
196 
197     private void doFlush(final ChannelHandlerContext ctx) throws Exception {
198         final Channel channel = ctx.channel();
199         if (!channel.isActive()) {
200             discard(null);
201             return;
202         }
203 
204         boolean requiresFlush = true;
205         while (channel.isWritable()) {
206             if (currentWrite == null) {
207                 currentWrite = queue.poll();
208             }
209 
210             if (currentWrite == null) {
211                 break;
212             }
213             final PendingWrite currentWrite = this.currentWrite;
214             final Object pendingMessage = currentWrite.msg;
215 
216             if (pendingMessage instanceof ChunkedInput) {
217                 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
218                 boolean endOfInput;
219                 boolean suspend;
220                 Object message = null;
221                 try {
222                     message = chunks.readChunk(ctx);
223                     endOfInput = chunks.isEndOfInput();
224 
225                     if (message == null) {
226                         // No need to suspend when reached at the end.
227                         suspend = !endOfInput;
228                     } else {
229                         suspend = false;
230                     }
231                 } catch (final Throwable t) {
232                     this.currentWrite = null;
233 
234                     if (message != null) {
235                         ReferenceCountUtil.release(message);
236                     }
237 
238                     currentWrite.fail(t);
239                     closeInput(chunks);
240                     break;
241                 }
242 
243                 if (suspend) {
244                     // ChunkedInput.nextChunk() returned null and it has
245                     // not reached at the end of input. Let's wait until
246                     // more chunks arrive. Nothing to write or notify.
247                     break;
248                 }
249 
250                 if (message == null) {
251                     // If message is null write an empty ByteBuf.
252                     // See https://github.com/netty/netty/issues/1671
253                     message = Unpooled.EMPTY_BUFFER;
254                 }
255 
256                 final int amount = amount(message);
257                 ChannelFuture f = ctx.write(message);
258                 if (endOfInput) {
259                     this.currentWrite = null;
260 
261                     // Register a listener which will close the input once the write is complete.
262                     // This is needed because the Chunk may have some resource bound that can not
263                     // be closed before its not written.
264                     //
265                     // See https://github.com/netty/netty/issues/303
266                     f.addListener(new ChannelFutureListener() {
267                         @Override
268                         public void operationComplete(ChannelFuture future) throws Exception {
269                             currentWrite.progress(amount);
270                             currentWrite.success();
271                             closeInput(chunks);
272                         }
273                     });
274                 } else if (channel.isWritable()) {
275                     f.addListener(new ChannelFutureListener() {
276                         @Override
277                         public void operationComplete(ChannelFuture future) throws Exception {
278                             if (!future.isSuccess()) {
279                                 closeInput((ChunkedInput<?>) pendingMessage);
280                                 currentWrite.fail(future.cause());
281                             } else {
282                                 currentWrite.progress(amount);
283                             }
284                         }
285                     });
286                 } else {
287                     f.addListener(new ChannelFutureListener() {
288                         @Override
289                         public void operationComplete(ChannelFuture future) throws Exception {
290                             if (!future.isSuccess()) {
291                                 closeInput((ChunkedInput<?>) pendingMessage);
292                                 currentWrite.fail(future.cause());
293                             } else {
294                                 currentWrite.progress(amount);
295                                 if (channel.isWritable()) {
296                                     resumeTransfer();
297                                 }
298                             }
299                         }
300                     });
301                 }
302                 // Flush each chunk to conserve memory
303                 ctx.flush();
304                 requiresFlush = false;
305             } else {
306                 ctx.write(pendingMessage, currentWrite.promise);
307                 this.currentWrite = null;
308                 requiresFlush = true;
309             }
310 
311             if (!channel.isActive()) {
312                 discard(new ClosedChannelException());
313                 break;
314             }
315         }
316 
317         if (requiresFlush) {
318             ctx.flush();
319         }
320     }
321 
322     static void closeInput(ChunkedInput<?> chunks) {
323         try {
324             chunks.close();
325         } catch (Throwable t) {
326             if (logger.isWarnEnabled()) {
327                 logger.warn("Failed to close a chunked input.", t);
328             }
329         }
330     }
331 
332     private static final class PendingWrite {
333         final Object msg;
334         final ChannelPromise promise;
335         private long progress;
336 
337         PendingWrite(Object msg, ChannelPromise promise) {
338             this.msg = msg;
339             this.promise = promise;
340         }
341 
342         void fail(Throwable cause) {
343             ReferenceCountUtil.release(msg);
344             promise.tryFailure(cause);
345         }
346 
347         void success() {
348             if (promise.isDone()) {
349                 // No need to notify the progress or fulfill the promise because it's done already.
350                 return;
351             }
352 
353             if (promise instanceof ChannelProgressivePromise) {
354                 // Now we know what the total is.
355                 ((ChannelProgressivePromise) promise).tryProgress(progress, progress);
356             }
357 
358             promise.trySuccess();
359         }
360 
361         void progress(int amount) {
362             progress += amount;
363             if (promise instanceof ChannelProgressivePromise) {
364                 ((ChannelProgressivePromise) promise).tryProgress(progress, -1);
365             }
366         }
367     }
368 
369     private static int amount(Object msg) {
370         if (msg instanceof ByteBuf) {
371             return ((ByteBuf) msg).readableBytes();
372         }
373         if (msg instanceof ByteBufHolder) {
374             return ((ByteBufHolder) msg).content().readableBytes();
375         }
376         return 1;
377     }
378 }