View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import io.netty.buffer.Unpooled;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelHandlerAdapter;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelPipeline;
26  import io.netty.channel.ChannelProgressivePromise;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.util.ReferenceCountUtil;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.nio.channels.ClosedChannelException;
33  import java.util.ArrayDeque;
34  import java.util.Queue;
35  
36  /**
37   * A {@link ChannelHandler} that adds support for writing a large data stream
38   * asynchronously neither spending a lot of memory nor getting
39   * {@link OutOfMemoryError}.  Large data streaming such as file
40   * transfer requires complicated state management in a {@link ChannelHandler}
41   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
42   * so that you can send a large data stream without difficulties.
43   * <p>
44   * To use {@link ChunkedWriteHandler} in your application, you have to insert
45   * a new {@link ChunkedWriteHandler} instance:
46   * <pre>
47   * {@link ChannelPipeline} p = ...;
48   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
49   * p.addLast("handler", new MyHandler());
50   * </pre>
51   * Once inserted, you can write a {@link ChunkedInput} so that the
52   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
53   * stream chunk by chunk and write the fetched chunk downstream:
54   * <pre>
55   * {@link Channel} ch = ...;
56   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
57   * </pre>
58   *
59   * <h3>Sending a stream which generates a chunk intermittently</h3>
60   *
61   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
62   * Such {@link ChunkedInput} implementation often returns {@code null} on
63   * {@link ChunkedInput#readChunk(ChannelHandlerContext)}, resulting in the indefinitely suspended
64   * transfer.  To resume the transfer when a new chunk is available, you have to
65   * call {@link #resumeTransfer()}.
66   */
67  public class ChunkedWriteHandler extends ChannelHandlerAdapter {
68  
69      private static final InternalLogger logger =
70          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
71  
72      private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
73      private volatile ChannelHandlerContext ctx;
74      private PendingWrite currentWrite;
75  
76      public ChunkedWriteHandler() {
77      }
78  
79      /**
80       * @deprecated use {@link #ChunkedWriteHandler()}
81       */
82      @Deprecated
83      public ChunkedWriteHandler(int maxPendingWrites) {
84          if (maxPendingWrites <= 0) {
85              throw new IllegalArgumentException(
86                      "maxPendingWrites: " + maxPendingWrites + " (expected: > 0)");
87          }
88      }
89  
90      @Override
91      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
92          this.ctx = ctx;
93      }
94  
95      /**
96       * Continues to fetch the chunks from the input.
97       */
98      public void resumeTransfer() {
99          final ChannelHandlerContext ctx = this.ctx;
100         if (ctx == null) {
101             return;
102         }
103         if (ctx.executor().inEventLoop()) {
104             try {
105                 doFlush(ctx);
106             } catch (Exception e) {
107                 if (logger.isWarnEnabled()) {
108                     logger.warn("Unexpected exception while sending chunks.", e);
109                 }
110             }
111         } else {
112             // let the transfer resume on the next event loop round
113             ctx.executor().execute(new Runnable() {
114 
115                 @Override
116                 public void run() {
117                     try {
118                         doFlush(ctx);
119                     } catch (Exception e) {
120                         if (logger.isWarnEnabled()) {
121                             logger.warn("Unexpected exception while sending chunks.", e);
122                         }
123                     }
124                 }
125             });
126         }
127     }
128 
129     @Override
130     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
131         queue.add(new PendingWrite(msg, promise));
132     }
133 
134     @Override
135     public void flush(ChannelHandlerContext ctx) throws Exception {
136         if (!doFlush(ctx)) {
137             // Make sure to flush at least once.
138             ctx.flush();
139         }
140     }
141 
142     @Override
143     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
144         doFlush(ctx);
145         ctx.fireChannelInactive();
146     }
147 
148     @Override
149     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
150         if (ctx.channel().isWritable()) {
151             // channel is writable again try to continue flushing
152             doFlush(ctx);
153         }
154         ctx.fireChannelWritabilityChanged();
155     }
156 
157     private void discard(Throwable cause) {
158         for (;;) {
159             PendingWrite currentWrite = this.currentWrite;
160 
161             if (this.currentWrite == null) {
162                 currentWrite = queue.poll();
163             } else {
164                 this.currentWrite = null;
165             }
166 
167             if (currentWrite == null) {
168                 break;
169             }
170             Object message = currentWrite.msg;
171             if (message instanceof ChunkedInput) {
172                 ChunkedInput<?> in = (ChunkedInput<?>) message;
173                 try {
174                     if (!in.isEndOfInput()) {
175                         if (cause == null) {
176                             cause = new ClosedChannelException();
177                         }
178                         currentWrite.fail(cause);
179                     } else {
180                         currentWrite.success(in.length());
181                     }
182                     closeInput(in);
183                 } catch (Exception e) {
184                     currentWrite.fail(e);
185                     logger.warn(ChunkedInput.class.getSimpleName() + ".isEndOfInput() failed", e);
186                     closeInput(in);
187                 }
188             } else {
189                 if (cause == null) {
190                     cause = new ClosedChannelException();
191                 }
192                 currentWrite.fail(cause);
193             }
194         }
195     }
196 
197     private boolean doFlush(final ChannelHandlerContext ctx) throws Exception {
198         final Channel channel = ctx.channel();
199         if (!channel.isActive()) {
200             discard(null);
201             return false;
202         }
203 
204         boolean flushed = false;
205         while (channel.isWritable()) {
206             if (currentWrite == null) {
207                 currentWrite = queue.poll();
208             }
209 
210             if (currentWrite == null) {
211                 break;
212             }
213             final PendingWrite currentWrite = this.currentWrite;
214             final Object pendingMessage = currentWrite.msg;
215 
216             if (pendingMessage instanceof ChunkedInput) {
217                 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
218                 boolean endOfInput;
219                 boolean suspend;
220                 Object message = null;
221                 try {
222                     message = chunks.readChunk(ctx);
223                     endOfInput = chunks.isEndOfInput();
224 
225                     if (message == null) {
226                         // No need to suspend when reached at the end.
227                         suspend = !endOfInput;
228                     } else {
229                         suspend = false;
230                     }
231                 } catch (final Throwable t) {
232                     this.currentWrite = null;
233 
234                     if (message != null) {
235                         ReferenceCountUtil.release(message);
236                     }
237 
238                     currentWrite.fail(t);
239                     closeInput(chunks);
240                     break;
241                 }
242 
243                 if (suspend) {
244                     // ChunkedInput.nextChunk() returned null and it has
245                     // not reached at the end of input. Let's wait until
246                     // more chunks arrive. Nothing to write or notify.
247                     break;
248                 }
249 
250                 if (message == null) {
251                     // If message is null write an empty ByteBuf.
252                     // See https://github.com/netty/netty/issues/1671
253                     message = Unpooled.EMPTY_BUFFER;
254                 }
255 
256                 ChannelFuture f = ctx.write(message);
257                 if (endOfInput) {
258                     this.currentWrite = null;
259 
260                     // Register a listener which will close the input once the write is complete.
261                     // This is needed because the Chunk may have some resource bound that can not
262                     // be closed before its not written.
263                     //
264                     // See https://github.com/netty/netty/issues/303
265                     f.addListener(new ChannelFutureListener() {
266                         @Override
267                         public void operationComplete(ChannelFuture future) throws Exception {
268                             currentWrite.progress(chunks.progress(), chunks.length());
269                             currentWrite.success(chunks.length());
270                             closeInput(chunks);
271                         }
272                     });
273                 } else if (channel.isWritable()) {
274                     f.addListener(new ChannelFutureListener() {
275                         @Override
276                         public void operationComplete(ChannelFuture future) throws Exception {
277                             if (!future.isSuccess()) {
278                                 closeInput((ChunkedInput<?>) pendingMessage);
279                                 currentWrite.fail(future.cause());
280                             } else {
281                                 currentWrite.progress(chunks.progress(), chunks.length());
282                             }
283                         }
284                     });
285                 } else {
286                     f.addListener(new ChannelFutureListener() {
287                         @Override
288                         public void operationComplete(ChannelFuture future) throws Exception {
289                             if (!future.isSuccess()) {
290                                 closeInput((ChunkedInput<?>) pendingMessage);
291                                 currentWrite.fail(future.cause());
292                             } else {
293                                 currentWrite.progress(chunks.progress(), chunks.length());
294                                 if (channel.isWritable()) {
295                                     resumeTransfer();
296                                 }
297                             }
298                         }
299                     });
300                 }
301             } else {
302                 ctx.write(pendingMessage, currentWrite.promise);
303                 this.currentWrite = null;
304             }
305 
306             // Always need to flush
307             ctx.flush();
308             flushed = true;
309 
310             if (!channel.isActive()) {
311                 discard(new ClosedChannelException());
312                 break;
313             }
314         }
315 
316         return flushed;
317     }
318 
319     static void closeInput(ChunkedInput<?> chunks) {
320         try {
321             chunks.close();
322         } catch (Throwable t) {
323             if (logger.isWarnEnabled()) {
324                 logger.warn("Failed to close a chunked input.", t);
325             }
326         }
327     }
328 
329     private static final class PendingWrite {
330         final Object msg;
331         final ChannelPromise promise;
332 
333         PendingWrite(Object msg, ChannelPromise promise) {
334             this.msg = msg;
335             this.promise = promise;
336         }
337 
338         void fail(Throwable cause) {
339             ReferenceCountUtil.release(msg);
340             promise.tryFailure(cause);
341         }
342 
343         void success(long total) {
344             if (promise.isDone()) {
345                 // No need to notify the progress or fulfill the promise because it's done already.
346                 return;
347             }
348 
349             if (promise instanceof ChannelProgressivePromise) {
350                 // Now we know what the total is.
351                 ((ChannelProgressivePromise) promise).tryProgress(total, total);
352             }
353 
354             promise.trySuccess();
355         }
356 
357         void progress(long progress, long total) {
358             if (promise instanceof ChannelProgressivePromise) {
359                 ((ChannelProgressivePromise) promise).tryProgress(progress, total);
360             }
361         }
362     }
363 }