1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.channel.Channel.Unsafe;
19 import io.netty.util.ReferenceCountUtil;
20 import io.netty.util.ResourceLeakDetector;
21 import io.netty.util.concurrent.EventExecutor;
22 import io.netty.util.concurrent.EventExecutorGroup;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.internal.ObjectUtil;
25 import io.netty.util.internal.StringUtil;
26 import io.netty.util.internal.logging.InternalLogger;
27 import io.netty.util.internal.logging.InternalLoggerFactory;
28
29 import java.net.SocketAddress;
30 import java.util.ArrayList;
31 import java.util.IdentityHashMap;
32 import java.util.Iterator;
33 import java.util.LinkedHashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.NoSuchElementException;
37 import java.util.WeakHashMap;
38 import java.util.concurrent.RejectedExecutionException;
39 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
40
41
42
43
44
45 public class DefaultChannelPipeline implements ChannelPipeline {
46
47 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
48
49 private static final String HEAD_NAME = generateName0(HeadContext.class);
50 private static final String TAIL_NAME = generateName0(TailContext.class);
51
52 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
53 new FastThreadLocal<Map<Class<?>, String>>() {
54 @Override
55 protected Map<Class<?>, String> initialValue() {
56 return new WeakHashMap<Class<?>, String>();
57 }
58 };
59
60 private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
61 AtomicReferenceFieldUpdater.newUpdater(
62 DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
63 final HeadContext head;
64 final TailContext tail;
65
66 private final Channel channel;
67 private final ChannelFuture succeededFuture;
68 private final VoidChannelPromise voidPromise;
69 private final boolean touch = ResourceLeakDetector.isEnabled();
70
71 private Map<EventExecutorGroup, EventExecutor> childExecutors;
72 private volatile MessageSizeEstimator.Handle estimatorHandle;
73 private boolean firstRegistration = true;
74
75
76
77
78
79
80
81
82
83 private PendingHandlerCallback pendingHandlerCallbackHead;
84
85
86
87
88
89 private boolean registered;
90
91 protected DefaultChannelPipeline(Channel channel) {
92 this.channel = ObjectUtil.checkNotNull(channel, "channel");
93 succeededFuture = new SucceededChannelFuture(channel, null);
94 voidPromise = new VoidChannelPromise(channel, true);
95
96 tail = new TailContext(this);
97 head = new HeadContext(this);
98
99 head.next = tail;
100 tail.prev = head;
101 }
102
103 final MessageSizeEstimator.Handle estimatorHandle() {
104 MessageSizeEstimator.Handle handle = estimatorHandle;
105 if (handle == null) {
106 handle = channel.config().getMessageSizeEstimator().newHandle();
107 if (!ESTIMATOR.compareAndSet(this, null, handle)) {
108 handle = estimatorHandle;
109 }
110 }
111 return handle;
112 }
113
114 final Object touch(Object msg, AbstractChannelHandlerContext next) {
115 return touch ? ReferenceCountUtil.touch(msg, next) : msg;
116 }
117
118 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
119 return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
120 }
121
122 private EventExecutor childExecutor(EventExecutorGroup group) {
123 if (group == null) {
124 return null;
125 }
126 Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
127 if (pinEventExecutor != null && !pinEventExecutor) {
128 return group.next();
129 }
130 Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
131 if (childExecutors == null) {
132
133 childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
134 }
135
136
137 EventExecutor childExecutor = childExecutors.get(group);
138 if (childExecutor == null) {
139 childExecutor = group.next();
140 childExecutors.put(group, childExecutor);
141 }
142 return childExecutor;
143 }
144 @Override
145 public final Channel channel() {
146 return channel;
147 }
148
149 @Override
150 public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
151 return addFirst(null, name, handler);
152 }
153
154 @Override
155 public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
156 final AbstractChannelHandlerContext newCtx;
157 synchronized (this) {
158 checkMultiplicity(handler);
159 name = filterName(name, handler);
160
161 newCtx = newContext(group, name, handler);
162
163 addFirst0(newCtx);
164
165
166
167
168 if (!registered) {
169 newCtx.setAddPending();
170 callHandlerCallbackLater(newCtx, true);
171 return this;
172 }
173
174 EventExecutor executor = newCtx.executor();
175 if (!executor.inEventLoop()) {
176 callHandlerAddedInEventLoop(newCtx, executor);
177 return this;
178 }
179 }
180 callHandlerAdded0(newCtx);
181 return this;
182 }
183
184 private void addFirst0(AbstractChannelHandlerContext newCtx) {
185 AbstractChannelHandlerContext nextCtx = head.next;
186 newCtx.prev = head;
187 newCtx.next = nextCtx;
188 head.next = newCtx;
189 nextCtx.prev = newCtx;
190 }
191
192 @Override
193 public final ChannelPipeline addLast(String name, ChannelHandler handler) {
194 return addLast(null, name, handler);
195 }
196
197 @Override
198 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
199 final AbstractChannelHandlerContext newCtx;
200 synchronized (this) {
201 checkMultiplicity(handler);
202
203 newCtx = newContext(group, filterName(name, handler), handler);
204
205 addLast0(newCtx);
206
207
208
209
210 if (!registered) {
211 newCtx.setAddPending();
212 callHandlerCallbackLater(newCtx, true);
213 return this;
214 }
215
216 EventExecutor executor = newCtx.executor();
217 if (!executor.inEventLoop()) {
218 callHandlerAddedInEventLoop(newCtx, executor);
219 return this;
220 }
221 }
222 callHandlerAdded0(newCtx);
223 return this;
224 }
225
226 private void addLast0(AbstractChannelHandlerContext newCtx) {
227 AbstractChannelHandlerContext prev = tail.prev;
228 newCtx.prev = prev;
229 newCtx.next = tail;
230 prev.next = newCtx;
231 tail.prev = newCtx;
232 }
233
234 @Override
235 public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
236 return addBefore(null, baseName, name, handler);
237 }
238
239 @Override
240 public final ChannelPipeline addBefore(
241 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
242 final AbstractChannelHandlerContext newCtx;
243 final AbstractChannelHandlerContext ctx;
244 synchronized (this) {
245 checkMultiplicity(handler);
246 name = filterName(name, handler);
247 ctx = getContextOrDie(baseName);
248
249 newCtx = newContext(group, name, handler);
250
251 addBefore0(ctx, newCtx);
252
253
254
255
256 if (!registered) {
257 newCtx.setAddPending();
258 callHandlerCallbackLater(newCtx, true);
259 return this;
260 }
261
262 EventExecutor executor = newCtx.executor();
263 if (!executor.inEventLoop()) {
264 callHandlerAddedInEventLoop(newCtx, executor);
265 return this;
266 }
267 }
268 callHandlerAdded0(newCtx);
269 return this;
270 }
271
272 private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
273 newCtx.prev = ctx.prev;
274 newCtx.next = ctx;
275 ctx.prev.next = newCtx;
276 ctx.prev = newCtx;
277 }
278
279 private String filterName(String name, ChannelHandler handler) {
280 if (name == null) {
281 return generateName(handler);
282 }
283 checkDuplicateName(name);
284 return name;
285 }
286
287 @Override
288 public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
289 return addAfter(null, baseName, name, handler);
290 }
291
292 @Override
293 public final ChannelPipeline addAfter(
294 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
295 final AbstractChannelHandlerContext newCtx;
296 final AbstractChannelHandlerContext ctx;
297
298 synchronized (this) {
299 checkMultiplicity(handler);
300 name = filterName(name, handler);
301 ctx = getContextOrDie(baseName);
302
303 newCtx = newContext(group, name, handler);
304
305 addAfter0(ctx, newCtx);
306
307
308
309
310 if (!registered) {
311 newCtx.setAddPending();
312 callHandlerCallbackLater(newCtx, true);
313 return this;
314 }
315 EventExecutor executor = newCtx.executor();
316 if (!executor.inEventLoop()) {
317 callHandlerAddedInEventLoop(newCtx, executor);
318 return this;
319 }
320 }
321 callHandlerAdded0(newCtx);
322 return this;
323 }
324
325 private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
326 newCtx.prev = ctx;
327 newCtx.next = ctx.next;
328 ctx.next.prev = newCtx;
329 ctx.next = newCtx;
330 }
331
332 public final ChannelPipeline addFirst(ChannelHandler handler) {
333 return addFirst(null, handler);
334 }
335
336 @Override
337 public final ChannelPipeline addFirst(ChannelHandler... handlers) {
338 return addFirst(null, handlers);
339 }
340
341 @Override
342 public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
343 ObjectUtil.checkNotNull(handlers, "handlers");
344 if (handlers.length == 0 || handlers[0] == null) {
345 return this;
346 }
347
348 int size;
349 for (size = 1; size < handlers.length; size ++) {
350 if (handlers[size] == null) {
351 break;
352 }
353 }
354
355 for (int i = size - 1; i >= 0; i --) {
356 ChannelHandler h = handlers[i];
357 addFirst(executor, null, h);
358 }
359
360 return this;
361 }
362
363 public final ChannelPipeline addLast(ChannelHandler handler) {
364 return addLast(null, handler);
365 }
366
367 @Override
368 public final ChannelPipeline addLast(ChannelHandler... handlers) {
369 return addLast(null, handlers);
370 }
371
372 @Override
373 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
374 ObjectUtil.checkNotNull(handlers, "handlers");
375
376 for (ChannelHandler h: handlers) {
377 if (h == null) {
378 break;
379 }
380 addLast(executor, null, h);
381 }
382
383 return this;
384 }
385
386 private String generateName(ChannelHandler handler) {
387 Map<Class<?>, String> cache = nameCaches.get();
388 Class<?> handlerType = handler.getClass();
389 String name = cache.get(handlerType);
390 if (name == null) {
391 name = generateName0(handlerType);
392 cache.put(handlerType, name);
393 }
394
395
396
397 if (context0(name) != null) {
398 String baseName = name.substring(0, name.length() - 1);
399 for (int i = 1;; i ++) {
400 String newName = baseName + i;
401 if (context0(newName) == null) {
402 name = newName;
403 break;
404 }
405 }
406 }
407 return name;
408 }
409
410 private static String generateName0(Class<?> handlerType) {
411 return StringUtil.simpleClassName(handlerType) + "#0";
412 }
413
414 @Override
415 public final ChannelPipeline remove(ChannelHandler handler) {
416 remove(getContextOrDie(handler));
417 return this;
418 }
419
420 @Override
421 public final ChannelHandler remove(String name) {
422 return remove(getContextOrDie(name)).handler();
423 }
424
425 @SuppressWarnings("unchecked")
426 @Override
427 public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
428 return (T) remove(getContextOrDie(handlerType)).handler();
429 }
430
431 public final <T extends ChannelHandler> T removeIfExists(String name) {
432 return removeIfExists(context(name));
433 }
434
435 public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
436 return removeIfExists(context(handlerType));
437 }
438
439 public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
440 return removeIfExists(context(handler));
441 }
442
443 @SuppressWarnings("unchecked")
444 private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
445 if (ctx == null) {
446 return null;
447 }
448 return (T) remove((AbstractChannelHandlerContext) ctx).handler();
449 }
450
451 private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
452 assert ctx != head && ctx != tail;
453
454 synchronized (this) {
455 atomicRemoveFromHandlerList(ctx);
456
457
458
459
460 if (!registered) {
461 callHandlerCallbackLater(ctx, false);
462 return ctx;
463 }
464
465 EventExecutor executor = ctx.executor();
466 if (!executor.inEventLoop()) {
467 executor.execute(new Runnable() {
468 @Override
469 public void run() {
470 callHandlerRemoved0(ctx);
471 }
472 });
473 return ctx;
474 }
475 }
476 callHandlerRemoved0(ctx);
477 return ctx;
478 }
479
480
481
482
483 private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
484 AbstractChannelHandlerContext prev = ctx.prev;
485 AbstractChannelHandlerContext next = ctx.next;
486 prev.next = next;
487 next.prev = prev;
488 }
489
490 @Override
491 public final ChannelHandler removeFirst() {
492 if (head.next == tail) {
493 throw new NoSuchElementException();
494 }
495 return remove(head.next).handler();
496 }
497
498 @Override
499 public final ChannelHandler removeLast() {
500 if (head.next == tail) {
501 throw new NoSuchElementException();
502 }
503 return remove(tail.prev).handler();
504 }
505
506 @Override
507 public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
508 replace(getContextOrDie(oldHandler), newName, newHandler);
509 return this;
510 }
511
512 @Override
513 public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
514 return replace(getContextOrDie(oldName), newName, newHandler);
515 }
516
517 @Override
518 @SuppressWarnings("unchecked")
519 public final <T extends ChannelHandler> T replace(
520 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
521 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
522 }
523
524 private ChannelHandler replace(
525 final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
526 assert ctx != head && ctx != tail;
527
528 final AbstractChannelHandlerContext newCtx;
529 synchronized (this) {
530 checkMultiplicity(newHandler);
531 if (newName == null) {
532 newName = generateName(newHandler);
533 } else {
534 boolean sameName = ctx.name().equals(newName);
535 if (!sameName) {
536 checkDuplicateName(newName);
537 }
538 }
539
540 newCtx = newContext(ctx.executor, newName, newHandler);
541
542 replace0(ctx, newCtx);
543
544
545
546
547
548 if (!registered) {
549 callHandlerCallbackLater(newCtx, true);
550 callHandlerCallbackLater(ctx, false);
551 return ctx.handler();
552 }
553 EventExecutor executor = ctx.executor();
554 if (!executor.inEventLoop()) {
555 executor.execute(new Runnable() {
556 @Override
557 public void run() {
558
559
560
561 callHandlerAdded0(newCtx);
562 callHandlerRemoved0(ctx);
563 }
564 });
565 return ctx.handler();
566 }
567 }
568
569
570
571 callHandlerAdded0(newCtx);
572 callHandlerRemoved0(ctx);
573 return ctx.handler();
574 }
575
576 private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
577 AbstractChannelHandlerContext prev = oldCtx.prev;
578 AbstractChannelHandlerContext next = oldCtx.next;
579 newCtx.prev = prev;
580 newCtx.next = next;
581
582
583
584
585
586 prev.next = newCtx;
587 next.prev = newCtx;
588
589
590 oldCtx.prev = newCtx;
591 oldCtx.next = newCtx;
592 }
593
594 private static void checkMultiplicity(ChannelHandler handler) {
595 if (handler instanceof ChannelHandlerAdapter) {
596 ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
597 if (!h.isSharable() && h.added) {
598 throw new ChannelPipelineException(
599 h.getClass().getName() +
600 " is not a @Sharable handler, so can't be added or removed multiple times.");
601 }
602 h.added = true;
603 }
604 }
605
606 private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
607 try {
608 ctx.callHandlerAdded();
609 } catch (Throwable t) {
610 boolean removed = false;
611 try {
612 atomicRemoveFromHandlerList(ctx);
613 ctx.callHandlerRemoved();
614 removed = true;
615 } catch (Throwable t2) {
616 if (logger.isWarnEnabled()) {
617 logger.warn("Failed to remove a handler: " + ctx.name(), t2);
618 }
619 }
620
621 if (removed) {
622 fireExceptionCaught(new ChannelPipelineException(
623 ctx.handler().getClass().getName() +
624 ".handlerAdded() has thrown an exception; removed.", t));
625 } else {
626 fireExceptionCaught(new ChannelPipelineException(
627 ctx.handler().getClass().getName() +
628 ".handlerAdded() has thrown an exception; also failed to remove.", t));
629 }
630 }
631 }
632
633 private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
634
635 try {
636 ctx.callHandlerRemoved();
637 } catch (Throwable t) {
638 fireExceptionCaught(new ChannelPipelineException(
639 ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
640 }
641 }
642
643 final void invokeHandlerAddedIfNeeded() {
644 assert channel.eventLoop().inEventLoop();
645 if (firstRegistration) {
646 firstRegistration = false;
647
648
649 callHandlerAddedForAllHandlers();
650 }
651 }
652
653 @Override
654 public final ChannelHandler first() {
655 ChannelHandlerContext first = firstContext();
656 if (first == null) {
657 return null;
658 }
659 return first.handler();
660 }
661
662 @Override
663 public final ChannelHandlerContext firstContext() {
664 AbstractChannelHandlerContext first = head.next;
665 if (first == tail) {
666 return null;
667 }
668 return head.next;
669 }
670
671 @Override
672 public final ChannelHandler last() {
673 AbstractChannelHandlerContext last = tail.prev;
674 if (last == head) {
675 return null;
676 }
677 return last.handler();
678 }
679
680 @Override
681 public final ChannelHandlerContext lastContext() {
682 AbstractChannelHandlerContext last = tail.prev;
683 if (last == head) {
684 return null;
685 }
686 return last;
687 }
688
689 @Override
690 public final ChannelHandler get(String name) {
691 ChannelHandlerContext ctx = context(name);
692 if (ctx == null) {
693 return null;
694 } else {
695 return ctx.handler();
696 }
697 }
698
699 @SuppressWarnings("unchecked")
700 @Override
701 public final <T extends ChannelHandler> T get(Class<T> handlerType) {
702 ChannelHandlerContext ctx = context(handlerType);
703 if (ctx == null) {
704 return null;
705 } else {
706 return (T) ctx.handler();
707 }
708 }
709
710 @Override
711 public final ChannelHandlerContext context(String name) {
712 return context0(ObjectUtil.checkNotNull(name, "name"));
713 }
714
715 @Override
716 public final ChannelHandlerContext context(ChannelHandler handler) {
717 ObjectUtil.checkNotNull(handler, "handler");
718
719 AbstractChannelHandlerContext ctx = head.next;
720 for (;;) {
721
722 if (ctx == null) {
723 return null;
724 }
725
726 if (ctx.handler() == handler) {
727 return ctx;
728 }
729
730 ctx = ctx.next;
731 }
732 }
733
734 @Override
735 public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
736 ObjectUtil.checkNotNull(handlerType, "handlerType");
737
738 AbstractChannelHandlerContext ctx = head.next;
739 for (;;) {
740 if (ctx == null) {
741 return null;
742 }
743 if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
744 return ctx;
745 }
746 ctx = ctx.next;
747 }
748 }
749
750 @Override
751 public final List<String> names() {
752 List<String> list = new ArrayList<String>();
753 AbstractChannelHandlerContext ctx = head.next;
754 for (;;) {
755 if (ctx == null) {
756 return list;
757 }
758 list.add(ctx.name());
759 ctx = ctx.next;
760 }
761 }
762
763 @Override
764 public final Map<String, ChannelHandler> toMap() {
765 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
766 AbstractChannelHandlerContext ctx = head.next;
767 for (;;) {
768 if (ctx == tail) {
769 return map;
770 }
771 map.put(ctx.name(), ctx.handler());
772 ctx = ctx.next;
773 }
774 }
775
776 @Override
777 public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
778 return toMap().entrySet().iterator();
779 }
780
781
782
783
784 @Override
785 public final String toString() {
786 StringBuilder buf = new StringBuilder()
787 .append(StringUtil.simpleClassName(this))
788 .append('{');
789 AbstractChannelHandlerContext ctx = head.next;
790 for (;;) {
791 if (ctx == tail) {
792 break;
793 }
794
795 buf.append('(')
796 .append(ctx.name())
797 .append(" = ")
798 .append(ctx.handler().getClass().getName())
799 .append(')');
800
801 ctx = ctx.next;
802 if (ctx == tail) {
803 break;
804 }
805
806 buf.append(", ");
807 }
808 buf.append('}');
809 return buf.toString();
810 }
811
812 @Override
813 public final ChannelPipeline fireChannelRegistered() {
814 AbstractChannelHandlerContext.invokeChannelRegistered(head);
815 return this;
816 }
817
818 @Override
819 public final ChannelPipeline fireChannelUnregistered() {
820 AbstractChannelHandlerContext.invokeChannelUnregistered(head);
821 return this;
822 }
823
824
825
826
827
828
829
830
831
832
833
834 private synchronized void destroy() {
835 destroyUp(head.next, false);
836 }
837
838 private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
839 final Thread currentThread = Thread.currentThread();
840 final AbstractChannelHandlerContext tail = this.tail;
841 for (;;) {
842 if (ctx == tail) {
843 destroyDown(currentThread, tail.prev, inEventLoop);
844 break;
845 }
846
847 final EventExecutor executor = ctx.executor();
848 if (!inEventLoop && !executor.inEventLoop(currentThread)) {
849 final AbstractChannelHandlerContext finalCtx = ctx;
850 executor.execute(new Runnable() {
851 @Override
852 public void run() {
853 destroyUp(finalCtx, true);
854 }
855 });
856 break;
857 }
858
859 ctx = ctx.next;
860 inEventLoop = false;
861 }
862 }
863
864 private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
865
866 final AbstractChannelHandlerContext head = this.head;
867 for (;;) {
868 if (ctx == head) {
869 break;
870 }
871
872 final EventExecutor executor = ctx.executor();
873 if (inEventLoop || executor.inEventLoop(currentThread)) {
874 atomicRemoveFromHandlerList(ctx);
875 callHandlerRemoved0(ctx);
876 } else {
877 final AbstractChannelHandlerContext finalCtx = ctx;
878 executor.execute(new Runnable() {
879 @Override
880 public void run() {
881 destroyDown(Thread.currentThread(), finalCtx, true);
882 }
883 });
884 break;
885 }
886
887 ctx = ctx.prev;
888 inEventLoop = false;
889 }
890 }
891
892 @Override
893 public final ChannelPipeline fireChannelActive() {
894 AbstractChannelHandlerContext.invokeChannelActive(head);
895 return this;
896 }
897
898 @Override
899 public final ChannelPipeline fireChannelInactive() {
900 AbstractChannelHandlerContext.invokeChannelInactive(head);
901 return this;
902 }
903
904 @Override
905 public final ChannelPipeline fireExceptionCaught(Throwable cause) {
906 AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
907 return this;
908 }
909
910 @Override
911 public final ChannelPipeline fireUserEventTriggered(Object event) {
912 AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
913 return this;
914 }
915
916 @Override
917 public final ChannelPipeline fireChannelRead(Object msg) {
918 AbstractChannelHandlerContext.invokeChannelRead(head, msg);
919 return this;
920 }
921
922 @Override
923 public final ChannelPipeline fireChannelReadComplete() {
924 AbstractChannelHandlerContext.invokeChannelReadComplete(head);
925 return this;
926 }
927
928 @Override
929 public final ChannelPipeline fireChannelWritabilityChanged() {
930 AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
931 return this;
932 }
933
934 @Override
935 public final ChannelFuture bind(SocketAddress localAddress) {
936 return tail.bind(localAddress);
937 }
938
939 @Override
940 public final ChannelFuture connect(SocketAddress remoteAddress) {
941 return tail.connect(remoteAddress);
942 }
943
944 @Override
945 public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
946 return tail.connect(remoteAddress, localAddress);
947 }
948
949 @Override
950 public final ChannelFuture disconnect() {
951 return tail.disconnect();
952 }
953
954 @Override
955 public final ChannelFuture close() {
956 return tail.close();
957 }
958
959 @Override
960 public final ChannelFuture deregister() {
961 return tail.deregister();
962 }
963
964 @Override
965 public final ChannelPipeline flush() {
966 tail.flush();
967 return this;
968 }
969
970 @Override
971 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
972 return tail.bind(localAddress, promise);
973 }
974
975 @Override
976 public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
977 return tail.connect(remoteAddress, promise);
978 }
979
980 @Override
981 public final ChannelFuture connect(
982 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
983 return tail.connect(remoteAddress, localAddress, promise);
984 }
985
986 @Override
987 public final ChannelFuture disconnect(ChannelPromise promise) {
988 return tail.disconnect(promise);
989 }
990
991 @Override
992 public final ChannelFuture close(ChannelPromise promise) {
993 return tail.close(promise);
994 }
995
996 @Override
997 public final ChannelFuture deregister(final ChannelPromise promise) {
998 return tail.deregister(promise);
999 }
1000
1001 @Override
1002 public final ChannelPipeline read() {
1003 tail.read();
1004 return this;
1005 }
1006
1007 @Override
1008 public final ChannelFuture write(Object msg) {
1009 return tail.write(msg);
1010 }
1011
1012 @Override
1013 public final ChannelFuture write(Object msg, ChannelPromise promise) {
1014 return tail.write(msg, promise);
1015 }
1016
1017 @Override
1018 public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1019 return tail.writeAndFlush(msg, promise);
1020 }
1021
1022 @Override
1023 public final ChannelFuture writeAndFlush(Object msg) {
1024 return tail.writeAndFlush(msg);
1025 }
1026
1027 @Override
1028 public final ChannelPromise newPromise() {
1029 return new DefaultChannelPromise(channel);
1030 }
1031
1032 @Override
1033 public final ChannelProgressivePromise newProgressivePromise() {
1034 return new DefaultChannelProgressivePromise(channel);
1035 }
1036
1037 @Override
1038 public final ChannelFuture newSucceededFuture() {
1039 return succeededFuture;
1040 }
1041
1042 @Override
1043 public final ChannelFuture newFailedFuture(Throwable cause) {
1044 return new FailedChannelFuture(channel, null, cause);
1045 }
1046
1047 @Override
1048 public final ChannelPromise voidPromise() {
1049 return voidPromise;
1050 }
1051
1052 private void checkDuplicateName(String name) {
1053 if (context0(name) != null) {
1054 throw new IllegalArgumentException("Duplicate handler name: " + name);
1055 }
1056 }
1057
1058 private AbstractChannelHandlerContext context0(String name) {
1059 AbstractChannelHandlerContext context = head.next;
1060 while (context != tail) {
1061 if (context.name().equals(name)) {
1062 return context;
1063 }
1064 context = context.next;
1065 }
1066 return null;
1067 }
1068
1069 private AbstractChannelHandlerContext getContextOrDie(String name) {
1070 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1071 if (ctx == null) {
1072 throw new NoSuchElementException(name);
1073 } else {
1074 return ctx;
1075 }
1076 }
1077
1078 private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1079 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1080 if (ctx == null) {
1081 throw new NoSuchElementException(handler.getClass().getName());
1082 } else {
1083 return ctx;
1084 }
1085 }
1086
1087 private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1088 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1089 if (ctx == null) {
1090 throw new NoSuchElementException(handlerType.getName());
1091 } else {
1092 return ctx;
1093 }
1094 }
1095
1096 private void callHandlerAddedForAllHandlers() {
1097 final PendingHandlerCallback pendingHandlerCallbackHead;
1098 synchronized (this) {
1099 assert !registered;
1100
1101
1102 registered = true;
1103
1104 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1105
1106 this.pendingHandlerCallbackHead = null;
1107 }
1108
1109
1110
1111
1112 PendingHandlerCallback task = pendingHandlerCallbackHead;
1113 while (task != null) {
1114 task.execute();
1115 task = task.next;
1116 }
1117 }
1118
1119 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1120 assert !registered;
1121
1122 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1123 PendingHandlerCallback pending = pendingHandlerCallbackHead;
1124 if (pending == null) {
1125 pendingHandlerCallbackHead = task;
1126 } else {
1127
1128 while (pending.next != null) {
1129 pending = pending.next;
1130 }
1131 pending.next = task;
1132 }
1133 }
1134
1135 private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
1136 newCtx.setAddPending();
1137 executor.execute(new Runnable() {
1138 @Override
1139 public void run() {
1140 callHandlerAdded0(newCtx);
1141 }
1142 });
1143 }
1144
1145
1146
1147
1148
1149 protected void onUnhandledInboundException(Throwable cause) {
1150 try {
1151 logger.warn(
1152 "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1153 "It usually means the last handler in the pipeline did not handle the exception.",
1154 cause);
1155 } finally {
1156 ReferenceCountUtil.release(cause);
1157 }
1158 }
1159
1160
1161
1162
1163
1164 protected void onUnhandledInboundChannelActive() {
1165 }
1166
1167
1168
1169
1170
1171 protected void onUnhandledInboundChannelInactive() {
1172 }
1173
1174
1175
1176
1177
1178
1179 protected void onUnhandledInboundMessage(Object msg) {
1180 try {
1181 logger.debug(
1182 "Discarded inbound message {} that reached at the tail of the pipeline. " +
1183 "Please check your pipeline configuration.", msg);
1184 } finally {
1185 ReferenceCountUtil.release(msg);
1186 }
1187 }
1188
1189
1190
1191
1192
1193
1194 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1195 onUnhandledInboundMessage(msg);
1196 if (logger.isDebugEnabled()) {
1197 logger.debug("Discarded message pipeline : {}. Channel : {}.",
1198 ctx.pipeline().names(), ctx.channel());
1199 }
1200 }
1201
1202
1203
1204
1205
1206 protected void onUnhandledInboundChannelReadComplete() {
1207 }
1208
1209
1210
1211
1212
1213
1214 protected void onUnhandledInboundUserEventTriggered(Object evt) {
1215
1216
1217 ReferenceCountUtil.release(evt);
1218 }
1219
1220
1221
1222
1223
1224 protected void onUnhandledChannelWritabilityChanged() {
1225 }
1226
1227 protected void incrementPendingOutboundBytes(long size) {
1228 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1229 if (buffer != null) {
1230 buffer.incrementPendingOutboundBytes(size);
1231 }
1232 }
1233
1234 protected void decrementPendingOutboundBytes(long size) {
1235 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1236 if (buffer != null) {
1237 buffer.decrementPendingOutboundBytes(size);
1238 }
1239 }
1240
1241
1242 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1243
1244 TailContext(DefaultChannelPipeline pipeline) {
1245 super(pipeline, null, TAIL_NAME, TailContext.class);
1246 setAddComplete();
1247 }
1248
1249 @Override
1250 public ChannelHandler handler() {
1251 return this;
1252 }
1253
1254 @Override
1255 public void channelRegistered(ChannelHandlerContext ctx) { }
1256
1257 @Override
1258 public void channelUnregistered(ChannelHandlerContext ctx) { }
1259
1260 @Override
1261 public void channelActive(ChannelHandlerContext ctx) {
1262 onUnhandledInboundChannelActive();
1263 }
1264
1265 @Override
1266 public void channelInactive(ChannelHandlerContext ctx) {
1267 onUnhandledInboundChannelInactive();
1268 }
1269
1270 @Override
1271 public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1272 onUnhandledChannelWritabilityChanged();
1273 }
1274
1275 @Override
1276 public void handlerAdded(ChannelHandlerContext ctx) { }
1277
1278 @Override
1279 public void handlerRemoved(ChannelHandlerContext ctx) { }
1280
1281 @Override
1282 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1283 onUnhandledInboundUserEventTriggered(evt);
1284 }
1285
1286 @Override
1287 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1288 onUnhandledInboundException(cause);
1289 }
1290
1291 @Override
1292 public void channelRead(ChannelHandlerContext ctx, Object msg) {
1293 onUnhandledInboundMessage(ctx, msg);
1294 }
1295
1296 @Override
1297 public void channelReadComplete(ChannelHandlerContext ctx) {
1298 onUnhandledInboundChannelReadComplete();
1299 }
1300 }
1301
1302 final class HeadContext extends AbstractChannelHandlerContext
1303 implements ChannelOutboundHandler, ChannelInboundHandler {
1304
1305 private final Unsafe unsafe;
1306
1307 HeadContext(DefaultChannelPipeline pipeline) {
1308 super(pipeline, null, HEAD_NAME, HeadContext.class);
1309 unsafe = pipeline.channel().unsafe();
1310 setAddComplete();
1311 }
1312
1313 @Override
1314 public ChannelHandler handler() {
1315 return this;
1316 }
1317
1318 @Override
1319 public void handlerAdded(ChannelHandlerContext ctx) {
1320
1321 }
1322
1323 @Override
1324 public void handlerRemoved(ChannelHandlerContext ctx) {
1325
1326 }
1327
1328 @Override
1329 public void bind(
1330 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
1331 unsafe.bind(localAddress, promise);
1332 }
1333
1334 @Override
1335 public void connect(
1336 ChannelHandlerContext ctx,
1337 SocketAddress remoteAddress, SocketAddress localAddress,
1338 ChannelPromise promise) {
1339 unsafe.connect(remoteAddress, localAddress, promise);
1340 }
1341
1342 @Override
1343 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
1344 unsafe.disconnect(promise);
1345 }
1346
1347 @Override
1348 public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
1349 unsafe.close(promise);
1350 }
1351
1352 @Override
1353 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
1354 unsafe.deregister(promise);
1355 }
1356
1357 @Override
1358 public void read(ChannelHandlerContext ctx) {
1359 unsafe.beginRead();
1360 }
1361
1362 @Override
1363 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1364 unsafe.write(msg, promise);
1365 }
1366
1367 @Override
1368 public void flush(ChannelHandlerContext ctx) {
1369 unsafe.flush();
1370 }
1371
1372 @Override
1373 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1374 ctx.fireExceptionCaught(cause);
1375 }
1376
1377 @Override
1378 public void channelRegistered(ChannelHandlerContext ctx) {
1379 invokeHandlerAddedIfNeeded();
1380 ctx.fireChannelRegistered();
1381 }
1382
1383 @Override
1384 public void channelUnregistered(ChannelHandlerContext ctx) {
1385 ctx.fireChannelUnregistered();
1386
1387
1388 if (!channel.isOpen()) {
1389 destroy();
1390 }
1391 }
1392
1393 @Override
1394 public void channelActive(ChannelHandlerContext ctx) {
1395 ctx.fireChannelActive();
1396
1397 readIfIsAutoRead();
1398 }
1399
1400 @Override
1401 public void channelInactive(ChannelHandlerContext ctx) {
1402 ctx.fireChannelInactive();
1403 }
1404
1405 @Override
1406 public void channelRead(ChannelHandlerContext ctx, Object msg) {
1407 ctx.fireChannelRead(msg);
1408 }
1409
1410 @Override
1411 public void channelReadComplete(ChannelHandlerContext ctx) {
1412 ctx.fireChannelReadComplete();
1413
1414 readIfIsAutoRead();
1415 }
1416
1417 private void readIfIsAutoRead() {
1418 if (channel.config().isAutoRead()) {
1419 channel.read();
1420 }
1421 }
1422
1423 @Override
1424 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1425 ctx.fireUserEventTriggered(evt);
1426 }
1427
1428 @Override
1429 public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1430 ctx.fireChannelWritabilityChanged();
1431 }
1432 }
1433
1434 private abstract static class PendingHandlerCallback implements Runnable {
1435 final AbstractChannelHandlerContext ctx;
1436 PendingHandlerCallback next;
1437
1438 PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1439 this.ctx = ctx;
1440 }
1441
1442 abstract void execute();
1443 }
1444
1445 private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1446
1447 PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1448 super(ctx);
1449 }
1450
1451 @Override
1452 public void run() {
1453 callHandlerAdded0(ctx);
1454 }
1455
1456 @Override
1457 void execute() {
1458 EventExecutor executor = ctx.executor();
1459 if (executor.inEventLoop()) {
1460 callHandlerAdded0(ctx);
1461 } else {
1462 try {
1463 executor.execute(this);
1464 } catch (RejectedExecutionException e) {
1465 if (logger.isWarnEnabled()) {
1466 logger.warn(
1467 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1468 executor, ctx.name(), e);
1469 }
1470 atomicRemoveFromHandlerList(ctx);
1471 ctx.setRemoved();
1472 }
1473 }
1474 }
1475 }
1476
1477 private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1478
1479 PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1480 super(ctx);
1481 }
1482
1483 @Override
1484 public void run() {
1485 callHandlerRemoved0(ctx);
1486 }
1487
1488 @Override
1489 void execute() {
1490 EventExecutor executor = ctx.executor();
1491 if (executor.inEventLoop()) {
1492 callHandlerRemoved0(ctx);
1493 } else {
1494 try {
1495 executor.execute(this);
1496 } catch (RejectedExecutionException e) {
1497 if (logger.isWarnEnabled()) {
1498 logger.warn(
1499 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1500 " removing handler {}.", executor, ctx.name(), e);
1501 }
1502
1503 ctx.setRemoved();
1504 }
1505 }
1506 }
1507 }
1508 }