1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.queue;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBuffers;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelConfig;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.ChannelFutureListener;
24 import org.jboss.netty.channel.ChannelHandlerContext;
25 import org.jboss.netty.channel.ChannelStateEvent;
26 import org.jboss.netty.channel.Channels;
27 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
28 import org.jboss.netty.channel.MessageEvent;
29 import org.jboss.netty.channel.SimpleChannelHandler;
30 import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
31 import org.jboss.netty.util.HashedWheelTimer;
32
33 import java.io.IOException;
34 import java.nio.channels.ClosedChannelException;
35 import java.util.ArrayList;
36 import java.util.List;
37 import java.util.Queue;
38 import java.util.concurrent.BlockingQueue;
39 import java.util.concurrent.ConcurrentLinkedQueue;
40 import java.util.concurrent.atomic.AtomicBoolean;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler {
165
166 private final Queue<MessageEvent> queue;
167 private final boolean consolidateOnFlush;
168 private volatile ChannelHandlerContext ctx;
169 private final AtomicBoolean flush = new AtomicBoolean(false);
170
171
172
173
174
175 public BufferedWriteHandler() {
176 this(false);
177 }
178
179
180
181
182
183
184
185 public BufferedWriteHandler(Queue<MessageEvent> queue) {
186 this(queue, false);
187 }
188
189
190
191
192
193
194
195
196 public BufferedWriteHandler(boolean consolidateOnFlush) {
197 this(new ConcurrentLinkedQueue<MessageEvent>(), consolidateOnFlush);
198 }
199
200
201
202
203
204
205
206
207
208
209 public BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush) {
210 if (queue == null) {
211 throw new NullPointerException("queue");
212 }
213 this.queue = queue;
214 this.consolidateOnFlush = consolidateOnFlush;
215 }
216
217 public boolean isConsolidateOnFlush() {
218 return consolidateOnFlush;
219 }
220
221
222
223
224
225 protected Queue<MessageEvent> getQueue() {
226 return queue;
227 }
228
229
230
231
232 public void flush() {
233 flush(consolidateOnFlush);
234 }
235
236
237
238
239
240
241
242
243 public void flush(boolean consolidateOnFlush) {
244 final ChannelHandlerContext ctx = this.ctx;
245 if (ctx == null) {
246
247 return;
248 }
249 Channel channel = ctx.getChannel();
250 boolean acquired;
251
252
253 if (acquired = flush.compareAndSet(false, true)) {
254 final Queue<MessageEvent> queue = getQueue();
255 if (consolidateOnFlush) {
256 if (queue.isEmpty()) {
257 flush.set(false);
258 return;
259 }
260
261 List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
262 for (;;) {
263 MessageEvent e = queue.poll();
264 if (e == null) {
265 break;
266 }
267 if (!(e.getMessage() instanceof ChannelBuffer)) {
268 if ((pendingWrites = consolidatedWrite(pendingWrites)) == null) {
269 pendingWrites = new ArrayList<MessageEvent>();
270 }
271 ctx.sendDownstream(e);
272 } else {
273 pendingWrites.add(e);
274 }
275 }
276 consolidatedWrite(pendingWrites);
277 } else {
278 for (;;) {
279 MessageEvent e = queue.poll();
280 if (e == null) {
281 break;
282 }
283 ctx.sendDownstream(e);
284 }
285 }
286 flush.set(false);
287 }
288
289 if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) {
290 flush(consolidateOnFlush);
291 }
292 }
293
294 private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {
295 final int size = pendingWrites.size();
296 if (size == 1) {
297 ctx.sendDownstream(pendingWrites.remove(0));
298 return pendingWrites;
299 }
300 if (size == 0) {
301 return pendingWrites;
302 }
303
304 ChannelBuffer[] data = new ChannelBuffer[size];
305 for (int i = 0; i < data.length; i ++) {
306 data[i] = (ChannelBuffer) pendingWrites.get(i).getMessage();
307 }
308
309 ChannelBuffer composite = ChannelBuffers.wrappedBuffer(data);
310 ChannelFuture future = Channels.future(ctx.getChannel());
311 future.addListener(new ChannelFutureListener() {
312 public void operationComplete(ChannelFuture future)
313 throws Exception {
314 if (future.isSuccess()) {
315 for (MessageEvent e: pendingWrites) {
316 e.getFuture().setSuccess();
317 }
318 } else {
319 Throwable cause = future.getCause();
320 for (MessageEvent e: pendingWrites) {
321 e.getFuture().setFailure(cause);
322 }
323 }
324 }
325 });
326
327 Channels.write(ctx, future, composite);
328 return null;
329 }
330
331
332
333
334
335 @Override
336 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
337 throws Exception {
338 if (this.ctx == null) {
339 this.ctx = ctx;
340 } else {
341 assert this.ctx == ctx;
342 }
343
344 getQueue().add(e);
345 }
346
347 @Override
348 public void disconnectRequested(ChannelHandlerContext ctx,
349 ChannelStateEvent e) throws Exception {
350 try {
351 flush(consolidateOnFlush);
352 } finally {
353 ctx.sendDownstream(e);
354 }
355 }
356
357 @Override
358 public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
359 throws Exception {
360 try {
361 flush(consolidateOnFlush);
362 } finally {
363 ctx.sendDownstream(e);
364 }
365 }
366
367
368
369
370
371 @Override
372 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
373 Throwable cause = null;
374 for (;;) {
375 MessageEvent ev = queue.poll();
376
377 if (ev == null) {
378 break;
379 }
380
381 if (cause == null) {
382 cause = new ClosedChannelException();
383 }
384 ev.getFuture().setFailure(cause);
385 }
386 if (cause != null) {
387 Channels.fireExceptionCaught(ctx.getChannel(), cause);
388 }
389
390 super.channelClosed(ctx, e);
391 }
392
393 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
394
395 }
396
397 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
398
399 }
400
401 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
402
403 flush(consolidateOnFlush);
404 }
405
406
407
408
409
410 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
411 Throwable cause = null;
412 for (;;) {
413 MessageEvent ev = queue.poll();
414
415 if (ev == null) {
416 break;
417 }
418
419 if (cause == null) {
420 cause = new IOException("Unable to flush message");
421 }
422 ev.getFuture().setFailure(cause);
423 }
424
425 if (cause != null) {
426 Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
427 }
428 }
429 }