1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.handler.queue;
17
18 import java.io.IOException;
19 import java.nio.channels.ClosedChannelException;
20 import java.util.ArrayList;
21 import java.util.List;
22 import java.util.Queue;
23 import java.util.concurrent.BlockingQueue;
24 import java.util.concurrent.ConcurrentLinkedQueue;
25 import java.util.concurrent.atomic.AtomicBoolean;
26
27 import org.jboss.netty.buffer.ChannelBuffer;
28 import org.jboss.netty.buffer.ChannelBuffers;
29 import org.jboss.netty.channel.Channel;
30 import org.jboss.netty.channel.ChannelConfig;
31 import org.jboss.netty.channel.ChannelFuture;
32 import org.jboss.netty.channel.ChannelFutureListener;
33 import org.jboss.netty.channel.ChannelHandlerContext;
34 import org.jboss.netty.channel.ChannelStateEvent;
35 import org.jboss.netty.channel.Channels;
36 import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
37 import org.jboss.netty.channel.MessageEvent;
38 import org.jboss.netty.channel.SimpleChannelHandler;
39 import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
40 import org.jboss.netty.util.HashedWheelTimer;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 public class BufferedWriteHandler extends SimpleChannelHandler implements LifeCycleAwareChannelHandler {
165
166 private final Queue<MessageEvent> queue;
167 private final boolean consolidateOnFlush;
168 private volatile ChannelHandlerContext ctx;
169 private final AtomicBoolean flush = new AtomicBoolean(false);
170
171
172
173
174
175 public BufferedWriteHandler() {
176 this(false);
177 }
178
179
180
181
182
183
184
185 public BufferedWriteHandler(Queue<MessageEvent> queue) {
186 this(queue, false);
187 }
188
189
190
191
192
193
194
195
196 public BufferedWriteHandler(boolean consolidateOnFlush) {
197 this(new ConcurrentLinkedQueue<MessageEvent>(), consolidateOnFlush);
198 }
199
200
201
202
203
204
205
206
207
208
209 public BufferedWriteHandler(Queue<MessageEvent> queue, boolean consolidateOnFlush) {
210 if (queue == null) {
211 throw new NullPointerException("queue");
212 }
213 this.queue = queue;
214 this.consolidateOnFlush = consolidateOnFlush;
215 }
216
217 public boolean isConsolidateOnFlush() {
218 return consolidateOnFlush;
219 }
220
221
222
223
224
225 protected Queue<MessageEvent> getQueue() {
226 return queue;
227 }
228
229
230
231
232 public void flush() {
233 flush(consolidateOnFlush);
234 }
235
236
237
238
239
240
241
242
243 public void flush(boolean consolidateOnFlush) {
244 final ChannelHandlerContext ctx = this.ctx;
245 if (ctx == null) {
246
247 return;
248 }
249 Channel channel = ctx.getChannel();
250 boolean acquired;
251
252
253 if (acquired = flush.compareAndSet(false, true)) {
254 final Queue<MessageEvent> queue = getQueue();
255 if (consolidateOnFlush) {
256 if (queue.isEmpty()) {
257 flush.set(false);
258 return;
259 }
260
261 List<MessageEvent> pendingWrites = new ArrayList<MessageEvent>();
262 for (;;) {
263 MessageEvent e = queue.poll();
264 if (e == null) {
265 break;
266 }
267 if (!(e.getMessage() instanceof ChannelBuffer)) {
268 if ((pendingWrites = consolidatedWrite(pendingWrites)) == null) {
269 pendingWrites = new ArrayList<MessageEvent>();
270 }
271 ctx.sendDownstream(e);
272 } else {
273 pendingWrites.add(e);
274 }
275 }
276 consolidatedWrite(pendingWrites);
277
278 } else {
279 for (;;) {
280 MessageEvent e = queue.poll();
281 if (e == null) {
282 break;
283 }
284 ctx.sendDownstream(e);
285 }
286 }
287 flush.set(false);
288 }
289
290 if (acquired && (!channel.isConnected() || channel.isWritable() && !queue.isEmpty())) {
291 flush(consolidateOnFlush);
292 }
293 }
294
295 private List<MessageEvent> consolidatedWrite(final List<MessageEvent> pendingWrites) {
296 final int size = pendingWrites.size();
297 if (size == 1) {
298 ctx.sendDownstream(pendingWrites.remove(0));
299 return pendingWrites;
300 } else if (size == 0) {
301 return pendingWrites;
302 }
303
304 ChannelBuffer[] data = new ChannelBuffer[size];
305 for (int i = 0; i < data.length; i ++) {
306 data[i] = (ChannelBuffer) pendingWrites.get(i).getMessage();
307 }
308
309 ChannelBuffer composite = ChannelBuffers.wrappedBuffer(data);
310 ChannelFuture future = Channels.future(ctx.getChannel());
311 future.addListener(new ChannelFutureListener() {
312 public void operationComplete(ChannelFuture future)
313 throws Exception {
314 if (future.isSuccess()) {
315 for (MessageEvent e: pendingWrites) {
316 e.getFuture().setSuccess();
317 }
318 } else {
319 Throwable cause = future.getCause();
320 for (MessageEvent e: pendingWrites) {
321 e.getFuture().setFailure(cause);
322 }
323 }
324 }
325 });
326
327 Channels.write(ctx, future, composite);
328 return null;
329 }
330
331
332
333
334
335 @Override
336 public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
337 throws Exception {
338 if (this.ctx == null) {
339 this.ctx = ctx;
340 } else {
341 assert this.ctx == ctx;
342 }
343
344 getQueue().add(e);
345 }
346
347 @Override
348 public void disconnectRequested(ChannelHandlerContext ctx,
349 ChannelStateEvent e) throws Exception {
350 try {
351 flush(consolidateOnFlush);
352 } finally {
353 ctx.sendDownstream(e);
354 }
355 }
356
357 @Override
358 public void closeRequested(ChannelHandlerContext ctx, ChannelStateEvent e)
359 throws Exception {
360 try {
361 flush(consolidateOnFlush);
362 } finally {
363 ctx.sendDownstream(e);
364 }
365 }
366
367
368
369
370
371 @Override
372 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
373 Throwable cause = null;
374 for (;;) {
375 MessageEvent ev = queue.poll();
376
377 if (ev == null) {
378 break;
379 }
380
381 if (cause == null) {
382 cause = new ClosedChannelException();
383 }
384 ev.getFuture().setFailure(cause);
385
386 }
387 if (cause != null) {
388 Channels.fireExceptionCaught(ctx.getChannel(), cause);
389 }
390
391 super.channelClosed(ctx, e);
392 }
393
394 public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
395
396
397 }
398
399 public void afterAdd(ChannelHandlerContext ctx) throws Exception {
400
401
402 }
403
404 public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
405
406 flush(consolidateOnFlush);
407 }
408
409
410
411
412
413 public void afterRemove(ChannelHandlerContext ctx) throws Exception {
414 Throwable cause = null;
415 for (;;) {
416 MessageEvent ev = queue.poll();
417
418 if (ev == null) {
419 break;
420 }
421
422 if (cause == null) {
423 cause = new IOException("Unable to flush message");
424 }
425 ev.getFuture().setFailure(cause);
426
427 }
428 if (cause != null) {
429 Channels.fireExceptionCaughtLater(ctx.getChannel(), cause);
430 }
431 }
432
433
434 }