1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.stream;
17
18 import org.jboss.netty.buffer.ChannelBuffers;
19 import org.jboss.netty.channel.Channel;
20 import org.jboss.netty.channel.ChannelDownstreamHandler;
21 import org.jboss.netty.channel.ChannelEvent;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.ChannelFutureListener;
24 import org.jboss.netty.channel.ChannelHandler;
25 import org.jboss.netty.channel.ChannelHandlerContext;
26 import org.jboss.netty.channel.ChannelPipeline;
27 import org.jboss.netty.channel.ChannelStateEvent;
28 import org.jboss.netty.channel.ChannelUpstreamHandler;
29 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
30 import org.jboss.netty.channel.MessageEvent;
31 import org.jboss.netty.logging.InternalLogger;
32 import org.jboss.netty.logging.InternalLoggerFactory;
33
34 import java.io.IOException;
35 import java.nio.channels.ClosedChannelException;
36 import java.util.Queue;
37 import java.util.concurrent.ConcurrentLinkedQueue;
38 import java.util.concurrent.atomic.AtomicBoolean;
39
40 import static org.jboss.netty.channel.Channels.*;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 public class ChunkedWriteHandler
76 implements ChannelUpstreamHandler, ChannelDownstreamHandler, LifeCycleAwareChannelHandler {
77
78 private static final InternalLogger logger =
79 InternalLoggerFactory.getInstance(ChunkedWriteHandler.class);
80
81 private final Queue<MessageEvent> queue = new ConcurrentLinkedQueue<MessageEvent>();
82
83 private volatile ChannelHandlerContext ctx;
84 private final AtomicBoolean flush = new AtomicBoolean(false);
85 private MessageEvent currentEvent;
86 private volatile boolean flushNeeded;
87
88
89
90
91 public void resumeTransfer() {
92 ChannelHandlerContext ctx = this.ctx;
93 if (ctx == null) {
94 return;
95 }
96
97 try {
98 flush(ctx, false);
99 } catch (Exception e) {
100 if (logger.isWarnEnabled()) {
101 logger.warn("Unexpected exception while sending chunks.", e);
102 }
103 }
104 }
105
106 public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
107 throws Exception {
108 if (!(e instanceof MessageEvent)) {
109 ctx.sendDownstream(e);
110 return;
111 }
112
113 boolean offered = queue.offer((MessageEvent) e);
114 assert offered;
115
116 final Channel channel = ctx.getChannel();
117
118
119 if (channel.isWritable() || !channel.isConnected()) {
120 this.ctx = ctx;
121 flush(ctx, false);
122 }
123 }
124
125 public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
126 throws Exception {
127 if (e instanceof ChannelStateEvent) {
128 ChannelStateEvent cse = (ChannelStateEvent) e;
129 switch (cse.getState()) {
130 case INTEREST_OPS:
131
132 flush(ctx, true);
133 break;
134 case OPEN:
135 if (!Boolean.TRUE.equals(cse.getValue())) {
136
137 flush(ctx, true);
138 }
139 break;
140 }
141 }
142 ctx.sendUpstream(e);
143 }
144
145 private void discard(ChannelHandlerContext ctx, boolean fireNow) {
146 ClosedChannelException cause = null;
147
148 for (;;) {
149 MessageEvent currentEvent = this.currentEvent;
150
151 if (this.currentEvent == null) {
152 currentEvent = queue.poll();
153 } else {
154 this.currentEvent = null;
155 }
156
157 if (currentEvent == null) {
158 break;
159 }
160
161 Object m = currentEvent.getMessage();
162 if (m instanceof ChunkedInput) {
163 closeInput((ChunkedInput) m);
164 }
165
166
167 if (cause == null) {
168 cause = new ClosedChannelException();
169 }
170 currentEvent.getFuture().setFailure(cause);
171 }
172
173 if (cause != null) {
174 if (fireNow) {
175 fireExceptionCaught(ctx.getChannel(), cause);
176 } else {
177 fireExceptionCaughtLater(ctx.getChannel(), cause);
178 }
179 }
180 }
181
182 private void flush(ChannelHandlerContext ctx, boolean fireNow) throws Exception {
183 boolean acquired;
184 final Channel channel = ctx.getChannel();
185 boolean suspend = false;
186 flushNeeded = true;
187
188 if (acquired = flush.compareAndSet(false, true)) {
189 flushNeeded = false;
190 try {
191 if (!channel.isConnected()) {
192 discard(ctx, fireNow);
193 return;
194 }
195
196 while (channel.isWritable()) {
197 if (currentEvent == null) {
198 currentEvent = queue.poll();
199 }
200
201 if (currentEvent == null) {
202 break;
203 }
204
205 if (currentEvent.getFuture().isDone()) {
206
207
208 currentEvent = null;
209 } else {
210 final MessageEvent currentEvent = this.currentEvent;
211 Object m = currentEvent.getMessage();
212 if (m instanceof ChunkedInput) {
213 final ChunkedInput chunks = (ChunkedInput) m;
214 Object chunk;
215 boolean endOfInput;
216 try {
217 chunk = chunks.nextChunk();
218 endOfInput = chunks.isEndOfInput();
219 if (chunk == null) {
220 chunk = ChannelBuffers.EMPTY_BUFFER;
221
222 suspend = !endOfInput;
223 } else {
224 suspend = false;
225 }
226 } catch (Throwable t) {
227 this.currentEvent = null;
228
229 currentEvent.getFuture().setFailure(t);
230 if (fireNow) {
231 fireExceptionCaught(ctx, t);
232 } else {
233 fireExceptionCaughtLater(ctx, t);
234 }
235
236 closeInput(chunks);
237 break;
238 }
239
240 if (suspend) {
241
242
243
244 break;
245 } else {
246 ChannelFuture writeFuture;
247 if (endOfInput) {
248 this.currentEvent = null;
249 writeFuture = currentEvent.getFuture();
250
251
252
253
254
255
256 writeFuture.addListener(new ChannelFutureListener() {
257
258 public void operationComplete(ChannelFuture future) throws Exception {
259 closeInput(chunks);
260 }
261 });
262 } else {
263 writeFuture = future(channel);
264 writeFuture.addListener(new ChannelFutureListener() {
265 public void operationComplete(ChannelFuture future) throws Exception {
266 if (!future.isSuccess()) {
267 currentEvent.getFuture().setFailure(future.getCause());
268 closeInput((ChunkedInput) currentEvent.getMessage());
269 }
270 }
271 });
272 }
273
274 write(
275 ctx, writeFuture, chunk,
276 currentEvent.getRemoteAddress());
277 }
278 } else {
279 this.currentEvent = null;
280 ctx.sendDownstream(currentEvent);
281 }
282 }
283
284 if (!channel.isConnected()) {
285 discard(ctx, fireNow);
286 return;
287 }
288 }
289 } finally {
290
291 flush.set(false);
292 }
293 }
294
295 if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty() && !suspend
296 || flushNeeded)) {
297 flush(ctx, fireNow);
298 }
299 }
300
301 static void closeInput(ChunkedInput chunks) {
302 try {
303 chunks.close();
304 } catch (Throwable t) {
305 if (logger.isWarnEnabled()) {
306 logger.warn("Failed to close a chunked input.", t);
307 }
308 }
309 }
310
311 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
312
313 }
314
315 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
316
317 }
318
319 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
320
321
322
323 flush(ctx, false);
324 }
325
326
327 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
328
329
330 Throwable cause = null;
331 boolean fireExceptionCaught = false;
332
333 for (;;) {
334 MessageEvent currentEvent = this.currentEvent;
335
336 if (this.currentEvent == null) {
337 currentEvent = queue.poll();
338 } else {
339 this.currentEvent = null;
340 }
341
342 if (currentEvent == null) {
343 break;
344 }
345
346 Object m = currentEvent.getMessage();
347 if (m instanceof ChunkedInput) {
348 closeInput((ChunkedInput) m);
349 }
350
351
352 if (cause == null) {
353 cause = new IOException("Unable to flush event, discarding");
354 }
355 currentEvent.getFuture().setFailure(cause);
356 fireExceptionCaught = true;
357 }
358
359 if (fireExceptionCaught) {
360 fireExceptionCaughtLater(ctx.getChannel(), cause);
361 }
362 }
363 }