1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel;
17
18 import org.jboss.netty.logging.InternalLogger;
19 import org.jboss.netty.logging.InternalLoggerFactory;
20
21 import java.util.ArrayList;
22 import java.util.HashMap;
23 import java.util.LinkedHashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.NoSuchElementException;
27 import java.util.concurrent.RejectedExecutionException;
28
29
30
31
32
33
34 public class DefaultChannelPipeline implements ChannelPipeline {
35
36 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
37 static final ChannelSink discardingSink = new DiscardingChannelSink();
38
39 private volatile Channel channel;
40 private volatile ChannelSink sink;
41 private volatile DefaultChannelHandlerContext head;
42 private volatile DefaultChannelHandlerContext tail;
43 private final Map<String, DefaultChannelHandlerContext> name2ctx =
44 new HashMap<String, DefaultChannelHandlerContext>(4);
45
46 public Channel getChannel() {
47 return channel;
48 }
49
50 public ChannelSink getSink() {
51 ChannelSink sink = this.sink;
52 if (sink == null) {
53 return discardingSink;
54 }
55 return sink;
56 }
57
58 public void attach(Channel channel, ChannelSink sink) {
59 if (channel == null) {
60 throw new NullPointerException("channel");
61 }
62 if (sink == null) {
63 throw new NullPointerException("sink");
64 }
65 if (this.channel != null || this.sink != null) {
66 throw new IllegalStateException("attached already");
67 }
68 this.channel = channel;
69 this.sink = sink;
70 }
71
72 public boolean isAttached() {
73 return sink != null;
74 }
75
76 public synchronized void addFirst(String name, ChannelHandler handler) {
77 if (name2ctx.isEmpty()) {
78 init(name, handler);
79 } else {
80 checkDuplicateName(name);
81 DefaultChannelHandlerContext oldHead = head;
82 DefaultChannelHandlerContext newHead = new DefaultChannelHandlerContext(null, oldHead, name, handler);
83
84 callBeforeAdd(newHead);
85
86 oldHead.prev = newHead;
87 head = newHead;
88 name2ctx.put(name, newHead);
89
90 callAfterAdd(newHead);
91 }
92 }
93
94 public synchronized void addLast(String name, ChannelHandler handler) {
95 if (name2ctx.isEmpty()) {
96 init(name, handler);
97 } else {
98 checkDuplicateName(name);
99 DefaultChannelHandlerContext oldTail = tail;
100 DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);
101
102 callBeforeAdd(newTail);
103
104 oldTail.next = newTail;
105 tail = newTail;
106 name2ctx.put(name, newTail);
107
108 callAfterAdd(newTail);
109 }
110 }
111
112 public synchronized void addBefore(String baseName, String name, ChannelHandler handler) {
113 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
114 if (ctx == head) {
115 addFirst(name, handler);
116 } else {
117 checkDuplicateName(name);
118 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx.prev, ctx, name, handler);
119
120 callBeforeAdd(newCtx);
121
122 ctx.prev.next = newCtx;
123 ctx.prev = newCtx;
124 name2ctx.put(name, newCtx);
125
126 callAfterAdd(newCtx);
127 }
128 }
129
130 public synchronized void addAfter(String baseName, String name, ChannelHandler handler) {
131 DefaultChannelHandlerContext ctx = getContextOrDie(baseName);
132 if (ctx == tail) {
133 addLast(name, handler);
134 } else {
135 checkDuplicateName(name);
136 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(ctx, ctx.next, name, handler);
137
138 callBeforeAdd(newCtx);
139
140 ctx.next.prev = newCtx;
141 ctx.next = newCtx;
142 name2ctx.put(name, newCtx);
143
144 callAfterAdd(newCtx);
145 }
146 }
147
148 public synchronized void remove(ChannelHandler handler) {
149 remove(getContextOrDie(handler));
150 }
151
152 public synchronized ChannelHandler remove(String name) {
153 return remove(getContextOrDie(name)).getHandler();
154 }
155
156 @SuppressWarnings("unchecked")
157 public synchronized <T extends ChannelHandler> T remove(Class<T> handlerType) {
158 return (T) remove(getContextOrDie(handlerType)).getHandler();
159 }
160
161 private DefaultChannelHandlerContext remove(DefaultChannelHandlerContext ctx) {
162 if (head == tail) {
163 head = tail = null;
164 name2ctx.clear();
165 } else if (ctx == head) {
166 removeFirst();
167 } else if (ctx == tail) {
168 removeLast();
169 } else {
170 callBeforeRemove(ctx);
171
172 DefaultChannelHandlerContext prev = ctx.prev;
173 DefaultChannelHandlerContext next = ctx.next;
174 prev.next = next;
175 next.prev = prev;
176 name2ctx.remove(ctx.getName());
177
178 callAfterRemove(ctx);
179 }
180 return ctx;
181 }
182
183 public synchronized ChannelHandler removeFirst() {
184 if (name2ctx.isEmpty()) {
185 throw new NoSuchElementException();
186 }
187
188 DefaultChannelHandlerContext oldHead = head;
189 if (oldHead == null) {
190 throw new NoSuchElementException();
191 }
192
193 callBeforeRemove(oldHead);
194
195 if (oldHead.next == null) {
196 head = tail = null;
197 name2ctx.clear();
198 } else {
199 oldHead.next.prev = null;
200 head = oldHead.next;
201 name2ctx.remove(oldHead.getName());
202 }
203
204 callAfterRemove(oldHead);
205
206 return oldHead.getHandler();
207 }
208
209 public synchronized ChannelHandler removeLast() {
210 if (name2ctx.isEmpty()) {
211 throw new NoSuchElementException();
212 }
213
214 DefaultChannelHandlerContext oldTail = tail;
215 if (oldTail == null) {
216 throw new NoSuchElementException();
217 }
218
219 callBeforeRemove(oldTail);
220
221 if (oldTail.prev == null) {
222 head = tail = null;
223 name2ctx.clear();
224 } else {
225 oldTail.prev.next = null;
226 tail = oldTail.prev;
227 name2ctx.remove(oldTail.getName());
228 }
229
230 callBeforeRemove(oldTail);
231
232 return oldTail.getHandler();
233 }
234
235 public synchronized void replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
236 replace(getContextOrDie(oldHandler), newName, newHandler);
237 }
238
239 public synchronized ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
240 return replace(getContextOrDie(oldName), newName, newHandler);
241 }
242
243 @SuppressWarnings("unchecked")
244 public synchronized <T extends ChannelHandler> T replace(
245 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
246 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
247 }
248
249 private ChannelHandler replace(DefaultChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
250 if (ctx == head) {
251 removeFirst();
252 addFirst(newName, newHandler);
253 } else if (ctx == tail) {
254 removeLast();
255 addLast(newName, newHandler);
256 } else {
257 boolean sameName = ctx.getName().equals(newName);
258 if (!sameName) {
259 checkDuplicateName(newName);
260 }
261
262 DefaultChannelHandlerContext prev = ctx.prev;
263 DefaultChannelHandlerContext next = ctx.next;
264 DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(prev, next, newName, newHandler);
265
266 callBeforeRemove(ctx);
267 callBeforeAdd(newCtx);
268
269 prev.next = newCtx;
270 next.prev = newCtx;
271
272 if (!sameName) {
273 name2ctx.remove(ctx.getName());
274 }
275 name2ctx.put(newName, newCtx);
276
277 ChannelHandlerLifeCycleException removeException = null;
278 ChannelHandlerLifeCycleException addException = null;
279 boolean removed = false;
280 try {
281 callAfterRemove(ctx);
282 removed = true;
283 } catch (ChannelHandlerLifeCycleException e) {
284 removeException = e;
285 }
286
287 boolean added = false;
288 try {
289 callAfterAdd(newCtx);
290 added = true;
291 } catch (ChannelHandlerLifeCycleException e) {
292 addException = e;
293 }
294
295 if (!removed && !added) {
296 logger.warn(removeException.getMessage(), removeException);
297 logger.warn(addException.getMessage(), addException);
298 throw new ChannelHandlerLifeCycleException(
299 "Both " + ctx.getHandler().getClass().getName() +
300 ".afterRemove() and " + newCtx.getHandler().getClass().getName() +
301 ".afterAdd() failed; see logs.");
302 } else if (!removed) {
303 throw removeException;
304 } else if (!added) {
305 throw addException;
306 }
307 }
308
309 return ctx.getHandler();
310 }
311
312 private static void callBeforeAdd(ChannelHandlerContext ctx) {
313 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
314 return;
315 }
316
317 LifeCycleAwareChannelHandler h =
318 (LifeCycleAwareChannelHandler) ctx.getHandler();
319
320 try {
321 h.beforeAdd(ctx);
322 } catch (Throwable t) {
323 throw new ChannelHandlerLifeCycleException(
324 h.getClass().getName() +
325 ".beforeAdd() has thrown an exception; not adding.", t);
326 }
327 }
328
329 private void callAfterAdd(ChannelHandlerContext ctx) {
330 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
331 return;
332 }
333
334 LifeCycleAwareChannelHandler h =
335 (LifeCycleAwareChannelHandler) ctx.getHandler();
336
337 try {
338 h.afterAdd(ctx);
339 } catch (Throwable t) {
340 boolean removed = false;
341 try {
342 remove((DefaultChannelHandlerContext) ctx);
343 removed = true;
344 } catch (Throwable t2) {
345 if (logger.isWarnEnabled()) {
346 logger.warn("Failed to remove a handler: " + ctx.getName(), t2);
347 }
348 }
349
350 if (removed) {
351 throw new ChannelHandlerLifeCycleException(
352 h.getClass().getName() +
353 ".afterAdd() has thrown an exception; removed.", t);
354 } else {
355 throw new ChannelHandlerLifeCycleException(
356 h.getClass().getName() +
357 ".afterAdd() has thrown an exception; also failed to remove.", t);
358 }
359 }
360 }
361
362 private static void callBeforeRemove(ChannelHandlerContext ctx) {
363 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
364 return;
365 }
366
367 LifeCycleAwareChannelHandler h =
368 (LifeCycleAwareChannelHandler) ctx.getHandler();
369
370 try {
371 h.beforeRemove(ctx);
372 } catch (Throwable t) {
373 throw new ChannelHandlerLifeCycleException(
374 h.getClass().getName() +
375 ".beforeRemove() has thrown an exception; not removing.", t);
376 }
377 }
378
379 private static void callAfterRemove(ChannelHandlerContext ctx) {
380 if (!(ctx.getHandler() instanceof LifeCycleAwareChannelHandler)) {
381 return;
382 }
383
384 LifeCycleAwareChannelHandler h =
385 (LifeCycleAwareChannelHandler) ctx.getHandler();
386
387 try {
388 h.afterRemove(ctx);
389 } catch (Throwable t) {
390 throw new ChannelHandlerLifeCycleException(
391 h.getClass().getName() +
392 ".afterRemove() has thrown an exception.", t);
393 }
394 }
395
396 public synchronized ChannelHandler getFirst() {
397 DefaultChannelHandlerContext head = this.head;
398 if (head == null) {
399 return null;
400 }
401 return head.getHandler();
402 }
403
404 public synchronized ChannelHandler getLast() {
405 DefaultChannelHandlerContext tail = this.tail;
406 if (tail == null) {
407 return null;
408 }
409 return tail.getHandler();
410 }
411
412 public synchronized ChannelHandler get(String name) {
413 DefaultChannelHandlerContext ctx = name2ctx.get(name);
414 if (ctx == null) {
415 return null;
416 } else {
417 return ctx.getHandler();
418 }
419 }
420
421 public synchronized <T extends ChannelHandler> T get(Class<T> handlerType) {
422 ChannelHandlerContext ctx = getContext(handlerType);
423 if (ctx == null) {
424 return null;
425 } else {
426 return (T) ctx.getHandler();
427 }
428 }
429
430 public synchronized ChannelHandlerContext getContext(String name) {
431 if (name == null) {
432 throw new NullPointerException("name");
433 }
434 return name2ctx.get(name);
435 }
436
437 public synchronized ChannelHandlerContext getContext(ChannelHandler handler) {
438 if (handler == null) {
439 throw new NullPointerException("handler");
440 }
441 if (name2ctx.isEmpty()) {
442 return null;
443 }
444 DefaultChannelHandlerContext ctx = head;
445 for (;;) {
446 if (ctx.getHandler() == handler) {
447 return ctx;
448 }
449
450 ctx = ctx.next;
451 if (ctx == null) {
452 break;
453 }
454 }
455 return null;
456 }
457
458 public synchronized ChannelHandlerContext getContext(
459 Class<? extends ChannelHandler> handlerType) {
460 if (handlerType == null) {
461 throw new NullPointerException("handlerType");
462 }
463
464 if (name2ctx.isEmpty()) {
465 return null;
466 }
467 DefaultChannelHandlerContext ctx = head;
468 for (;;) {
469 if (handlerType.isAssignableFrom(ctx.getHandler().getClass())) {
470 return ctx;
471 }
472
473 ctx = ctx.next;
474 if (ctx == null) {
475 break;
476 }
477 }
478 return null;
479 }
480
481 public List<String> getNames() {
482 List<String> list = new ArrayList<String>();
483 if (name2ctx.isEmpty()) {
484 return list;
485 }
486
487 DefaultChannelHandlerContext ctx = head;
488 for (;;) {
489 list.add(ctx.getName());
490 ctx = ctx.next;
491 if (ctx == null) {
492 break;
493 }
494 }
495 return list;
496 }
497
498 public Map<String, ChannelHandler> toMap() {
499 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
500 if (name2ctx.isEmpty()) {
501 return map;
502 }
503
504 DefaultChannelHandlerContext ctx = head;
505 for (;;) {
506 map.put(ctx.getName(), ctx.getHandler());
507 ctx = ctx.next;
508 if (ctx == null) {
509 break;
510 }
511 }
512 return map;
513 }
514
515
516
517
518 @Override
519 public String toString() {
520 StringBuilder buf = new StringBuilder();
521 buf.append(getClass().getSimpleName());
522 buf.append('{');
523 DefaultChannelHandlerContext ctx = head;
524 if (ctx != null) {
525 for (;;) {
526 buf.append('(');
527 buf.append(ctx.getName());
528 buf.append(" = ");
529 buf.append(ctx.getHandler().getClass().getName());
530 buf.append(')');
531 ctx = ctx.next;
532 if (ctx == null) {
533 break;
534 }
535 buf.append(", ");
536 }
537 }
538 buf.append('}');
539 return buf.toString();
540 }
541
542 public void sendUpstream(ChannelEvent e) {
543 DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
544 if (head == null) {
545 if (logger.isWarnEnabled()) {
546 logger.warn(
547 "The pipeline contains no upstream handlers; discarding: " + e);
548 }
549
550 return;
551 }
552
553 sendUpstream(head, e);
554 }
555
556 void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
557 try {
558 ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
559 } catch (Throwable t) {
560 notifyHandlerException(e, t);
561 }
562 }
563
564 public void sendDownstream(ChannelEvent e) {
565 DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
566 if (tail == null) {
567 try {
568 getSink().eventSunk(this, e);
569 return;
570 } catch (Throwable t) {
571 notifyHandlerException(e, t);
572 return;
573 }
574 }
575
576 sendDownstream(tail, e);
577 }
578
579 void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
580 if (e instanceof UpstreamMessageEvent) {
581 throw new IllegalArgumentException("cannot send an upstream event to downstream");
582 }
583
584 try {
585 ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
586 } catch (Throwable t) {
587
588
589
590
591
592 e.getFuture().setFailure(t);
593 notifyHandlerException(e, t);
594 }
595 }
596
597 DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
598 if (ctx == null) {
599 return null;
600 }
601
602 DefaultChannelHandlerContext realCtx = ctx;
603 while (!realCtx.canHandleUpstream()) {
604 realCtx = realCtx.next;
605 if (realCtx == null) {
606 return null;
607 }
608 }
609
610 return realCtx;
611 }
612
613 DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
614 if (ctx == null) {
615 return null;
616 }
617
618 DefaultChannelHandlerContext realCtx = ctx;
619 while (!realCtx.canHandleDownstream()) {
620 realCtx = realCtx.prev;
621 if (realCtx == null) {
622 return null;
623 }
624 }
625
626 return realCtx;
627 }
628
629 public ChannelFuture execute(Runnable task) {
630 return getSink().execute(this, task);
631 }
632
633 protected void notifyHandlerException(ChannelEvent e, Throwable t) {
634 if (e instanceof ExceptionEvent) {
635 if (logger.isWarnEnabled()) {
636 logger.warn(
637 "An exception was thrown by a user handler " +
638 "while handling an exception event (" + e + ')', t);
639 }
640
641 return;
642 }
643
644 ChannelPipelineException pe;
645 if (t instanceof ChannelPipelineException) {
646 pe = (ChannelPipelineException) t;
647 } else {
648 pe = new ChannelPipelineException(t);
649 }
650
651 try {
652 sink.exceptionCaught(this, e, pe);
653 } catch (Exception e1) {
654 if (logger.isWarnEnabled()) {
655 logger.warn("An exception was thrown by an exception handler.", e1);
656 }
657 }
658 }
659
660 private void init(String name, ChannelHandler handler) {
661 DefaultChannelHandlerContext ctx = new DefaultChannelHandlerContext(null, null, name, handler);
662 callBeforeAdd(ctx);
663 head = tail = ctx;
664 name2ctx.clear();
665 name2ctx.put(name, ctx);
666 callAfterAdd(ctx);
667 }
668
669 private void checkDuplicateName(String name) {
670 if (name2ctx.containsKey(name)) {
671 throw new IllegalArgumentException("Duplicate handler name: " + name);
672 }
673 }
674
675 private DefaultChannelHandlerContext getContextOrDie(String name) {
676 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(name);
677 if (ctx == null) {
678 throw new NoSuchElementException(name);
679 } else {
680 return ctx;
681 }
682 }
683
684 private DefaultChannelHandlerContext getContextOrDie(ChannelHandler handler) {
685 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handler);
686 if (ctx == null) {
687 throw new NoSuchElementException(handler.getClass().getName());
688 } else {
689 return ctx;
690 }
691 }
692
693 private DefaultChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
694 DefaultChannelHandlerContext ctx = (DefaultChannelHandlerContext) getContext(handlerType);
695 if (ctx == null) {
696 throw new NoSuchElementException(handlerType.getName());
697 } else {
698 return ctx;
699 }
700 }
701
702 private final class DefaultChannelHandlerContext implements ChannelHandlerContext {
703 volatile DefaultChannelHandlerContext next;
704 volatile DefaultChannelHandlerContext prev;
705 private final String name;
706 private final ChannelHandler handler;
707 private final boolean canHandleUpstream;
708 private final boolean canHandleDownstream;
709 private volatile Object attachment;
710
711 DefaultChannelHandlerContext(
712 DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
713 String name, ChannelHandler handler) {
714
715 if (name == null) {
716 throw new NullPointerException("name");
717 }
718 if (handler == null) {
719 throw new NullPointerException("handler");
720 }
721 canHandleUpstream = handler instanceof ChannelUpstreamHandler;
722 canHandleDownstream = handler instanceof ChannelDownstreamHandler;
723
724
725 if (!canHandleUpstream && !canHandleDownstream) {
726 throw new IllegalArgumentException(
727 "handler must be either " +
728 ChannelUpstreamHandler.class.getName() + " or " +
729 ChannelDownstreamHandler.class.getName() + '.');
730 }
731
732 this.prev = prev;
733 this.next = next;
734 this.name = name;
735 this.handler = handler;
736 }
737
738 public Channel getChannel() {
739 return getPipeline().getChannel();
740 }
741
742 public ChannelPipeline getPipeline() {
743 return DefaultChannelPipeline.this;
744 }
745
746 public boolean canHandleDownstream() {
747 return canHandleDownstream;
748 }
749
750 public boolean canHandleUpstream() {
751 return canHandleUpstream;
752 }
753
754 public ChannelHandler getHandler() {
755 return handler;
756 }
757
758 public String getName() {
759 return name;
760 }
761
762 public Object getAttachment() {
763 return attachment;
764 }
765
766 public void setAttachment(Object attachment) {
767 this.attachment = attachment;
768 }
769
770 public void sendDownstream(ChannelEvent e) {
771 DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
772 if (prev == null) {
773 try {
774 getSink().eventSunk(DefaultChannelPipeline.this, e);
775 } catch (Throwable t) {
776 notifyHandlerException(e, t);
777 }
778 } else {
779 DefaultChannelPipeline.this.sendDownstream(prev, e);
780 }
781 }
782
783 public void sendUpstream(ChannelEvent e) {
784 DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
785 if (next != null) {
786 DefaultChannelPipeline.this.sendUpstream(next, e);
787 }
788 }
789 }
790
791 private static final class DiscardingChannelSink implements ChannelSink {
792 DiscardingChannelSink() {
793 }
794
795 public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) {
796 if (logger.isWarnEnabled()) {
797 logger.warn("Not attached yet; discarding: " + e);
798 }
799
800 }
801
802 public void exceptionCaught(ChannelPipeline pipeline,
803 ChannelEvent e, ChannelPipelineException cause) throws Exception {
804 throw cause;
805 }
806
807
808 public ChannelFuture execute(ChannelPipeline pipeline, Runnable task) {
809 if (logger.isWarnEnabled()) {
810 logger.warn("Not attached yet; rejecting: " + task);
811 }
812 return Channels.failedFuture(pipeline.getChannel(), new RejectedExecutionException("Not attached yet"));
813 }
814 }
815 }