View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.ResourceLeakDetector;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.EventExecutorGroup;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.StringUtil;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.net.SocketAddress;
30  import java.util.ArrayList;
31  import java.util.IdentityHashMap;
32  import java.util.Iterator;
33  import java.util.LinkedHashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NoSuchElementException;
37  import java.util.WeakHashMap;
38  import java.util.concurrent.RejectedExecutionException;
39  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
40  
41  /**
42   * The default {@link ChannelPipeline} implementation.  It is usually created
43   * by a {@link Channel} implementation when the {@link Channel} is created.
44   */
45  public class DefaultChannelPipeline implements ChannelPipeline {
46  
47      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
48  
49      private static final String HEAD_NAME = generateName0(HeadContext.class);
50      private static final String TAIL_NAME = generateName0(TailContext.class);
51  
52      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
53              new FastThreadLocal<Map<Class<?>, String>>() {
54          @Override
55          protected Map<Class<?>, String> initialValue() {
56              return new WeakHashMap<Class<?>, String>();
57          }
58      };
59  
60      private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
61              AtomicReferenceFieldUpdater.newUpdater(
62                      DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
63      final HeadContext head;
64      final TailContext tail;
65  
66      private final Channel channel;
67      private final ChannelFuture succeededFuture;
68      private final VoidChannelPromise voidPromise;
69      private final boolean touch = ResourceLeakDetector.isEnabled();
70  
71      private Map<EventExecutorGroup, EventExecutor> childExecutors;
72      private volatile MessageSizeEstimator.Handle estimatorHandle;
73      private boolean firstRegistration = true;
74  
75      /**
76       * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
77       * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
78       *
79       * We only keep the head because it is expected that the list is used infrequently and its size is small.
80       * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
81       * complexity.
82       */
83      private PendingHandlerCallback pendingHandlerCallbackHead;
84  
85      /**
86       * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
87       * change.
88       */
89      private boolean registered;
90  
91      protected DefaultChannelPipeline(Channel channel) {
92          this.channel = ObjectUtil.checkNotNull(channel, "channel");
93          succeededFuture = new SucceededChannelFuture(channel, null);
94          voidPromise =  new VoidChannelPromise(channel, true);
95  
96          tail = new TailContext(this);
97          head = new HeadContext(this);
98  
99          head.next = tail;
100         tail.prev = head;
101     }
102 
103     final MessageSizeEstimator.Handle estimatorHandle() {
104         MessageSizeEstimator.Handle handle = estimatorHandle;
105         if (handle == null) {
106             handle = channel.config().getMessageSizeEstimator().newHandle();
107             if (!ESTIMATOR.compareAndSet(this, null, handle)) {
108                 handle = estimatorHandle;
109             }
110         }
111         return handle;
112     }
113 
114     final Object touch(Object msg, AbstractChannelHandlerContext next) {
115         return touch ? ReferenceCountUtil.touch(msg, next) : msg;
116     }
117 
118     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
119         return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
120     }
121 
122     private EventExecutor childExecutor(EventExecutorGroup group) {
123         if (group == null) {
124             return null;
125         }
126         Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
127         if (pinEventExecutor != null && !pinEventExecutor) {
128             return group.next();
129         }
130         Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
131         if (childExecutors == null) {
132             // Use size of 4 as most people only use one extra EventExecutor.
133             childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
134         }
135         // Pin one of the child executors once and remember it so that the same child executor
136         // is used to fire events for the same channel.
137         EventExecutor childExecutor = childExecutors.get(group);
138         if (childExecutor == null) {
139             childExecutor = group.next();
140             childExecutors.put(group, childExecutor);
141         }
142         return childExecutor;
143     }
144     @Override
145     public final Channel channel() {
146         return channel;
147     }
148 
149     @Override
150     public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
151         return addFirst(null, name, handler);
152     }
153 
154     @Override
155     public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
156         final AbstractChannelHandlerContext newCtx;
157         synchronized (this) {
158             checkMultiplicity(handler);
159             name = filterName(name, handler);
160 
161             newCtx = newContext(group, name, handler);
162 
163             addFirst0(newCtx);
164 
165             // If the registered is false it means that the channel was not registered on an eventLoop yet.
166             // In this case we add the context to the pipeline and add a task that will call
167             // ChannelHandler.handlerAdded(...) once the channel is registered.
168             if (!registered) {
169                 newCtx.setAddPending();
170                 callHandlerCallbackLater(newCtx, true);
171                 return this;
172             }
173 
174             EventExecutor executor = newCtx.executor();
175             if (!executor.inEventLoop()) {
176                 callHandlerAddedInEventLoop(newCtx, executor);
177                 return this;
178             }
179         }
180         callHandlerAdded0(newCtx);
181         return this;
182     }
183 
184     private void addFirst0(AbstractChannelHandlerContext newCtx) {
185         AbstractChannelHandlerContext nextCtx = head.next;
186         newCtx.prev = head;
187         newCtx.next = nextCtx;
188         head.next = newCtx;
189         nextCtx.prev = newCtx;
190     }
191 
192     @Override
193     public final ChannelPipeline addLast(String name, ChannelHandler handler) {
194         return addLast(null, name, handler);
195     }
196 
197     @Override
198     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
199         final AbstractChannelHandlerContext newCtx;
200         synchronized (this) {
201             checkMultiplicity(handler);
202 
203             newCtx = newContext(group, filterName(name, handler), handler);
204 
205             addLast0(newCtx);
206 
207             // If the registered is false it means that the channel was not registered on an eventLoop yet.
208             // In this case we add the context to the pipeline and add a task that will call
209             // ChannelHandler.handlerAdded(...) once the channel is registered.
210             if (!registered) {
211                 newCtx.setAddPending();
212                 callHandlerCallbackLater(newCtx, true);
213                 return this;
214             }
215 
216             EventExecutor executor = newCtx.executor();
217             if (!executor.inEventLoop()) {
218                 callHandlerAddedInEventLoop(newCtx, executor);
219                 return this;
220             }
221         }
222         callHandlerAdded0(newCtx);
223         return this;
224     }
225 
226     private void addLast0(AbstractChannelHandlerContext newCtx) {
227         AbstractChannelHandlerContext prev = tail.prev;
228         newCtx.prev = prev;
229         newCtx.next = tail;
230         prev.next = newCtx;
231         tail.prev = newCtx;
232     }
233 
234     @Override
235     public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
236         return addBefore(null, baseName, name, handler);
237     }
238 
239     @Override
240     public final ChannelPipeline addBefore(
241             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
242         final AbstractChannelHandlerContext newCtx;
243         final AbstractChannelHandlerContext ctx;
244         synchronized (this) {
245             checkMultiplicity(handler);
246             name = filterName(name, handler);
247             ctx = getContextOrDie(baseName);
248 
249             newCtx = newContext(group, name, handler);
250 
251             addBefore0(ctx, newCtx);
252 
253             // If the registered is false it means that the channel was not registered on an eventLoop yet.
254             // In this case we add the context to the pipeline and add a task that will call
255             // ChannelHandler.handlerAdded(...) once the channel is registered.
256             if (!registered) {
257                 newCtx.setAddPending();
258                 callHandlerCallbackLater(newCtx, true);
259                 return this;
260             }
261 
262             EventExecutor executor = newCtx.executor();
263             if (!executor.inEventLoop()) {
264                 callHandlerAddedInEventLoop(newCtx, executor);
265                 return this;
266             }
267         }
268         callHandlerAdded0(newCtx);
269         return this;
270     }
271 
272     private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
273         newCtx.prev = ctx.prev;
274         newCtx.next = ctx;
275         ctx.prev.next = newCtx;
276         ctx.prev = newCtx;
277     }
278 
279     private String filterName(String name, ChannelHandler handler) {
280         if (name == null) {
281             return generateName(handler);
282         }
283         checkDuplicateName(name);
284         return name;
285     }
286 
287     @Override
288     public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
289         return addAfter(null, baseName, name, handler);
290     }
291 
292     @Override
293     public final ChannelPipeline addAfter(
294             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
295         final AbstractChannelHandlerContext newCtx;
296         final AbstractChannelHandlerContext ctx;
297 
298         synchronized (this) {
299             checkMultiplicity(handler);
300             name = filterName(name, handler);
301             ctx = getContextOrDie(baseName);
302 
303             newCtx = newContext(group, name, handler);
304 
305             addAfter0(ctx, newCtx);
306 
307             // If the registered is false it means that the channel was not registered on an eventLoop yet.
308             // In this case we remove the context from the pipeline and add a task that will call
309             // ChannelHandler.handlerRemoved(...) once the channel is registered.
310             if (!registered) {
311                 newCtx.setAddPending();
312                 callHandlerCallbackLater(newCtx, true);
313                 return this;
314             }
315             EventExecutor executor = newCtx.executor();
316             if (!executor.inEventLoop()) {
317                 callHandlerAddedInEventLoop(newCtx, executor);
318                 return this;
319             }
320         }
321         callHandlerAdded0(newCtx);
322         return this;
323     }
324 
325     private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
326         newCtx.prev = ctx;
327         newCtx.next = ctx.next;
328         ctx.next.prev = newCtx;
329         ctx.next = newCtx;
330     }
331 
332     public final ChannelPipeline addFirst(ChannelHandler handler) {
333         return addFirst(null, handler);
334     }
335 
336     @Override
337     public final ChannelPipeline addFirst(ChannelHandler... handlers) {
338         return addFirst(null, handlers);
339     }
340 
341     @Override
342     public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
343         ObjectUtil.checkNotNull(handlers, "handlers");
344         if (handlers.length == 0 || handlers[0] == null) {
345             return this;
346         }
347 
348         int size;
349         for (size = 1; size < handlers.length; size ++) {
350             if (handlers[size] == null) {
351                 break;
352             }
353         }
354 
355         for (int i = size - 1; i >= 0; i --) {
356             ChannelHandler h = handlers[i];
357             addFirst(executor, null, h);
358         }
359 
360         return this;
361     }
362 
363     public final ChannelPipeline addLast(ChannelHandler handler) {
364         return addLast(null, handler);
365     }
366 
367     @Override
368     public final ChannelPipeline addLast(ChannelHandler... handlers) {
369         return addLast(null, handlers);
370     }
371 
372     @Override
373     public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
374         ObjectUtil.checkNotNull(handlers, "handlers");
375 
376         for (ChannelHandler h: handlers) {
377             if (h == null) {
378                 break;
379             }
380             addLast(executor, null, h);
381         }
382 
383         return this;
384     }
385 
386     private String generateName(ChannelHandler handler) {
387         Map<Class<?>, String> cache = nameCaches.get();
388         Class<?> handlerType = handler.getClass();
389         String name = cache.get(handlerType);
390         if (name == null) {
391             name = generateName0(handlerType);
392             cache.put(handlerType, name);
393         }
394 
395         // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
396         // any name conflicts.  Note that we don't cache the names generated here.
397         if (context0(name) != null) {
398             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
399             for (int i = 1;; i ++) {
400                 String newName = baseName + i;
401                 if (context0(newName) == null) {
402                     name = newName;
403                     break;
404                 }
405             }
406         }
407         return name;
408     }
409 
410     private static String generateName0(Class<?> handlerType) {
411         return StringUtil.simpleClassName(handlerType) + "#0";
412     }
413 
414     @Override
415     public final ChannelPipeline remove(ChannelHandler handler) {
416         remove(getContextOrDie(handler));
417         return this;
418     }
419 
420     @Override
421     public final ChannelHandler remove(String name) {
422         return remove(getContextOrDie(name)).handler();
423     }
424 
425     @SuppressWarnings("unchecked")
426     @Override
427     public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
428         return (T) remove(getContextOrDie(handlerType)).handler();
429     }
430 
431     public final <T extends ChannelHandler> T removeIfExists(String name) {
432         return removeIfExists(context(name));
433     }
434 
435     public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
436         return removeIfExists(context(handlerType));
437     }
438 
439     public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
440         return removeIfExists(context(handler));
441     }
442 
443     @SuppressWarnings("unchecked")
444     private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
445         if (ctx == null) {
446             return null;
447         }
448         return (T) remove((AbstractChannelHandlerContext) ctx).handler();
449     }
450 
451     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
452         assert ctx != head && ctx != tail;
453 
454         synchronized (this) {
455             atomicRemoveFromHandlerList(ctx);
456 
457             // If the registered is false it means that the channel was not registered on an eventloop yet.
458             // In this case we remove the context from the pipeline and add a task that will call
459             // ChannelHandler.handlerRemoved(...) once the channel is registered.
460             if (!registered) {
461                 callHandlerCallbackLater(ctx, false);
462                 return ctx;
463             }
464 
465             EventExecutor executor = ctx.executor();
466             if (!executor.inEventLoop()) {
467                 executor.execute(new Runnable() {
468                     @Override
469                     public void run() {
470                         callHandlerRemoved0(ctx);
471                     }
472                 });
473                 return ctx;
474             }
475         }
476         callHandlerRemoved0(ctx);
477         return ctx;
478     }
479 
480     /**
481      * Method is synchronized to make the handler removal from the double linked list atomic.
482      */
483     private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
484         AbstractChannelHandlerContext prev = ctx.prev;
485         AbstractChannelHandlerContext next = ctx.next;
486         prev.next = next;
487         next.prev = prev;
488     }
489 
490     @Override
491     public final ChannelHandler removeFirst() {
492         if (head.next == tail) {
493             throw new NoSuchElementException();
494         }
495         return remove(head.next).handler();
496     }
497 
498     @Override
499     public final ChannelHandler removeLast() {
500         if (head.next == tail) {
501             throw new NoSuchElementException();
502         }
503         return remove(tail.prev).handler();
504     }
505 
506     @Override
507     public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
508         replace(getContextOrDie(oldHandler), newName, newHandler);
509         return this;
510     }
511 
512     @Override
513     public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
514         return replace(getContextOrDie(oldName), newName, newHandler);
515     }
516 
517     @Override
518     @SuppressWarnings("unchecked")
519     public final <T extends ChannelHandler> T replace(
520             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
521         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
522     }
523 
524     private ChannelHandler replace(
525             final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
526         assert ctx != head && ctx != tail;
527 
528         final AbstractChannelHandlerContext newCtx;
529         synchronized (this) {
530             checkMultiplicity(newHandler);
531             if (newName == null) {
532                 newName = generateName(newHandler);
533             } else {
534                 boolean sameName = ctx.name().equals(newName);
535                 if (!sameName) {
536                     checkDuplicateName(newName);
537                 }
538             }
539 
540             newCtx = newContext(ctx.executor, newName, newHandler);
541 
542             replace0(ctx, newCtx);
543 
544             // If the registered is false it means that the channel was not registered on an eventloop yet.
545             // In this case we replace the context in the pipeline
546             // and add a task that will call ChannelHandler.handlerAdded(...) and
547             // ChannelHandler.handlerRemoved(...) once the channel is registered.
548             if (!registered) {
549                 callHandlerCallbackLater(newCtx, true);
550                 callHandlerCallbackLater(ctx, false);
551                 return ctx.handler();
552             }
553             EventExecutor executor = ctx.executor();
554             if (!executor.inEventLoop()) {
555                 executor.execute(new Runnable() {
556                     @Override
557                     public void run() {
558                         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
559                         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
560                         // those event handlers must be called after handlerAdded().
561                         callHandlerAdded0(newCtx);
562                         callHandlerRemoved0(ctx);
563                     }
564                 });
565                 return ctx.handler();
566             }
567         }
568         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
569         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
570         // event handlers must be called after handlerAdded().
571         callHandlerAdded0(newCtx);
572         callHandlerRemoved0(ctx);
573         return ctx.handler();
574     }
575 
576     private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
577         AbstractChannelHandlerContext prev = oldCtx.prev;
578         AbstractChannelHandlerContext next = oldCtx.next;
579         newCtx.prev = prev;
580         newCtx.next = next;
581 
582         // Finish the replacement of oldCtx with newCtx in the linked list.
583         // Note that this doesn't mean events will be sent to the new handler immediately
584         // because we are currently at the event handler thread and no more than one handler methods can be invoked
585         // at the same time (we ensured that in replace().)
586         prev.next = newCtx;
587         next.prev = newCtx;
588 
589         // update the reference to the replacement so forward of buffered content will work correctly
590         oldCtx.prev = newCtx;
591         oldCtx.next = newCtx;
592     }
593 
594     private static void checkMultiplicity(ChannelHandler handler) {
595         if (handler instanceof ChannelHandlerAdapter) {
596             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
597             if (!h.isSharable() && h.added) {
598                 throw new ChannelPipelineException(
599                         h.getClass().getName() +
600                         " is not a @Sharable handler, so can't be added or removed multiple times.");
601             }
602             h.added = true;
603         }
604     }
605 
606     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
607         try {
608             ctx.callHandlerAdded();
609         } catch (Throwable t) {
610             boolean removed = false;
611             try {
612                 atomicRemoveFromHandlerList(ctx);
613                 ctx.callHandlerRemoved();
614                 removed = true;
615             } catch (Throwable t2) {
616                 if (logger.isWarnEnabled()) {
617                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
618                 }
619             }
620 
621             if (removed) {
622                 fireExceptionCaught(new ChannelPipelineException(
623                         ctx.handler().getClass().getName() +
624                         ".handlerAdded() has thrown an exception; removed.", t));
625             } else {
626                 fireExceptionCaught(new ChannelPipelineException(
627                         ctx.handler().getClass().getName() +
628                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
629             }
630         }
631     }
632 
633     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
634         // Notify the complete removal.
635         try {
636             ctx.callHandlerRemoved();
637         } catch (Throwable t) {
638             fireExceptionCaught(new ChannelPipelineException(
639                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
640         }
641     }
642 
643     final void invokeHandlerAddedIfNeeded() {
644         assert channel.eventLoop().inEventLoop();
645         if (firstRegistration) {
646             firstRegistration = false;
647             // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
648             // that were added before the registration was done.
649             callHandlerAddedForAllHandlers();
650         }
651     }
652 
653     @Override
654     public final ChannelHandler first() {
655         ChannelHandlerContext first = firstContext();
656         if (first == null) {
657             return null;
658         }
659         return first.handler();
660     }
661 
662     @Override
663     public final ChannelHandlerContext firstContext() {
664         AbstractChannelHandlerContext first = head.next;
665         if (first == tail) {
666             return null;
667         }
668         return head.next;
669     }
670 
671     @Override
672     public final ChannelHandler last() {
673         AbstractChannelHandlerContext last = tail.prev;
674         if (last == head) {
675             return null;
676         }
677         return last.handler();
678     }
679 
680     @Override
681     public final ChannelHandlerContext lastContext() {
682         AbstractChannelHandlerContext last = tail.prev;
683         if (last == head) {
684             return null;
685         }
686         return last;
687     }
688 
689     @Override
690     public final ChannelHandler get(String name) {
691         ChannelHandlerContext ctx = context(name);
692         if (ctx == null) {
693             return null;
694         } else {
695             return ctx.handler();
696         }
697     }
698 
699     @SuppressWarnings("unchecked")
700     @Override
701     public final <T extends ChannelHandler> T get(Class<T> handlerType) {
702         ChannelHandlerContext ctx = context(handlerType);
703         if (ctx == null) {
704             return null;
705         } else {
706             return (T) ctx.handler();
707         }
708     }
709 
710     @Override
711     public final ChannelHandlerContext context(String name) {
712         return context0(ObjectUtil.checkNotNull(name, "name"));
713     }
714 
715     @Override
716     public final ChannelHandlerContext context(ChannelHandler handler) {
717         ObjectUtil.checkNotNull(handler, "handler");
718 
719         AbstractChannelHandlerContext ctx = head.next;
720         for (;;) {
721 
722             if (ctx == null) {
723                 return null;
724             }
725 
726             if (ctx.handler() == handler) {
727                 return ctx;
728             }
729 
730             ctx = ctx.next;
731         }
732     }
733 
734     @Override
735     public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
736         ObjectUtil.checkNotNull(handlerType, "handlerType");
737 
738         AbstractChannelHandlerContext ctx = head.next;
739         for (;;) {
740             if (ctx == null) {
741                 return null;
742             }
743             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
744                 return ctx;
745             }
746             ctx = ctx.next;
747         }
748     }
749 
750     @Override
751     public final List<String> names() {
752         List<String> list = new ArrayList<String>();
753         AbstractChannelHandlerContext ctx = head.next;
754         for (;;) {
755             if (ctx == null) {
756                 return list;
757             }
758             list.add(ctx.name());
759             ctx = ctx.next;
760         }
761     }
762 
763     @Override
764     public final Map<String, ChannelHandler> toMap() {
765         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
766         AbstractChannelHandlerContext ctx = head.next;
767         for (;;) {
768             if (ctx == tail) {
769                 return map;
770             }
771             map.put(ctx.name(), ctx.handler());
772             ctx = ctx.next;
773         }
774     }
775 
776     @Override
777     public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
778         return toMap().entrySet().iterator();
779     }
780 
781     /**
782      * Returns the {@link String} representation of this pipeline.
783      */
784     @Override
785     public final String toString() {
786         StringBuilder buf = new StringBuilder()
787             .append(StringUtil.simpleClassName(this))
788             .append('{');
789         AbstractChannelHandlerContext ctx = head.next;
790         for (;;) {
791             if (ctx == tail) {
792                 break;
793             }
794 
795             buf.append('(')
796                .append(ctx.name())
797                .append(" = ")
798                .append(ctx.handler().getClass().getName())
799                .append(')');
800 
801             ctx = ctx.next;
802             if (ctx == tail) {
803                 break;
804             }
805 
806             buf.append(", ");
807         }
808         buf.append('}');
809         return buf.toString();
810     }
811 
812     @Override
813     public final ChannelPipeline fireChannelRegistered() {
814         AbstractChannelHandlerContext.invokeChannelRegistered(head);
815         return this;
816     }
817 
818     @Override
819     public final ChannelPipeline fireChannelUnregistered() {
820         AbstractChannelHandlerContext.invokeChannelUnregistered(head);
821         return this;
822     }
823 
824     /**
825      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
826      * handlerRemoved().
827      *
828      * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)})
829      * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
830      * the handlers are removed after all events are handled.
831      *
832      * See: https://github.com/netty/netty/issues/3156
833      */
834     private synchronized void destroy() {
835         destroyUp(head.next, false);
836     }
837 
838     private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
839         final Thread currentThread = Thread.currentThread();
840         final AbstractChannelHandlerContext tail = this.tail;
841         for (;;) {
842             if (ctx == tail) {
843                 destroyDown(currentThread, tail.prev, inEventLoop);
844                 break;
845             }
846 
847             final EventExecutor executor = ctx.executor();
848             if (!inEventLoop && !executor.inEventLoop(currentThread)) {
849                 final AbstractChannelHandlerContext finalCtx = ctx;
850                 executor.execute(new Runnable() {
851                     @Override
852                     public void run() {
853                         destroyUp(finalCtx, true);
854                     }
855                 });
856                 break;
857             }
858 
859             ctx = ctx.next;
860             inEventLoop = false;
861         }
862     }
863 
864     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
865         // We have reached at tail; now traverse backwards.
866         final AbstractChannelHandlerContext head = this.head;
867         for (;;) {
868             if (ctx == head) {
869                 break;
870             }
871 
872             final EventExecutor executor = ctx.executor();
873             if (inEventLoop || executor.inEventLoop(currentThread)) {
874                 atomicRemoveFromHandlerList(ctx);
875                 callHandlerRemoved0(ctx);
876             } else {
877                 final AbstractChannelHandlerContext finalCtx = ctx;
878                 executor.execute(new Runnable() {
879                     @Override
880                     public void run() {
881                         destroyDown(Thread.currentThread(), finalCtx, true);
882                     }
883                 });
884                 break;
885             }
886 
887             ctx = ctx.prev;
888             inEventLoop = false;
889         }
890     }
891 
892     @Override
893     public final ChannelPipeline fireChannelActive() {
894         AbstractChannelHandlerContext.invokeChannelActive(head);
895         return this;
896     }
897 
898     @Override
899     public final ChannelPipeline fireChannelInactive() {
900         AbstractChannelHandlerContext.invokeChannelInactive(head);
901         return this;
902     }
903 
904     @Override
905     public final ChannelPipeline fireExceptionCaught(Throwable cause) {
906         AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
907         return this;
908     }
909 
910     @Override
911     public final ChannelPipeline fireUserEventTriggered(Object event) {
912         AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
913         return this;
914     }
915 
916     @Override
917     public final ChannelPipeline fireChannelRead(Object msg) {
918         AbstractChannelHandlerContext.invokeChannelRead(head, msg);
919         return this;
920     }
921 
922     @Override
923     public final ChannelPipeline fireChannelReadComplete() {
924         AbstractChannelHandlerContext.invokeChannelReadComplete(head);
925         return this;
926     }
927 
928     @Override
929     public final ChannelPipeline fireChannelWritabilityChanged() {
930         AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
931         return this;
932     }
933 
934     @Override
935     public final ChannelFuture bind(SocketAddress localAddress) {
936         return tail.bind(localAddress);
937     }
938 
939     @Override
940     public final ChannelFuture connect(SocketAddress remoteAddress) {
941         return tail.connect(remoteAddress);
942     }
943 
944     @Override
945     public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
946         return tail.connect(remoteAddress, localAddress);
947     }
948 
949     @Override
950     public final ChannelFuture disconnect() {
951         return tail.disconnect();
952     }
953 
954     @Override
955     public final ChannelFuture close() {
956         return tail.close();
957     }
958 
959     @Override
960     public final ChannelFuture deregister() {
961         return tail.deregister();
962     }
963 
964     @Override
965     public final ChannelPipeline flush() {
966         tail.flush();
967         return this;
968     }
969 
970     @Override
971     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
972         return tail.bind(localAddress, promise);
973     }
974 
975     @Override
976     public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
977         return tail.connect(remoteAddress, promise);
978     }
979 
980     @Override
981     public final ChannelFuture connect(
982             SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
983         return tail.connect(remoteAddress, localAddress, promise);
984     }
985 
986     @Override
987     public final ChannelFuture disconnect(ChannelPromise promise) {
988         return tail.disconnect(promise);
989     }
990 
991     @Override
992     public final ChannelFuture close(ChannelPromise promise) {
993         return tail.close(promise);
994     }
995 
996     @Override
997     public final ChannelFuture deregister(final ChannelPromise promise) {
998         return tail.deregister(promise);
999     }
1000 
1001     @Override
1002     public final ChannelPipeline read() {
1003         tail.read();
1004         return this;
1005     }
1006 
1007     @Override
1008     public final ChannelFuture write(Object msg) {
1009         return tail.write(msg);
1010     }
1011 
1012     @Override
1013     public final ChannelFuture write(Object msg, ChannelPromise promise) {
1014         return tail.write(msg, promise);
1015     }
1016 
1017     @Override
1018     public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1019         return tail.writeAndFlush(msg, promise);
1020     }
1021 
1022     @Override
1023     public final ChannelFuture writeAndFlush(Object msg) {
1024         return tail.writeAndFlush(msg);
1025     }
1026 
1027     @Override
1028     public final ChannelPromise newPromise() {
1029         return new DefaultChannelPromise(channel);
1030     }
1031 
1032     @Override
1033     public final ChannelProgressivePromise newProgressivePromise() {
1034         return new DefaultChannelProgressivePromise(channel);
1035     }
1036 
1037     @Override
1038     public final ChannelFuture newSucceededFuture() {
1039         return succeededFuture;
1040     }
1041 
1042     @Override
1043     public final ChannelFuture newFailedFuture(Throwable cause) {
1044         return new FailedChannelFuture(channel, null, cause);
1045     }
1046 
1047     @Override
1048     public final ChannelPromise voidPromise() {
1049         return voidPromise;
1050     }
1051 
1052     private void checkDuplicateName(String name) {
1053         if (context0(name) != null) {
1054             throw new IllegalArgumentException("Duplicate handler name: " + name);
1055         }
1056     }
1057 
1058     private AbstractChannelHandlerContext context0(String name) {
1059         AbstractChannelHandlerContext context = head.next;
1060         while (context != tail) {
1061             if (context.name().equals(name)) {
1062                 return context;
1063             }
1064             context = context.next;
1065         }
1066         return null;
1067     }
1068 
1069     private AbstractChannelHandlerContext getContextOrDie(String name) {
1070         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1071         if (ctx == null) {
1072             throw new NoSuchElementException(name);
1073         } else {
1074             return ctx;
1075         }
1076     }
1077 
1078     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1079         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1080         if (ctx == null) {
1081             throw new NoSuchElementException(handler.getClass().getName());
1082         } else {
1083             return ctx;
1084         }
1085     }
1086 
1087     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1088         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1089         if (ctx == null) {
1090             throw new NoSuchElementException(handlerType.getName());
1091         } else {
1092             return ctx;
1093         }
1094     }
1095 
1096     private void callHandlerAddedForAllHandlers() {
1097         final PendingHandlerCallback pendingHandlerCallbackHead;
1098         synchronized (this) {
1099             assert !registered;
1100 
1101             // This Channel itself was registered.
1102             registered = true;
1103 
1104             pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1105             // Null out so it can be GC'ed.
1106             this.pendingHandlerCallbackHead = null;
1107         }
1108 
1109         // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
1110         // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
1111         // the EventLoop.
1112         PendingHandlerCallback task = pendingHandlerCallbackHead;
1113         while (task != null) {
1114             task.execute();
1115             task = task.next;
1116         }
1117     }
1118 
1119     private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1120         assert !registered;
1121 
1122         PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1123         PendingHandlerCallback pending = pendingHandlerCallbackHead;
1124         if (pending == null) {
1125             pendingHandlerCallbackHead = task;
1126         } else {
1127             // Find the tail of the linked-list.
1128             while (pending.next != null) {
1129                 pending = pending.next;
1130             }
1131             pending.next = task;
1132         }
1133     }
1134 
1135     private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
1136         newCtx.setAddPending();
1137         executor.execute(new Runnable() {
1138             @Override
1139             public void run() {
1140                 callHandlerAdded0(newCtx);
1141             }
1142         });
1143     }
1144 
1145     /**
1146      * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
1147      * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
1148      */
1149     protected void onUnhandledInboundException(Throwable cause) {
1150         try {
1151             logger.warn(
1152                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1153                             "It usually means the last handler in the pipeline did not handle the exception.",
1154                     cause);
1155         } finally {
1156             ReferenceCountUtil.release(cause);
1157         }
1158     }
1159 
1160     /**
1161      * Called once the {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}event hit
1162      * the end of the {@link ChannelPipeline}.
1163      */
1164     protected void onUnhandledInboundChannelActive() {
1165     }
1166 
1167     /**
1168      * Called once the {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} event hit
1169      * the end of the {@link ChannelPipeline}.
1170      */
1171     protected void onUnhandledInboundChannelInactive() {
1172     }
1173 
1174     /**
1175      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1176      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1177      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1178      */
1179     protected void onUnhandledInboundMessage(Object msg) {
1180         try {
1181             logger.debug(
1182                     "Discarded inbound message {} that reached at the tail of the pipeline. " +
1183                             "Please check your pipeline configuration.", msg);
1184         } finally {
1185             ReferenceCountUtil.release(msg);
1186         }
1187     }
1188 
1189     /**
1190      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1191      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1192      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1193      */
1194     protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1195         onUnhandledInboundMessage(msg);
1196         if (logger.isDebugEnabled()) {
1197             logger.debug("Discarded message pipeline : {}. Channel : {}.",
1198                          ctx.pipeline().names(), ctx.channel());
1199         }
1200     }
1201 
1202     /**
1203      * Called once the {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)} event hit
1204      * the end of the {@link ChannelPipeline}.
1205      */
1206     protected void onUnhandledInboundChannelReadComplete() {
1207     }
1208 
1209     /**
1210      * Called once an user event hit the end of the {@link ChannelPipeline} without been handled by the user
1211      * in {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is responsible
1212      * to call {@link ReferenceCountUtil#release(Object)} on the given event at some point.
1213      */
1214     protected void onUnhandledInboundUserEventTriggered(Object evt) {
1215         // This may not be a configuration error and so don't log anything.
1216         // The event may be superfluous for the current pipeline configuration.
1217         ReferenceCountUtil.release(evt);
1218     }
1219 
1220     /**
1221      * Called once the {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)} event hit
1222      * the end of the {@link ChannelPipeline}.
1223      */
1224     protected void onUnhandledChannelWritabilityChanged() {
1225     }
1226 
1227     protected void incrementPendingOutboundBytes(long size) {
1228         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1229         if (buffer != null) {
1230             buffer.incrementPendingOutboundBytes(size);
1231         }
1232     }
1233 
1234     protected void decrementPendingOutboundBytes(long size) {
1235         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1236         if (buffer != null) {
1237             buffer.decrementPendingOutboundBytes(size);
1238         }
1239     }
1240 
1241     // A special catch-all handler that handles both bytes and messages.
1242     final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1243 
1244         TailContext(DefaultChannelPipeline pipeline) {
1245             super(pipeline, null, TAIL_NAME, TailContext.class);
1246             setAddComplete();
1247         }
1248 
1249         @Override
1250         public ChannelHandler handler() {
1251             return this;
1252         }
1253 
1254         @Override
1255         public void channelRegistered(ChannelHandlerContext ctx) { }
1256 
1257         @Override
1258         public void channelUnregistered(ChannelHandlerContext ctx) { }
1259 
1260         @Override
1261         public void channelActive(ChannelHandlerContext ctx) {
1262             onUnhandledInboundChannelActive();
1263         }
1264 
1265         @Override
1266         public void channelInactive(ChannelHandlerContext ctx) {
1267             onUnhandledInboundChannelInactive();
1268         }
1269 
1270         @Override
1271         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1272             onUnhandledChannelWritabilityChanged();
1273         }
1274 
1275         @Override
1276         public void handlerAdded(ChannelHandlerContext ctx) { }
1277 
1278         @Override
1279         public void handlerRemoved(ChannelHandlerContext ctx) { }
1280 
1281         @Override
1282         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1283             onUnhandledInboundUserEventTriggered(evt);
1284         }
1285 
1286         @Override
1287         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1288             onUnhandledInboundException(cause);
1289         }
1290 
1291         @Override
1292         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1293             onUnhandledInboundMessage(ctx, msg);
1294         }
1295 
1296         @Override
1297         public void channelReadComplete(ChannelHandlerContext ctx) {
1298             onUnhandledInboundChannelReadComplete();
1299         }
1300     }
1301 
1302     final class HeadContext extends AbstractChannelHandlerContext
1303             implements ChannelOutboundHandler, ChannelInboundHandler {
1304 
1305         private final Unsafe unsafe;
1306 
1307         HeadContext(DefaultChannelPipeline pipeline) {
1308             super(pipeline, null, HEAD_NAME, HeadContext.class);
1309             unsafe = pipeline.channel().unsafe();
1310             setAddComplete();
1311         }
1312 
1313         @Override
1314         public ChannelHandler handler() {
1315             return this;
1316         }
1317 
1318         @Override
1319         public void handlerAdded(ChannelHandlerContext ctx) {
1320             // NOOP
1321         }
1322 
1323         @Override
1324         public void handlerRemoved(ChannelHandlerContext ctx) {
1325             // NOOP
1326         }
1327 
1328         @Override
1329         public void bind(
1330                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
1331             unsafe.bind(localAddress, promise);
1332         }
1333 
1334         @Override
1335         public void connect(
1336                 ChannelHandlerContext ctx,
1337                 SocketAddress remoteAddress, SocketAddress localAddress,
1338                 ChannelPromise promise) {
1339             unsafe.connect(remoteAddress, localAddress, promise);
1340         }
1341 
1342         @Override
1343         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
1344             unsafe.disconnect(promise);
1345         }
1346 
1347         @Override
1348         public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
1349             unsafe.close(promise);
1350         }
1351 
1352         @Override
1353         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
1354             unsafe.deregister(promise);
1355         }
1356 
1357         @Override
1358         public void read(ChannelHandlerContext ctx) {
1359             unsafe.beginRead();
1360         }
1361 
1362         @Override
1363         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1364             unsafe.write(msg, promise);
1365         }
1366 
1367         @Override
1368         public void flush(ChannelHandlerContext ctx) {
1369             unsafe.flush();
1370         }
1371 
1372         @Override
1373         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1374             ctx.fireExceptionCaught(cause);
1375         }
1376 
1377         @Override
1378         public void channelRegistered(ChannelHandlerContext ctx) {
1379             invokeHandlerAddedIfNeeded();
1380             ctx.fireChannelRegistered();
1381         }
1382 
1383         @Override
1384         public void channelUnregistered(ChannelHandlerContext ctx) {
1385             ctx.fireChannelUnregistered();
1386 
1387             // Remove all handlers sequentially if channel is closed and unregistered.
1388             if (!channel.isOpen()) {
1389                 destroy();
1390             }
1391         }
1392 
1393         @Override
1394         public void channelActive(ChannelHandlerContext ctx) {
1395             ctx.fireChannelActive();
1396 
1397             readIfIsAutoRead();
1398         }
1399 
1400         @Override
1401         public void channelInactive(ChannelHandlerContext ctx) {
1402             ctx.fireChannelInactive();
1403         }
1404 
1405         @Override
1406         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1407             ctx.fireChannelRead(msg);
1408         }
1409 
1410         @Override
1411         public void channelReadComplete(ChannelHandlerContext ctx) {
1412             ctx.fireChannelReadComplete();
1413 
1414             readIfIsAutoRead();
1415         }
1416 
1417         private void readIfIsAutoRead() {
1418             if (channel.config().isAutoRead()) {
1419                 channel.read();
1420             }
1421         }
1422 
1423         @Override
1424         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1425             ctx.fireUserEventTriggered(evt);
1426         }
1427 
1428         @Override
1429         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1430             ctx.fireChannelWritabilityChanged();
1431         }
1432     }
1433 
1434     private abstract static class PendingHandlerCallback implements Runnable {
1435         final AbstractChannelHandlerContext ctx;
1436         PendingHandlerCallback next;
1437 
1438         PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1439             this.ctx = ctx;
1440         }
1441 
1442         abstract void execute();
1443     }
1444 
1445     private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1446 
1447         PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1448             super(ctx);
1449         }
1450 
1451         @Override
1452         public void run() {
1453             callHandlerAdded0(ctx);
1454         }
1455 
1456         @Override
1457         void execute() {
1458             EventExecutor executor = ctx.executor();
1459             if (executor.inEventLoop()) {
1460                 callHandlerAdded0(ctx);
1461             } else {
1462                 try {
1463                     executor.execute(this);
1464                 } catch (RejectedExecutionException e) {
1465                     if (logger.isWarnEnabled()) {
1466                         logger.warn(
1467                                 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1468                                 executor, ctx.name(), e);
1469                     }
1470                     atomicRemoveFromHandlerList(ctx);
1471                     ctx.setRemoved();
1472                 }
1473             }
1474         }
1475     }
1476 
1477     private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1478 
1479         PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1480             super(ctx);
1481         }
1482 
1483         @Override
1484         public void run() {
1485             callHandlerRemoved0(ctx);
1486         }
1487 
1488         @Override
1489         void execute() {
1490             EventExecutor executor = ctx.executor();
1491             if (executor.inEventLoop()) {
1492                 callHandlerRemoved0(ctx);
1493             } else {
1494                 try {
1495                     executor.execute(this);
1496                 } catch (RejectedExecutionException e) {
1497                     if (logger.isWarnEnabled()) {
1498                         logger.warn(
1499                                 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1500                                         " removing handler {}.", executor, ctx.name(), e);
1501                     }
1502                     // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
1503                     ctx.setRemoved();
1504                 }
1505             }
1506         }
1507     }
1508 }