View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  
20  import io.netty.buffer.ByteBufAllocator;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelDuplexHandler;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import io.netty.channel.ChannelHandler;
27  import io.netty.channel.ChannelHandlerContext;
28  import io.netty.channel.ChannelPipeline;
29  import io.netty.channel.ChannelProgressivePromise;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.nio.channels.ClosedChannelException;
36  import java.util.ArrayDeque;
37  import java.util.Queue;
38  
39  /**
40   * A {@link ChannelHandler} that adds support for writing a large data stream
41   * asynchronously neither spending a lot of memory nor getting
42   * {@link OutOfMemoryError}.  Large data streaming such as file
43   * transfer requires complicated state management in a {@link ChannelHandler}
44   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
45   * so that you can send a large data stream without difficulties.
46   * <p>
47   * To use {@link ChunkedWriteHandler} in your application, you have to insert
48   * a new {@link ChunkedWriteHandler} instance:
49   * <pre>
50   * {@link ChannelPipeline} p = ...;
51   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
52   * p.addLast("handler", new MyHandler());
53   * </pre>
54   * Once inserted, you can write a {@link ChunkedInput} so that the
55   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
56   * stream chunk by chunk and write the fetched chunk downstream:
57   * <pre>
58   * {@link Channel} ch = ...;
59   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
60   * </pre>
61   *
62   * <h3>Sending a stream which generates a chunk intermittently</h3>
63   *
64   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
65   * Such {@link ChunkedInput} implementation often returns {@code null} on
66   * {@link ChunkedInput#readChunk(ChannelHandlerContext)}, resulting in the indefinitely suspended
67   * transfer.  To resume the transfer when a new chunk is available, you have to
68   * call {@link #resumeTransfer()}.
69   */
70  public class ChunkedWriteHandler extends ChannelDuplexHandler {
71  
72      private static final InternalLogger logger =
73          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
74  
75      private final Queue<PendingWrite> queue = new ArrayDeque<PendingWrite>();
76      private volatile ChannelHandlerContext ctx;
77  
78      public ChunkedWriteHandler() {
79      }
80  
81      /**
82       * @deprecated use {@link #ChunkedWriteHandler()}
83       */
84      @Deprecated
85      public ChunkedWriteHandler(int maxPendingWrites) {
86          checkPositive(maxPendingWrites, "maxPendingWrites");
87      }
88  
89      @Override
90      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
91          this.ctx = ctx;
92      }
93  
94      /**
95       * Continues to fetch the chunks from the input.
96       */
97      public void resumeTransfer() {
98          final ChannelHandlerContext ctx = this.ctx;
99          if (ctx == null) {
100             return;
101         }
102         if (ctx.executor().inEventLoop()) {
103             resumeTransfer0(ctx);
104         } else {
105             // let the transfer resume on the next event loop round
106             ctx.executor().execute(new Runnable() {
107 
108                 @Override
109                 public void run() {
110                     resumeTransfer0(ctx);
111                 }
112             });
113         }
114     }
115 
116     private void resumeTransfer0(ChannelHandlerContext ctx) {
117         try {
118             doFlush(ctx);
119         } catch (Exception e) {
120             logger.warn("Unexpected exception while sending chunks.", e);
121         }
122     }
123 
124     @Override
125     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
126         queue.add(new PendingWrite(msg, promise));
127     }
128 
129     @Override
130     public void flush(ChannelHandlerContext ctx) throws Exception {
131         doFlush(ctx);
132     }
133 
134     @Override
135     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
136         doFlush(ctx);
137         ctx.fireChannelInactive();
138     }
139 
140     @Override
141     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
142         if (ctx.channel().isWritable()) {
143             // channel is writable again try to continue flushing
144             doFlush(ctx);
145         }
146         ctx.fireChannelWritabilityChanged();
147     }
148 
149     private void discard(Throwable cause) {
150         for (;;) {
151             PendingWrite currentWrite = queue.poll();
152 
153             if (currentWrite == null) {
154                 break;
155             }
156             Object message = currentWrite.msg;
157             if (message instanceof ChunkedInput) {
158                 ChunkedInput<?> in = (ChunkedInput<?>) message;
159                 boolean endOfInput;
160                 long inputLength;
161                 try {
162                     endOfInput = in.isEndOfInput();
163                     inputLength = in.length();
164                     closeInput(in);
165                 } catch (Exception e) {
166                     closeInput(in);
167                     currentWrite.fail(e);
168                     if (logger.isWarnEnabled()) {
169                         logger.warn(ChunkedInput.class.getSimpleName() + " failed", e);
170                     }
171                     continue;
172                 }
173 
174                 if (!endOfInput) {
175                     if (cause == null) {
176                         cause = new ClosedChannelException();
177                     }
178                     currentWrite.fail(cause);
179                 } else {
180                     currentWrite.success(inputLength);
181                 }
182             } else {
183                 if (cause == null) {
184                     cause = new ClosedChannelException();
185                 }
186                 currentWrite.fail(cause);
187             }
188         }
189     }
190 
191     private void doFlush(final ChannelHandlerContext ctx) {
192         final Channel channel = ctx.channel();
193         if (!channel.isActive()) {
194             discard(null);
195             return;
196         }
197 
198         boolean requiresFlush = true;
199         ByteBufAllocator allocator = ctx.alloc();
200         while (channel.isWritable()) {
201             final PendingWrite currentWrite = queue.peek();
202 
203             if (currentWrite == null) {
204                 break;
205             }
206 
207             if (currentWrite.promise.isDone()) {
208                 // This might happen e.g. in the case when a write operation
209                 // failed, but there're still unconsumed chunks left.
210                 // Most chunked input sources would stop generating chunks
211                 // and report end of input, but this doesn't work with any
212                 // source wrapped in HttpChunkedInput.
213                 // Note, that we're not trying to release the message/chunks
214                 // as this had to be done already by someone who resolved the
215                 // promise (using ChunkedInput.close method).
216                 // See https://github.com/netty/netty/issues/8700.
217                 queue.remove();
218                 continue;
219             }
220 
221             final Object pendingMessage = currentWrite.msg;
222 
223             if (pendingMessage instanceof ChunkedInput) {
224                 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
225                 boolean endOfInput;
226                 boolean suspend;
227                 Object message = null;
228                 try {
229                     message = chunks.readChunk(allocator);
230                     endOfInput = chunks.isEndOfInput();
231 
232                     if (message == null) {
233                         // No need to suspend when reached at the end.
234                         suspend = !endOfInput;
235                     } else {
236                         suspend = false;
237                     }
238                 } catch (final Throwable t) {
239                     queue.remove();
240 
241                     if (message != null) {
242                         ReferenceCountUtil.release(message);
243                     }
244 
245                     closeInput(chunks);
246                     currentWrite.fail(t);
247                     break;
248                 }
249 
250                 if (suspend) {
251                     // ChunkedInput.nextChunk() returned null and it has
252                     // not reached at the end of input. Let's wait until
253                     // more chunks arrive. Nothing to write or notify.
254                     break;
255                 }
256 
257                 if (message == null) {
258                     // If message is null write an empty ByteBuf.
259                     // See https://github.com/netty/netty/issues/1671
260                     message = Unpooled.EMPTY_BUFFER;
261                 }
262 
263                 if (endOfInput) {
264                     // We need to remove the element from the queue before we call writeAndFlush() as this operation
265                     // may cause an action that also touches the queue.
266                     queue.remove();
267                 }
268                 // Flush each chunk to conserve memory
269                 ChannelFuture f = ctx.writeAndFlush(message);
270                 if (endOfInput) {
271                     if (f.isDone()) {
272                         handleEndOfInputFuture(f, currentWrite);
273                     } else {
274                         // Register a listener which will close the input once the write is complete.
275                         // This is needed because the Chunk may have some resource bound that can not
276                         // be closed before its not written.
277                         //
278                         // See https://github.com/netty/netty/issues/303
279                         f.addListener(new ChannelFutureListener() {
280                             @Override
281                             public void operationComplete(ChannelFuture future) {
282                                 handleEndOfInputFuture(future, currentWrite);
283                             }
284                         });
285                     }
286                 } else {
287                     final boolean resume = !channel.isWritable();
288                     if (f.isDone()) {
289                         handleFuture(f, currentWrite, resume);
290                     } else {
291                         f.addListener(new ChannelFutureListener() {
292                             @Override
293                             public void operationComplete(ChannelFuture future) {
294                                 handleFuture(future, currentWrite, resume);
295                             }
296                         });
297                     }
298                 }
299                 requiresFlush = false;
300             } else {
301                 queue.remove();
302                 ctx.write(pendingMessage, currentWrite.promise);
303                 requiresFlush = true;
304             }
305 
306             if (!channel.isActive()) {
307                 discard(new ClosedChannelException());
308                 break;
309             }
310         }
311 
312         if (requiresFlush) {
313             ctx.flush();
314         }
315     }
316 
317     private static void handleEndOfInputFuture(ChannelFuture future, PendingWrite currentWrite) {
318         ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
319         if (!future.isSuccess()) {
320             closeInput(input);
321             currentWrite.fail(future.cause());
322         } else {
323             // read state of the input in local variables before closing it
324             long inputProgress = input.progress();
325             long inputLength = input.length();
326             closeInput(input);
327             currentWrite.progress(inputProgress, inputLength);
328             currentWrite.success(inputLength);
329         }
330     }
331 
332     private void handleFuture(ChannelFuture future, PendingWrite currentWrite, boolean resume) {
333         ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
334         if (!future.isSuccess()) {
335             closeInput(input);
336             currentWrite.fail(future.cause());
337         } else {
338             currentWrite.progress(input.progress(), input.length());
339             if (resume && future.channel().isWritable()) {
340                 resumeTransfer();
341             }
342         }
343     }
344 
345     private static void closeInput(ChunkedInput<?> chunks) {
346         try {
347             chunks.close();
348         } catch (Throwable t) {
349             if (logger.isWarnEnabled()) {
350                 logger.warn("Failed to close a chunked input.", t);
351             }
352         }
353     }
354 
355     private static final class PendingWrite {
356         final Object msg;
357         final ChannelPromise promise;
358 
359         PendingWrite(Object msg, ChannelPromise promise) {
360             this.msg = msg;
361             this.promise = promise;
362         }
363 
364         void fail(Throwable cause) {
365             ReferenceCountUtil.release(msg);
366             promise.tryFailure(cause);
367         }
368 
369         void success(long total) {
370             if (promise.isDone()) {
371                 // No need to notify the progress or fulfill the promise because it's done already.
372                 return;
373             }
374             progress(total, total);
375             promise.trySuccess();
376         }
377 
378         void progress(long progress, long total) {
379             if (promise instanceof ChannelProgressivePromise) {
380                 ((ChannelProgressivePromise) promise).tryProgress(progress, total);
381             }
382         }
383     }
384 }