View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.stream;
17  
18  import static io.netty.util.internal.ObjectUtil.checkPositive;
19  
20  import io.netty.buffer.ByteBufAllocator;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelDuplexHandler;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import io.netty.channel.ChannelHandler;
27  import io.netty.channel.ChannelHandlerContext;
28  import io.netty.channel.ChannelPipeline;
29  import io.netty.channel.ChannelProgressivePromise;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.nio.channels.ClosedChannelException;
36  import java.util.ArrayDeque;
37  import java.util.Queue;
38  
39  /**
40   * A {@link ChannelHandler} that adds support for writing a large data stream
41   * asynchronously neither spending a lot of memory nor getting
42   * {@link OutOfMemoryError}.  Large data streaming such as file
43   * transfer requires complicated state management in a {@link ChannelHandler}
44   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
45   * so that you can send a large data stream without difficulties.
46   * <p>
47   * To use {@link ChunkedWriteHandler} in your application, you have to insert
48   * a new {@link ChunkedWriteHandler} instance:
49   * <pre>
50   * {@link ChannelPipeline} p = ...;
51   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
52   * p.addLast("handler", new MyHandler());
53   * </pre>
54   * Once inserted, you can write a {@link ChunkedInput} so that the
55   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
56   * stream chunk by chunk and write the fetched chunk downstream:
57   * <pre>
58   * {@link Channel} ch = ...;
59   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
60   * </pre>
61   *
62   * <h3>Sending a stream which generates a chunk intermittently</h3>
63   *
64   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
65   * Such {@link ChunkedInput} implementation often returns {@code null} on
66   * {@link ChunkedInput#readChunk(ChannelHandlerContext)}, resulting in the indefinitely suspended
67   * transfer.  To resume the transfer when a new chunk is available, you have to
68   * call {@link #resumeTransfer()}.
69   */
70  public class ChunkedWriteHandler extends ChannelDuplexHandler {
71  
72      private static final InternalLogger logger =
73          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
74  
75      private Queue<PendingWrite> queue;
76      private volatile ChannelHandlerContext ctx;
77  
78      public ChunkedWriteHandler() {
79      }
80  
81      /**
82       * @deprecated use {@link #ChunkedWriteHandler()}
83       */
84      @Deprecated
85      public ChunkedWriteHandler(int maxPendingWrites) {
86          checkPositive(maxPendingWrites, "maxPendingWrites");
87      }
88  
89      private void allocateQueue() {
90          if (queue == null) {
91              queue = new ArrayDeque<PendingWrite>();
92          }
93      }
94  
95      private boolean queueIsEmpty() {
96          return queue == null || queue.isEmpty();
97      }
98  
99      @Override
100     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
101         this.ctx = ctx;
102     }
103 
104     /**
105      * Continues to fetch the chunks from the input.
106      */
107     public void resumeTransfer() {
108         final ChannelHandlerContext ctx = this.ctx;
109         if (ctx == null) {
110             return;
111         }
112         if (ctx.executor().inEventLoop()) {
113             resumeTransfer0(ctx);
114         } else {
115             // let the transfer resume on the next event loop round
116             ctx.executor().execute(new Runnable() {
117 
118                 @Override
119                 public void run() {
120                     resumeTransfer0(ctx);
121                 }
122             });
123         }
124     }
125 
126     private void resumeTransfer0(ChannelHandlerContext ctx) {
127         try {
128             doFlush(ctx);
129         } catch (Exception e) {
130             logger.warn("Unexpected exception while sending chunks.", e);
131         }
132     }
133 
134     @Override
135     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
136         if (!queueIsEmpty() || msg instanceof ChunkedInput) {
137             allocateQueue();
138             queue.add(new PendingWrite(msg, promise));
139         } else {
140             ctx.write(msg, promise);
141         }
142     }
143 
144     @Override
145     public void flush(ChannelHandlerContext ctx) throws Exception {
146         doFlush(ctx);
147     }
148 
149     @Override
150     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
151         doFlush(ctx);
152         ctx.fireChannelInactive();
153     }
154 
155     @Override
156     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
157         if (ctx.channel().isWritable()) {
158             // channel is writable again try to continue flushing
159             doFlush(ctx);
160         }
161         ctx.fireChannelWritabilityChanged();
162     }
163 
164     private void discard(Throwable cause) {
165         if (queueIsEmpty()) {
166             return;
167         }
168         for (;;) {
169             PendingWrite currentWrite = queue.poll();
170 
171             if (currentWrite == null) {
172                 break;
173             }
174             Object message = currentWrite.msg;
175             if (message instanceof ChunkedInput) {
176                 ChunkedInput<?> in = (ChunkedInput<?>) message;
177                 boolean endOfInput;
178                 long inputLength;
179                 try {
180                     endOfInput = in.isEndOfInput();
181                     inputLength = in.length();
182                     closeInput(in);
183                 } catch (Exception e) {
184                     closeInput(in);
185                     currentWrite.fail(e);
186                     logger.warn("ChunkedInput failed", e);
187                     continue;
188                 }
189 
190                 if (!endOfInput) {
191                     if (cause == null) {
192                         cause = new ClosedChannelException();
193                     }
194                     currentWrite.fail(cause);
195                 } else {
196                     currentWrite.success(inputLength);
197                 }
198             } else {
199                 if (cause == null) {
200                     cause = new ClosedChannelException();
201                 }
202                 currentWrite.fail(cause);
203             }
204         }
205     }
206 
207     private void doFlush(final ChannelHandlerContext ctx) {
208         final Channel channel = ctx.channel();
209         if (!channel.isActive()) {
210             // Even after discarding all previous queued objects we should propagate the flush through
211             // to ensure previous written objects via writeAndFlush(...) that were not queued will be flushed and
212             // so eventually fail the promise.
213             discard(null);
214             ctx.flush();
215             return;
216         }
217 
218         if (queueIsEmpty()) {
219             ctx.flush();
220             return;
221         }
222 
223         boolean requiresFlush = true;
224         ByteBufAllocator allocator = ctx.alloc();
225         while (channel.isWritable()) {
226             final PendingWrite currentWrite = queue.peek();
227 
228             if (currentWrite == null) {
229                 break;
230             }
231 
232             if (currentWrite.promise.isDone()) {
233                 // This might happen e.g. in the case when a write operation
234                 // failed, but there are still unconsumed chunks left.
235                 // Most chunked input sources would stop generating chunks
236                 // and report end of input, but this doesn't work with any
237                 // source wrapped in HttpChunkedInput.
238                 // Note, that we're not trying to release the message/chunks
239                 // as this had to be done already by someone who resolved the
240                 // promise (using ChunkedInput.close method).
241                 // See https://github.com/netty/netty/issues/8700.
242                 queue.remove();
243                 continue;
244             }
245 
246             final Object pendingMessage = currentWrite.msg;
247 
248             if (pendingMessage instanceof ChunkedInput) {
249                 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
250                 boolean endOfInput;
251                 boolean suspend;
252                 Object message = null;
253                 try {
254                     message = chunks.readChunk(allocator);
255                     endOfInput = chunks.isEndOfInput();
256                     // No need to suspend when reached at the end.
257                     suspend = message == null && !endOfInput;
258 
259                 } catch (final Throwable t) {
260                     queue.remove();
261 
262                     if (message != null) {
263                         ReferenceCountUtil.release(message);
264                     }
265 
266                     closeInput(chunks);
267                     currentWrite.fail(t);
268                     break;
269                 }
270 
271                 if (suspend) {
272                     // ChunkedInput.nextChunk() returned null and it has
273                     // not reached at the end of input. Let's wait until
274                     // more chunks arrive. Nothing to write or notify.
275                     break;
276                 }
277 
278                 if (message == null) {
279                     // If message is null write an empty ByteBuf.
280                     // See https://github.com/netty/netty/issues/1671
281                     message = Unpooled.EMPTY_BUFFER;
282                 }
283 
284                 if (endOfInput) {
285                     // We need to remove the element from the queue before we call writeAndFlush() as this operation
286                     // may cause an action that also touches the queue.
287                     queue.remove();
288                 }
289                 // Flush each chunk to conserve memory
290                 ChannelFuture f = ctx.writeAndFlush(message);
291                 if (endOfInput) {
292                     if (f.isDone()) {
293                         handleEndOfInputFuture(f, chunks, currentWrite);
294                     } else {
295                         // Register a listener which will close the input once the write is complete.
296                         // This is needed because the Chunk may have some resource bound that can not
297                         // be closed before it's not written.
298                         //
299                         // See https://github.com/netty/netty/issues/303
300                         f.addListener(new ChannelFutureListener() {
301                             @Override
302                             public void operationComplete(ChannelFuture future) {
303                                 handleEndOfInputFuture(future, chunks, currentWrite);
304                             }
305                         });
306                     }
307                 } else {
308                     final boolean resume = !channel.isWritable();
309                     if (f.isDone()) {
310                         handleFuture(f, chunks, currentWrite, resume);
311                     } else {
312                         f.addListener(new ChannelFutureListener() {
313                             @Override
314                             public void operationComplete(ChannelFuture future) {
315                                 handleFuture(future, chunks, currentWrite, resume);
316                             }
317                         });
318                     }
319                 }
320                 requiresFlush = false;
321             } else {
322                 queue.remove();
323                 ctx.write(pendingMessage, currentWrite.promise);
324                 requiresFlush = true;
325             }
326 
327             if (!channel.isActive()) {
328                 discard(new ClosedChannelException());
329                 break;
330             }
331         }
332 
333         if (requiresFlush) {
334             ctx.flush();
335         }
336     }
337 
338     private static void handleEndOfInputFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite) {
339         if (!future.isSuccess()) {
340             closeInput(input);
341             currentWrite.fail(future.cause());
342         } else {
343             // read state of the input in local variables before closing it
344             long inputProgress = input.progress();
345             long inputLength = input.length();
346             closeInput(input);
347             currentWrite.progress(inputProgress, inputLength);
348             currentWrite.success(inputLength);
349         }
350     }
351 
352     private void handleFuture(ChannelFuture future, ChunkedInput<?> input, PendingWrite currentWrite, boolean resume) {
353         if (!future.isSuccess()) {
354             closeInput(input);
355             currentWrite.fail(future.cause());
356         } else {
357             currentWrite.progress(input.progress(), input.length());
358             if (resume && future.channel().isWritable()) {
359                 resumeTransfer();
360             }
361         }
362     }
363 
364     private static void closeInput(ChunkedInput<?> chunks) {
365         try {
366             chunks.close();
367         } catch (Throwable t) {
368             logger.warn("Failed to close a ChunkedInput.", t);
369         }
370     }
371 
372     private static final class PendingWrite {
373         final Object msg;
374         final ChannelPromise promise;
375 
376         PendingWrite(Object msg, ChannelPromise promise) {
377             this.msg = msg;
378             this.promise = promise;
379         }
380 
381         void fail(Throwable cause) {
382             ReferenceCountUtil.release(msg);
383             promise.tryFailure(cause);
384         }
385 
386         void success(long total) {
387             if (promise.isDone()) {
388                 // No need to notify the progress or fulfill the promise because it's done already.
389                 return;
390             }
391             progress(total, total);
392             promise.trySuccess();
393         }
394 
395         void progress(long progress, long total) {
396             if (promise instanceof ChannelProgressivePromise) {
397                 ((ChannelProgressivePromise) promise).tryProgress(progress, total);
398             }
399         }
400     }
401 }