1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.channel.Channel.Unsafe;
19 import io.netty.util.ReferenceCountUtil;
20 import io.netty.util.ResourceLeakDetector;
21 import io.netty.util.concurrent.EventExecutor;
22 import io.netty.util.concurrent.EventExecutorGroup;
23 import io.netty.util.concurrent.FastThreadLocal;
24 import io.netty.util.internal.ObjectUtil;
25 import io.netty.util.internal.StringUtil;
26 import io.netty.util.internal.logging.InternalLogger;
27 import io.netty.util.internal.logging.InternalLoggerFactory;
28
29 import java.net.SocketAddress;
30 import java.util.ArrayList;
31 import java.util.IdentityHashMap;
32 import java.util.Iterator;
33 import java.util.LinkedHashMap;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.NoSuchElementException;
37 import java.util.WeakHashMap;
38 import java.util.concurrent.RejectedExecutionException;
39 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
40
41
42
43
44
45 public class DefaultChannelPipeline implements ChannelPipeline {
46
47 static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
48
49 private static final String HEAD_NAME = generateName0(HeadContext.class);
50 private static final String TAIL_NAME = generateName0(TailContext.class);
51
52 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
53 new FastThreadLocal<Map<Class<?>, String>>() {
54 @Override
55 protected Map<Class<?>, String> initialValue() {
56 return new WeakHashMap<Class<?>, String>();
57 }
58 };
59
60 private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
61 AtomicReferenceFieldUpdater.newUpdater(
62 DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
63 final HeadContext head;
64 final TailContext tail;
65
66 private final Channel channel;
67 private final ChannelFuture succeededFuture;
68 private final VoidChannelPromise voidPromise;
69 private final boolean touch = ResourceLeakDetector.isEnabled();
70
71 private Map<EventExecutorGroup, EventExecutor> childExecutors;
72 private volatile MessageSizeEstimator.Handle estimatorHandle;
73 private boolean firstRegistration = true;
74
75
76
77
78
79
80
81
82
83 private PendingHandlerCallback pendingHandlerCallbackHead;
84
85
86
87
88
89 private boolean registered;
90
91 protected DefaultChannelPipeline(Channel channel) {
92 this.channel = ObjectUtil.checkNotNull(channel, "channel");
93 succeededFuture = new SucceededChannelFuture(channel, null);
94 voidPromise = new VoidChannelPromise(channel, true);
95
96 tail = new TailContext(this);
97 head = new HeadContext(this);
98
99 head.next = tail;
100 tail.prev = head;
101 }
102
103 final MessageSizeEstimator.Handle estimatorHandle() {
104 MessageSizeEstimator.Handle handle = estimatorHandle;
105 if (handle == null) {
106 handle = channel.config().getMessageSizeEstimator().newHandle();
107 if (!ESTIMATOR.compareAndSet(this, null, handle)) {
108 handle = estimatorHandle;
109 }
110 }
111 return handle;
112 }
113
114 final Object touch(Object msg, AbstractChannelHandlerContext next) {
115 return touch ? ReferenceCountUtil.touch(msg, next) : msg;
116 }
117
118 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
119 return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
120 }
121
122 private EventExecutor childExecutor(EventExecutorGroup group) {
123 if (group == null) {
124 return null;
125 }
126 Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
127 if (pinEventExecutor != null && !pinEventExecutor) {
128 return group.next();
129 }
130 Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
131 if (childExecutors == null) {
132
133 childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
134 }
135
136
137 EventExecutor childExecutor = childExecutors.get(group);
138 if (childExecutor == null) {
139 childExecutor = group.next();
140 childExecutors.put(group, childExecutor);
141 }
142 return childExecutor;
143 }
144 @Override
145 public final Channel channel() {
146 return channel;
147 }
148
149 @Override
150 public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
151 return addFirst(null, name, handler);
152 }
153
154 private enum AddStrategy {
155 ADD_FIRST,
156 ADD_LAST,
157 ADD_BEFORE,
158 ADD_AFTER;
159 }
160
161 private ChannelPipeline internalAdd(EventExecutorGroup group, String name,
162 ChannelHandler handler, String baseName,
163 AddStrategy addStrategy) {
164 final AbstractChannelHandlerContext newCtx;
165 synchronized (this) {
166 checkMultiplicity(handler);
167 name = filterName(name, handler);
168
169 newCtx = newContext(group, name, handler);
170
171 switch (addStrategy) {
172 case ADD_FIRST:
173 addFirst0(newCtx);
174 break;
175 case ADD_LAST:
176 addLast0(newCtx);
177 break;
178 case ADD_BEFORE:
179 addBefore0(getContextOrDie(baseName), newCtx);
180 break;
181 case ADD_AFTER:
182 addAfter0(getContextOrDie(baseName), newCtx);
183 break;
184 default:
185 throw new IllegalArgumentException("unknown add strategy: " + addStrategy);
186 }
187
188
189
190
191 if (!registered) {
192 newCtx.setAddPending();
193 callHandlerCallbackLater(newCtx, true);
194 return this;
195 }
196
197 EventExecutor executor = newCtx.executor();
198 if (!executor.inEventLoop()) {
199 callHandlerAddedInEventLoop(newCtx, executor);
200 return this;
201 }
202 }
203 callHandlerAdded0(newCtx);
204 return this;
205 }
206
207 @Override
208 public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
209 return internalAdd(group, name, handler, null, AddStrategy.ADD_FIRST);
210 }
211
212 private void addFirst0(AbstractChannelHandlerContext newCtx) {
213 AbstractChannelHandlerContext nextCtx = head.next;
214 newCtx.prev = head;
215 newCtx.next = nextCtx;
216 head.next = newCtx;
217 nextCtx.prev = newCtx;
218 }
219
220 @Override
221 public final ChannelPipeline addLast(String name, ChannelHandler handler) {
222 return addLast(null, name, handler);
223 }
224
225 @Override
226 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
227 return internalAdd(group, name, handler, null, AddStrategy.ADD_LAST);
228 }
229
230 private void addLast0(AbstractChannelHandlerContext newCtx) {
231 AbstractChannelHandlerContext prev = tail.prev;
232 newCtx.prev = prev;
233 newCtx.next = tail;
234 prev.next = newCtx;
235 tail.prev = newCtx;
236 }
237
238 @Override
239 public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
240 return addBefore(null, baseName, name, handler);
241 }
242
243 @Override
244 public final ChannelPipeline addBefore(
245 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
246 return internalAdd(group, name, handler, baseName, AddStrategy.ADD_BEFORE);
247 }
248
249 private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
250 newCtx.prev = ctx.prev;
251 newCtx.next = ctx;
252 ctx.prev.next = newCtx;
253 ctx.prev = newCtx;
254 }
255
256 private String filterName(String name, ChannelHandler handler) {
257 if (name == null) {
258 return generateName(handler);
259 }
260 checkDuplicateName(name);
261 return name;
262 }
263
264 @Override
265 public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
266 return addAfter(null, baseName, name, handler);
267 }
268
269 @Override
270 public final ChannelPipeline addAfter(
271 EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
272 return internalAdd(group, name, handler, baseName, AddStrategy.ADD_AFTER);
273 }
274
275 private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
276 newCtx.prev = ctx;
277 newCtx.next = ctx.next;
278 ctx.next.prev = newCtx;
279 ctx.next = newCtx;
280 }
281
282 public final ChannelPipeline addFirst(ChannelHandler handler) {
283 return addFirst(null, handler);
284 }
285
286 @Override
287 public final ChannelPipeline addFirst(ChannelHandler... handlers) {
288 return addFirst(null, handlers);
289 }
290
291 @Override
292 public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
293 ObjectUtil.checkNotNull(handlers, "handlers");
294 if (handlers.length == 0 || handlers[0] == null) {
295 return this;
296 }
297
298 int size;
299 for (size = 1; size < handlers.length; size ++) {
300 if (handlers[size] == null) {
301 break;
302 }
303 }
304
305 for (int i = size - 1; i >= 0; i --) {
306 ChannelHandler h = handlers[i];
307 addFirst(executor, null, h);
308 }
309
310 return this;
311 }
312
313 public final ChannelPipeline addLast(ChannelHandler handler) {
314 return addLast(null, handler);
315 }
316
317 @Override
318 public final ChannelPipeline addLast(ChannelHandler... handlers) {
319 return addLast(null, handlers);
320 }
321
322 @Override
323 public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
324 ObjectUtil.checkNotNull(handlers, "handlers");
325
326 for (ChannelHandler h: handlers) {
327 if (h == null) {
328 break;
329 }
330 addLast(executor, null, h);
331 }
332
333 return this;
334 }
335
336 private String generateName(ChannelHandler handler) {
337 Map<Class<?>, String> cache = nameCaches.get();
338 Class<?> handlerType = handler.getClass();
339 String name = cache.get(handlerType);
340 if (name == null) {
341 name = generateName0(handlerType);
342 cache.put(handlerType, name);
343 }
344
345
346
347 if (context0(name) != null) {
348 String baseName = name.substring(0, name.length() - 1);
349 for (int i = 1;; i ++) {
350 String newName = baseName + i;
351 if (context0(newName) == null) {
352 name = newName;
353 break;
354 }
355 }
356 }
357 return name;
358 }
359
360 private static String generateName0(Class<?> handlerType) {
361 return StringUtil.simpleClassName(handlerType) + "#0";
362 }
363
364 @Override
365 public final ChannelPipeline remove(ChannelHandler handler) {
366 remove(getContextOrDie(handler));
367 return this;
368 }
369
370 @Override
371 public final ChannelHandler remove(String name) {
372 return remove(getContextOrDie(name)).handler();
373 }
374
375 @SuppressWarnings("unchecked")
376 @Override
377 public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
378 return (T) remove(getContextOrDie(handlerType)).handler();
379 }
380
381 public final <T extends ChannelHandler> T removeIfExists(String name) {
382 return removeIfExists(context(name));
383 }
384
385 public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
386 return removeIfExists(context(handlerType));
387 }
388
389 public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
390 return removeIfExists(context(handler));
391 }
392
393 @SuppressWarnings("unchecked")
394 private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
395 if (ctx == null) {
396 return null;
397 }
398 return (T) remove((AbstractChannelHandlerContext) ctx).handler();
399 }
400
401 private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
402 assert ctx != head && ctx != tail;
403
404 synchronized (this) {
405 atomicRemoveFromHandlerList(ctx);
406
407
408
409
410 if (!registered) {
411 callHandlerCallbackLater(ctx, false);
412 return ctx;
413 }
414
415 EventExecutor executor = ctx.executor();
416 if (!executor.inEventLoop()) {
417 executor.execute(new Runnable() {
418 @Override
419 public void run() {
420 callHandlerRemoved0(ctx);
421 }
422 });
423 return ctx;
424 }
425 }
426 callHandlerRemoved0(ctx);
427 return ctx;
428 }
429
430
431
432
433 private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
434 AbstractChannelHandlerContext prev = ctx.prev;
435 AbstractChannelHandlerContext next = ctx.next;
436 prev.next = next;
437 next.prev = prev;
438 }
439
440 @Override
441 public final ChannelHandler removeFirst() {
442 if (head.next == tail) {
443 throw new NoSuchElementException();
444 }
445 return remove(head.next).handler();
446 }
447
448 @Override
449 public final ChannelHandler removeLast() {
450 if (head.next == tail) {
451 throw new NoSuchElementException();
452 }
453 return remove(tail.prev).handler();
454 }
455
456 @Override
457 public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
458 replace(getContextOrDie(oldHandler), newName, newHandler);
459 return this;
460 }
461
462 @Override
463 public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
464 return replace(getContextOrDie(oldName), newName, newHandler);
465 }
466
467 @Override
468 @SuppressWarnings("unchecked")
469 public final <T extends ChannelHandler> T replace(
470 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
471 return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
472 }
473
474 private ChannelHandler replace(
475 final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
476 assert ctx != head && ctx != tail;
477
478 final AbstractChannelHandlerContext newCtx;
479 synchronized (this) {
480 checkMultiplicity(newHandler);
481 if (newName == null) {
482 newName = generateName(newHandler);
483 } else {
484 boolean sameName = ctx.name().equals(newName);
485 if (!sameName) {
486 checkDuplicateName(newName);
487 }
488 }
489
490 newCtx = newContext(ctx.childExecutor, newName, newHandler);
491
492 replace0(ctx, newCtx);
493
494
495
496
497
498 if (!registered) {
499 callHandlerCallbackLater(newCtx, true);
500 callHandlerCallbackLater(ctx, false);
501 return ctx.handler();
502 }
503 EventExecutor executor = ctx.executor();
504 if (!executor.inEventLoop()) {
505 executor.execute(new Runnable() {
506 @Override
507 public void run() {
508
509
510
511 callHandlerAdded0(newCtx);
512 callHandlerRemoved0(ctx);
513 }
514 });
515 return ctx.handler();
516 }
517 }
518
519
520
521 callHandlerAdded0(newCtx);
522 callHandlerRemoved0(ctx);
523 return ctx.handler();
524 }
525
526 private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
527 AbstractChannelHandlerContext prev = oldCtx.prev;
528 AbstractChannelHandlerContext next = oldCtx.next;
529 newCtx.prev = prev;
530 newCtx.next = next;
531
532
533
534
535
536 prev.next = newCtx;
537 next.prev = newCtx;
538
539
540 oldCtx.prev = newCtx;
541 oldCtx.next = newCtx;
542 }
543
544 private static void checkMultiplicity(ChannelHandler handler) {
545 if (handler instanceof ChannelHandlerAdapter) {
546 ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
547 if (!h.isSharable() && h.added) {
548 throw new ChannelPipelineException(
549 h.getClass().getName() +
550 " is not a @Sharable handler, so can't be added or removed multiple times.");
551 }
552 h.added = true;
553 }
554 }
555
556 private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
557 try {
558 ctx.callHandlerAdded();
559 } catch (Throwable t) {
560 boolean removed = false;
561 try {
562 atomicRemoveFromHandlerList(ctx);
563 ctx.callHandlerRemoved();
564 removed = true;
565 } catch (Throwable t2) {
566 if (logger.isWarnEnabled()) {
567 logger.warn("Failed to remove a handler: " + ctx.name(), t2);
568 }
569 }
570
571 if (removed) {
572 fireExceptionCaught(new ChannelPipelineException(
573 ctx.handler().getClass().getName() +
574 ".handlerAdded() has thrown an exception; removed.", t));
575 } else {
576 fireExceptionCaught(new ChannelPipelineException(
577 ctx.handler().getClass().getName() +
578 ".handlerAdded() has thrown an exception; also failed to remove.", t));
579 }
580 }
581 }
582
583 private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
584
585 try {
586 ctx.callHandlerRemoved();
587 } catch (Throwable t) {
588 fireExceptionCaught(new ChannelPipelineException(
589 ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
590 }
591 }
592
593 final void invokeHandlerAddedIfNeeded() {
594 assert channel.eventLoop().inEventLoop();
595 if (firstRegistration) {
596 firstRegistration = false;
597
598
599 callHandlerAddedForAllHandlers();
600 }
601 }
602
603 @Override
604 public final ChannelHandler first() {
605 ChannelHandlerContext first = firstContext();
606 if (first == null) {
607 return null;
608 }
609 return first.handler();
610 }
611
612 @Override
613 public final ChannelHandlerContext firstContext() {
614 AbstractChannelHandlerContext first = head.next;
615 if (first == tail) {
616 return null;
617 }
618 return head.next;
619 }
620
621 @Override
622 public final ChannelHandler last() {
623 AbstractChannelHandlerContext last = tail.prev;
624 if (last == head) {
625 return null;
626 }
627 return last.handler();
628 }
629
630 @Override
631 public final ChannelHandlerContext lastContext() {
632 AbstractChannelHandlerContext last = tail.prev;
633 if (last == head) {
634 return null;
635 }
636 return last;
637 }
638
639 @Override
640 public final ChannelHandler get(String name) {
641 ChannelHandlerContext ctx = context(name);
642 if (ctx == null) {
643 return null;
644 } else {
645 return ctx.handler();
646 }
647 }
648
649 @SuppressWarnings("unchecked")
650 @Override
651 public final <T extends ChannelHandler> T get(Class<T> handlerType) {
652 ChannelHandlerContext ctx = context(handlerType);
653 if (ctx == null) {
654 return null;
655 } else {
656 return (T) ctx.handler();
657 }
658 }
659
660 @Override
661 public final ChannelHandlerContext context(String name) {
662 return context0(ObjectUtil.checkNotNull(name, "name"));
663 }
664
665 @Override
666 public final ChannelHandlerContext context(ChannelHandler handler) {
667 ObjectUtil.checkNotNull(handler, "handler");
668
669 AbstractChannelHandlerContext ctx = head.next;
670 for (;;) {
671
672 if (ctx == null) {
673 return null;
674 }
675
676 if (ctx.handler() == handler) {
677 return ctx;
678 }
679
680 ctx = ctx.next;
681 }
682 }
683
684 @Override
685 public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
686 ObjectUtil.checkNotNull(handlerType, "handlerType");
687
688 AbstractChannelHandlerContext ctx = head.next;
689 for (;;) {
690 if (ctx == null) {
691 return null;
692 }
693 if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
694 return ctx;
695 }
696 ctx = ctx.next;
697 }
698 }
699
700 @Override
701 public final List<String> names() {
702 List<String> list = new ArrayList<String>();
703 AbstractChannelHandlerContext ctx = head.next;
704 for (;;) {
705 if (ctx == null) {
706 return list;
707 }
708 list.add(ctx.name());
709 ctx = ctx.next;
710 }
711 }
712
713 @Override
714 public final Map<String, ChannelHandler> toMap() {
715 Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
716 AbstractChannelHandlerContext ctx = head.next;
717 for (;;) {
718 if (ctx == tail) {
719 return map;
720 }
721 map.put(ctx.name(), ctx.handler());
722 ctx = ctx.next;
723 }
724 }
725
726 @Override
727 public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
728 return toMap().entrySet().iterator();
729 }
730
731
732
733
734 @Override
735 public final String toString() {
736 StringBuilder buf = new StringBuilder()
737 .append(StringUtil.simpleClassName(this))
738 .append('{');
739 AbstractChannelHandlerContext ctx = head.next;
740 for (;;) {
741 if (ctx == tail) {
742 break;
743 }
744
745 buf.append('(')
746 .append(ctx.name())
747 .append(" = ")
748 .append(ctx.handler().getClass().getName())
749 .append(')');
750
751 ctx = ctx.next;
752 if (ctx == tail) {
753 break;
754 }
755
756 buf.append(", ");
757 }
758 buf.append('}');
759 return buf.toString();
760 }
761
762 @Override
763 public final ChannelPipeline fireChannelRegistered() {
764 if (head.executor().inEventLoop()) {
765 if (head.invokeHandler()) {
766 head.channelRegistered(head);
767 } else {
768 head.fireChannelRegistered();
769 }
770 } else {
771 head.executor().execute(this::fireChannelRegistered);
772 }
773 return this;
774 }
775
776 @Override
777 public final ChannelPipeline fireChannelUnregistered() {
778 if (head.executor().inEventLoop()) {
779 if (head.invokeHandler()) {
780 head.channelUnregistered(head);
781 } else {
782 head.fireChannelUnregistered();
783 }
784 } else {
785 head.executor().execute(this::fireChannelUnregistered);
786 }
787 return this;
788 }
789
790
791
792
793
794
795
796
797
798
799
800 private synchronized void destroy() {
801 destroyUp(head.next, false);
802 }
803
804 private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
805 final Thread currentThread = Thread.currentThread();
806 final AbstractChannelHandlerContext tail = this.tail;
807 for (;;) {
808 if (ctx == tail) {
809 destroyDown(currentThread, tail.prev, inEventLoop);
810 break;
811 }
812
813 final EventExecutor executor = ctx.executor();
814 if (!inEventLoop && !executor.inEventLoop(currentThread)) {
815 final AbstractChannelHandlerContext finalCtx = ctx;
816 executor.execute(new Runnable() {
817 @Override
818 public void run() {
819 destroyUp(finalCtx, true);
820 }
821 });
822 break;
823 }
824
825 ctx = ctx.next;
826 inEventLoop = false;
827 }
828 }
829
830 private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
831
832 final AbstractChannelHandlerContext head = this.head;
833 for (;;) {
834 if (ctx == head) {
835 break;
836 }
837
838 final EventExecutor executor = ctx.executor();
839 if (inEventLoop || executor.inEventLoop(currentThread)) {
840 atomicRemoveFromHandlerList(ctx);
841 callHandlerRemoved0(ctx);
842 } else {
843 final AbstractChannelHandlerContext finalCtx = ctx;
844 executor.execute(new Runnable() {
845 @Override
846 public void run() {
847 destroyDown(Thread.currentThread(), finalCtx, true);
848 }
849 });
850 break;
851 }
852
853 ctx = ctx.prev;
854 inEventLoop = false;
855 }
856 }
857
858 @Override
859 public final ChannelPipeline fireChannelActive() {
860 if (head.executor().inEventLoop()) {
861 if (head.invokeHandler()) {
862 head.channelActive(head);
863 } else {
864 head.fireChannelActive();
865 }
866 } else {
867 head.executor().execute(this::fireChannelActive);
868 }
869 return this;
870 }
871
872 @Override
873 public final ChannelPipeline fireChannelInactive() {
874 if (head.executor().inEventLoop()) {
875 if (head.invokeHandler()) {
876 head.channelInactive(head);
877 } else {
878 head.fireChannelInactive();
879 }
880 } else {
881 head.executor().execute(this::fireChannelInactive);
882 }
883 return this;
884 }
885
886 @Override
887 public final ChannelPipeline fireExceptionCaught(Throwable cause) {
888 if (head.executor().inEventLoop()) {
889 if (head.invokeHandler()) {
890 head.exceptionCaught(head, cause);
891 } else {
892 head.fireExceptionCaught(cause);
893 }
894 } else {
895 head.executor().execute(() -> fireExceptionCaught(cause));
896 }
897 return this;
898 }
899
900 @Override
901 public final ChannelPipeline fireUserEventTriggered(Object event) {
902 if (head.executor().inEventLoop()) {
903 if (head.invokeHandler()) {
904 head.userEventTriggered(head, event);
905 } else {
906 head.fireUserEventTriggered(event);
907 }
908 } else {
909 head.executor().execute(() -> fireUserEventTriggered(event));
910 }
911 return this;
912 }
913
914 @Override
915 public final ChannelPipeline fireChannelRead(Object msg) {
916 if (head.executor().inEventLoop()) {
917 if (head.invokeHandler()) {
918 head.channelRead(head, msg);
919 } else {
920 head.fireChannelRead(msg);
921 }
922 } else {
923 head.executor().execute(() -> fireChannelRead(msg));
924 }
925 return this;
926 }
927
928 @Override
929 public final ChannelPipeline fireChannelReadComplete() {
930 if (head.executor().inEventLoop()) {
931 if (head.invokeHandler()) {
932 head.channelReadComplete(head);
933 } else {
934 head.fireChannelReadComplete();
935 }
936 } else {
937 head.executor().execute(this::fireChannelReadComplete);
938 }
939 return this;
940 }
941
942 @Override
943 public final ChannelPipeline fireChannelWritabilityChanged() {
944 if (head.executor().inEventLoop()) {
945 if (head.invokeHandler()) {
946 head.channelWritabilityChanged(head);
947 } else {
948 head.fireChannelWritabilityChanged();
949 }
950 } else {
951 head.executor().execute(this::fireChannelWritabilityChanged);
952 }
953 return this;
954 }
955
956 @Override
957 public final ChannelFuture bind(SocketAddress localAddress) {
958 return tail.bind(localAddress);
959 }
960
961 @Override
962 public final ChannelFuture connect(SocketAddress remoteAddress) {
963 return tail.connect(remoteAddress);
964 }
965
966 @Override
967 public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
968 return tail.connect(remoteAddress, localAddress);
969 }
970
971 @Override
972 public final ChannelFuture disconnect() {
973 return tail.disconnect();
974 }
975
976 @Override
977 public final ChannelFuture close() {
978 return tail.close();
979 }
980
981 @Override
982 public final ChannelFuture deregister() {
983 return tail.deregister();
984 }
985
986 @Override
987 public final ChannelPipeline flush() {
988 tail.flush();
989 return this;
990 }
991
992 @Override
993 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
994 return tail.bind(localAddress, promise);
995 }
996
997 @Override
998 public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
999 return tail.connect(remoteAddress, promise);
1000 }
1001
1002 @Override
1003 public final ChannelFuture connect(
1004 SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1005 return tail.connect(remoteAddress, localAddress, promise);
1006 }
1007
1008 @Override
1009 public final ChannelFuture disconnect(ChannelPromise promise) {
1010 return tail.disconnect(promise);
1011 }
1012
1013 @Override
1014 public final ChannelFuture close(ChannelPromise promise) {
1015 return tail.close(promise);
1016 }
1017
1018 @Override
1019 public final ChannelFuture deregister(final ChannelPromise promise) {
1020 return tail.deregister(promise);
1021 }
1022
1023 @Override
1024 public final ChannelPipeline read() {
1025 tail.read();
1026 return this;
1027 }
1028
1029 @Override
1030 public final ChannelFuture write(Object msg) {
1031 return tail.write(msg);
1032 }
1033
1034 @Override
1035 public final ChannelFuture write(Object msg, ChannelPromise promise) {
1036 return tail.write(msg, promise);
1037 }
1038
1039 @Override
1040 public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1041 return tail.writeAndFlush(msg, promise);
1042 }
1043
1044 @Override
1045 public final ChannelFuture writeAndFlush(Object msg) {
1046 return tail.writeAndFlush(msg);
1047 }
1048
1049 @Override
1050 public final ChannelPromise newPromise() {
1051 return new DefaultChannelPromise(channel);
1052 }
1053
1054 @Override
1055 public final ChannelProgressivePromise newProgressivePromise() {
1056 return new DefaultChannelProgressivePromise(channel);
1057 }
1058
1059 @Override
1060 public final ChannelFuture newSucceededFuture() {
1061 return succeededFuture;
1062 }
1063
1064 @Override
1065 public final ChannelFuture newFailedFuture(Throwable cause) {
1066 return new FailedChannelFuture(channel, null, cause);
1067 }
1068
1069 @Override
1070 public final ChannelPromise voidPromise() {
1071 return voidPromise;
1072 }
1073
1074 private void checkDuplicateName(String name) {
1075 if (context0(name) != null) {
1076 throw new IllegalArgumentException("Duplicate handler name: " + name);
1077 }
1078 }
1079
1080 private AbstractChannelHandlerContext context0(String name) {
1081 AbstractChannelHandlerContext context = head.next;
1082 while (context != tail) {
1083 if (context.name().equals(name)) {
1084 return context;
1085 }
1086 context = context.next;
1087 }
1088 return null;
1089 }
1090
1091 private AbstractChannelHandlerContext getContextOrDie(String name) {
1092 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1093 if (ctx == null) {
1094 throw new NoSuchElementException(name);
1095 } else {
1096 return ctx;
1097 }
1098 }
1099
1100 private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1101 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1102 if (ctx == null) {
1103 throw new NoSuchElementException(handler.getClass().getName());
1104 } else {
1105 return ctx;
1106 }
1107 }
1108
1109 private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1110 AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1111 if (ctx == null) {
1112 throw new NoSuchElementException(handlerType.getName());
1113 } else {
1114 return ctx;
1115 }
1116 }
1117
1118 private void callHandlerAddedForAllHandlers() {
1119 final PendingHandlerCallback pendingHandlerCallbackHead;
1120 synchronized (this) {
1121 assert !registered;
1122
1123
1124 registered = true;
1125
1126 pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1127
1128 this.pendingHandlerCallbackHead = null;
1129 }
1130
1131
1132
1133
1134 PendingHandlerCallback task = pendingHandlerCallbackHead;
1135 while (task != null) {
1136 task.execute();
1137 task = task.next;
1138 }
1139 }
1140
1141 private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1142 assert !registered;
1143
1144 PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1145 PendingHandlerCallback pending = pendingHandlerCallbackHead;
1146 if (pending == null) {
1147 pendingHandlerCallbackHead = task;
1148 } else {
1149
1150 while (pending.next != null) {
1151 pending = pending.next;
1152 }
1153 pending.next = task;
1154 }
1155 }
1156
1157 private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
1158 newCtx.setAddPending();
1159 executor.execute(new Runnable() {
1160 @Override
1161 public void run() {
1162 callHandlerAdded0(newCtx);
1163 }
1164 });
1165 }
1166
1167
1168
1169
1170
1171 protected void onUnhandledInboundException(Throwable cause) {
1172 try {
1173 logger.warn(
1174 "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1175 "It usually means the last handler in the pipeline did not handle the exception.",
1176 cause);
1177 } finally {
1178 ReferenceCountUtil.release(cause);
1179 }
1180 }
1181
1182
1183
1184
1185
1186 protected void onUnhandledInboundChannelActive() {
1187 }
1188
1189
1190
1191
1192
1193 protected void onUnhandledInboundChannelInactive() {
1194 }
1195
1196
1197
1198
1199
1200
1201 protected void onUnhandledInboundMessage(Object msg) {
1202 try {
1203 logger.debug(
1204 "Discarded inbound message {} that reached at the tail of the pipeline. " +
1205 "Please check your pipeline configuration.", msg);
1206 } finally {
1207 ReferenceCountUtil.release(msg);
1208 }
1209 }
1210
1211
1212
1213
1214
1215
1216 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1217 onUnhandledInboundMessage(msg);
1218 if (logger.isDebugEnabled()) {
1219 logger.debug("Discarded message pipeline : {}. Channel : {}.",
1220 ctx.pipeline().names(), ctx.channel());
1221 }
1222 }
1223
1224
1225
1226
1227
1228 protected void onUnhandledInboundChannelReadComplete() {
1229 }
1230
1231
1232
1233
1234
1235
1236 protected void onUnhandledInboundUserEventTriggered(Object evt) {
1237
1238
1239 ReferenceCountUtil.release(evt);
1240 }
1241
1242
1243
1244
1245
1246 protected void onUnhandledChannelWritabilityChanged() {
1247 }
1248
1249 protected void incrementPendingOutboundBytes(long size) {
1250 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1251 if (buffer != null) {
1252 buffer.incrementPendingOutboundBytes(size);
1253 }
1254 }
1255
1256 protected void decrementPendingOutboundBytes(long size) {
1257 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1258 if (buffer != null) {
1259 buffer.decrementPendingOutboundBytes(size);
1260 }
1261 }
1262
1263
1264 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1265
1266 TailContext(DefaultChannelPipeline pipeline) {
1267 super(pipeline, null, TAIL_NAME, TailContext.class);
1268 setAddComplete();
1269 }
1270
1271 @Override
1272 public ChannelHandler handler() {
1273 return this;
1274 }
1275
1276 @Override
1277 public void channelRegistered(ChannelHandlerContext ctx) { }
1278
1279 @Override
1280 public void channelUnregistered(ChannelHandlerContext ctx) { }
1281
1282 @Override
1283 public void channelActive(ChannelHandlerContext ctx) {
1284 onUnhandledInboundChannelActive();
1285 }
1286
1287 @Override
1288 public void channelInactive(ChannelHandlerContext ctx) {
1289 onUnhandledInboundChannelInactive();
1290 }
1291
1292 @Override
1293 public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1294 onUnhandledChannelWritabilityChanged();
1295 }
1296
1297 @Override
1298 public void handlerAdded(ChannelHandlerContext ctx) { }
1299
1300 @Override
1301 public void handlerRemoved(ChannelHandlerContext ctx) { }
1302
1303 @Override
1304 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1305 onUnhandledInboundUserEventTriggered(evt);
1306 }
1307
1308 @Override
1309 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1310 onUnhandledInboundException(cause);
1311 }
1312
1313 @Override
1314 public void channelRead(ChannelHandlerContext ctx, Object msg) {
1315 onUnhandledInboundMessage(ctx, msg);
1316 }
1317
1318 @Override
1319 public void channelReadComplete(ChannelHandlerContext ctx) {
1320 onUnhandledInboundChannelReadComplete();
1321 }
1322 }
1323
1324 final class HeadContext extends AbstractChannelHandlerContext
1325 implements ChannelOutboundHandler, ChannelInboundHandler {
1326
1327 private final Unsafe unsafe;
1328
1329 HeadContext(DefaultChannelPipeline pipeline) {
1330 super(pipeline, null, HEAD_NAME, HeadContext.class);
1331 unsafe = pipeline.channel().unsafe();
1332 setAddComplete();
1333 }
1334
1335 @Override
1336 public ChannelHandler handler() {
1337 return this;
1338 }
1339
1340 @Override
1341 public void handlerAdded(ChannelHandlerContext ctx) {
1342
1343 }
1344
1345 @Override
1346 public void handlerRemoved(ChannelHandlerContext ctx) {
1347
1348 }
1349
1350 @Override
1351 public void bind(
1352 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
1353 unsafe.bind(localAddress, promise);
1354 }
1355
1356 @Override
1357 public void connect(
1358 ChannelHandlerContext ctx,
1359 SocketAddress remoteAddress, SocketAddress localAddress,
1360 ChannelPromise promise) {
1361 unsafe.connect(remoteAddress, localAddress, promise);
1362 }
1363
1364 @Override
1365 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
1366 unsafe.disconnect(promise);
1367 }
1368
1369 @Override
1370 public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
1371 unsafe.close(promise);
1372 }
1373
1374 @Override
1375 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
1376 unsafe.deregister(promise);
1377 }
1378
1379 @Override
1380 public void read(ChannelHandlerContext ctx) {
1381 unsafe.beginRead();
1382 }
1383
1384 @Override
1385 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1386 unsafe.write(msg, promise);
1387 }
1388
1389 @Override
1390 public void flush(ChannelHandlerContext ctx) {
1391 unsafe.flush();
1392 }
1393
1394 @Override
1395 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1396 ctx.fireExceptionCaught(cause);
1397 }
1398
1399 @Override
1400 public void channelRegistered(ChannelHandlerContext ctx) {
1401 invokeHandlerAddedIfNeeded();
1402 ctx.fireChannelRegistered();
1403 }
1404
1405 @Override
1406 public void channelUnregistered(ChannelHandlerContext ctx) {
1407 ctx.fireChannelUnregistered();
1408
1409
1410 if (!channel.isOpen()) {
1411 destroy();
1412 }
1413 }
1414
1415 @Override
1416 public void channelActive(ChannelHandlerContext ctx) {
1417 ctx.fireChannelActive();
1418
1419 readIfIsAutoRead();
1420 }
1421
1422 @Override
1423 public void channelInactive(ChannelHandlerContext ctx) {
1424 ctx.fireChannelInactive();
1425 }
1426
1427 @Override
1428 public void channelRead(ChannelHandlerContext ctx, Object msg) {
1429 ctx.fireChannelRead(msg);
1430 }
1431
1432 @Override
1433 public void channelReadComplete(ChannelHandlerContext ctx) {
1434 ctx.fireChannelReadComplete();
1435
1436 readIfIsAutoRead();
1437 }
1438
1439 private void readIfIsAutoRead() {
1440 if (channel.config().isAutoRead()) {
1441 channel.read();
1442 }
1443 }
1444
1445 @Override
1446 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1447 ctx.fireUserEventTriggered(evt);
1448 }
1449
1450 @Override
1451 public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1452 ctx.fireChannelWritabilityChanged();
1453 }
1454 }
1455
1456 private abstract static class PendingHandlerCallback implements Runnable {
1457 final AbstractChannelHandlerContext ctx;
1458 PendingHandlerCallback next;
1459
1460 PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1461 this.ctx = ctx;
1462 }
1463
1464 abstract void execute();
1465 }
1466
1467 private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1468
1469 PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1470 super(ctx);
1471 }
1472
1473 @Override
1474 public void run() {
1475 callHandlerAdded0(ctx);
1476 }
1477
1478 @Override
1479 void execute() {
1480 EventExecutor executor = ctx.executor();
1481 if (executor.inEventLoop()) {
1482 callHandlerAdded0(ctx);
1483 } else {
1484 try {
1485 executor.execute(this);
1486 } catch (RejectedExecutionException e) {
1487 if (logger.isWarnEnabled()) {
1488 logger.warn(
1489 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1490 executor, ctx.name(), e);
1491 }
1492 atomicRemoveFromHandlerList(ctx);
1493 ctx.setRemoved();
1494 }
1495 }
1496 }
1497 }
1498
1499 private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1500
1501 PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1502 super(ctx);
1503 }
1504
1505 @Override
1506 public void run() {
1507 callHandlerRemoved0(ctx);
1508 }
1509
1510 @Override
1511 void execute() {
1512 EventExecutor executor = ctx.executor();
1513 if (executor.inEventLoop()) {
1514 callHandlerRemoved0(ctx);
1515 } else {
1516 try {
1517 executor.execute(this);
1518 } catch (RejectedExecutionException e) {
1519 if (logger.isWarnEnabled()) {
1520 logger.warn(
1521 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1522 " removing handler {}.", executor, ctx.name(), e);
1523 }
1524
1525 ctx.setRemoved();
1526 }
1527 }
1528 }
1529 }
1530 }