View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.handler.stream;
17  
18  import io.netty5.buffer.api.BufferAllocator;
19  import io.netty5.util.Resource;
20  import io.netty5.channel.Channel;
21  import io.netty5.channel.ChannelHandler;
22  import io.netty5.channel.ChannelHandlerContext;
23  import io.netty5.channel.ChannelPipeline;
24  import io.netty5.util.concurrent.Future;
25  import io.netty5.util.concurrent.Promise;
26  import io.netty5.util.internal.SilentDispose;
27  import io.netty5.util.internal.logging.InternalLogger;
28  import io.netty5.util.internal.logging.InternalLoggerFactory;
29  
30  import java.nio.channels.ClosedChannelException;
31  import java.util.ArrayDeque;
32  import java.util.Queue;
33  
34  import static io.netty5.util.internal.ObjectUtil.checkPositive;
35  
36  /**
37   * A {@link ChannelHandler} that adds support for writing a large data stream
38   * asynchronously neither spending a lot of memory nor getting
39   * {@link OutOfMemoryError}.  Large data streaming such as file
40   * transfer requires complicated state management in a {@link ChannelHandler}
41   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
42   * so that you can send a large data stream without difficulties.
43   * <p>
44   * To use {@link ChunkedWriteHandler} in your application, you have to insert
45   * a new {@link ChunkedWriteHandler} instance:
46   * <pre>
47   * {@link ChannelPipeline} p = ...;
48   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
49   * p.addLast("handler", new MyHandler());
50   * </pre>
51   * Once inserted, you can write a {@link ChunkedInput} so that the
52   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
53   * stream chunk by chunk and write the fetched chunk downstream:
54   * <pre>
55   * {@link Channel} ch = ...;
56   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
57   * </pre>
58   *
59   * <h3>Sending a stream which generates a chunk intermittently</h3>
60   *
61   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
62   * Such {@link ChunkedInput} implementation often returns {@code null} on
63   * {@link ChunkedInput#readChunk(BufferAllocator)}, resulting in the indefinitely suspended
64   * transfer.  To resume the transfer when a new chunk is available, you have to
65   * call {@link #resumeTransfer()}.
66   */
67  public class ChunkedWriteHandler implements ChannelHandler {
68  
69      private static final InternalLogger logger =
70          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
71  
72      private final Queue<PendingWrite> queue = new ArrayDeque<>();
73      private volatile ChannelHandlerContext ctx;
74  
75      public ChunkedWriteHandler() {
76      }
77  
78      /**
79       * @deprecated use {@link #ChunkedWriteHandler()}
80       */
81      @Deprecated
82      public ChunkedWriteHandler(int maxPendingWrites) {
83          checkPositive(maxPendingWrites, "maxPendingWrites");
84      }
85  
86      @Override
87      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
88          this.ctx = ctx;
89      }
90  
91      /**
92       * Continues to fetch the chunks from the input.
93       */
94      public void resumeTransfer() {
95          final ChannelHandlerContext ctx = this.ctx;
96          if (ctx == null) {
97              return;
98          }
99          if (ctx.executor().inEventLoop()) {
100             resumeTransfer0(ctx);
101         } else {
102             // let the transfer resume on the next event loop round
103             ctx.executor().execute(() -> resumeTransfer0(ctx));
104         }
105     }
106 
107     private void resumeTransfer0(ChannelHandlerContext ctx) {
108         try {
109             doFlush(ctx);
110         } catch (Exception e) {
111             logger.warn("Unexpected exception while sending chunks.", e);
112         }
113     }
114 
115     @Override
116     public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
117         Promise<Void> promise = ctx.newPromise();
118         queue.add(new PendingWrite(msg, promise));
119         return promise.asFuture();
120     }
121 
122     @Override
123     public void flush(ChannelHandlerContext ctx) {
124         doFlush(ctx);
125     }
126 
127     @Override
128     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
129         doFlush(ctx);
130         ctx.fireChannelInactive();
131     }
132 
133     @Override
134     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
135         if (ctx.channel().isWritable()) {
136             // channel is writable again try to continue flushing
137             doFlush(ctx);
138         }
139         ctx.fireChannelWritabilityChanged();
140     }
141 
142     private void discard(Throwable cause) {
143         for (;;) {
144             PendingWrite currentWrite = queue.poll();
145 
146             if (currentWrite == null) {
147                 break;
148             }
149             Object message = currentWrite.msg;
150             if (message instanceof ChunkedInput) {
151                 ChunkedInput<?> in = (ChunkedInput<?>) message;
152                 boolean endOfInput;
153                 try {
154                     endOfInput = in.isEndOfInput();
155                     closeInput(in);
156                 } catch (Exception e) {
157                     closeInput(in);
158                     currentWrite.fail(e);
159                     if (logger.isWarnEnabled()) {
160                         logger.warn(ChunkedInput.class.getSimpleName() + " failed", e);
161                     }
162                     continue;
163                 }
164 
165                 if (!endOfInput) {
166                     if (cause == null) {
167                         cause = new ClosedChannelException();
168                     }
169                     currentWrite.fail(cause);
170                 } else {
171                     currentWrite.success();
172                 }
173             } else {
174                 if (cause == null) {
175                     cause = new ClosedChannelException();
176                 }
177                 currentWrite.fail(cause);
178             }
179         }
180     }
181 
182     private void doFlush(final ChannelHandlerContext ctx) {
183         final Channel channel = ctx.channel();
184         if (!channel.isActive()) {
185             discard(null);
186             return;
187         }
188 
189         boolean requiresFlush = true;
190         BufferAllocator allocator = ctx.bufferAllocator();
191         while (channel.isWritable()) {
192             final PendingWrite currentWrite = queue.peek();
193 
194             if (currentWrite == null) {
195                 break;
196             }
197 
198             if (currentWrite.promise.isDone()) {
199                 // This might happen e.g. in the case when a write operation
200                 // failed, but there're still unconsumed chunks left.
201                 // Most chunked input sources would stop generating chunks
202                 // and report end of input, but this doesn't work with any
203                 // source wrapped in HttpChunkedInput.
204                 // Note, that we're not trying to release the message/chunks
205                 // as this had to be done already by someone who resolved the
206                 // promise (using ChunkedInput.close method).
207                 // See https://github.com/netty/netty/issues/8700.
208                 queue.remove();
209                 continue;
210             }
211 
212             final Object pendingMessage = currentWrite.msg;
213 
214             if (pendingMessage instanceof ChunkedInput) {
215                 final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
216                 boolean endOfInput;
217                 boolean suspend;
218                 Object message = null;
219                 try {
220                     message = chunks.readChunk(allocator);
221                     endOfInput = chunks.isEndOfInput();
222 
223                     if (message == null) {
224                         // No need to suspend when reached at the end.
225                         suspend = !endOfInput;
226                     } else {
227                         suspend = false;
228                     }
229                 } catch (final Throwable t) {
230                     queue.remove();
231 
232                     if (message != null) {
233                         Resource.dispose(message);
234                     }
235 
236                     closeInput(chunks);
237                     currentWrite.fail(t);
238                     break;
239                 }
240 
241                 if (suspend) {
242                     // ChunkedInput.nextChunk() returned null and it has
243                     // not reached at the end of input. Let's wait until
244                     // more chunks arrive. Nothing to write or notify.
245                     break;
246                 }
247 
248                 if (message == null) {
249                     // If message is null write an empty ByteBuf.
250                     // See https://github.com/netty/netty/issues/1671
251                     message = allocator.allocate(0);
252                 }
253 
254                 if (endOfInput) {
255                     // We need to remove the element from the queue before we call writeAndFlush() as this operation
256                     // may cause an action that also touches the queue.
257                     queue.remove();
258                 }
259                 // Flush each chunk to conserve memory
260                 Future<Void> f = ctx.writeAndFlush(message);
261                 if (endOfInput) {
262                     if (f.isDone()) {
263                         handleEndOfInputFuture(f, currentWrite);
264                     } else {
265                         // Register a listener which will close the input once the write is complete.
266                         // This is needed because the Chunk may have some resource bound that can not
267                         // be closed before its not written.
268                         //
269                         // See https://github.com/netty/netty/issues/303
270                         f.addListener(future -> handleEndOfInputFuture(future, currentWrite));
271                     }
272                 } else {
273                     final boolean resume = !channel.isWritable();
274                     if (f.isDone()) {
275                         handleFuture(channel, f, currentWrite, resume);
276                     } else {
277                         f.addListener(future -> handleFuture(channel, future, currentWrite, resume));
278                     }
279                 }
280                 requiresFlush = false;
281             } else {
282                 queue.remove();
283                 ctx.write(pendingMessage).cascadeTo(currentWrite.promise);
284                 requiresFlush = true;
285             }
286 
287             if (!channel.isActive()) {
288                 discard(new ClosedChannelException());
289                 break;
290             }
291         }
292 
293         if (requiresFlush) {
294             ctx.flush();
295         }
296     }
297 
298     private static void handleEndOfInputFuture(Future<?> future, PendingWrite currentWrite) {
299         ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
300         closeInput(input);
301         if (future.isFailed()) {
302             currentWrite.fail(future.cause());
303         } else {
304             currentWrite.success();
305         }
306     }
307 
308     private void handleFuture(Channel channel, Future<?> future, PendingWrite currentWrite, boolean resume) {
309         ChunkedInput<?> input = (ChunkedInput<?>) currentWrite.msg;
310         if (future.isFailed()) {
311             closeInput(input);
312             currentWrite.fail(future.cause());
313         } else {
314             if (resume && channel.isWritable()) {
315                 resumeTransfer();
316             }
317         }
318     }
319 
320     private static void closeInput(ChunkedInput<?> chunks) {
321         try {
322             chunks.close();
323         } catch (Throwable t) {
324             if (logger.isWarnEnabled()) {
325                 logger.warn("Failed to close a chunked input.", t);
326             }
327         }
328     }
329 
330     private static final class PendingWrite {
331         final Object msg;
332         final Promise<Void> promise;
333 
334         PendingWrite(Object msg, Promise<Void> promise) {
335             this.msg = msg;
336             this.promise = promise;
337         }
338 
339         void fail(Throwable cause) {
340             promise.tryFailure(cause);
341             if (Resource.isAccessible(msg, false)) {
342                 SilentDispose.dispose(msg, logger);
343             }
344         }
345 
346         void success() {
347             if (promise.isDone()) {
348                 // No need to notify the progress or fulfill the promise because it's done already.
349                 return;
350             }
351             promise.trySuccess(null);
352         }
353     }
354 }