1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import org.jboss.netty.logging.InternalLogger;
19 import org.jboss.netty.logging.InternalLoggerFactory;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.LinkedHashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NoSuchElementException;
27 import java.util.concurrent.RejectedExecutionException;
28
29
30
31
32
33
34 public class DefaultChannelPipeline implements ChannelPipeline {
35
36 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
37 static final ChannelSink discardingSink = new DiscardingChannelSink();
38
39 private volatile Channel channel;
40 private volatile ChannelSink sink;
41 private volatile DefaultChannelHandlerContext head;
42 private volatile DefaultChannelHandlerContext tail;
43 private final Map<String, DefaultChannelHandlerContext> name2ctx =
44 new HashMap<String, DefaultChannelHandlerContext>(4);
45
46 public Channel getChannel() {
47 return channel;
48 }
49
50 public ChannelSink getSink() {
51 ChannelSink sink = this.sink;
52 if (sink == null) {
53 return discardingSink;
54 }
55 return sink;
56 }
57
58 public void attach(Channel channel, ChannelSink sink) {
59 if (channel == null) {
60 throw new NullPointerException("channel");
61 }
62 if (sink == null) {
63 throw new NullPointerException("sink");
64 }
65 if (this.channel != null || this.sink != null) {
66 throw new IllegalStateException("attached already");
67 }
68 this.channel = channel;
69 this.sink = sink;
70 }
71
72 public boolean isAttached() {
73 return sink != null;
74 }
75
76 public synchronized void addFirst(String name, ChannelHandler handler) {
77 if (name2ctx.isEmpty()) {
78 init(name, handler);
79 } else {
80 checkDuplicateName(name);
81 DefaultChannelHandlerContext oldHead = head;
82 DefaultChannelHandlerContext newHead = new DefaultChannelHandlerContext(null, oldHead, name, handler);
83
84 callBeforeAdd(newHead);
85
86 oldHead.prev = newHead;
87 head = newHead;
88 name2ctx.put(name, newHead);
89
90 callAfterAdd(newHead);
91 }
92 }
93
94 public synchronized void addLast(String name, ChannelHandler handler) {
95 if (name2ctx.isEmpty()) {
96 init(name, handler);
97 } else {
98 checkDuplicateName(name);
99 DefaultChannelHandlerContext oldTail = tail;
100 DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
101
102 callBeforeAdd(newTail);
103
104 oldTail.next = newTail;
105 tail = newTail;
106 name2ctx.put(name, newTail);
107
108 callAfterAdd(newTail);
109 }
110 }
111
112 public synchronized void addBefore(String baseName, String name, ChannelHandler handler) {
113 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
114 if (ctx == head) {
115 addFirst(name, handler);
116 } else {
117 checkDuplicateName(name);
118 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx.prev, ctx, name, handler);
119
120 callBeforeAdd(newCtx);
121
122 ctx.prev.next = newCtx;
123 ctx.prev = newCtx;
124 name2ctx.put(name, newCtx);
125
126 callAfterAdd(newCtx);
127 }
128 }
129
130 public synchronized void addAfter(String baseName, String name, ChannelHandler handler) {
131 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
132 if (ctx == tail) {
133 addLast(name, handler);
134 } else {
135 checkDuplicateName(name);
136 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx, ctx.next, name, handler);
137
138 callBeforeAdd(newCtx);
139
140 ctx.next.prev = newCtx;
141 ctx.next = newCtx;
142 name2ctx.put(name, newCtx);
143
144 callAfterAdd(newCtx);
145 }
146 }
147
148 public synchronized void remove(ChannelHandler handler) {
149 remove(getContextOrDie(handler));
150 }
151
152 public synchronized ChannelHandler remove(String name) {
153 return remove(getContextOrDie(name)).getHandler();
154 }
155
156 @SuppressWarnings("unchecked")
157 public synchronized <T extends ChannelHandler> T remove(Class<T> handlerType) {
158 return (T) remove(getContextOrDie(handlerType)).getHandler();
159 }
160
161 private DefaultChannelHandlerContext remove(DefaultChannelHandlerContext ctx) {
162 if (head == tail) {
163 callBeforeRemove(ctx);
164
165 head = tail = null;
166 name2ctx.clear();
167
168 callAfterRemove(ctx);
169 } else if (ctx == head) {
170 removeFirst();
171 } else if (ctx == tail) {
172 removeLast();
173 } else {
174 callBeforeRemove(ctx);
175
176 DefaultChannelHandlerContext prev = ctx.prev;
177 DefaultChannelHandlerContext next = ctx.next;
178 prev.next = next;
179 next.prev = prev;
180 name2ctx.remove(ctx.getName());
181
182 callAfterRemove(ctx);
183 }
184 return ctx;
185 }
186
187 public synchronized ChannelHandler removeFirst() {
188 if (name2ctx.isEmpty()) {
189 throw new NoSuchElementException();
190 }
191
192 DefaultChannelHandlerContext oldHead = head;
193 if (oldHead == null) {
194 throw new NoSuchElementException();
195 }
196
197 callBeforeRemove(oldHead);
198
199 if (oldHead.next == null) {
200 head = tail = null;
201 name2ctx.clear();
202 } else {
203 oldHead.next.prev = null;
204 head = oldHead.next;
205 name2ctx.remove(oldHead.getName());
206 }
207
208 callAfterRemove(oldHead);
209
210 return oldHead.getHandler();
211 }
212
213 public synchronized ChannelHandler removeLast() {
214 if (name2ctx.isEmpty()) {
215 throw new NoSuchElementException();
216 }
217
218 DefaultChannelHandlerContext oldTail = tail;
219 if (oldTail == null) {
220 throw new NoSuchElementException();
221 }
222
223 callBeforeRemove(oldTail);
224
225 if (oldTail.prev == null) {
226 head = tail = null;
227 name2ctx.clear();
228 } else {
229 oldTail.prev.next = null;
230 tail = oldTail.prev;
231 name2ctx.remove(oldTail.getName());
232 }
233
234 callAfterRemove(oldTail);
235
236 return oldTail.getHandler();
237 }
238
239 public synchronized void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
240 replace(getContextOrDie(oldHandler), newName, newHandler);
241 }
242
243 public synchronized ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
244 return replace(getContextOrDie(oldName), newName, newHandler);
245 }
246
247 @SuppressWarnings("unchecked")
248 public synchronized <T extends ChannelHandler> T replace(
249 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
250 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
251 }
252
253 private ChannelHandler replace(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
254 if (ctx == head) {
255 removeFirst();
256 addFirst(newName, newHandler);
257 } else if (ctx == tail) {
258 removeLast();
259 addLast(newName, newHandler);
260 } else {
261 boolean sameName = ctx.getName().equals(newName);
262 if (!sameName) {
263 checkDuplicateName(newName);
264 }
265
266 DefaultChannelHandlerContext prev = ctx.prev;
267 DefaultChannelHandlerContext next = ctx.next;
268 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(prev, next, newName, newHandler);
269
270 callBeforeRemove(ctx);
271 callBeforeAdd(newCtx);
272
273 prev.next = newCtx;
274 next.prev = newCtx;
275
276 if (!sameName) {
277 name2ctx.remove(ctx.getName());
278 }
279 name2ctx.put(newName, newCtx);
280
281 ChannelHandlerLifeCycleException removeException = null;
282 ChannelHandlerLifeCycleException addException = null;
283 boolean removed = false;
284 try {
285 callAfterRemove(ctx);
286 removed = true;
287 } catch (ChannelHandlerLifeCycleException e) {
288 removeException = e;
289 }
290
291 boolean added = false;
292 try {
293 callAfterAdd(newCtx);
294 added = true;
295 } catch (ChannelHandlerLifeCycleException e) {
296 addException = e;
297 }
298
299 if (!removed && !added) {
300 logger.warn(removeException.getMessage(), removeException);
301 logger.warn(addException.getMessage(), addException);
302 throw new ChannelHandlerLifeCycleException(
303 "Both " + ctx.getHandler().getClass().getName() +
304 ".afterRemove() and " + newCtx.getHandler().getClass().getName() +
305 ".afterAdd() failed; see logs.");
306 } else if (!removed) {
307 throw removeException;
308 } else if (!added) {
309 throw addException;
310 }
311 }
312
313 return ctx.getHandler();
314 }
315
316 private static void callBeforeAdd(ChannelHandlerContext ctx) {
317 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
318 return;
319 }
320
321 LifeCycleAwareChannelHandler h =
322 (LifeCycleAwareChannelHandler) ctx.getHandler();
323
324 try {
325 h.beforeAdd(ctx);
326 } catch (Throwable t) {
327 throw new ChannelHandlerLifeCycleException(
328 h.getClass().getName() +
329 ".beforeAdd() has thrown an exception; not adding.", t);
330 }
331 }
332
333 private void callAfterAdd(ChannelHandlerContext ctx) {
334 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
335 return;
336 }
337
338 LifeCycleAwareChannelHandler h =
339 (LifeCycleAwareChannelHandler) ctx.getHandler();
340
341 try {
342 h.afterAdd(ctx);
343 } catch (Throwable t) {
344 boolean removed = false;
345 try {
346 remove((DefaultChannelHandlerContext) ctx);
347 removed = true;
348 } catch (Throwable t2) {
349 if (logger.isWarnEnabled()) {
350 logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
351 }
352 }
353
354 if (removed) {
355 throw new ChannelHandlerLifeCycleException(
356 h.getClass().getName() +
357 ".afterAdd() has thrown an exception; removed.", t);
358 } else {
359 throw new ChannelHandlerLifeCycleException(
360 h.getClass().getName() +
361 ".afterAdd() has thrown an exception; also failed to remove.", t);
362 }
363 }
364 }
365
366 private static void callBeforeRemove(ChannelHandlerContext ctx) {
367 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
368 return;
369 }
370
371 LifeCycleAwareChannelHandler h =
372 (LifeCycleAwareChannelHandler) ctx.getHandler();
373
374 try {
375 h.beforeRemove(ctx);
376 } catch (Throwable t) {
377 throw new ChannelHandlerLifeCycleException(
378 h.getClass().getName() +
379 ".beforeRemove() has thrown an exception; not removing.", t);
380 }
381 }
382
383 private static void callAfterRemove(ChannelHandlerContext ctx) {
384 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
385 return;
386 }
387
388 LifeCycleAwareChannelHandler h =
389 (LifeCycleAwareChannelHandler) ctx.getHandler();
390
391 try {
392 h.afterRemove(ctx);
393 } catch (Throwable t) {
394 throw new ChannelHandlerLifeCycleException(
395 h.getClass().getName() +
396 ".afterRemove() has thrown an exception.", t);
397 }
398 }
399
400 public synchronized ChannelHandler getFirst() {
401 DefaultChannelHandlerContext head = this.head;
402 if (head == null) {
403 return null;
404 }
405 return head.getHandler();
406 }
407
408 public synchronized ChannelHandler getLast() {
409 DefaultChannelHandlerContext tail = this.tail;
410 if (tail == null) {
411 return null;
412 }
413 return tail.getHandler();
414 }
415
416 public synchronized ChannelHandler get(String name) {
417 DefaultChannelHandlerContext ctx = name2ctx.get(name);
418 if (ctx == null) {
419 return null;
420 } else {
421 return ctx.getHandler();
422 }
423 }
424
425 public synchronized <T extends ChannelHandler> T get(Class<T> handlerType) {
426 ChannelHandlerContext ctx = getContext(handlerType);
427 if (ctx == null) {
428 return null;
429 } else {
430 @SuppressWarnings("unchecked")
431 T handler = (T) ctx.getHandler();
432 return handler;
433 }
434 }
435
436 public synchronized ChannelHandlerContext getContext(String name) {
437 if (name == null) {
438 throw new NullPointerException("name");
439 }
440 return name2ctx.get(name);
441 }
442
443 public synchronized ChannelHandlerContext getContext(ChannelHandler handler) {
444 if (handler == null) {
445 throw new NullPointerException("handler");
446 }
447 if (name2ctx.isEmpty()) {
448 return null;
449 }
450 DefaultChannelHandlerContext ctx = head;
451 for (;;) {
452 if (ctx.getHandler() == handler) {
453 return ctx;
454 }
455
456 ctx = ctx.next;
457 if (ctx == null) {
458 break;
459 }
460 }
461 return null;
462 }
463
464 public synchronized ChannelHandlerContext getContext(
465 Class<? extends ChannelHandler> handlerType) {
466 if (handlerType == null) {
467 throw new NullPointerException("handlerType");
468 }
469
470 if (name2ctx.isEmpty()) {
471 return null;
472 }
473 DefaultChannelHandlerContext ctx = head;
474 for (;;) {
475 if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
476 return ctx;
477 }
478
479 ctx = ctx.next;
480 if (ctx == null) {
481 break;
482 }
483 }
484 return null;
485 }
486
487 public List<String> getNames() {
488 List<String> list = new ArrayList<String>();
489 if (name2ctx.isEmpty()) {
490 return list;
491 }
492
493 DefaultChannelHandlerContext ctx = head;
494 for (;;) {
495 list.add(ctx.getName());
496 ctx = ctx.next;
497 if (ctx == null) {
498 break;
499 }
500 }
501 return list;
502 }
503
504 public Map<String, ChannelHandler> toMap() {
505 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
506 if (name2ctx.isEmpty()) {
507 return map;
508 }
509
510 DefaultChannelHandlerContext ctx = head;
511 for (;;) {
512 map.put(ctx.getName(), ctx.getHandler());
513 ctx = ctx.next;
514 if (ctx == null) {
515 break;
516 }
517 }
518 return map;
519 }
520
521
522
523
524 @Override
525 public String toString() {
526 StringBuilder buf = new StringBuilder();
527 buf.append(getClass().getSimpleName());
528 buf.append('{');
529 DefaultChannelHandlerContext ctx = head;
530 if (ctx != null) {
531 for (;;) {
532 buf.append('(');
533 buf.append(ctx.getName());
534 buf.append(" = ");
535 buf.append(ctx.getHandler().getClass().getName());
536 buf.append(')');
537 ctx = ctx.next;
538 if (ctx == null) {
539 break;
540 }
541 buf.append(", ");
542 }
543 }
544 buf.append('}');
545 return buf.toString();
546 }
547
548 public void sendUpstream(ChannelEvent e) {
549 DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
550 if (head == null) {
551 if (logger.isWarnEnabled()) {
552 logger.warn(
553 "The pipeline contains no upstream handlers; discarding: " + e);
554 }
555
556 return;
557 }
558
559 sendUpstream(head, e);
560 }
561
562 void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
563 try {
564 ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
565 } catch (Throwable t) {
566 notifyHandlerException(e, t);
567 }
568 }
569
570 public void sendDownstream(ChannelEvent e) {
571 DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
572 if (tail == null) {
573 try {
574 getSink().eventSunk(this, e);
575 return;
576 } catch (Throwable t) {
577 notifyHandlerException(e, t);
578 return;
579 }
580 }
581
582 sendDownstream(tail, e);
583 }
584
585 void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
586 if (e instanceof UpstreamMessageEvent) {
587 throw new IllegalArgumentException("cannot send an upstream event to downstream");
588 }
589
590 try {
591 ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
592 } catch (Throwable t) {
593
594
595
596
597
598 e.getFuture().setFailure(t);
599 notifyHandlerException(e, t);
600 }
601 }
602
603 private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
604 if (ctx == null) {
605 return null;
606 }
607
608 DefaultChannelHandlerContext realCtx = ctx;
609 while (!realCtx.canHandleUpstream()) {
610 realCtx = realCtx.next;
611 if (realCtx == null) {
612 return null;
613 }
614 }
615
616 return realCtx;
617 }
618
619 private DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
620 if (ctx == null) {
621 return null;
622 }
623
624 DefaultChannelHandlerContext realCtx = ctx;
625 while (!realCtx.canHandleDownstream()) {
626 realCtx = realCtx.prev;
627 if (realCtx == null) {
628 return null;
629 }
630 }
631
632 return realCtx;
633 }
634
635 public ChannelFuture execute(Runnable task) {
636 return getSink().execute(this, task);
637 }
638
639 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
640 if (e instanceof ExceptionEvent) {
641 if (logger.isWarnEnabled()) {
642 logger.warn(
643 "An exception was thrown by a user handler " +
644 "while handling an exception event (" + e + ')', t);
645 }
646
647 return;
648 }
649
650 ChannelPipelineException pe;
651 if (t instanceof ChannelPipelineException) {
652 pe = (ChannelPipelineException) t;
653 } else {
654 pe = new ChannelPipelineException(t);
655 }
656
657 try {
658 sink.exceptionCaught(this, e, pe);
659 } catch (Exception e1) {
660 if (logger.isWarnEnabled()) {
661 logger.warn("An exception was thrown by an exception handler.", e1);
662 }
663 }
664 }
665
666 private void init(String name, ChannelHandler handler) {
667 DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);
668 callBeforeAdd(ctx);
669 head = tail = ctx;
670 name2ctx.clear();
671 name2ctx.put(name, ctx);
672 callAfterAdd(ctx);
673 }
674
675 private void checkDuplicateName(String name) {
676 if (name2ctx.containsKey(name)) {
677 throw new IllegalArgumentException("Duplicate handler name: " + name);
678 }
679 }
680
681 private DefaultChannelHandlerContext getContextOrDie(String name) {
682 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(name);
683 if (ctx == null) {
684 throw new NoSuchElementException(name);
685 } else {
686 return ctx;
687 }
688 }
689
690 private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
691 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handler);
692 if (ctx == null) {
693 throw new NoSuchElementException(handler.getClass().getName());
694 } else {
695 return ctx;
696 }
697 }
698
699 private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
700 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handlerType);
701 if (ctx == null) {
702 throw new NoSuchElementException(handlerType.getName());
703 } else {
704 return ctx;
705 }
706 }
707
708 private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
709 volatile DefaultChannelHandlerContext next;
710 volatile DefaultChannelHandlerContext prev;
711 private final String name;
712 private final ChannelHandler handler;
713 private final boolean canHandleUpstream;
714 private final boolean canHandleDownstream;
715 private volatile Object attachment;
716
717 DefaultChannelHandlerContext(
718 DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
719 String name, ChannelHandler handler) {
720
721 if (name == null) {
722 throw new NullPointerException("name");
723 }
724 if (handler == null) {
725 throw new NullPointerException("handler");
726 }
727 canHandleUpstream = handler instanceof ChannelUpstreamHandler;
728 canHandleDownstream = handler instanceof ChannelDownstreamHandler;
729
730 if (!canHandleUpstream && !canHandleDownstream) {
731 throw new IllegalArgumentException(
732 "handler must be either " +
733 ChannelUpstreamHandler.class.getName() + " or " +
734 ChannelDownstreamHandler.class.getName() + '.');
735 }
736
737 this.prev = prev;
738 this.next = next;
739 this.name = name;
740 this.handler = handler;
741 }
742
743 public Channel getChannel() {
744 return getPipeline().getChannel();
745 }
746
747 public ChannelPipeline getPipeline() {
748 return DefaultChannelPipeline.this;
749 }
750
751 public boolean canHandleDownstream() {
752 return canHandleDownstream;
753 }
754
755 public boolean canHandleUpstream() {
756 return canHandleUpstream;
757 }
758
759 public ChannelHandler getHandler() {
760 return handler;
761 }
762
763 public String getName() {
764 return name;
765 }
766
767 public Object getAttachment() {
768 return attachment;
769 }
770
771 public void setAttachment(Object attachment) {
772 this.attachment = attachment;
773 }
774
775 public void sendDownstream(ChannelEvent e) {
776 DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
777 if (prev == null) {
778 try {
779 getSink().eventSunk(DefaultChannelPipeline.this, e);
780 } catch (Throwable t) {
781 notifyHandlerException(e, t);
782 }
783 } else {
784 DefaultChannelPipeline.this.sendDownstream(prev, e);
785 }
786 }
787
788 public void sendUpstream(ChannelEvent e) {
789 DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
790 if (next != null) {
791 DefaultChannelPipeline.this.sendUpstream(next, e);
792 }
793 }
794 }
795
796 private static final class DiscardingChannelSink implements ChannelSink {
797 DiscardingChannelSink() {
798 }
799
800 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
801 if (logger.isWarnEnabled()) {
802 logger.warn("Not attached yet; discarding: " + e);
803 }
804 }
805
806 public void exceptionCaught(ChannelPipeline pipeline,
807 ChannelEvent e, ChannelPipelineException cause) throws Exception {
808 throw cause;
809 }
810
811 public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
812 if (logger.isWarnEnabled()) {
813 logger.warn("Not attached yet; rejecting: " + task);
814 }
815 return Channels.failedFuture(pipeline.getChannel(), new RejectedExecutionException("Not attached yet"));
816 }
817 }
818 }