1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.channel.Channel.Unsafe;
19 import io.netty.util.ReferenceCountUtil;
20 import io.netty.util.ResourceLeakDetector;
21 import io.netty.util.concurrent.EventExecutor;
22 import io.netty.util.concurrent.EventExecutorGroup;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.internal.ObjectUtil;
25 import io.netty.util.internal.StringUtil;
26 import io.netty.util.internal.logging.InternalLogger;
27 import io.netty.util.internal.logging.InternalLoggerFactory;
28
29 import java.net.SocketAddress;
30 import java.util.ArrayList;
31 import java.util.IdentityHashMap;
32 import java.util.Iterator;
33 import java.util.LinkedHashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.NoSuchElementException;
37 import java.util.WeakHashMap;
38 import java.util.concurrent.RejectedExecutionException;
39 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
40
41
42
43
44
45 public class DefaultChannelPipeline implements ChannelPipeline {
46
47 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
48
49 private static final String HEAD_NAME = generateName0(HeadContext.class);
50 private static final String TAIL_NAME = generateName0(TailContext.class);
51
52 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
53 new FastThreadLocal<Map<Class<?>, String>>() {
54 @Override
55 protected Map<Class<?>, String> initialValue() {
56 return new WeakHashMap<Class<?>, String>();
57 }
58 };
59
60 private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
61 AtomicReferenceFieldUpdater.newUpdater(
62 DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
63 final HeadContext head;
64 final TailContext tail;
65
66 private final Channel channel;
67 private final ChannelFuture succeededFuture;
68 private final VoidChannelPromise voidPromise;
69 private final boolean touch = ResourceLeakDetector.isEnabled();
70
71 private Map<EventExecutorGroup, EventExecutor> childExecutors;
72 private volatile MessageSizeEstimator.Handle estimatorHandle;
73 private boolean firstRegistration = true;
74
75
76
77
78
79
80
81
82
83 private PendingHandlerCallback pendingHandlerCallbackHead;
84
85
86
87
88
89 private boolean registered;
90
91 protected DefaultChannelPipeline(Channel channel) {
92 this.channel = ObjectUtil.checkNotNull(channel, "channel");
93 succeededFuture = new SucceededChannelFuture(channel, null);
94 voidPromise = new VoidChannelPromise(channel, true);
95
96 tail = new TailContext(this);
97 head = new HeadContext(this);
98
99 head.next = tail;
100 tail.prev = head;
101 }
102
103 final MessageSizeEstimator.Handle estimatorHandle() {
104 MessageSizeEstimator.Handle handle = estimatorHandle;
105 if (handle == null) {
106 handle = channel.config().getMessageSizeEstimator().newHandle();
107 if (!ESTIMATOR.compareAndSet(this, null, handle)) {
108 handle = estimatorHandle;
109 }
110 }
111 return handle;
112 }
113
114 final Object touch(Object msg, AbstractChannelHandlerContext next) {
115 return touch ? ReferenceCountUtil.touch(msg, next) : msg;
116 }
117
118 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
119 return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
120 }
121
122 private EventExecutor childExecutor(EventExecutorGroup group) {
123 if (group == null) {
124 return null;
125 }
126 Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
127 if (pinEventExecutor != null && !pinEventExecutor) {
128 return group.next();
129 }
130 Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
131 if (childExecutors == null) {
132
133 childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
134 }
135
136
137 EventExecutor childExecutor = childExecutors.get(group);
138 if (childExecutor == null) {
139 childExecutor = group.next();
140 childExecutors.put(group, childExecutor);
141 }
142 return childExecutor;
143 }
144 @Override
145 public final Channel channel() {
146 return channel;
147 }
148
149 @Override
150 public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
151 return addFirst(null, name, handler);
152 }
153
154 private enum AddStrategy {
155 ADD_FIRST,
156 ADD_LAST,
157 ADD_BEFORE,
158 ADD_AFTER;
159 }
160
161 private ChannelPipeline internalAdd(EventExecutorGroup group, String name,
162 ChannelHandler handler, String baseName,
163 AddStrategy addStrategy) {
164 final AbstractChannelHandlerContext newCtx;
165 synchronized (this) {
166 checkMultiplicity(handler);
167 name = filterName(name, handler);
168
169 newCtx = newContext(group, name, handler);
170
171 switch (addStrategy) {
172 case ADD_FIRST:
173 addFirst0(newCtx);
174 break;
175 case ADD_LAST:
176 addLast0(newCtx);
177 break;
178 case ADD_BEFORE:
179 addBefore0(getContextOrDie(baseName), newCtx);
180 break;
181 case ADD_AFTER:
182 addAfter0(getContextOrDie(baseName), newCtx);
183 break;
184 default:
185 throw new IllegalArgumentException("unknown add strategy: " + addStrategy);
186 }
187
188
189
190
191 if (!registered) {
192 newCtx.setAddPending();
193 callHandlerCallbackLater(newCtx, true);
194 return this;
195 }
196
197 EventExecutor executor = newCtx.executor();
198 if (!executor.inEventLoop()) {
199 callHandlerAddedInEventLoop(newCtx, executor);
200 return this;
201 }
202 }
203 callHandlerAdded0(newCtx);
204 return this;
205 }
206
207 @Override
208 public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
209 return internalAdd(group, name, handler, null, AddStrategy.ADD_FIRST);
210 }
211
212 private void addFirst0(AbstractChannelHandlerContext newCtx) {
213 AbstractChannelHandlerContext nextCtx = head.next;
214 newCtx.prev = head;
215 newCtx.next = nextCtx;
216 head.next = newCtx;
217 nextCtx.prev = newCtx;
218 }
219
220 @Override
221 public final ChannelPipeline addLast(String name, ChannelHandler handler) {
222 return addLast(null, name, handler);
223 }
224
225 @Override
226 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
227 return internalAdd(group, name, handler, null, AddStrategy.ADD_LAST);
228 }
229
230 private void addLast0(AbstractChannelHandlerContext newCtx) {
231 AbstractChannelHandlerContext prev = tail.prev;
232 newCtx.prev = prev;
233 newCtx.next = tail;
234 prev.next = newCtx;
235 tail.prev = newCtx;
236 }
237
238 @Override
239 public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
240 return addBefore(null, baseName, name, handler);
241 }
242
243 @Override
244 public final ChannelPipeline addBefore(
245 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
246 return internalAdd(group, name, handler, baseName, AddStrategy.ADD_BEFORE);
247 }
248
249 private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
250 newCtx.prev = ctx.prev;
251 newCtx.next = ctx;
252 ctx.prev.next = newCtx;
253 ctx.prev = newCtx;
254 }
255
256 private String filterName(String name, ChannelHandler handler) {
257 if (name == null) {
258 return generateName(handler);
259 }
260 checkDuplicateName(name);
261 return name;
262 }
263
264 @Override
265 public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
266 return addAfter(null, baseName, name, handler);
267 }
268
269 @Override
270 public final ChannelPipeline addAfter(
271 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
272 return internalAdd(group, name, handler, baseName, AddStrategy.ADD_AFTER);
273 }
274
275 private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
276 newCtx.prev = ctx;
277 newCtx.next = ctx.next;
278 ctx.next.prev = newCtx;
279 ctx.next = newCtx;
280 }
281
282 public final ChannelPipeline addFirst(ChannelHandler handler) {
283 return addFirst(null, handler);
284 }
285
286 @Override
287 public final ChannelPipeline addFirst(ChannelHandler... handlers) {
288 return addFirst(null, handlers);
289 }
290
291 @Override
292 public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
293 ObjectUtil.checkNotNull(handlers, "handlers");
294 if (handlers.length == 0 || handlers[0] == null) {
295 return this;
296 }
297
298 int size;
299 for (size = 1; size < handlers.length; size ++) {
300 if (handlers[size] == null) {
301 break;
302 }
303 }
304
305 for (int i = size - 1; i >= 0; i --) {
306 ChannelHandler h = handlers[i];
307 addFirst(executor, null, h);
308 }
309
310 return this;
311 }
312
313 public final ChannelPipeline addLast(ChannelHandler handler) {
314 return addLast(null, handler);
315 }
316
317 @Override
318 public final ChannelPipeline addLast(ChannelHandler... handlers) {
319 return addLast(null, handlers);
320 }
321
322 @Override
323 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
324 ObjectUtil.checkNotNull(handlers, "handlers");
325
326 for (ChannelHandler h: handlers) {
327 if (h == null) {
328 break;
329 }
330 addLast(executor, null, h);
331 }
332
333 return this;
334 }
335
336 private String generateName(ChannelHandler handler) {
337 Map<Class<?>, String> cache = nameCaches.get();
338 Class<?> handlerType = handler.getClass();
339 String name = cache.get(handlerType);
340 if (name == null) {
341 name = generateName0(handlerType);
342 cache.put(handlerType, name);
343 }
344
345
346
347 if (context0(name) != null) {
348 String baseName = name.substring(0, name.length() - 1);
349 for (int i = 1;; i ++) {
350 String newName = baseName + i;
351 if (context0(newName) == null) {
352 name = newName;
353 break;
354 }
355 }
356 }
357 return name;
358 }
359
360 private static String generateName0(Class<?> handlerType) {
361 return StringUtil.simpleClassName(handlerType) + "#0";
362 }
363
364 @Override
365 public final ChannelPipeline remove(ChannelHandler handler) {
366 remove(getContextOrDie(handler));
367 return this;
368 }
369
370 @Override
371 public final ChannelHandler remove(String name) {
372 return remove(getContextOrDie(name)).handler();
373 }
374
375 @SuppressWarnings("unchecked")
376 @Override
377 public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
378 return (T) remove(getContextOrDie(handlerType)).handler();
379 }
380
381 public final <T extends ChannelHandler> T removeIfExists(String name) {
382 return removeIfExists(context(name));
383 }
384
385 public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
386 return removeIfExists(context(handlerType));
387 }
388
389 public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
390 return removeIfExists(context(handler));
391 }
392
393 @SuppressWarnings("unchecked")
394 private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
395 if (ctx == null) {
396 return null;
397 }
398 return (T) remove((AbstractChannelHandlerContext) ctx).handler();
399 }
400
401 private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
402 assert ctx != head && ctx != tail;
403
404 synchronized (this) {
405 atomicRemoveFromHandlerList(ctx);
406
407
408
409
410 if (!registered) {
411 callHandlerCallbackLater(ctx, false);
412 return ctx;
413 }
414
415 EventExecutor executor = ctx.executor();
416 if (!executor.inEventLoop()) {
417 executor.execute(new Runnable() {
418 @Override
419 public void run() {
420 callHandlerRemoved0(ctx);
421 }
422 });
423 return ctx;
424 }
425 }
426 callHandlerRemoved0(ctx);
427 return ctx;
428 }
429
430
431
432
433 private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
434 AbstractChannelHandlerContext prev = ctx.prev;
435 AbstractChannelHandlerContext next = ctx.next;
436 prev.next = next;
437 next.prev = prev;
438 }
439
440 @Override
441 public final ChannelHandler removeFirst() {
442 if (head.next == tail) {
443 throw new NoSuchElementException();
444 }
445 return remove(head.next).handler();
446 }
447
448 @Override
449 public final ChannelHandler removeLast() {
450 if (head.next == tail) {
451 throw new NoSuchElementException();
452 }
453 return remove(tail.prev).handler();
454 }
455
456 @Override
457 public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
458 replace(getContextOrDie(oldHandler), newName, newHandler);
459 return this;
460 }
461
462 @Override
463 public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
464 return replace(getContextOrDie(oldName), newName, newHandler);
465 }
466
467 @Override
468 @SuppressWarnings("unchecked")
469 public final <T extends ChannelHandler> T replace(
470 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
471 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
472 }
473
474 private ChannelHandler replace(
475 final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
476 assert ctx != head && ctx != tail;
477
478 final AbstractChannelHandlerContext newCtx;
479 synchronized (this) {
480 checkMultiplicity(newHandler);
481 if (newName == null) {
482 newName = generateName(newHandler);
483 } else {
484 boolean sameName = ctx.name().equals(newName);
485 if (!sameName) {
486 checkDuplicateName(newName);
487 }
488 }
489
490 newCtx = newContext(ctx.executor, newName, newHandler);
491
492 replace0(ctx, newCtx);
493
494
495
496
497
498 if (!registered) {
499 callHandlerCallbackLater(newCtx, true);
500 callHandlerCallbackLater(ctx, false);
501 return ctx.handler();
502 }
503 EventExecutor executor = ctx.executor();
504 if (!executor.inEventLoop()) {
505 executor.execute(new Runnable() {
506 @Override
507 public void run() {
508
509
510
511 callHandlerAdded0(newCtx);
512 callHandlerRemoved0(ctx);
513 }
514 });
515 return ctx.handler();
516 }
517 }
518
519
520
521 callHandlerAdded0(newCtx);
522 callHandlerRemoved0(ctx);
523 return ctx.handler();
524 }
525
526 private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
527 AbstractChannelHandlerContext prev = oldCtx.prev;
528 AbstractChannelHandlerContext next = oldCtx.next;
529 newCtx.prev = prev;
530 newCtx.next = next;
531
532
533
534
535
536 prev.next = newCtx;
537 next.prev = newCtx;
538
539
540 oldCtx.prev = newCtx;
541 oldCtx.next = newCtx;
542 }
543
544 private static void checkMultiplicity(ChannelHandler handler) {
545 if (handler instanceof ChannelHandlerAdapter) {
546 ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
547 if (!h.isSharable() && h.added) {
548 throw new ChannelPipelineException(
549 h.getClass().getName() +
550 " is not a @Sharable handler, so can't be added or removed multiple times.");
551 }
552 h.added = true;
553 }
554 }
555
556 private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
557 try {
558 ctx.callHandlerAdded();
559 } catch (Throwable t) {
560 boolean removed = false;
561 try {
562 atomicRemoveFromHandlerList(ctx);
563 ctx.callHandlerRemoved();
564 removed = true;
565 } catch (Throwable t2) {
566 if (logger.isWarnEnabled()) {
567 logger.warn("Failed to remove a handler: " + ctx.name(), t2);
568 }
569 }
570
571 if (removed) {
572 fireExceptionCaught(new ChannelPipelineException(
573 ctx.handler().getClass().getName() +
574 ".handlerAdded() has thrown an exception; removed.", t));
575 } else {
576 fireExceptionCaught(new ChannelPipelineException(
577 ctx.handler().getClass().getName() +
578 ".handlerAdded() has thrown an exception; also failed to remove.", t));
579 }
580 }
581 }
582
583 private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
584
585 try {
586 ctx.callHandlerRemoved();
587 } catch (Throwable t) {
588 fireExceptionCaught(new ChannelPipelineException(
589 ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
590 }
591 }
592
593 final void invokeHandlerAddedIfNeeded() {
594 assert channel.eventLoop().inEventLoop();
595 if (firstRegistration) {
596 firstRegistration = false;
597
598
599 callHandlerAddedForAllHandlers();
600 }
601 }
602
603 @Override
604 public final ChannelHandler first() {
605 ChannelHandlerContext first = firstContext();
606 if (first == null) {
607 return null;
608 }
609 return first.handler();
610 }
611
612 @Override
613 public final ChannelHandlerContext firstContext() {
614 AbstractChannelHandlerContext first = head.next;
615 if (first == tail) {
616 return null;
617 }
618 return head.next;
619 }
620
621 @Override
622 public final ChannelHandler last() {
623 AbstractChannelHandlerContext last = tail.prev;
624 if (last == head) {
625 return null;
626 }
627 return last.handler();
628 }
629
630 @Override
631 public final ChannelHandlerContext lastContext() {
632 AbstractChannelHandlerContext last = tail.prev;
633 if (last == head) {
634 return null;
635 }
636 return last;
637 }
638
639 @Override
640 public final ChannelHandler get(String name) {
641 ChannelHandlerContext ctx = context(name);
642 if (ctx == null) {
643 return null;
644 } else {
645 return ctx.handler();
646 }
647 }
648
649 @SuppressWarnings("unchecked")
650 @Override
651 public final <T extends ChannelHandler> T get(Class<T> handlerType) {
652 ChannelHandlerContext ctx = context(handlerType);
653 if (ctx == null) {
654 return null;
655 } else {
656 return (T) ctx.handler();
657 }
658 }
659
660 @Override
661 public final ChannelHandlerContext context(String name) {
662 return context0(ObjectUtil.checkNotNull(name, "name"));
663 }
664
665 @Override
666 public final ChannelHandlerContext context(ChannelHandler handler) {
667 ObjectUtil.checkNotNull(handler, "handler");
668
669 AbstractChannelHandlerContext ctx = head.next;
670 for (;;) {
671
672 if (ctx == null) {
673 return null;
674 }
675
676 if (ctx.handler() == handler) {
677 return ctx;
678 }
679
680 ctx = ctx.next;
681 }
682 }
683
684 @Override
685 public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
686 ObjectUtil.checkNotNull(handlerType, "handlerType");
687
688 AbstractChannelHandlerContext ctx = head.next;
689 for (;;) {
690 if (ctx == null) {
691 return null;
692 }
693 if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
694 return ctx;
695 }
696 ctx = ctx.next;
697 }
698 }
699
700 @Override
701 public final List<String> names() {
702 List<String> list = new ArrayList<String>();
703 AbstractChannelHandlerContext ctx = head.next;
704 for (;;) {
705 if (ctx == null) {
706 return list;
707 }
708 list.add(ctx.name());
709 ctx = ctx.next;
710 }
711 }
712
713 @Override
714 public final Map<String, ChannelHandler> toMap() {
715 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
716 AbstractChannelHandlerContext ctx = head.next;
717 for (;;) {
718 if (ctx == tail) {
719 return map;
720 }
721 map.put(ctx.name(), ctx.handler());
722 ctx = ctx.next;
723 }
724 }
725
726 @Override
727 public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
728 return toMap().entrySet().iterator();
729 }
730
731
732
733
734 @Override
735 public final String toString() {
736 StringBuilder buf = new StringBuilder()
737 .append(StringUtil.simpleClassName(this))
738 .append('{');
739 AbstractChannelHandlerContext ctx = head.next;
740 for (;;) {
741 if (ctx == tail) {
742 break;
743 }
744
745 buf.append('(')
746 .append(ctx.name())
747 .append(" = ")
748 .append(ctx.handler().getClass().getName())
749 .append(')');
750
751 ctx = ctx.next;
752 if (ctx == tail) {
753 break;
754 }
755
756 buf.append(", ");
757 }
758 buf.append('}');
759 return buf.toString();
760 }
761
762 @Override
763 public final ChannelPipeline fireChannelRegistered() {
764 AbstractChannelHandlerContext.invokeChannelRegistered(head);
765 return this;
766 }
767
768 @Override
769 public final ChannelPipeline fireChannelUnregistered() {
770 AbstractChannelHandlerContext.invokeChannelUnregistered(head);
771 return this;
772 }
773
774
775
776
777
778
779
780
781
782
783
784 private synchronized void destroy() {
785 destroyUp(head.next, false);
786 }
787
788 private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
789 final Thread currentThread = Thread.currentThread();
790 final AbstractChannelHandlerContext tail = this.tail;
791 for (;;) {
792 if (ctx == tail) {
793 destroyDown(currentThread, tail.prev, inEventLoop);
794 break;
795 }
796
797 final EventExecutor executor = ctx.executor();
798 if (!inEventLoop && !executor.inEventLoop(currentThread)) {
799 final AbstractChannelHandlerContext finalCtx = ctx;
800 executor.execute(new Runnable() {
801 @Override
802 public void run() {
803 destroyUp(finalCtx, true);
804 }
805 });
806 break;
807 }
808
809 ctx = ctx.next;
810 inEventLoop = false;
811 }
812 }
813
814 private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
815
816 final AbstractChannelHandlerContext head = this.head;
817 for (;;) {
818 if (ctx == head) {
819 break;
820 }
821
822 final EventExecutor executor = ctx.executor();
823 if (inEventLoop || executor.inEventLoop(currentThread)) {
824 atomicRemoveFromHandlerList(ctx);
825 callHandlerRemoved0(ctx);
826 } else {
827 final AbstractChannelHandlerContext finalCtx = ctx;
828 executor.execute(new Runnable() {
829 @Override
830 public void run() {
831 destroyDown(Thread.currentThread(), finalCtx, true);
832 }
833 });
834 break;
835 }
836
837 ctx = ctx.prev;
838 inEventLoop = false;
839 }
840 }
841
842 @Override
843 public final ChannelPipeline fireChannelActive() {
844 AbstractChannelHandlerContext.invokeChannelActive(head);
845 return this;
846 }
847
848 @Override
849 public final ChannelPipeline fireChannelInactive() {
850 AbstractChannelHandlerContext.invokeChannelInactive(head);
851 return this;
852 }
853
854 @Override
855 public final ChannelPipeline fireExceptionCaught(Throwable cause) {
856 AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
857 return this;
858 }
859
860 @Override
861 public final ChannelPipeline fireUserEventTriggered(Object event) {
862 AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
863 return this;
864 }
865
866 @Override
867 public final ChannelPipeline fireChannelRead(Object msg) {
868 AbstractChannelHandlerContext.invokeChannelRead(head, msg);
869 return this;
870 }
871
872 @Override
873 public final ChannelPipeline fireChannelReadComplete() {
874 AbstractChannelHandlerContext.invokeChannelReadComplete(head);
875 return this;
876 }
877
878 @Override
879 public final ChannelPipeline fireChannelWritabilityChanged() {
880 AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
881 return this;
882 }
883
884 @Override
885 public final ChannelFuture bind(SocketAddress localAddress) {
886 return tail.bind(localAddress);
887 }
888
889 @Override
890 public final ChannelFuture connect(SocketAddress remoteAddress) {
891 return tail.connect(remoteAddress);
892 }
893
894 @Override
895 public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
896 return tail.connect(remoteAddress, localAddress);
897 }
898
899 @Override
900 public final ChannelFuture disconnect() {
901 return tail.disconnect();
902 }
903
904 @Override
905 public final ChannelFuture close() {
906 return tail.close();
907 }
908
909 @Override
910 public final ChannelFuture deregister() {
911 return tail.deregister();
912 }
913
914 @Override
915 public final ChannelPipeline flush() {
916 tail.flush();
917 return this;
918 }
919
920 @Override
921 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
922 return tail.bind(localAddress, promise);
923 }
924
925 @Override
926 public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
927 return tail.connect(remoteAddress, promise);
928 }
929
930 @Override
931 public final ChannelFuture connect(
932 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
933 return tail.connect(remoteAddress, localAddress, promise);
934 }
935
936 @Override
937 public final ChannelFuture disconnect(ChannelPromise promise) {
938 return tail.disconnect(promise);
939 }
940
941 @Override
942 public final ChannelFuture close(ChannelPromise promise) {
943 return tail.close(promise);
944 }
945
946 @Override
947 public final ChannelFuture deregister(final ChannelPromise promise) {
948 return tail.deregister(promise);
949 }
950
951 @Override
952 public final ChannelPipeline read() {
953 tail.read();
954 return this;
955 }
956
957 @Override
958 public final ChannelFuture write(Object msg) {
959 return tail.write(msg);
960 }
961
962 @Override
963 public final ChannelFuture write(Object msg, ChannelPromise promise) {
964 return tail.write(msg, promise);
965 }
966
967 @Override
968 public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
969 return tail.writeAndFlush(msg, promise);
970 }
971
972 @Override
973 public final ChannelFuture writeAndFlush(Object msg) {
974 return tail.writeAndFlush(msg);
975 }
976
977 @Override
978 public final ChannelPromise newPromise() {
979 return new DefaultChannelPromise(channel);
980 }
981
982 @Override
983 public final ChannelProgressivePromise newProgressivePromise() {
984 return new DefaultChannelProgressivePromise(channel);
985 }
986
987 @Override
988 public final ChannelFuture newSucceededFuture() {
989 return succeededFuture;
990 }
991
992 @Override
993 public final ChannelFuture newFailedFuture(Throwable cause) {
994 return new FailedChannelFuture(channel, null, cause);
995 }
996
997 @Override
998 public final ChannelPromise voidPromise() {
999 return voidPromise;
1000 }
1001
1002 private void checkDuplicateName(String name) {
1003 if (context0(name) != null) {
1004 throw new IllegalArgumentException("Duplicate handler name: " + name);
1005 }
1006 }
1007
1008 private AbstractChannelHandlerContext context0(String name) {
1009 AbstractChannelHandlerContext context = head.next;
1010 while (context != tail) {
1011 if (context.name().equals(name)) {
1012 return context;
1013 }
1014 context = context.next;
1015 }
1016 return null;
1017 }
1018
1019 private AbstractChannelHandlerContext getContextOrDie(String name) {
1020 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1021 if (ctx == null) {
1022 throw new NoSuchElementException(name);
1023 } else {
1024 return ctx;
1025 }
1026 }
1027
1028 private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1029 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1030 if (ctx == null) {
1031 throw new NoSuchElementException(handler.getClass().getName());
1032 } else {
1033 return ctx;
1034 }
1035 }
1036
1037 private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1038 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1039 if (ctx == null) {
1040 throw new NoSuchElementException(handlerType.getName());
1041 } else {
1042 return ctx;
1043 }
1044 }
1045
1046 private void callHandlerAddedForAllHandlers() {
1047 final PendingHandlerCallback pendingHandlerCallbackHead;
1048 synchronized (this) {
1049 assert !registered;
1050
1051
1052 registered = true;
1053
1054 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1055
1056 this.pendingHandlerCallbackHead = null;
1057 }
1058
1059
1060
1061
1062 PendingHandlerCallback task = pendingHandlerCallbackHead;
1063 while (task != null) {
1064 task.execute();
1065 task = task.next;
1066 }
1067 }
1068
1069 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1070 assert !registered;
1071
1072 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1073 PendingHandlerCallback pending = pendingHandlerCallbackHead;
1074 if (pending == null) {
1075 pendingHandlerCallbackHead = task;
1076 } else {
1077
1078 while (pending.next != null) {
1079 pending = pending.next;
1080 }
1081 pending.next = task;
1082 }
1083 }
1084
1085 private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
1086 newCtx.setAddPending();
1087 executor.execute(new Runnable() {
1088 @Override
1089 public void run() {
1090 callHandlerAdded0(newCtx);
1091 }
1092 });
1093 }
1094
1095
1096
1097
1098
1099 protected void onUnhandledInboundException(Throwable cause) {
1100 try {
1101 logger.warn(
1102 "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1103 "It usually means the last handler in the pipeline did not handle the exception.",
1104 cause);
1105 } finally {
1106 ReferenceCountUtil.release(cause);
1107 }
1108 }
1109
1110
1111
1112
1113
1114 protected void onUnhandledInboundChannelActive() {
1115 }
1116
1117
1118
1119
1120
1121 protected void onUnhandledInboundChannelInactive() {
1122 }
1123
1124
1125
1126
1127
1128
1129 protected void onUnhandledInboundMessage(Object msg) {
1130 try {
1131 logger.debug(
1132 "Discarded inbound message {} that reached at the tail of the pipeline. " +
1133 "Please check your pipeline configuration.", msg);
1134 } finally {
1135 ReferenceCountUtil.release(msg);
1136 }
1137 }
1138
1139
1140
1141
1142
1143
1144 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1145 onUnhandledInboundMessage(msg);
1146 if (logger.isDebugEnabled()) {
1147 logger.debug("Discarded message pipeline : {}. Channel : {}.",
1148 ctx.pipeline().names(), ctx.channel());
1149 }
1150 }
1151
1152
1153
1154
1155
1156 protected void onUnhandledInboundChannelReadComplete() {
1157 }
1158
1159
1160
1161
1162
1163
1164 protected void onUnhandledInboundUserEventTriggered(Object evt) {
1165
1166
1167 ReferenceCountUtil.release(evt);
1168 }
1169
1170
1171
1172
1173
1174 protected void onUnhandledChannelWritabilityChanged() {
1175 }
1176
1177 protected void incrementPendingOutboundBytes(long size) {
1178 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1179 if (buffer != null) {
1180 buffer.incrementPendingOutboundBytes(size);
1181 }
1182 }
1183
1184 protected void decrementPendingOutboundBytes(long size) {
1185 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1186 if (buffer != null) {
1187 buffer.decrementPendingOutboundBytes(size);
1188 }
1189 }
1190
1191
1192 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1193
1194 TailContext(DefaultChannelPipeline pipeline) {
1195 super(pipeline, null, TAIL_NAME, TailContext.class);
1196 setAddComplete();
1197 }
1198
1199 @Override
1200 public ChannelHandler handler() {
1201 return this;
1202 }
1203
1204 @Override
1205 public void channelRegistered(ChannelHandlerContext ctx) { }
1206
1207 @Override
1208 public void channelUnregistered(ChannelHandlerContext ctx) { }
1209
1210 @Override
1211 public void channelActive(ChannelHandlerContext ctx) {
1212 onUnhandledInboundChannelActive();
1213 }
1214
1215 @Override
1216 public void channelInactive(ChannelHandlerContext ctx) {
1217 onUnhandledInboundChannelInactive();
1218 }
1219
1220 @Override
1221 public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1222 onUnhandledChannelWritabilityChanged();
1223 }
1224
1225 @Override
1226 public void handlerAdded(ChannelHandlerContext ctx) { }
1227
1228 @Override
1229 public void handlerRemoved(ChannelHandlerContext ctx) { }
1230
1231 @Override
1232 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1233 onUnhandledInboundUserEventTriggered(evt);
1234 }
1235
1236 @Override
1237 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1238 onUnhandledInboundException(cause);
1239 }
1240
1241 @Override
1242 public void channelRead(ChannelHandlerContext ctx, Object msg) {
1243 onUnhandledInboundMessage(ctx, msg);
1244 }
1245
1246 @Override
1247 public void channelReadComplete(ChannelHandlerContext ctx) {
1248 onUnhandledInboundChannelReadComplete();
1249 }
1250 }
1251
1252 final class HeadContext extends AbstractChannelHandlerContext
1253 implements ChannelOutboundHandler, ChannelInboundHandler {
1254
1255 private final Unsafe unsafe;
1256
1257 HeadContext(DefaultChannelPipeline pipeline) {
1258 super(pipeline, null, HEAD_NAME, HeadContext.class);
1259 unsafe = pipeline.channel().unsafe();
1260 setAddComplete();
1261 }
1262
1263 @Override
1264 public ChannelHandler handler() {
1265 return this;
1266 }
1267
1268 @Override
1269 public void handlerAdded(ChannelHandlerContext ctx) {
1270
1271 }
1272
1273 @Override
1274 public void handlerRemoved(ChannelHandlerContext ctx) {
1275
1276 }
1277
1278 @Override
1279 public void bind(
1280 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
1281 unsafe.bind(localAddress, promise);
1282 }
1283
1284 @Override
1285 public void connect(
1286 ChannelHandlerContext ctx,
1287 SocketAddress remoteAddress, SocketAddress localAddress,
1288 ChannelPromise promise) {
1289 unsafe.connect(remoteAddress, localAddress, promise);
1290 }
1291
1292 @Override
1293 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
1294 unsafe.disconnect(promise);
1295 }
1296
1297 @Override
1298 public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
1299 unsafe.close(promise);
1300 }
1301
1302 @Override
1303 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
1304 unsafe.deregister(promise);
1305 }
1306
1307 @Override
1308 public void read(ChannelHandlerContext ctx) {
1309 unsafe.beginRead();
1310 }
1311
1312 @Override
1313 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1314 unsafe.write(msg, promise);
1315 }
1316
1317 @Override
1318 public void flush(ChannelHandlerContext ctx) {
1319 unsafe.flush();
1320 }
1321
1322 @Override
1323 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1324 ctx.fireExceptionCaught(cause);
1325 }
1326
1327 @Override
1328 public void channelRegistered(ChannelHandlerContext ctx) {
1329 invokeHandlerAddedIfNeeded();
1330 ctx.fireChannelRegistered();
1331 }
1332
1333 @Override
1334 public void channelUnregistered(ChannelHandlerContext ctx) {
1335 ctx.fireChannelUnregistered();
1336
1337
1338 if (!channel.isOpen()) {
1339 destroy();
1340 }
1341 }
1342
1343 @Override
1344 public void channelActive(ChannelHandlerContext ctx) {
1345 ctx.fireChannelActive();
1346
1347 readIfIsAutoRead();
1348 }
1349
1350 @Override
1351 public void channelInactive(ChannelHandlerContext ctx) {
1352 ctx.fireChannelInactive();
1353 }
1354
1355 @Override
1356 public void channelRead(ChannelHandlerContext ctx, Object msg) {
1357 ctx.fireChannelRead(msg);
1358 }
1359
1360 @Override
1361 public void channelReadComplete(ChannelHandlerContext ctx) {
1362 ctx.fireChannelReadComplete();
1363
1364 readIfIsAutoRead();
1365 }
1366
1367 private void readIfIsAutoRead() {
1368 if (channel.config().isAutoRead()) {
1369 channel.read();
1370 }
1371 }
1372
1373 @Override
1374 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1375 ctx.fireUserEventTriggered(evt);
1376 }
1377
1378 @Override
1379 public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1380 ctx.fireChannelWritabilityChanged();
1381 }
1382 }
1383
1384 private abstract static class PendingHandlerCallback implements Runnable {
1385 final AbstractChannelHandlerContext ctx;
1386 PendingHandlerCallback next;
1387
1388 PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1389 this.ctx = ctx;
1390 }
1391
1392 abstract void execute();
1393 }
1394
1395 private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1396
1397 PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1398 super(ctx);
1399 }
1400
1401 @Override
1402 public void run() {
1403 callHandlerAdded0(ctx);
1404 }
1405
1406 @Override
1407 void execute() {
1408 EventExecutor executor = ctx.executor();
1409 if (executor.inEventLoop()) {
1410 callHandlerAdded0(ctx);
1411 } else {
1412 try {
1413 executor.execute(this);
1414 } catch (RejectedExecutionException e) {
1415 if (logger.isWarnEnabled()) {
1416 logger.warn(
1417 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1418 executor, ctx.name(), e);
1419 }
1420 atomicRemoveFromHandlerList(ctx);
1421 ctx.setRemoved();
1422 }
1423 }
1424 }
1425 }
1426
1427 private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1428
1429 PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1430 super(ctx);
1431 }
1432
1433 @Override
1434 public void run() {
1435 callHandlerRemoved0(ctx);
1436 }
1437
1438 @Override
1439 void execute() {
1440 EventExecutor executor = ctx.executor();
1441 if (executor.inEventLoop()) {
1442 callHandlerRemoved0(ctx);
1443 } else {
1444 try {
1445 executor.execute(this);
1446 } catch (RejectedExecutionException e) {
1447 if (logger.isWarnEnabled()) {
1448 logger.warn(
1449 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1450 " removing handler {}.", executor, ctx.name(), e);
1451 }
1452
1453 ctx.setRemoved();
1454 }
1455 }
1456 }
1457 }
1458 }