View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.stream;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.nio.channels.ClosedChannelException;
22  import java.util.Queue;
23  import java.util.concurrent.ConcurrentLinkedQueue;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  
26  import org.jboss.netty.buffer.ChannelBuffers;
27  import org.jboss.netty.channel.Channel;
28  import org.jboss.netty.channel.ChannelDownstreamHandler;
29  import org.jboss.netty.channel.ChannelEvent;
30  import org.jboss.netty.channel.ChannelFuture;
31  import org.jboss.netty.channel.ChannelFutureListener;
32  import org.jboss.netty.channel.ChannelHandler;
33  import org.jboss.netty.channel.ChannelHandlerContext;
34  import org.jboss.netty.channel.ChannelPipeline;
35  import org.jboss.netty.channel.ChannelStateEvent;
36  import org.jboss.netty.channel.ChannelUpstreamHandler;
37  import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
38  import org.jboss.netty.channel.MessageEvent;
39  import org.jboss.netty.logging.InternalLogger;
40  import org.jboss.netty.logging.InternalLoggerFactory;
41  
42  /**
43   * A {@link ChannelHandler} that adds support for writing a large data stream
44   * asynchronously neither spending a lot of memory nor getting
45   * {@link java.lang.OutOfMemoryError}.  Large data streaming such as file
46   * transfer requires complicated state management in a {@link ChannelHandler}
47   * implementation.  {@link ChunkedWriteHandler} manages such complicated states
48   * so that you can send a large data stream without difficulties.
49   * <p>
50   * To use {@link ChunkedWriteHandler} in your application, you have to insert
51   * a new {@link ChunkedWriteHandler} instance:
52   * <pre>
53   * {@link ChannelPipeline} p = ...;
54   * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
55   * p.addLast("handler", new MyHandler());
56   * </pre>
57   * Once inserted, you can write a {@link ChunkedInput} so that the
58   * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
59   * stream chunk by chunk and write the fetched chunk downstream:
60   * <pre>
61   * {@link Channel} ch = ...;
62   * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
63   * </pre>
64   *
65   * <h3>Sending a stream which generates a chunk intermittently</h3>
66   *
67   * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
68   * Such {@link ChunkedInput} implementation often returns {@code null} on
69   * {@link ChunkedInput#nextChunk()}, resulting in the indefinitely suspended
70   * transfer.  To resume the transfer when a new chunk is available, you have to
71   * call {@link #resumeTransfer()}.
72   * @apiviz.landmark
73   * @apiviz.has org.jboss.netty.handler.stream.ChunkedInput oneway - - reads from
74   */
75  public class ChunkedWriteHandler
76          implements ChannelUpstreamHandler, ChannelDownstreamHandler, LifeCycleAwareChannelHandler {
77  
78      private static final InternalLogger logger =
79          InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
80  
81      private final Queue<MessageEvent> queue = new ConcurrentLinkedQueue<MessageEvent>();
82  
83      private volatile ChannelHandlerContext ctx;
84      private final AtomicBoolean flush = new AtomicBoolean(false);
85      private MessageEvent currentEvent;
86      private volatile boolean flushNeeded;
87  
88      /**
89       * Continues to fetch the chunks from the input.
90       */
91      public void resumeTransfer() {
92          ChannelHandlerContext ctx = this.ctx;
93          if (ctx == null) {
94              return;
95          }
96  
97          try {
98              flush(ctx, false);
99          } catch (Exception e) {
100             if (logger.isWarnEnabled()) {
101                 logger.warn("Unexpected exception while sending chunks.", e);
102             }
103         }
104     }
105 
106     public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
107             throws Exception {
108         if (!(e instanceof MessageEvent)) {
109             ctx.sendDownstream(e);
110             return;
111         }
112 
113         boolean offered = queue.offer((MessageEvent) e);
114         assert offered;
115 
116         final Channel channel = ctx.getChannel();
117         // call flush if the channel is writable or not connected. flush(..) will take care of the rest
118 
119         if (channel.isWritable() || !channel.isConnected()) {
120             this.ctx = ctx;
121             flush(ctx, false);
122         }
123     }
124 
125     public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
126             throws Exception {
127         if (e instanceof ChannelStateEvent) {
128             ChannelStateEvent cse = (ChannelStateEvent) e;
129             switch (cse.getState()) {
130             case INTEREST_OPS:
131                 // Continue writing when the channel becomes writable.
132                 flush(ctx, true);
133                 break;
134             case OPEN:
135                 if (!Boolean.TRUE.equals(cse.getValue())) {
136                     // Fail all pending writes
137                     flush(ctx, true);
138                 }
139                 break;
140             }
141         }
142         ctx.sendUpstream(e);
143     }
144 
145     private void discard(ChannelHandlerContext ctx, boolean fireNow) {
146         ClosedChannelException cause = null;
147 
148         for (;;) {
149             MessageEvent currentEvent = this.currentEvent;
150 
151             if (this.currentEvent == null) {
152                 currentEvent = queue.poll();
153             } else {
154                 this.currentEvent = null;
155             }
156 
157             if (currentEvent == null) {
158                 break;
159             }
160 
161 
162             Object m = currentEvent.getMessage();
163             if (m instanceof ChunkedInput) {
164                 closeInput((ChunkedInput) m);
165             }
166 
167             // Trigger a ClosedChannelException
168             if (cause == null) {
169                 cause = new ClosedChannelException();
170             }
171             currentEvent.getFuture().setFailure(cause);
172 
173             currentEvent = null;
174         }
175 
176 
177         if (cause != null) {
178             if (fireNow) {
179                 fireExceptionCaught(ctx.getChannel(), cause);
180             } else {
181                 fireExceptionCaughtLater(ctx.getChannel(), cause);
182             }
183         }
184     }
185 
186     private void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception {
187         boolean acquired = false;
188         final Channel channel = ctx.getChannel();
189         boolean suspend = false;
190         flushNeeded = true;
191         // use CAS to see if the have flush already running, if so we don't need to take futher actions
192         if (acquired = flush.compareAndSet(false, true)) {
193             flushNeeded = false;
194             try {
195 
196                 if (!channel.isConnected()) {
197                     discard(ctx, fireNow);
198                     return;
199                 }
200 
201                 while (channel.isWritable()) {
202                     if (currentEvent == null) {
203                         currentEvent = queue.poll();
204                     }
205 
206                     if (currentEvent == null) {
207                         break;
208                     }
209 
210                     if (currentEvent.getFuture().isDone()) {
211                         // Skip the current request because the previous partial write
212                         // attempt for the current request has been failed.
213                         currentEvent = null;
214                     } else {
215                         final MessageEvent currentEvent = this.currentEvent;
216                         Object m = currentEvent.getMessage();
217                         if (m instanceof ChunkedInput) {
218                             final ChunkedInput chunks = (ChunkedInput) m;
219                             Object chunk;
220                             boolean endOfInput;
221                             try {
222                                 chunk = chunks.nextChunk();
223                                 endOfInput = chunks.isEndOfInput();
224                                 if (chunk == null) {
225                                     chunk = ChannelBuffers.EMPTY_BUFFER;
226                                     // No need to suspend when reached at the end.
227                                     suspend = !endOfInput;
228                                 } else {
229                                     suspend = false;
230                                 }
231                             } catch (Throwable t) {
232                                 this.currentEvent = null;
233 
234                                 currentEvent.getFuture().setFailure(t);
235                                 if (fireNow) {
236                                     fireExceptionCaught(ctx, t);
237                                 } else {
238                                     fireExceptionCaughtLater(ctx, t);
239                                 }
240 
241                                 closeInput(chunks);
242                                 break;
243                             }
244 
245                             if (suspend) {
246                                 // ChunkedInput.nextChunk() returned null and it has
247                                 // not reached at the end of input.  Let's wait until
248                                 // more chunks arrive.  Nothing to write or notify.
249                                 break;
250                             } else {
251                                 ChannelFuture writeFuture;
252                                 if (endOfInput) {
253                                     this.currentEvent = null;
254                                     writeFuture = currentEvent.getFuture();
255 
256                                     // Register a listener which will close the input once the write
257                                     // is complete. This is needed because the Chunk may have some
258                                     // resource bound that can not be closed before its not written
259                                     //
260                                     // See https://github.com/netty/netty/issues/303
261                                     writeFuture.addListener(new ChannelFutureListener() {
262 
263                                         public void operationComplete(ChannelFuture future) throws Exception {
264                                             closeInput(chunks);
265                                         }
266                                     });
267                                 } else {
268                                     writeFuture = future(channel);
269                                     writeFuture.addListener(new ChannelFutureListener() {
270                                         public void operationComplete(ChannelFuture future) throws Exception {
271                                             if (!future.isSuccess()) {
272                                                 currentEvent.getFuture().setFailure(future.getCause());
273                                                 closeInput((ChunkedInput) currentEvent.getMessage());
274                                             }
275                                         }
276                                     });
277                                 }
278 
279                                 write(
280                                         ctx, writeFuture, chunk,
281                                         currentEvent.getRemoteAddress());
282                             }
283                         } else {
284                             this.currentEvent = null;
285                             ctx.sendDownstream(currentEvent);
286                         }
287                     }
288 
289                     if (!channel.isConnected()) {
290                         discard(ctx, fireNow);
291                         return;
292                     }
293                 }
294             } finally {
295                 // mark the flush as done
296                 flush.set(false);
297             }
298 
299         }
300 
301         if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty() && !suspend
302                 || flushNeeded)) {
303             flush(ctx, fireNow);
304         }
305     }
306 
307     static void closeInput(ChunkedInput chunks) {
308         try {
309             chunks.close();
310         } catch (Throwable t) {
311             if (logger.isWarnEnabled()) {
312                 logger.warn("Failed to close a chunked input.", t);
313             }
314         }
315     }
316 
317     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
318         // nothing to do
319 
320     }
321 
322     public void afterAdd(ChannelHandlerContext ctx) throws Exception {
323         // nothing to do
324 
325     }
326 
327     public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
328         // try to flush again a last time.
329         //
330         // See #304
331         flush(ctx, false);
332     }
333 
334     // This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events
335     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
336         // Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the
337         // ChannelFuture and the registered FutureListener. See #304
338         //
339         Throwable cause = null;
340         boolean fireExceptionCaught = false;
341 
342         for (;;) {
343             MessageEvent currentEvent = this.currentEvent;
344 
345             if (this.currentEvent == null) {
346                 currentEvent = queue.poll();
347             } else {
348                 this.currentEvent = null;
349             }
350 
351             if (currentEvent == null) {
352                 break;
353             }
354 
355             Object m = currentEvent.getMessage();
356             if (m instanceof ChunkedInput) {
357                 closeInput((ChunkedInput) m);
358             }
359 
360             // Create exception
361             if (cause == null) {
362                 cause = new IOException("Unable to flush event, discarding");
363             }
364             currentEvent.getFuture().setFailure(cause);
365             fireExceptionCaught = true;
366 
367             currentEvent = null;
368         }
369 
370         if (fireExceptionCaught) {
371             fireExceptionCaughtLater(ctx.getChannel(), cause);
372         }
373     }
374 }