1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.stream;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.nio.channels.ClosedChannelException;
22 import java.util.Queue;
23 import java.util.concurrent.ConcurrentLinkedQueue;
24 import java.util.concurrent.atomic.AtomicBoolean;
25
26 import org.jboss.netty.buffer.ChannelBuffers;
27 import org.jboss.netty.channel.Channel;
28 import org.jboss.netty.channel.ChannelDownstreamHandler;
29 import org.jboss.netty.channel.ChannelEvent;
30 import org.jboss.netty.channel.ChannelFuture;
31 import org.jboss.netty.channel.ChannelFutureListener;
32 import org.jboss.netty.channel.ChannelHandler;
33 import org.jboss.netty.channel.ChannelHandlerContext;
34 import org.jboss.netty.channel.ChannelPipeline;
35 import org.jboss.netty.channel.ChannelStateEvent;
36 import org.jboss.netty.channel.ChannelUpstreamHandler;
37 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
38 import org.jboss.netty.channel.MessageEvent;
39 import org.jboss.netty.logging.InternalLogger;
40 import org.jboss.netty.logging.InternalLoggerFactory;
41
42 /**
43 * A {@link ChannelHandler} that adds support for writing a large data stream
44 * asynchronously neither spending a lot of memory nor getting
45 * {@link java.lang.OutOfMemoryError}. Large data streaming such as file
46 * transfer requires complicated state management in a {@link ChannelHandler}
47 * implementation. {@link ChunkedWriteHandler} manages such complicated states
48 * so that you can send a large data stream without difficulties.
49 * <p>
50 * To use {@link ChunkedWriteHandler} in your application, you have to insert
51 * a new {@link ChunkedWriteHandler} instance:
52 * <pre>
53 * {@link ChannelPipeline} p = ...;
54 * p.addLast("streamer", <b>new {@link ChunkedWriteHandler}()</b>);
55 * p.addLast("handler", new MyHandler());
56 * </pre>
57 * Once inserted, you can write a {@link ChunkedInput} so that the
58 * {@link ChunkedWriteHandler} can pick it up and fetch the content of the
59 * stream chunk by chunk and write the fetched chunk downstream:
60 * <pre>
61 * {@link Channel} ch = ...;
62 * ch.write(new {@link ChunkedFile}(new File("video.mkv"));
63 * </pre>
64 *
65 * <h3>Sending a stream which generates a chunk intermittently</h3>
66 *
67 * Some {@link ChunkedInput} generates a chunk on a certain event or timing.
68 * Such {@link ChunkedInput} implementation often returns {@code null} on
69 * {@link ChunkedInput#nextChunk()}, resulting in the indefinitely suspended
70 * transfer. To resume the transfer when a new chunk is available, you have to
71 * call {@link #resumeTransfer()}.
72 * @apiviz.landmark
73 * @apiviz.has org.jboss.netty.handler.stream.ChunkedInput oneway - - reads from
74 */
75 public class ChunkedWriteHandler
76 implements ChannelUpstreamHandler, ChannelDownstreamHandler, LifeCycleAwareChannelHandler {
77
78 private static final InternalLogger logger =
79 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
80
81 private final Queue<MessageEvent> queue = new ConcurrentLinkedQueue<MessageEvent>();
82
83 private volatile ChannelHandlerContext ctx;
84 private final AtomicBoolean flush = new AtomicBoolean(false);
85 private MessageEvent currentEvent;
86 private volatile boolean flushNeeded;
87
88 /**
89 * Continues to fetch the chunks from the input.
90 */
91 public void resumeTransfer() {
92 ChannelHandlerContext ctx = this.ctx;
93 if (ctx == null) {
94 return;
95 }
96
97 try {
98 flush(ctx, false);
99 } catch (Exception e) {
100 if (logger.isWarnEnabled()) {
101 logger.warn("Unexpected exception while sending chunks.", e);
102 }
103 }
104 }
105
106 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
107 throws Exception {
108 if (!(e instanceof MessageEvent)) {
109 ctx.sendDownstream(e);
110 return;
111 }
112
113 boolean offered = queue.offer((MessageEvent) e);
114 assert offered;
115
116 final Channel channel = ctx.getChannel();
117 // call flush if the channel is writable or not connected. flush(..) will take care of the rest
118
119 if (channel.isWritable() || !channel.isConnected()) {
120 this.ctx = ctx;
121 flush(ctx, false);
122 }
123 }
124
125 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
126 throws Exception {
127 if (e instanceof ChannelStateEvent) {
128 ChannelStateEvent cse = (ChannelStateEvent) e;
129 switch (cse.getState()) {
130 case INTEREST_OPS:
131 // Continue writing when the channel becomes writable.
132 flush(ctx, true);
133 break;
134 case OPEN:
135 if (!Boolean.TRUE.equals(cse.getValue())) {
136 // Fail all pending writes
137 flush(ctx, true);
138 }
139 break;
140 }
141 }
142 ctx.sendUpstream(e);
143 }
144
145 private void discard(ChannelHandlerContext ctx, boolean fireNow) {
146 ClosedChannelException cause = null;
147
148 for (;;) {
149 MessageEvent currentEvent = this.currentEvent;
150
151 if (this.currentEvent == null) {
152 currentEvent = queue.poll();
153 } else {
154 this.currentEvent = null;
155 }
156
157 if (currentEvent == null) {
158 break;
159 }
160
161
162 Object m = currentEvent.getMessage();
163 if (m instanceof ChunkedInput) {
164 closeInput((ChunkedInput) m);
165 }
166
167 // Trigger a ClosedChannelException
168 if (cause == null) {
169 cause = new ClosedChannelException();
170 }
171 currentEvent.getFuture().setFailure(cause);
172
173 currentEvent = null;
174 }
175
176
177 if (cause != null) {
178 if (fireNow) {
179 fireExceptionCaught(ctx.getChannel(), cause);
180 } else {
181 fireExceptionCaughtLater(ctx.getChannel(), cause);
182 }
183 }
184 }
185
186 private void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception {
187 boolean acquired = false;
188 final Channel channel = ctx.getChannel();
189 boolean suspend = false;
190 flushNeeded = true;
191 // use CAS to see if the have flush already running, if so we don't need to take futher actions
192 if (acquired = flush.compareAndSet(false, true)) {
193 flushNeeded = false;
194 try {
195
196 if (!channel.isConnected()) {
197 discard(ctx, fireNow);
198 return;
199 }
200
201 while (channel.isWritable()) {
202 if (currentEvent == null) {
203 currentEvent = queue.poll();
204 }
205
206 if (currentEvent == null) {
207 break;
208 }
209
210 if (currentEvent.getFuture().isDone()) {
211 // Skip the current request because the previous partial write
212 // attempt for the current request has been failed.
213 currentEvent = null;
214 } else {
215 final MessageEvent currentEvent = this.currentEvent;
216 Object m = currentEvent.getMessage();
217 if (m instanceof ChunkedInput) {
218 final ChunkedInput chunks = (ChunkedInput) m;
219 Object chunk;
220 boolean endOfInput;
221 try {
222 chunk = chunks.nextChunk();
223 endOfInput = chunks.isEndOfInput();
224 if (chunk == null) {
225 chunk = ChannelBuffers.EMPTY_BUFFER;
226 // No need to suspend when reached at the end.
227 suspend = !endOfInput;
228 } else {
229 suspend = false;
230 }
231 } catch (Throwable t) {
232 this.currentEvent = null;
233
234 currentEvent.getFuture().setFailure(t);
235 if (fireNow) {
236 fireExceptionCaught(ctx, t);
237 } else {
238 fireExceptionCaughtLater(ctx, t);
239 }
240
241 closeInput(chunks);
242 break;
243 }
244
245 if (suspend) {
246 // ChunkedInput.nextChunk() returned null and it has
247 // not reached at the end of input. Let's wait until
248 // more chunks arrive. Nothing to write or notify.
249 break;
250 } else {
251 ChannelFuture writeFuture;
252 if (endOfInput) {
253 this.currentEvent = null;
254 writeFuture = currentEvent.getFuture();
255
256 // Register a listener which will close the input once the write
257 // is complete. This is needed because the Chunk may have some
258 // resource bound that can not be closed before its not written
259 //
260 // See https://github.com/netty/netty/issues/303
261 writeFuture.addListener(new ChannelFutureListener() {
262
263 public void operationComplete(ChannelFuture future) throws Exception {
264 closeInput(chunks);
265 }
266 });
267 } else {
268 writeFuture = future(channel);
269 writeFuture.addListener(new ChannelFutureListener() {
270 public void operationComplete(ChannelFuture future) throws Exception {
271 if (!future.isSuccess()) {
272 currentEvent.getFuture().setFailure(future.getCause());
273 closeInput((ChunkedInput) currentEvent.getMessage());
274 }
275 }
276 });
277 }
278
279 write(
280 ctx, writeFuture, chunk,
281 currentEvent.getRemoteAddress());
282 }
283 } else {
284 this.currentEvent = null;
285 ctx.sendDownstream(currentEvent);
286 }
287 }
288
289 if (!channel.isConnected()) {
290 discard(ctx, fireNow);
291 return;
292 }
293 }
294 } finally {
295 // mark the flush as done
296 flush.set(false);
297 }
298
299 }
300
301 if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty() && !suspend
302 || flushNeeded)) {
303 flush(ctx, fireNow);
304 }
305 }
306
307 static void closeInput(ChunkedInput chunks) {
308 try {
309 chunks.close();
310 } catch (Throwable t) {
311 if (logger.isWarnEnabled()) {
312 logger.warn("Failed to close a chunked input.", t);
313 }
314 }
315 }
316
317 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
318 // nothing to do
319
320 }
321
322 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
323 // nothing to do
324
325 }
326
327 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
328 // try to flush again a last time.
329 //
330 // See #304
331 flush(ctx, false);
332 }
333
334 // This method should not need any synchronization as the ChunkedWriteHandler will not receive any new events
335 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
336 // Fail all MessageEvent's that are left. This is needed because otherwise we would never notify the
337 // ChannelFuture and the registered FutureListener. See #304
338 //
339 Throwable cause = null;
340 boolean fireExceptionCaught = false;
341
342 for (;;) {
343 MessageEvent currentEvent = this.currentEvent;
344
345 if (this.currentEvent == null) {
346 currentEvent = queue.poll();
347 } else {
348 this.currentEvent = null;
349 }
350
351 if (currentEvent == null) {
352 break;
353 }
354
355 Object m = currentEvent.getMessage();
356 if (m instanceof ChunkedInput) {
357 closeInput((ChunkedInput) m);
358 }
359
360 // Create exception
361 if (cause == null) {
362 cause = new IOException("Unable to flush event, discarding");
363 }
364 currentEvent.getFuture().setFailure(cause);
365 fireExceptionCaught = true;
366
367 currentEvent = null;
368 }
369
370 if (fireExceptionCaught) {
371 fireExceptionCaughtLater(ctx.getChannel(), cause);
372 }
373 }
374 }