View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  
20  import io.netty.buffer.ByteBufAllocator;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelDuplexHandler;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import io.netty.channel.ChannelHandler;
27  import io.netty.channel.ChannelHandlerContext;
28  import io.netty.channel.ChannelPipeline;
29  import io.netty.channel.ChannelProgressivePromise;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.nio.channels.ClosedChannelException;
36  import java.util.ArrayDeque;
37  import java.util.Queue;
38  
39  /**
40   * A {@link ChannelHandler} that adds support for writing a large data stream
41   * asynchronously neither spending a lot of memory nor getting
42   * {@link OutOfMemoryError}.  Large data streaming such as file
43   * transfer requires complicated state management in a {@link ChannelHandler}
44   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
45   * so that you can send a large data stream without difficulties.
46   * <p>
47   * To use {@link ChunkedWriteHandler} in your application, you have to insert
48   * a new {@link ChunkedWriteHandler} instance:
49   * <pre>
50   * {@link ChannelPipeline} p = ...;
51   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
52   * p.addLast("handler", new MyHandler());
53   * </pre>
54   * Once inserted, you can write a {@link ChunkedInput} so that the
55   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
56   * stream chunk by chunk and write the fetched chunk downstream:
57   * <pre>
58   * {@link Channel} ch = ...;
59   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
60   * </pre>
61   *
62   * <h3>Sending a stream which generates a chunk intermittently</h3>
63   *
64   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
65   * Such {@link ChunkedInput} implementation often returns {@code null} on
66   * {@link ChunkedInput#readChunk(ChannelHandlerContext)}, resulting in the indefinitely suspended
67   * transfer.  To resume the transfer when a new chunk is available, you have to
68   * call {@link #resumeTransfer()}.
69   */
70  public class ChunkedWriteHandler extends ChannelDuplexHandler {
71  
72      private static final InternalLogger logger =
73          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
74  
75      private Queue<PendingWrite> queue;
76      private volatile ChannelHandlerContext ctx;
77  
78      public ChunkedWriteHandler() {
79      }
80  
81      /**
82       * @deprecated use {@link #ChunkedWriteHandler()}
83       */
84      @Deprecated
85      public ChunkedWriteHandler(int maxPendingWrites) {
86          checkPositive(maxPendingWrites, "maxPendingWrites");
87      }
88  
89      private void allocateQueue() {
90          if (queue == null) {
91              queue = new ArrayDeque<PendingWrite>();
92          }
93      }
94  
95      private boolean queueIsEmpty() {
96          return queue == null || queue.isEmpty();
97      }
98  
99      @Override
100     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
101         this.ctx = ctx;
102     }
103 
104     /**
105      * Continues to fetch the chunks from the input.
106      */
107     public void resumeTransfer() {
108         final ChannelHandlerContext ctx = this.ctx;
109         if (ctx == null) {
110             return;
111         }
112         if (ctx.executor().inEventLoop()) {
113             resumeTransfer0(ctx);
114         } else {
115             // let the transfer resume on the next event loop round
116             ctx.executor().execute(new Runnable() {
117 
118                 @Override
119                 public void run() {
120                     resumeTransfer0(ctx);
121                 }
122             });
123         }
124     }
125 
126     private void resumeTransfer0(ChannelHandlerContext ctx) {
127         try {
128             doFlush(ctx);
129         } catch (Exception e) {
130             logger.warn("Unexpected exception while sending chunks.", e);
131         }
132     }
133 
134     @Override
135     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
136         if (!queueIsEmpty() || msg instanceof ChunkedInput) {
137             allocateQueue();
138             queue.add(new PendingWrite(msg, promise));
139         } else {
140             ctx.write(msg, promise);
141         }
142     }
143 
144     @Override
145     public void flush(ChannelHandlerContext ctx) throws Exception {
146         doFlush(ctx);
147     }
148 
149     @Override
150     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
151         doFlush(ctx);
152         ctx.fireChannelInactive();
153     }
154 
155     @Override
156     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
157         if (ctx.channel().isWritable()) {
158             // channel is writable again try to continue flushing
159             doFlush(ctx);
160         }
161         ctx.fireChannelWritabilityChanged();
162     }
163 
164     private void discard(Throwable cause) {
165         if (queueIsEmpty()) {
166             return;
167         }
168         for (;;) {
169             PendingWrite currentWrite = queue.poll();
170 
171             if (currentWrite == null) {
172                 break;
173             }
174             Object message = currentWrite.msg;
175             if (message instanceof ChunkedInput) {
176                 ChunkedInput<?> in = (ChunkedInput<?>) message;
177                 boolean endOfInput;
178                 long inputLength;
179                 try {
180                     endOfInput = in.isEndOfInput();
181                     inputLength = in.length();
182                     closeInput(in);
183                 } catch (Exception e) {
184                     closeInput(in);
185                     currentWrite.fail(e);
186                     logger.warn("ChunkedInput failed", e);
187                     continue;
188                 }
189 
190                 if (!endOfInput) {
191                     if (cause == null) {
192                         cause = new ClosedChannelException();
193                     }
194                     currentWrite.fail(cause);
195                 } else {
196                     currentWrite.success(inputLength);
197                 }
198             } else {
199                 if (cause == null) {
200                     cause = new ClosedChannelException();
201                 }
202                 currentWrite.fail(cause);
203             }
204         }
205     }
206 
207     private void doFlush(final ChannelHandlerContext ctx) {
208         final Channel channel = ctx.channel();
209         if (!channel.isActive()) {
210             discard(null);
211             return;
212         }
213 
214         if (queueIsEmpty()) {
215             ctx.flush();
216             return;
217         }
218 
219         boolean requiresFlush = true;
220         ByteBufAllocator allocator = ctx.alloc();
221         while (channel.isWritable()) {
222             final PendingWrite currentWrite = queue.peek();
223 
224             if (currentWrite == null) {
225                 break;
226             }
227 
228             if (currentWrite.promise.isDone()) {
229                 // This might happen e.g. in the case when a write operation
230                 // failed, but there are still unconsumed chunks left.
231                 // Most chunked input sources would stop generating chunks
232                 // and report end of input, but this doesn't work with any
233                 // source wrapped in HttpChunkedInput.
234                 // Note, that we're not trying to release the message/chunks
235                 // as this had to be done already by someone who resolved the
236                 // promise (using ChunkedInput.close method).
237                 // See https://github.com/netty/netty/issues/8700.
238                 queue.remove();
239                 continue;
240             }
241 
242             final Object pendingMessage = currentWrite.msg;
243 
244             if (pendingMessage instanceof ChunkedInput) {
245                 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
246                 boolean endOfInput;
247                 boolean suspend;
248                 Object message = null;
249                 try {
250                     message = chunks.readChunk(allocator);
251                     endOfInput = chunks.isEndOfInput();
252                     // No need to suspend when reached at the end.
253                     suspend = message == null && !endOfInput;
254 
255                 } catch (final Throwable t) {
256                     queue.remove();
257 
258                     if (message != null) {
259                         ReferenceCountUtil.release(message);
260                     }
261 
262                     closeInput(chunks);
263                     currentWrite.fail(t);
264                     break;
265                 }
266 
267                 if (suspend) {
268                     // ChunkedInput.nextChunk() returned null and it has
269                     // not reached at the end of input. Let's wait until
270                     // more chunks arrive. Nothing to write or notify.
271                     break;
272                 }
273 
274                 if (message == null) {
275                     // If message is null write an empty ByteBuf.
276                     // See https://github.com/netty/netty/issues/1671
277                     message = Unpooled.EMPTY_BUFFER;
278                 }
279 
280                 if (endOfInput) {
281                     // We need to remove the element from the queue before we call writeAndFlush() as this operation
282                     // may cause an action that also touches the queue.
283                     queue.remove();
284                 }
285                 // Flush each chunk to conserve memory
286                 ChannelFuture f = ctx.writeAndFlush(message);
287                 if (endOfInput) {
288                     if (f.isDone()) {
289                         handleEndOfInputFuture(f, chunks, currentWrite);
290                     } else {
291                         // Register a listener which will close the input once the write is complete.
292                         // This is needed because the Chunk may have some resource bound that can not
293                         // be closed before it's not written.
294                         //
295                         // See https://github.com/netty/netty/issues/303
296                         f.addListener(new ChannelFutureListener() {
297                             @Override
298                             public void operationComplete(ChannelFuture future) {
299                                 handleEndOfInputFuture(future, chunks, currentWrite);
300                             }
301                         });
302                     }
303                 } else {
304                     final boolean resume = !channel.isWritable();
305                     if (f.isDone()) {
306                         handleFuture(f, chunks, currentWrite, resume);
307                     } else {
308                         f.addListener(new ChannelFutureListener() {
309                             @Override
310                             public void operationComplete(ChannelFuture future) {
311                                 handleFuture(future, chunks, currentWrite, resume);
312                             }
313                         });
314                     }
315                 }
316                 requiresFlush = false;
317             } else {
318                 queue.remove();
319                 ctx.write(pendingMessage, currentWrite.promise);
320                 requiresFlush = true;
321             }
322 
323             if (!channel.isActive()) {
324                 discard(new ClosedChannelException());
325                 break;
326             }
327         }
328 
329         if (requiresFlush) {
330             ctx.flush();
331         }
332     }
333 
334     private static void handleEndOfInputFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite) {
335         if (!future.isSuccess()) {
336             closeInput(input);
337             currentWrite.fail(future.cause());
338         } else {
339             // read state of the input in local variables before closing it
340             long inputProgress = input.progress();
341             long inputLength = input.length();
342             closeInput(input);
343             currentWrite.progress(inputProgress, inputLength);
344             currentWrite.success(inputLength);
345         }
346     }
347 
348     private void handleFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite, boolean resume) {
349         if (!future.isSuccess()) {
350             closeInput(input);
351             currentWrite.fail(future.cause());
352         } else {
353             currentWrite.progress(input.progress(), input.length());
354             if (resume && future.channel().isWritable()) {
355                 resumeTransfer();
356             }
357         }
358     }
359 
360     private static void closeInput(ChunkedInput<?> chunks) {
361         try {
362             chunks.close();
363         } catch (Throwable t) {
364             logger.warn("Failed to close a ChunkedInput.", t);
365         }
366     }
367 
368     private static final class PendingWrite {
369         final Object msg;
370         final ChannelPromise promise;
371 
372         PendingWrite(Object msg, ChannelPromise promise) {
373             this.msg = msg;
374             this.promise = promise;
375         }
376 
377         void fail(Throwable cause) {
378             ReferenceCountUtil.release(msg);
379             promise.tryFailure(cause);
380         }
381 
382         void success(long total) {
383             if (promise.isDone()) {
384                 // No need to notify the progress or fulfill the promise because it's done already.
385                 return;
386             }
387             progress(total, total);
388             promise.trySuccess();
389         }
390 
391         void progress(long progress, long total) {
392             if (promise instanceof ChannelProgressivePromise) {
393                 ((ChannelProgressivePromise) promise).tryProgress(progress, total);
394             }
395         }
396     }
397 }