View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.ResourceLeakDetector;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.EventExecutorGroup;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.StringUtil;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.net.SocketAddress;
30  import java.util.ArrayList;
31  import java.util.IdentityHashMap;
32  import java.util.Iterator;
33  import java.util.LinkedHashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NoSuchElementException;
37  import java.util.WeakHashMap;
38  import java.util.concurrent.RejectedExecutionException;
39  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
40  
41  /**
42   * The default {@link ChannelPipeline} implementation.  It is usually created
43   * by a {@link Channel} implementation when the {@link Channel} is created.
44   */
45  public class DefaultChannelPipeline implements ChannelPipeline {
46  
47      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
48  
49      private static final String HEAD_NAME = generateName0(HeadContext.class);
50      private static final String TAIL_NAME = generateName0(TailContext.class);
51  
52      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
53              new FastThreadLocal<Map<Class<?>, String>>() {
54          @Override
55          protected Map<Class<?>, String> initialValue() {
56              return new WeakHashMap<Class<?>, String>();
57          }
58      };
59  
60      private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
61              AtomicReferenceFieldUpdater.newUpdater(
62                      DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
63      final HeadContext head;
64      final TailContext tail;
65  
66      private final Channel channel;
67      private final ChannelFuture succeededFuture;
68      private final VoidChannelPromise voidPromise;
69      private final boolean touch = ResourceLeakDetector.isEnabled();
70  
71      private Map<EventExecutorGroup, EventExecutor> childExecutors;
72      private volatile MessageSizeEstimator.Handle estimatorHandle;
73      private boolean firstRegistration = true;
74  
75      /**
76       * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
77       * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
78       * <p>
79       * We only keep the head because it is expected that the list is used infrequently and its size is small.
80       * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
81       * complexity.
82       */
83      private PendingHandlerCallback pendingHandlerCallbackHead;
84  
85      /**
86       * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
87       * change.
88       */
89      private boolean registered;
90  
91      protected DefaultChannelPipeline(Channel channel) {
92          this.channel = ObjectUtil.checkNotNull(channel, "channel");
93          succeededFuture = new SucceededChannelFuture(channel, null);
94          voidPromise = new VoidChannelPromise(channel, true);
95  
96          tail = new TailContext(this);
97          head = new HeadContext(this);
98  
99          head.next = tail;
100         tail.prev = head;
101     }
102 
103     final MessageSizeEstimator.Handle estimatorHandle() {
104         MessageSizeEstimator.Handle handle = estimatorHandle;
105         if (handle == null) {
106             handle = channel.config().getMessageSizeEstimator().newHandle();
107             if (!ESTIMATOR.compareAndSet(this, null, handle)) {
108                 handle = estimatorHandle;
109             }
110         }
111         return handle;
112     }
113 
114     final Object touch(Object msg, AbstractChannelHandlerContext next) {
115         return touch ? ReferenceCountUtil.touch(msg, next) : msg;
116     }
117 
118     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
119         return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
120     }
121 
122     private EventExecutor childExecutor(EventExecutorGroup group) {
123         if (group == null) {
124             return null;
125         }
126         Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
127         if (pinEventExecutor != null && !pinEventExecutor) {
128             return group.next();
129         }
130         Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
131         if (childExecutors == null) {
132             // Use size of 4 as most people only use one extra EventExecutor.
133             childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
134         }
135         // Pin one of the child executors once and remember it so that the same child executor
136         // is used to fire events for the same channel.
137         EventExecutor childExecutor = childExecutors.get(group);
138         if (childExecutor == null) {
139             childExecutor = group.next();
140             childExecutors.put(group, childExecutor);
141         }
142         return childExecutor;
143     }
144     @Override
145     public final Channel channel() {
146         return channel;
147     }
148 
149     @Override
150     public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
151         return addFirst(null, name, handler);
152     }
153 
154     private enum AddStrategy {
155         ADD_FIRST,
156         ADD_LAST,
157         ADD_BEFORE,
158         ADD_AFTER;
159     }
160 
161     private ChannelPipeline internalAdd(EventExecutorGroup group, String name,
162                                         ChannelHandler handler, String baseName,
163                                         AddStrategy addStrategy) {
164         final AbstractChannelHandlerContext newCtx;
165         synchronized (this) {
166             checkMultiplicity(handler);
167             name = filterName(name, handler);
168 
169             newCtx = newContext(group, name, handler);
170 
171             switch (addStrategy) {
172                 case ADD_FIRST:
173                     addFirst0(newCtx);
174                     break;
175                 case ADD_LAST:
176                     addLast0(newCtx);
177                     break;
178                 case ADD_BEFORE:
179                     addBefore0(getContextOrDie(baseName), newCtx);
180                     break;
181                 case ADD_AFTER:
182                     addAfter0(getContextOrDie(baseName), newCtx);
183                     break;
184                 default:
185                     throw new IllegalArgumentException("unknown add strategy: " + addStrategy);
186             }
187 
188             // If the registered is false it means that the channel was not registered on an eventLoop yet.
189             // In this case we add the context to the pipeline and add a task that will call
190             // ChannelHandler.handlerAdded(...) once the channel is registered.
191             if (!registered) {
192                 newCtx.setAddPending();
193                 callHandlerCallbackLater(newCtx, true);
194                 return this;
195             }
196 
197             EventExecutor executor = newCtx.executor();
198             if (!executor.inEventLoop()) {
199                 callHandlerAddedInEventLoop(newCtx, executor);
200                 return this;
201             }
202         }
203         callHandlerAdded0(newCtx);
204         return this;
205     }
206 
207     @Override
208     public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
209         return internalAdd(group, name, handler, null, AddStrategy.ADD_FIRST);
210     }
211 
212     private void addFirst0(AbstractChannelHandlerContext newCtx) {
213         AbstractChannelHandlerContext nextCtx = head.next;
214         newCtx.prev = head;
215         newCtx.next = nextCtx;
216         head.next = newCtx;
217         nextCtx.prev = newCtx;
218     }
219 
220     @Override
221     public final ChannelPipeline addLast(String name, ChannelHandler handler) {
222         return addLast(null, name, handler);
223     }
224 
225     @Override
226     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
227         return internalAdd(group, name, handler, null, AddStrategy.ADD_LAST);
228     }
229 
230     private void addLast0(AbstractChannelHandlerContext newCtx) {
231         AbstractChannelHandlerContext prev = tail.prev;
232         newCtx.prev = prev;
233         newCtx.next = tail;
234         prev.next = newCtx;
235         tail.prev = newCtx;
236     }
237 
238     @Override
239     public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
240         return addBefore(null, baseName, name, handler);
241     }
242 
243     @Override
244     public final ChannelPipeline addBefore(
245             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
246         return internalAdd(group, name, handler, baseName, AddStrategy.ADD_BEFORE);
247     }
248 
249     private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
250         newCtx.prev = ctx.prev;
251         newCtx.next = ctx;
252         ctx.prev.next = newCtx;
253         ctx.prev = newCtx;
254     }
255 
256     private String filterName(String name, ChannelHandler handler) {
257         if (name == null) {
258             return generateName(handler);
259         }
260         checkDuplicateName(name);
261         return name;
262     }
263 
264     @Override
265     public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
266         return addAfter(null, baseName, name, handler);
267     }
268 
269     @Override
270     public final ChannelPipeline addAfter(
271             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
272         return internalAdd(group, name, handler, baseName, AddStrategy.ADD_AFTER);
273     }
274 
275     private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
276         newCtx.prev = ctx;
277         newCtx.next = ctx.next;
278         ctx.next.prev = newCtx;
279         ctx.next = newCtx;
280     }
281 
282     public final ChannelPipeline addFirst(ChannelHandler handler) {
283         return addFirst(null, handler);
284     }
285 
286     @Override
287     public final ChannelPipeline addFirst(ChannelHandler... handlers) {
288         return addFirst(null, handlers);
289     }
290 
291     @Override
292     public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
293         ObjectUtil.checkNotNull(handlers, "handlers");
294         if (handlers.length == 0 || handlers[0] == null) {
295             return this;
296         }
297 
298         int size;
299         for (size = 1; size < handlers.length; size ++) {
300             if (handlers[size] == null) {
301                 break;
302             }
303         }
304 
305         for (int i = size - 1; i >= 0; i --) {
306             ChannelHandler h = handlers[i];
307             addFirst(executor, null, h);
308         }
309 
310         return this;
311     }
312 
313     public final ChannelPipeline addLast(ChannelHandler handler) {
314         return addLast(null, handler);
315     }
316 
317     @Override
318     public final ChannelPipeline addLast(ChannelHandler... handlers) {
319         return addLast(null, handlers);
320     }
321 
322     @Override
323     public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
324         ObjectUtil.checkNotNull(handlers, "handlers");
325 
326         for (ChannelHandler h: handlers) {
327             if (h == null) {
328                 break;
329             }
330             addLast(executor, null, h);
331         }
332 
333         return this;
334     }
335 
336     private String generateName(ChannelHandler handler) {
337         Map<Class<?>, String> cache = nameCaches.get();
338         Class<?> handlerType = handler.getClass();
339         String name = cache.get(handlerType);
340         if (name == null) {
341             name = generateName0(handlerType);
342             cache.put(handlerType, name);
343         }
344 
345         // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
346         // any name conflicts.  Note that we don't cache the names generated here.
347         if (context0(name) != null) {
348             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
349             for (int i = 1;; i ++) {
350                 String newName = baseName + i;
351                 if (context0(newName) == null) {
352                     name = newName;
353                     break;
354                 }
355             }
356         }
357         return name;
358     }
359 
360     private static String generateName0(Class<?> handlerType) {
361         return StringUtil.simpleClassName(handlerType) + "#0";
362     }
363 
364     @Override
365     public final ChannelPipeline remove(ChannelHandler handler) {
366         remove(getContextOrDie(handler));
367         return this;
368     }
369 
370     @Override
371     public final ChannelHandler remove(String name) {
372         return remove(getContextOrDie(name)).handler();
373     }
374 
375     @SuppressWarnings("unchecked")
376     @Override
377     public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
378         return (T) remove(getContextOrDie(handlerType)).handler();
379     }
380 
381     public final <T extends ChannelHandler> T removeIfExists(String name) {
382         return removeIfExists(context(name));
383     }
384 
385     public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
386         return removeIfExists(context(handlerType));
387     }
388 
389     public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
390         return removeIfExists(context(handler));
391     }
392 
393     @SuppressWarnings("unchecked")
394     private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
395         if (ctx == null) {
396             return null;
397         }
398         return (T) remove((AbstractChannelHandlerContext) ctx).handler();
399     }
400 
401     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
402         assert ctx != head && ctx != tail;
403 
404         synchronized (this) {
405             atomicRemoveFromHandlerList(ctx);
406 
407             // If the registered is false it means that the channel was not registered on an eventloop yet.
408             // In this case we remove the context from the pipeline and add a task that will call
409             // ChannelHandler.handlerRemoved(...) once the channel is registered.
410             if (!registered) {
411                 callHandlerCallbackLater(ctx, false);
412                 return ctx;
413             }
414 
415             EventExecutor executor = ctx.executor();
416             if (!executor.inEventLoop()) {
417                 executor.execute(new Runnable() {
418                     @Override
419                     public void run() {
420                         callHandlerRemoved0(ctx);
421                     }
422                 });
423                 return ctx;
424             }
425         }
426         callHandlerRemoved0(ctx);
427         return ctx;
428     }
429 
430     /**
431      * Method is synchronized to make the handler removal from the double linked list atomic.
432      */
433     private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
434         AbstractChannelHandlerContext prev = ctx.prev;
435         AbstractChannelHandlerContext next = ctx.next;
436         prev.next = next;
437         next.prev = prev;
438     }
439 
440     @Override
441     public final ChannelHandler removeFirst() {
442         if (head.next == tail) {
443             throw new NoSuchElementException();
444         }
445         return remove(head.next).handler();
446     }
447 
448     @Override
449     public final ChannelHandler removeLast() {
450         if (head.next == tail) {
451             throw new NoSuchElementException();
452         }
453         return remove(tail.prev).handler();
454     }
455 
456     @Override
457     public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
458         replace(getContextOrDie(oldHandler), newName, newHandler);
459         return this;
460     }
461 
462     @Override
463     public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
464         return replace(getContextOrDie(oldName), newName, newHandler);
465     }
466 
467     @Override
468     @SuppressWarnings("unchecked")
469     public final <T extends ChannelHandler> T replace(
470             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
471         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
472     }
473 
474     private ChannelHandler replace(
475             final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
476         assert ctx != head && ctx != tail;
477 
478         final AbstractChannelHandlerContext newCtx;
479         synchronized (this) {
480             checkMultiplicity(newHandler);
481             if (newName == null) {
482                 newName = generateName(newHandler);
483             } else {
484                 boolean sameName = ctx.name().equals(newName);
485                 if (!sameName) {
486                     checkDuplicateName(newName);
487                 }
488             }
489 
490             newCtx = newContext(ctx.childExecutor, newName, newHandler);
491 
492             replace0(ctx, newCtx);
493 
494             // If the registered is false it means that the channel was not registered on an eventloop yet.
495             // In this case we replace the context in the pipeline
496             // and add a task that will call ChannelHandler.handlerAdded(...) and
497             // ChannelHandler.handlerRemoved(...) once the channel is registered.
498             if (!registered) {
499                 callHandlerCallbackLater(newCtx, true);
500                 callHandlerCallbackLater(ctx, false);
501                 return ctx.handler();
502             }
503             EventExecutor executor = ctx.executor();
504             if (!executor.inEventLoop()) {
505                 executor.execute(new Runnable() {
506                     @Override
507                     public void run() {
508                         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
509                         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
510                         // those event handlers must be called after handlerAdded().
511                         callHandlerAdded0(newCtx);
512                         callHandlerRemoved0(ctx);
513                     }
514                 });
515                 return ctx.handler();
516             }
517         }
518         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
519         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
520         // event handlers must be called after handlerAdded().
521         callHandlerAdded0(newCtx);
522         callHandlerRemoved0(ctx);
523         return ctx.handler();
524     }
525 
526     private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
527         AbstractChannelHandlerContext prev = oldCtx.prev;
528         AbstractChannelHandlerContext next = oldCtx.next;
529         newCtx.prev = prev;
530         newCtx.next = next;
531 
532         // Finish the replacement of oldCtx with newCtx in the linked list.
533         // Note that this doesn't mean events will be sent to the new handler immediately
534         // because we are currently at the event handler thread and no more than one handler methods can be invoked
535         // at the same time (we ensured that in replace().)
536         prev.next = newCtx;
537         next.prev = newCtx;
538 
539         // update the reference to the replacement so forward of buffered content will work correctly
540         oldCtx.prev = newCtx;
541         oldCtx.next = newCtx;
542     }
543 
544     private static void checkMultiplicity(ChannelHandler handler) {
545         if (handler instanceof ChannelHandlerAdapter) {
546             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
547             if (!h.isSharable() && h.added) {
548                 throw new ChannelPipelineException(
549                         h.getClass().getName() +
550                         " is not a @Sharable handler, so can't be added or removed multiple times.");
551             }
552             h.added = true;
553         }
554     }
555 
556     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
557         try {
558             ctx.callHandlerAdded();
559         } catch (Throwable t) {
560             boolean removed = false;
561             try {
562                 atomicRemoveFromHandlerList(ctx);
563                 ctx.callHandlerRemoved();
564                 removed = true;
565             } catch (Throwable t2) {
566                 if (logger.isWarnEnabled()) {
567                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
568                 }
569             }
570 
571             if (removed) {
572                 fireExceptionCaught(new ChannelPipelineException(
573                         ctx.handler().getClass().getName() +
574                         ".handlerAdded() has thrown an exception; removed.", t));
575             } else {
576                 fireExceptionCaught(new ChannelPipelineException(
577                         ctx.handler().getClass().getName() +
578                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
579             }
580         }
581     }
582 
583     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
584         // Notify the complete removal.
585         try {
586             ctx.callHandlerRemoved();
587         } catch (Throwable t) {
588             fireExceptionCaught(new ChannelPipelineException(
589                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
590         }
591     }
592 
593     final void invokeHandlerAddedIfNeeded() {
594         assert channel.eventLoop().inEventLoop();
595         if (firstRegistration) {
596             firstRegistration = false;
597             // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
598             // that were added before the registration was done.
599             callHandlerAddedForAllHandlers();
600         }
601     }
602 
603     @Override
604     public final ChannelHandler first() {
605         ChannelHandlerContext first = firstContext();
606         if (first == null) {
607             return null;
608         }
609         return first.handler();
610     }
611 
612     @Override
613     public final ChannelHandlerContext firstContext() {
614         AbstractChannelHandlerContext first = head.next;
615         if (first == tail) {
616             return null;
617         }
618         return head.next;
619     }
620 
621     @Override
622     public final ChannelHandler last() {
623         AbstractChannelHandlerContext last = tail.prev;
624         if (last == head) {
625             return null;
626         }
627         return last.handler();
628     }
629 
630     @Override
631     public final ChannelHandlerContext lastContext() {
632         AbstractChannelHandlerContext last = tail.prev;
633         if (last == head) {
634             return null;
635         }
636         return last;
637     }
638 
639     @Override
640     public final ChannelHandler get(String name) {
641         ChannelHandlerContext ctx = context(name);
642         if (ctx == null) {
643             return null;
644         } else {
645             return ctx.handler();
646         }
647     }
648 
649     @SuppressWarnings("unchecked")
650     @Override
651     public final <T extends ChannelHandler> T get(Class<T> handlerType) {
652         ChannelHandlerContext ctx = context(handlerType);
653         if (ctx == null) {
654             return null;
655         } else {
656             return (T) ctx.handler();
657         }
658     }
659 
660     @Override
661     public final ChannelHandlerContext context(String name) {
662         return context0(ObjectUtil.checkNotNull(name, "name"));
663     }
664 
665     @Override
666     public final ChannelHandlerContext context(ChannelHandler handler) {
667         ObjectUtil.checkNotNull(handler, "handler");
668 
669         AbstractChannelHandlerContext ctx = head.next;
670         for (;;) {
671 
672             if (ctx == null) {
673                 return null;
674             }
675 
676             if (ctx.handler() == handler) {
677                 return ctx;
678             }
679 
680             ctx = ctx.next;
681         }
682     }
683 
684     @Override
685     public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
686         ObjectUtil.checkNotNull(handlerType, "handlerType");
687 
688         AbstractChannelHandlerContext ctx = head.next;
689         for (;;) {
690             if (ctx == null) {
691                 return null;
692             }
693             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
694                 return ctx;
695             }
696             ctx = ctx.next;
697         }
698     }
699 
700     @Override
701     public final List<String> names() {
702         List<String> list = new ArrayList<String>();
703         AbstractChannelHandlerContext ctx = head.next;
704         for (;;) {
705             if (ctx == null) {
706                 return list;
707             }
708             list.add(ctx.name());
709             ctx = ctx.next;
710         }
711     }
712 
713     @Override
714     public final Map<String, ChannelHandler> toMap() {
715         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
716         AbstractChannelHandlerContext ctx = head.next;
717         for (;;) {
718             if (ctx == tail) {
719                 return map;
720             }
721             map.put(ctx.name(), ctx.handler());
722             ctx = ctx.next;
723         }
724     }
725 
726     @Override
727     public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
728         return toMap().entrySet().iterator();
729     }
730 
731     /**
732      * Returns the {@link String} representation of this pipeline.
733      */
734     @Override
735     public final String toString() {
736         StringBuilder buf = new StringBuilder()
737             .append(StringUtil.simpleClassName(this))
738             .append('{');
739         AbstractChannelHandlerContext ctx = head.next;
740         for (;;) {
741             if (ctx == tail) {
742                 break;
743             }
744 
745             buf.append('(')
746                .append(ctx.name())
747                .append(" = ")
748                .append(ctx.handler().getClass().getName())
749                .append(')');
750 
751             ctx = ctx.next;
752             if (ctx == tail) {
753                 break;
754             }
755 
756             buf.append(", ");
757         }
758         buf.append('}');
759         return buf.toString();
760     }
761 
762     @Override
763     public final ChannelPipeline fireChannelRegistered() {
764         if (head.executor().inEventLoop()) {
765             if (head.invokeHandler()) {
766                 head.channelRegistered(head);
767             } else {
768                 head.fireChannelRegistered();
769             }
770         } else {
771             head.executor().execute(this::fireChannelRegistered);
772         }
773         return this;
774     }
775 
776     @Override
777     public final ChannelPipeline fireChannelUnregistered() {
778         if (head.executor().inEventLoop()) {
779             if (head.invokeHandler()) {
780                 head.channelUnregistered(head);
781             } else {
782                 head.fireChannelUnregistered();
783             }
784         } else {
785             head.executor().execute(this::fireChannelUnregistered);
786         }
787         return this;
788     }
789 
790     /**
791      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
792      * handlerRemoved().
793      *
794      * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)})
795      * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
796      * the handlers are removed after all events are handled.
797      *
798      * See: https://github.com/netty/netty/issues/3156
799      */
800     private synchronized void destroy() {
801         destroyUp(head.next, false);
802     }
803 
804     private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
805         final Thread currentThread = Thread.currentThread();
806         final AbstractChannelHandlerContext tail = this.tail;
807         for (;;) {
808             if (ctx == tail) {
809                 destroyDown(currentThread, tail.prev, inEventLoop);
810                 break;
811             }
812 
813             final EventExecutor executor = ctx.executor();
814             if (!inEventLoop && !executor.inEventLoop(currentThread)) {
815                 final AbstractChannelHandlerContext finalCtx = ctx;
816                 executor.execute(new Runnable() {
817                     @Override
818                     public void run() {
819                         destroyUp(finalCtx, true);
820                     }
821                 });
822                 break;
823             }
824 
825             ctx = ctx.next;
826             inEventLoop = false;
827         }
828     }
829 
830     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
831         // We have reached at tail; now traverse backwards.
832         final AbstractChannelHandlerContext head = this.head;
833         for (;;) {
834             if (ctx == head) {
835                 break;
836             }
837 
838             final EventExecutor executor = ctx.executor();
839             if (inEventLoop || executor.inEventLoop(currentThread)) {
840                 atomicRemoveFromHandlerList(ctx);
841                 callHandlerRemoved0(ctx);
842             } else {
843                 final AbstractChannelHandlerContext finalCtx = ctx;
844                 executor.execute(new Runnable() {
845                     @Override
846                     public void run() {
847                         destroyDown(Thread.currentThread(), finalCtx, true);
848                     }
849                 });
850                 break;
851             }
852 
853             ctx = ctx.prev;
854             inEventLoop = false;
855         }
856     }
857 
858     @Override
859     public final ChannelPipeline fireChannelActive() {
860         if (head.executor().inEventLoop()) {
861             if (head.invokeHandler()) {
862                 head.channelActive(head);
863             } else {
864                 head.fireChannelActive();
865             }
866         } else {
867             head.executor().execute(this::fireChannelActive);
868         }
869         return this;
870     }
871 
872     @Override
873     public final ChannelPipeline fireChannelInactive() {
874         if (head.executor().inEventLoop()) {
875             if (head.invokeHandler()) {
876                 head.channelInactive(head);
877             } else {
878                 head.fireChannelInactive();
879             }
880         } else {
881             head.executor().execute(this::fireChannelInactive);
882         }
883         return this;
884     }
885 
886     @Override
887     public final ChannelPipeline fireExceptionCaught(Throwable cause) {
888         if (head.executor().inEventLoop()) {
889             if (head.invokeHandler()) {
890                 head.exceptionCaught(head, cause);
891             } else {
892                 head.fireExceptionCaught(cause);
893             }
894         } else {
895             head.executor().execute(() -> fireExceptionCaught(cause));
896         }
897         return this;
898     }
899 
900     @Override
901     public final ChannelPipeline fireUserEventTriggered(Object event) {
902         if (head.executor().inEventLoop()) {
903             if (head.invokeHandler()) {
904                 head.userEventTriggered(head, event);
905             } else {
906                 head.fireUserEventTriggered(event);
907             }
908         } else {
909             head.executor().execute(() -> fireUserEventTriggered(event));
910         }
911         return this;
912     }
913 
914     @Override
915     public final ChannelPipeline fireChannelRead(Object msg) {
916         if (head.executor().inEventLoop()) {
917             if (head.invokeHandler()) {
918                 head.channelRead(head, msg);
919             } else {
920                 head.fireChannelRead(msg);
921             }
922         } else {
923             head.executor().execute(() -> fireChannelRead(msg));
924         }
925         return this;
926     }
927 
928     @Override
929     public final ChannelPipeline fireChannelReadComplete() {
930         if (head.executor().inEventLoop()) {
931             if (head.invokeHandler()) {
932                 head.channelReadComplete(head);
933             } else {
934                 head.fireChannelReadComplete();
935             }
936         } else {
937             head.executor().execute(this::fireChannelReadComplete);
938         }
939         return this;
940     }
941 
942     @Override
943     public final ChannelPipeline fireChannelWritabilityChanged() {
944         if (head.executor().inEventLoop()) {
945             if (head.invokeHandler()) {
946                 head.channelWritabilityChanged(head);
947             } else {
948                 head.fireChannelWritabilityChanged();
949             }
950         } else {
951             head.executor().execute(this::fireChannelWritabilityChanged);
952         }
953         return this;
954     }
955 
956     @Override
957     public final ChannelFuture bind(SocketAddress localAddress) {
958         return tail.bind(localAddress);
959     }
960 
961     @Override
962     public final ChannelFuture connect(SocketAddress remoteAddress) {
963         return tail.connect(remoteAddress);
964     }
965 
966     @Override
967     public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
968         return tail.connect(remoteAddress, localAddress);
969     }
970 
971     @Override
972     public final ChannelFuture disconnect() {
973         return tail.disconnect();
974     }
975 
976     @Override
977     public final ChannelFuture close() {
978         return tail.close();
979     }
980 
981     @Override
982     public final ChannelFuture deregister() {
983         return tail.deregister();
984     }
985 
986     @Override
987     public final ChannelPipeline flush() {
988         tail.flush();
989         return this;
990     }
991 
992     @Override
993     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
994         return tail.bind(localAddress, promise);
995     }
996 
997     @Override
998     public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
999         return tail.connect(remoteAddress, promise);
1000     }
1001 
1002     @Override
1003     public final ChannelFuture connect(
1004             SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1005         return tail.connect(remoteAddress, localAddress, promise);
1006     }
1007 
1008     @Override
1009     public final ChannelFuture disconnect(ChannelPromise promise) {
1010         return tail.disconnect(promise);
1011     }
1012 
1013     @Override
1014     public final ChannelFuture close(ChannelPromise promise) {
1015         return tail.close(promise);
1016     }
1017 
1018     @Override
1019     public final ChannelFuture deregister(final ChannelPromise promise) {
1020         return tail.deregister(promise);
1021     }
1022 
1023     @Override
1024     public final ChannelPipeline read() {
1025         tail.read();
1026         return this;
1027     }
1028 
1029     @Override
1030     public final ChannelFuture write(Object msg) {
1031         return tail.write(msg);
1032     }
1033 
1034     @Override
1035     public final ChannelFuture write(Object msg, ChannelPromise promise) {
1036         return tail.write(msg, promise);
1037     }
1038 
1039     @Override
1040     public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1041         return tail.writeAndFlush(msg, promise);
1042     }
1043 
1044     @Override
1045     public final ChannelFuture writeAndFlush(Object msg) {
1046         return tail.writeAndFlush(msg);
1047     }
1048 
1049     @Override
1050     public final ChannelPromise newPromise() {
1051         return new DefaultChannelPromise(channel);
1052     }
1053 
1054     @Override
1055     public final ChannelProgressivePromise newProgressivePromise() {
1056         return new DefaultChannelProgressivePromise(channel);
1057     }
1058 
1059     @Override
1060     public final ChannelFuture newSucceededFuture() {
1061         return succeededFuture;
1062     }
1063 
1064     @Override
1065     public final ChannelFuture newFailedFuture(Throwable cause) {
1066         return new FailedChannelFuture(channel, null, cause);
1067     }
1068 
1069     @Override
1070     public final ChannelPromise voidPromise() {
1071         return voidPromise;
1072     }
1073 
1074     private void checkDuplicateName(String name) {
1075         if (context0(name) != null) {
1076             throw new IllegalArgumentException("Duplicate handler name: " + name);
1077         }
1078     }
1079 
1080     private AbstractChannelHandlerContext context0(String name) {
1081         AbstractChannelHandlerContext context = head.next;
1082         while (context != tail) {
1083             if (context.name().equals(name)) {
1084                 return context;
1085             }
1086             context = context.next;
1087         }
1088         return null;
1089     }
1090 
1091     private AbstractChannelHandlerContext getContextOrDie(String name) {
1092         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1093         if (ctx == null) {
1094             throw new NoSuchElementException(name);
1095         } else {
1096             return ctx;
1097         }
1098     }
1099 
1100     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1101         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1102         if (ctx == null) {
1103             throw new NoSuchElementException(handler.getClass().getName());
1104         } else {
1105             return ctx;
1106         }
1107     }
1108 
1109     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1110         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1111         if (ctx == null) {
1112             throw new NoSuchElementException(handlerType.getName());
1113         } else {
1114             return ctx;
1115         }
1116     }
1117 
1118     private void callHandlerAddedForAllHandlers() {
1119         final PendingHandlerCallback pendingHandlerCallbackHead;
1120         synchronized (this) {
1121             assert !registered;
1122 
1123             // This Channel itself was registered.
1124             registered = true;
1125 
1126             pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1127             // Null out so it can be GC'ed.
1128             this.pendingHandlerCallbackHead = null;
1129         }
1130 
1131         // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
1132         // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
1133         // the EventLoop.
1134         PendingHandlerCallback task = pendingHandlerCallbackHead;
1135         while (task != null) {
1136             task.execute();
1137             task = task.next;
1138         }
1139     }
1140 
1141     private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1142         assert !registered;
1143 
1144         PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1145         PendingHandlerCallback pending = pendingHandlerCallbackHead;
1146         if (pending == null) {
1147             pendingHandlerCallbackHead = task;
1148         } else {
1149             // Find the tail of the linked-list.
1150             while (pending.next != null) {
1151                 pending = pending.next;
1152             }
1153             pending.next = task;
1154         }
1155     }
1156 
1157     private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
1158         newCtx.setAddPending();
1159         executor.execute(new Runnable() {
1160             @Override
1161             public void run() {
1162                 callHandlerAdded0(newCtx);
1163             }
1164         });
1165     }
1166 
1167     /**
1168      * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
1169      * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
1170      */
1171     protected void onUnhandledInboundException(Throwable cause) {
1172         try {
1173             logger.warn(
1174                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1175                             "It usually means the last handler in the pipeline did not handle the exception.",
1176                     cause);
1177         } finally {
1178             ReferenceCountUtil.release(cause);
1179         }
1180     }
1181 
1182     /**
1183      * Called once the {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}event hit
1184      * the end of the {@link ChannelPipeline}.
1185      */
1186     protected void onUnhandledInboundChannelActive() {
1187     }
1188 
1189     /**
1190      * Called once the {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} event hit
1191      * the end of the {@link ChannelPipeline}.
1192      */
1193     protected void onUnhandledInboundChannelInactive() {
1194     }
1195 
1196     /**
1197      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1198      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1199      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1200      */
1201     protected void onUnhandledInboundMessage(Object msg) {
1202         try {
1203             logger.debug(
1204                     "Discarded inbound message {} that reached at the tail of the pipeline. " +
1205                             "Please check your pipeline configuration.", msg);
1206         } finally {
1207             ReferenceCountUtil.release(msg);
1208         }
1209     }
1210 
1211     /**
1212      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1213      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1214      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1215      */
1216     protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1217         onUnhandledInboundMessage(msg);
1218         if (logger.isDebugEnabled()) {
1219             logger.debug("Discarded message pipeline : {}. Channel : {}.",
1220                          ctx.pipeline().names(), ctx.channel());
1221         }
1222     }
1223 
1224     /**
1225      * Called once the {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)} event hit
1226      * the end of the {@link ChannelPipeline}.
1227      */
1228     protected void onUnhandledInboundChannelReadComplete() {
1229     }
1230 
1231     /**
1232      * Called once an user event hit the end of the {@link ChannelPipeline} without been handled by the user
1233      * in {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is responsible
1234      * to call {@link ReferenceCountUtil#release(Object)} on the given event at some point.
1235      */
1236     protected void onUnhandledInboundUserEventTriggered(Object evt) {
1237         // This may not be a configuration error and so don't log anything.
1238         // The event may be superfluous for the current pipeline configuration.
1239         ReferenceCountUtil.release(evt);
1240     }
1241 
1242     /**
1243      * Called once the {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)} event hit
1244      * the end of the {@link ChannelPipeline}.
1245      */
1246     protected void onUnhandledChannelWritabilityChanged() {
1247     }
1248 
1249     protected void incrementPendingOutboundBytes(long size) {
1250         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1251         if (buffer != null) {
1252             buffer.incrementPendingOutboundBytes(size);
1253         }
1254     }
1255 
1256     protected void decrementPendingOutboundBytes(long size) {
1257         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1258         if (buffer != null) {
1259             buffer.decrementPendingOutboundBytes(size);
1260         }
1261     }
1262 
1263     // A special catch-all handler that handles both bytes and messages.
1264     final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1265 
1266         TailContext(DefaultChannelPipeline pipeline) {
1267             super(pipeline, null, TAIL_NAME, TailContext.class);
1268             setAddComplete();
1269         }
1270 
1271         @Override
1272         public ChannelHandler handler() {
1273             return this;
1274         }
1275 
1276         @Override
1277         public void channelRegistered(ChannelHandlerContext ctx) { }
1278 
1279         @Override
1280         public void channelUnregistered(ChannelHandlerContext ctx) { }
1281 
1282         @Override
1283         public void channelActive(ChannelHandlerContext ctx) {
1284             onUnhandledInboundChannelActive();
1285         }
1286 
1287         @Override
1288         public void channelInactive(ChannelHandlerContext ctx) {
1289             onUnhandledInboundChannelInactive();
1290         }
1291 
1292         @Override
1293         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1294             onUnhandledChannelWritabilityChanged();
1295         }
1296 
1297         @Override
1298         public void handlerAdded(ChannelHandlerContext ctx) { }
1299 
1300         @Override
1301         public void handlerRemoved(ChannelHandlerContext ctx) { }
1302 
1303         @Override
1304         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1305             onUnhandledInboundUserEventTriggered(evt);
1306         }
1307 
1308         @Override
1309         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1310             onUnhandledInboundException(cause);
1311         }
1312 
1313         @Override
1314         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1315             onUnhandledInboundMessage(ctx, msg);
1316         }
1317 
1318         @Override
1319         public void channelReadComplete(ChannelHandlerContext ctx) {
1320             onUnhandledInboundChannelReadComplete();
1321         }
1322     }
1323 
1324     final class HeadContext extends AbstractChannelHandlerContext
1325             implements ChannelOutboundHandler, ChannelInboundHandler {
1326 
1327         private final Unsafe unsafe;
1328 
1329         HeadContext(DefaultChannelPipeline pipeline) {
1330             super(pipeline, null, HEAD_NAME, HeadContext.class);
1331             unsafe = pipeline.channel().unsafe();
1332             setAddComplete();
1333         }
1334 
1335         @Override
1336         public ChannelHandler handler() {
1337             return this;
1338         }
1339 
1340         @Override
1341         public void handlerAdded(ChannelHandlerContext ctx) {
1342             // NOOP
1343         }
1344 
1345         @Override
1346         public void handlerRemoved(ChannelHandlerContext ctx) {
1347             // NOOP
1348         }
1349 
1350         @Override
1351         public void bind(
1352                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
1353             unsafe.bind(localAddress, promise);
1354         }
1355 
1356         @Override
1357         public void connect(
1358                 ChannelHandlerContext ctx,
1359                 SocketAddress remoteAddress, SocketAddress localAddress,
1360                 ChannelPromise promise) {
1361             unsafe.connect(remoteAddress, localAddress, promise);
1362         }
1363 
1364         @Override
1365         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
1366             unsafe.disconnect(promise);
1367         }
1368 
1369         @Override
1370         public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
1371             unsafe.close(promise);
1372         }
1373 
1374         @Override
1375         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
1376             unsafe.deregister(promise);
1377         }
1378 
1379         @Override
1380         public void read(ChannelHandlerContext ctx) {
1381             unsafe.beginRead();
1382         }
1383 
1384         @Override
1385         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1386             unsafe.write(msg, promise);
1387         }
1388 
1389         @Override
1390         public void flush(ChannelHandlerContext ctx) {
1391             unsafe.flush();
1392         }
1393 
1394         @Override
1395         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1396             ctx.fireExceptionCaught(cause);
1397         }
1398 
1399         @Override
1400         public void channelRegistered(ChannelHandlerContext ctx) {
1401             invokeHandlerAddedIfNeeded();
1402             ctx.fireChannelRegistered();
1403         }
1404 
1405         @Override
1406         public void channelUnregistered(ChannelHandlerContext ctx) {
1407             ctx.fireChannelUnregistered();
1408 
1409             // Remove all handlers sequentially if channel is closed and unregistered.
1410             if (!channel.isOpen()) {
1411                 destroy();
1412             }
1413         }
1414 
1415         @Override
1416         public void channelActive(ChannelHandlerContext ctx) {
1417             ctx.fireChannelActive();
1418 
1419             readIfIsAutoRead();
1420         }
1421 
1422         @Override
1423         public void channelInactive(ChannelHandlerContext ctx) {
1424             ctx.fireChannelInactive();
1425         }
1426 
1427         @Override
1428         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1429             ctx.fireChannelRead(msg);
1430         }
1431 
1432         @Override
1433         public void channelReadComplete(ChannelHandlerContext ctx) {
1434             ctx.fireChannelReadComplete();
1435 
1436             readIfIsAutoRead();
1437         }
1438 
1439         private void readIfIsAutoRead() {
1440             if (channel.config().isAutoRead()) {
1441                 channel.read();
1442             }
1443         }
1444 
1445         @Override
1446         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1447             ctx.fireUserEventTriggered(evt);
1448         }
1449 
1450         @Override
1451         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1452             ctx.fireChannelWritabilityChanged();
1453         }
1454     }
1455 
1456     private abstract static class PendingHandlerCallback implements Runnable {
1457         final AbstractChannelHandlerContext ctx;
1458         PendingHandlerCallback next;
1459 
1460         PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1461             this.ctx = ctx;
1462         }
1463 
1464         abstract void execute();
1465     }
1466 
1467     private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1468 
1469         PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1470             super(ctx);
1471         }
1472 
1473         @Override
1474         public void run() {
1475             callHandlerAdded0(ctx);
1476         }
1477 
1478         @Override
1479         void execute() {
1480             EventExecutor executor = ctx.executor();
1481             if (executor.inEventLoop()) {
1482                 callHandlerAdded0(ctx);
1483             } else {
1484                 try {
1485                     executor.execute(this);
1486                 } catch (RejectedExecutionException e) {
1487                     if (logger.isWarnEnabled()) {
1488                         logger.warn(
1489                                 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1490                                 executor, ctx.name(), e);
1491                     }
1492                     atomicRemoveFromHandlerList(ctx);
1493                     ctx.setRemoved();
1494                 }
1495             }
1496         }
1497     }
1498 
1499     private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1500 
1501         PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1502             super(ctx);
1503         }
1504 
1505         @Override
1506         public void run() {
1507             callHandlerRemoved0(ctx);
1508         }
1509 
1510         @Override
1511         void execute() {
1512             EventExecutor executor = ctx.executor();
1513             if (executor.inEventLoop()) {
1514                 callHandlerRemoved0(ctx);
1515             } else {
1516                 try {
1517                     executor.execute(this);
1518                 } catch (RejectedExecutionException e) {
1519                     if (logger.isWarnEnabled()) {
1520                         logger.warn(
1521                                 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1522                                         " removing handler {}.", executor, ctx.name(), e);
1523                     }
1524                     // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
1525                     ctx.setRemoved();
1526                 }
1527             }
1528         }
1529     }
1530 }