View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.ResourceLeakDetector;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.EventExecutorGroup;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.StringUtil;
26  import io.netty.util.internal.UnstableApi;
27  import io.netty.util.internal.logging.InternalLogger;
28  import io.netty.util.internal.logging.InternalLoggerFactory;
29  
30  import java.net.SocketAddress;
31  import java.util.ArrayList;
32  import java.util.IdentityHashMap;
33  import java.util.Iterator;
34  import java.util.LinkedHashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NoSuchElementException;
38  import java.util.WeakHashMap;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
41  
42  /**
43   * The default {@link ChannelPipeline} implementation.  It is usually created
44   * by a {@link Channel} implementation when the {@link Channel} is created.
45   */
46  public class DefaultChannelPipeline implements ChannelPipeline {
47  
48      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
49  
50      private static final String HEAD_NAME = generateName0(HeadContext.class);
51      private static final String TAIL_NAME = generateName0(TailContext.class);
52  
53      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
54              new FastThreadLocal<Map<Class<?>, String>>() {
55          @Override
56          protected Map<Class<?>, String> initialValue() throws Exception {
57              return new WeakHashMap<Class<?>, String>();
58          }
59      };
60  
61      private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
62              AtomicReferenceFieldUpdater.newUpdater(
63                      DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
64      final AbstractChannelHandlerContext head;
65      final AbstractChannelHandlerContext tail;
66  
67      private final Channel channel;
68      private final ChannelFuture succeededFuture;
69      private final VoidChannelPromise voidPromise;
70      private final boolean touch = ResourceLeakDetector.isEnabled();
71  
72      private Map<EventExecutorGroup, EventExecutor> childExecutors;
73      private volatile MessageSizeEstimator.Handle estimatorHandle;
74      private boolean firstRegistration = true;
75  
76      /**
77       * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
78       * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
79       *
80       * We only keep the head because it is expected that the list is used infrequently and its size is small.
81       * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
82       * complexity.
83       */
84      private PendingHandlerCallback pendingHandlerCallbackHead;
85  
86      /**
87       * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
88       * change.
89       */
90      private boolean registered;
91  
92      protected DefaultChannelPipeline(Channel channel) {
93          this.channel = ObjectUtil.checkNotNull(channel, "channel");
94          succeededFuture = new SucceededChannelFuture(channel, null);
95          voidPromise =  new VoidChannelPromise(channel, true);
96  
97          tail = new TailContext(this);
98          head = new HeadContext(this);
99  
100         head.next = tail;
101         tail.prev = head;
102     }
103 
104     final MessageSizeEstimator.Handle estimatorHandle() {
105         MessageSizeEstimator.Handle handle = estimatorHandle;
106         if (handle == null) {
107             handle = channel.config().getMessageSizeEstimator().newHandle();
108             if (!ESTIMATOR.compareAndSet(this, null, handle)) {
109                 handle = estimatorHandle;
110             }
111         }
112         return handle;
113     }
114 
115     final Object touch(Object msg, AbstractChannelHandlerContext next) {
116         return touch ? ReferenceCountUtil.touch(msg, next) : msg;
117     }
118 
119     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
120         return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
121     }
122 
123     private EventExecutor childExecutor(EventExecutorGroup group) {
124         if (group == null) {
125             return null;
126         }
127         Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
128         if (pinEventExecutor != null && !pinEventExecutor) {
129             return group.next();
130         }
131         Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
132         if (childExecutors == null) {
133             // Use size of 4 as most people only use one extra EventExecutor.
134             childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
135         }
136         // Pin one of the child executors once and remember it so that the same child executor
137         // is used to fire events for the same channel.
138         EventExecutor childExecutor = childExecutors.get(group);
139         if (childExecutor == null) {
140             childExecutor = group.next();
141             childExecutors.put(group, childExecutor);
142         }
143         return childExecutor;
144     }
145     @Override
146     public final Channel channel() {
147         return channel;
148     }
149 
150     @Override
151     public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
152         return addFirst(null, name, handler);
153     }
154 
155     @Override
156     public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
157         final AbstractChannelHandlerContext newCtx;
158         synchronized (this) {
159             checkMultiplicity(handler);
160             name = filterName(name, handler);
161 
162             newCtx = newContext(group, name, handler);
163 
164             addFirst0(newCtx);
165 
166             // If the registered is false it means that the channel was not registered on an eventloop yet.
167             // In this case we add the context to the pipeline and add a task that will call
168             // ChannelHandler.handlerAdded(...) once the channel is registered.
169             if (!registered) {
170                 newCtx.setAddPending();
171                 callHandlerCallbackLater(newCtx, true);
172                 return this;
173             }
174 
175             EventExecutor executor = newCtx.executor();
176             if (!executor.inEventLoop()) {
177                 newCtx.setAddPending();
178                 executor.execute(new Runnable() {
179                     @Override
180                     public void run() {
181                         callHandlerAdded0(newCtx);
182                     }
183                 });
184                 return this;
185             }
186         }
187         callHandlerAdded0(newCtx);
188         return this;
189     }
190 
191     private void addFirst0(AbstractChannelHandlerContext newCtx) {
192         AbstractChannelHandlerContext nextCtx = head.next;
193         newCtx.prev = head;
194         newCtx.next = nextCtx;
195         head.next = newCtx;
196         nextCtx.prev = newCtx;
197     }
198 
199     @Override
200     public final ChannelPipeline addLast(String name, ChannelHandler handler) {
201         return addLast(null, name, handler);
202     }
203 
204     @Override
205     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
206         final AbstractChannelHandlerContext newCtx;
207         synchronized (this) {
208             checkMultiplicity(handler);
209 
210             newCtx = newContext(group, filterName(name, handler), handler);
211 
212             addLast0(newCtx);
213 
214             // If the registered is false it means that the channel was not registered on an eventloop yet.
215             // In this case we add the context to the pipeline and add a task that will call
216             // ChannelHandler.handlerAdded(...) once the channel is registered.
217             if (!registered) {
218                 newCtx.setAddPending();
219                 callHandlerCallbackLater(newCtx, true);
220                 return this;
221             }
222 
223             EventExecutor executor = newCtx.executor();
224             if (!executor.inEventLoop()) {
225                 newCtx.setAddPending();
226                 executor.execute(new Runnable() {
227                     @Override
228                     public void run() {
229                         callHandlerAdded0(newCtx);
230                     }
231                 });
232                 return this;
233             }
234         }
235         callHandlerAdded0(newCtx);
236         return this;
237     }
238 
239     private void addLast0(AbstractChannelHandlerContext newCtx) {
240         AbstractChannelHandlerContext prev = tail.prev;
241         newCtx.prev = prev;
242         newCtx.next = tail;
243         prev.next = newCtx;
244         tail.prev = newCtx;
245     }
246 
247     @Override
248     public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
249         return addBefore(null, baseName, name, handler);
250     }
251 
252     @Override
253     public final ChannelPipeline addBefore(
254             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
255         final AbstractChannelHandlerContext newCtx;
256         final AbstractChannelHandlerContext ctx;
257         synchronized (this) {
258             checkMultiplicity(handler);
259             name = filterName(name, handler);
260             ctx = getContextOrDie(baseName);
261 
262             newCtx = newContext(group, name, handler);
263 
264             addBefore0(ctx, newCtx);
265 
266             // If the registered is false it means that the channel was not registered on an eventloop yet.
267             // In this case we add the context to the pipeline and add a task that will call
268             // ChannelHandler.handlerAdded(...) once the channel is registered.
269             if (!registered) {
270                 newCtx.setAddPending();
271                 callHandlerCallbackLater(newCtx, true);
272                 return this;
273             }
274 
275             EventExecutor executor = newCtx.executor();
276             if (!executor.inEventLoop()) {
277                 newCtx.setAddPending();
278                 executor.execute(new Runnable() {
279                     @Override
280                     public void run() {
281                         callHandlerAdded0(newCtx);
282                     }
283                 });
284                 return this;
285             }
286         }
287         callHandlerAdded0(newCtx);
288         return this;
289     }
290 
291     private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
292         newCtx.prev = ctx.prev;
293         newCtx.next = ctx;
294         ctx.prev.next = newCtx;
295         ctx.prev = newCtx;
296     }
297 
298     private String filterName(String name, ChannelHandler handler) {
299         if (name == null) {
300             return generateName(handler);
301         }
302         checkDuplicateName(name);
303         return name;
304     }
305 
306     @Override
307     public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
308         return addAfter(null, baseName, name, handler);
309     }
310 
311     @Override
312     public final ChannelPipeline addAfter(
313             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
314         final AbstractChannelHandlerContext newCtx;
315         final AbstractChannelHandlerContext ctx;
316 
317         synchronized (this) {
318             checkMultiplicity(handler);
319             name = filterName(name, handler);
320             ctx = getContextOrDie(baseName);
321 
322             newCtx = newContext(group, name, handler);
323 
324             addAfter0(ctx, newCtx);
325 
326             // If the registered is false it means that the channel was not registered on an eventloop yet.
327             // In this case we remove the context from the pipeline and add a task that will call
328             // ChannelHandler.handlerRemoved(...) once the channel is registered.
329             if (!registered) {
330                 newCtx.setAddPending();
331                 callHandlerCallbackLater(newCtx, true);
332                 return this;
333             }
334             EventExecutor executor = newCtx.executor();
335             if (!executor.inEventLoop()) {
336                 newCtx.setAddPending();
337                 executor.execute(new Runnable() {
338                     @Override
339                     public void run() {
340                         callHandlerAdded0(newCtx);
341                     }
342                 });
343                 return this;
344             }
345         }
346         callHandlerAdded0(newCtx);
347         return this;
348     }
349 
350     private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
351         newCtx.prev = ctx;
352         newCtx.next = ctx.next;
353         ctx.next.prev = newCtx;
354         ctx.next = newCtx;
355     }
356 
357     public final ChannelPipeline addFirst(ChannelHandler handler) {
358         return addFirst(null, handler);
359     }
360 
361     @Override
362     public final ChannelPipeline addFirst(ChannelHandler... handlers) {
363         return addFirst(null, handlers);
364     }
365 
366     @Override
367     public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
368         if (handlers == null) {
369             throw new NullPointerException("handlers");
370         }
371         if (handlers.length == 0 || handlers[0] == null) {
372             return this;
373         }
374 
375         int size;
376         for (size = 1; size < handlers.length; size ++) {
377             if (handlers[size] == null) {
378                 break;
379             }
380         }
381 
382         for (int i = size - 1; i >= 0; i --) {
383             ChannelHandler h = handlers[i];
384             addFirst(executor, null, h);
385         }
386 
387         return this;
388     }
389 
390     public final ChannelPipeline addLast(ChannelHandler handler) {
391         return addLast(null, handler);
392     }
393 
394     @Override
395     public final ChannelPipeline addLast(ChannelHandler... handlers) {
396         return addLast(null, handlers);
397     }
398 
399     @Override
400     public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
401         if (handlers == null) {
402             throw new NullPointerException("handlers");
403         }
404 
405         for (ChannelHandler h: handlers) {
406             if (h == null) {
407                 break;
408             }
409             addLast(executor, null, h);
410         }
411 
412         return this;
413     }
414 
415     private String generateName(ChannelHandler handler) {
416         Map<Class<?>, String> cache = nameCaches.get();
417         Class<?> handlerType = handler.getClass();
418         String name = cache.get(handlerType);
419         if (name == null) {
420             name = generateName0(handlerType);
421             cache.put(handlerType, name);
422         }
423 
424         // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
425         // any name conflicts.  Note that we don't cache the names generated here.
426         if (context0(name) != null) {
427             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
428             for (int i = 1;; i ++) {
429                 String newName = baseName + i;
430                 if (context0(newName) == null) {
431                     name = newName;
432                     break;
433                 }
434             }
435         }
436         return name;
437     }
438 
439     private static String generateName0(Class<?> handlerType) {
440         return StringUtil.simpleClassName(handlerType) + "#0";
441     }
442 
443     @Override
444     public final ChannelPipeline remove(ChannelHandler handler) {
445         remove(getContextOrDie(handler));
446         return this;
447     }
448 
449     @Override
450     public final ChannelHandler remove(String name) {
451         return remove(getContextOrDie(name)).handler();
452     }
453 
454     @SuppressWarnings("unchecked")
455     @Override
456     public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
457         return (T) remove(getContextOrDie(handlerType)).handler();
458     }
459 
460     public final <T extends ChannelHandler> T removeIfExists(String name) {
461         return removeIfExists(context(name));
462     }
463 
464     public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
465         return removeIfExists(context(handlerType));
466     }
467 
468     public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
469         return removeIfExists(context(handler));
470     }
471 
472     @SuppressWarnings("unchecked")
473     private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
474         if (ctx == null) {
475             return null;
476         }
477         return (T) remove((AbstractChannelHandlerContext) ctx).handler();
478     }
479 
480     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
481         assert ctx != head && ctx != tail;
482 
483         synchronized (this) {
484             remove0(ctx);
485 
486             // If the registered is false it means that the channel was not registered on an eventloop yet.
487             // In this case we remove the context from the pipeline and add a task that will call
488             // ChannelHandler.handlerRemoved(...) once the channel is registered.
489             if (!registered) {
490                 callHandlerCallbackLater(ctx, false);
491                 return ctx;
492             }
493 
494             EventExecutor executor = ctx.executor();
495             if (!executor.inEventLoop()) {
496                 executor.execute(new Runnable() {
497                     @Override
498                     public void run() {
499                         callHandlerRemoved0(ctx);
500                     }
501                 });
502                 return ctx;
503             }
504         }
505         callHandlerRemoved0(ctx);
506         return ctx;
507     }
508 
509     private static void remove0(AbstractChannelHandlerContext ctx) {
510         AbstractChannelHandlerContext prev = ctx.prev;
511         AbstractChannelHandlerContext next = ctx.next;
512         prev.next = next;
513         next.prev = prev;
514     }
515 
516     @Override
517     public final ChannelHandler removeFirst() {
518         if (head.next == tail) {
519             throw new NoSuchElementException();
520         }
521         return remove(head.next).handler();
522     }
523 
524     @Override
525     public final ChannelHandler removeLast() {
526         if (head.next == tail) {
527             throw new NoSuchElementException();
528         }
529         return remove(tail.prev).handler();
530     }
531 
532     @Override
533     public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
534         replace(getContextOrDie(oldHandler), newName, newHandler);
535         return this;
536     }
537 
538     @Override
539     public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
540         return replace(getContextOrDie(oldName), newName, newHandler);
541     }
542 
543     @Override
544     @SuppressWarnings("unchecked")
545     public final <T extends ChannelHandler> T replace(
546             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
547         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
548     }
549 
550     private ChannelHandler replace(
551             final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
552         assert ctx != head && ctx != tail;
553 
554         final AbstractChannelHandlerContext newCtx;
555         synchronized (this) {
556             checkMultiplicity(newHandler);
557             if (newName == null) {
558                 newName = generateName(newHandler);
559             } else {
560                 boolean sameName = ctx.name().equals(newName);
561                 if (!sameName) {
562                     checkDuplicateName(newName);
563                 }
564             }
565 
566             newCtx = newContext(ctx.executor, newName, newHandler);
567 
568             replace0(ctx, newCtx);
569 
570             // If the registered is false it means that the channel was not registered on an eventloop yet.
571             // In this case we replace the context in the pipeline
572             // and add a task that will call ChannelHandler.handlerAdded(...) and
573             // ChannelHandler.handlerRemoved(...) once the channel is registered.
574             if (!registered) {
575                 callHandlerCallbackLater(newCtx, true);
576                 callHandlerCallbackLater(ctx, false);
577                 return ctx.handler();
578             }
579             EventExecutor executor = ctx.executor();
580             if (!executor.inEventLoop()) {
581                 executor.execute(new Runnable() {
582                     @Override
583                     public void run() {
584                         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
585                         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
586                         // those event handlers must be called after handlerAdded().
587                         callHandlerAdded0(newCtx);
588                         callHandlerRemoved0(ctx);
589                     }
590                 });
591                 return ctx.handler();
592             }
593         }
594         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
595         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
596         // event handlers must be called after handlerAdded().
597         callHandlerAdded0(newCtx);
598         callHandlerRemoved0(ctx);
599         return ctx.handler();
600     }
601 
602     private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
603         AbstractChannelHandlerContext prev = oldCtx.prev;
604         AbstractChannelHandlerContext next = oldCtx.next;
605         newCtx.prev = prev;
606         newCtx.next = next;
607 
608         // Finish the replacement of oldCtx with newCtx in the linked list.
609         // Note that this doesn't mean events will be sent to the new handler immediately
610         // because we are currently at the event handler thread and no more than one handler methods can be invoked
611         // at the same time (we ensured that in replace().)
612         prev.next = newCtx;
613         next.prev = newCtx;
614 
615         // update the reference to the replacement so forward of buffered content will work correctly
616         oldCtx.prev = newCtx;
617         oldCtx.next = newCtx;
618     }
619 
620     private static void checkMultiplicity(ChannelHandler handler) {
621         if (handler instanceof ChannelHandlerAdapter) {
622             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
623             if (!h.isSharable() && h.added) {
624                 throw new ChannelPipelineException(
625                         h.getClass().getName() +
626                         " is not a @Sharable handler, so can't be added or removed multiple times.");
627             }
628             h.added = true;
629         }
630     }
631 
632     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
633         try {
634             // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
635             // any pipeline events ctx.handler() will miss them because the state will not allow it.
636             ctx.setAddComplete();
637             ctx.handler().handlerAdded(ctx);
638         } catch (Throwable t) {
639             boolean removed = false;
640             try {
641                 remove0(ctx);
642                 try {
643                     ctx.handler().handlerRemoved(ctx);
644                 } finally {
645                     ctx.setRemoved();
646                 }
647                 removed = true;
648             } catch (Throwable t2) {
649                 if (logger.isWarnEnabled()) {
650                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
651                 }
652             }
653 
654             if (removed) {
655                 fireExceptionCaught(new ChannelPipelineException(
656                         ctx.handler().getClass().getName() +
657                         ".handlerAdded() has thrown an exception; removed.", t));
658             } else {
659                 fireExceptionCaught(new ChannelPipelineException(
660                         ctx.handler().getClass().getName() +
661                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
662             }
663         }
664     }
665 
666     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
667         // Notify the complete removal.
668         try {
669             try {
670                 ctx.handler().handlerRemoved(ctx);
671             } finally {
672                 ctx.setRemoved();
673             }
674         } catch (Throwable t) {
675             fireExceptionCaught(new ChannelPipelineException(
676                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
677         }
678     }
679 
680     final void invokeHandlerAddedIfNeeded() {
681         assert channel.eventLoop().inEventLoop();
682         if (firstRegistration) {
683             firstRegistration = false;
684             // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
685             // that were added before the registration was done.
686             callHandlerAddedForAllHandlers();
687         }
688     }
689 
690     @Override
691     public final ChannelHandler first() {
692         ChannelHandlerContext first = firstContext();
693         if (first == null) {
694             return null;
695         }
696         return first.handler();
697     }
698 
699     @Override
700     public final ChannelHandlerContext firstContext() {
701         AbstractChannelHandlerContext first = head.next;
702         if (first == tail) {
703             return null;
704         }
705         return head.next;
706     }
707 
708     @Override
709     public final ChannelHandler last() {
710         AbstractChannelHandlerContext last = tail.prev;
711         if (last == head) {
712             return null;
713         }
714         return last.handler();
715     }
716 
717     @Override
718     public final ChannelHandlerContext lastContext() {
719         AbstractChannelHandlerContext last = tail.prev;
720         if (last == head) {
721             return null;
722         }
723         return last;
724     }
725 
726     @Override
727     public final ChannelHandler get(String name) {
728         ChannelHandlerContext ctx = context(name);
729         if (ctx == null) {
730             return null;
731         } else {
732             return ctx.handler();
733         }
734     }
735 
736     @SuppressWarnings("unchecked")
737     @Override
738     public final <T extends ChannelHandler> T get(Class<T> handlerType) {
739         ChannelHandlerContext ctx = context(handlerType);
740         if (ctx == null) {
741             return null;
742         } else {
743             return (T) ctx.handler();
744         }
745     }
746 
747     @Override
748     public final ChannelHandlerContext context(String name) {
749         if (name == null) {
750             throw new NullPointerException("name");
751         }
752 
753         return context0(name);
754     }
755 
756     @Override
757     public final ChannelHandlerContext context(ChannelHandler handler) {
758         if (handler == null) {
759             throw new NullPointerException("handler");
760         }
761 
762         AbstractChannelHandlerContext ctx = head.next;
763         for (;;) {
764 
765             if (ctx == null) {
766                 return null;
767             }
768 
769             if (ctx.handler() == handler) {
770                 return ctx;
771             }
772 
773             ctx = ctx.next;
774         }
775     }
776 
777     @Override
778     public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
779         if (handlerType == null) {
780             throw new NullPointerException("handlerType");
781         }
782 
783         AbstractChannelHandlerContext ctx = head.next;
784         for (;;) {
785             if (ctx == null) {
786                 return null;
787             }
788             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
789                 return ctx;
790             }
791             ctx = ctx.next;
792         }
793     }
794 
795     @Override
796     public final List<String> names() {
797         List<String> list = new ArrayList<String>();
798         AbstractChannelHandlerContext ctx = head.next;
799         for (;;) {
800             if (ctx == null) {
801                 return list;
802             }
803             list.add(ctx.name());
804             ctx = ctx.next;
805         }
806     }
807 
808     @Override
809     public final Map<String, ChannelHandler> toMap() {
810         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
811         AbstractChannelHandlerContext ctx = head.next;
812         for (;;) {
813             if (ctx == tail) {
814                 return map;
815             }
816             map.put(ctx.name(), ctx.handler());
817             ctx = ctx.next;
818         }
819     }
820 
821     @Override
822     public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
823         return toMap().entrySet().iterator();
824     }
825 
826     /**
827      * Returns the {@link String} representation of this pipeline.
828      */
829     @Override
830     public final String toString() {
831         StringBuilder buf = new StringBuilder()
832             .append(StringUtil.simpleClassName(this))
833             .append('{');
834         AbstractChannelHandlerContext ctx = head.next;
835         for (;;) {
836             if (ctx == tail) {
837                 break;
838             }
839 
840             buf.append('(')
841                .append(ctx.name())
842                .append(" = ")
843                .append(ctx.handler().getClass().getName())
844                .append(')');
845 
846             ctx = ctx.next;
847             if (ctx == tail) {
848                 break;
849             }
850 
851             buf.append(", ");
852         }
853         buf.append('}');
854         return buf.toString();
855     }
856 
857     @Override
858     public final ChannelPipeline fireChannelRegistered() {
859         AbstractChannelHandlerContext.invokeChannelRegistered(head);
860         return this;
861     }
862 
863     @Override
864     public final ChannelPipeline fireChannelUnregistered() {
865         AbstractChannelHandlerContext.invokeChannelUnregistered(head);
866         return this;
867     }
868 
869     /**
870      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
871      * handlerRemoved().
872      *
873      * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)})
874      * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
875      * the handlers are removed after all events are handled.
876      *
877      * See: https://github.com/netty/netty/issues/3156
878      */
879     private synchronized void destroy() {
880         destroyUp(head.next, false);
881     }
882 
883     private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
884         final Thread currentThread = Thread.currentThread();
885         final AbstractChannelHandlerContext tail = this.tail;
886         for (;;) {
887             if (ctx == tail) {
888                 destroyDown(currentThread, tail.prev, inEventLoop);
889                 break;
890             }
891 
892             final EventExecutor executor = ctx.executor();
893             if (!inEventLoop && !executor.inEventLoop(currentThread)) {
894                 final AbstractChannelHandlerContext finalCtx = ctx;
895                 executor.execute(new Runnable() {
896                     @Override
897                     public void run() {
898                         destroyUp(finalCtx, true);
899                     }
900                 });
901                 break;
902             }
903 
904             ctx = ctx.next;
905             inEventLoop = false;
906         }
907     }
908 
909     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
910         // We have reached at tail; now traverse backwards.
911         final AbstractChannelHandlerContext head = this.head;
912         for (;;) {
913             if (ctx == head) {
914                 break;
915             }
916 
917             final EventExecutor executor = ctx.executor();
918             if (inEventLoop || executor.inEventLoop(currentThread)) {
919                 synchronized (this) {
920                     remove0(ctx);
921                 }
922                 callHandlerRemoved0(ctx);
923             } else {
924                 final AbstractChannelHandlerContext finalCtx = ctx;
925                 executor.execute(new Runnable() {
926                     @Override
927                     public void run() {
928                         destroyDown(Thread.currentThread(), finalCtx, true);
929                     }
930                 });
931                 break;
932             }
933 
934             ctx = ctx.prev;
935             inEventLoop = false;
936         }
937     }
938 
939     @Override
940     public final ChannelPipeline fireChannelActive() {
941         AbstractChannelHandlerContext.invokeChannelActive(head);
942         return this;
943     }
944 
945     @Override
946     public final ChannelPipeline fireChannelInactive() {
947         AbstractChannelHandlerContext.invokeChannelInactive(head);
948         return this;
949     }
950 
951     @Override
952     public final ChannelPipeline fireExceptionCaught(Throwable cause) {
953         AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
954         return this;
955     }
956 
957     @Override
958     public final ChannelPipeline fireUserEventTriggered(Object event) {
959         AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
960         return this;
961     }
962 
963     @Override
964     public final ChannelPipeline fireChannelRead(Object msg) {
965         AbstractChannelHandlerContext.invokeChannelRead(head, msg);
966         return this;
967     }
968 
969     @Override
970     public final ChannelPipeline fireChannelReadComplete() {
971         AbstractChannelHandlerContext.invokeChannelReadComplete(head);
972         return this;
973     }
974 
975     @Override
976     public final ChannelPipeline fireChannelWritabilityChanged() {
977         AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
978         return this;
979     }
980 
981     @Override
982     public final ChannelFuture bind(SocketAddress localAddress) {
983         return tail.bind(localAddress);
984     }
985 
986     @Override
987     public final ChannelFuture connect(SocketAddress remoteAddress) {
988         return tail.connect(remoteAddress);
989     }
990 
991     @Override
992     public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
993         return tail.connect(remoteAddress, localAddress);
994     }
995 
996     @Override
997     public final ChannelFuture disconnect() {
998         return tail.disconnect();
999     }
1000 
1001     @Override
1002     public final ChannelFuture close() {
1003         return tail.close();
1004     }
1005 
1006     @Override
1007     public final ChannelFuture deregister() {
1008         return tail.deregister();
1009     }
1010 
1011     @Override
1012     public final ChannelPipeline flush() {
1013         tail.flush();
1014         return this;
1015     }
1016 
1017     @Override
1018     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
1019         return tail.bind(localAddress, promise);
1020     }
1021 
1022     @Override
1023     public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
1024         return tail.connect(remoteAddress, promise);
1025     }
1026 
1027     @Override
1028     public final ChannelFuture connect(
1029             SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1030         return tail.connect(remoteAddress, localAddress, promise);
1031     }
1032 
1033     @Override
1034     public final ChannelFuture disconnect(ChannelPromise promise) {
1035         return tail.disconnect(promise);
1036     }
1037 
1038     @Override
1039     public final ChannelFuture close(ChannelPromise promise) {
1040         return tail.close(promise);
1041     }
1042 
1043     @Override
1044     public final ChannelFuture deregister(final ChannelPromise promise) {
1045         return tail.deregister(promise);
1046     }
1047 
1048     @Override
1049     public final ChannelPipeline read() {
1050         tail.read();
1051         return this;
1052     }
1053 
1054     @Override
1055     public final ChannelFuture write(Object msg) {
1056         return tail.write(msg);
1057     }
1058 
1059     @Override
1060     public final ChannelFuture write(Object msg, ChannelPromise promise) {
1061         return tail.write(msg, promise);
1062     }
1063 
1064     @Override
1065     public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1066         return tail.writeAndFlush(msg, promise);
1067     }
1068 
1069     @Override
1070     public final ChannelFuture writeAndFlush(Object msg) {
1071         return tail.writeAndFlush(msg);
1072     }
1073 
1074     @Override
1075     public final ChannelPromise newPromise() {
1076         return new DefaultChannelPromise(channel);
1077     }
1078 
1079     @Override
1080     public final ChannelProgressivePromise newProgressivePromise() {
1081         return new DefaultChannelProgressivePromise(channel);
1082     }
1083 
1084     @Override
1085     public final ChannelFuture newSucceededFuture() {
1086         return succeededFuture;
1087     }
1088 
1089     @Override
1090     public final ChannelFuture newFailedFuture(Throwable cause) {
1091         return new FailedChannelFuture(channel, null, cause);
1092     }
1093 
1094     @Override
1095     public final ChannelPromise voidPromise() {
1096         return voidPromise;
1097     }
1098 
1099     private void checkDuplicateName(String name) {
1100         if (context0(name) != null) {
1101             throw new IllegalArgumentException("Duplicate handler name: " + name);
1102         }
1103     }
1104 
1105     private AbstractChannelHandlerContext context0(String name) {
1106         AbstractChannelHandlerContext context = head.next;
1107         while (context != tail) {
1108             if (context.name().equals(name)) {
1109                 return context;
1110             }
1111             context = context.next;
1112         }
1113         return null;
1114     }
1115 
1116     private AbstractChannelHandlerContext getContextOrDie(String name) {
1117         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1118         if (ctx == null) {
1119             throw new NoSuchElementException(name);
1120         } else {
1121             return ctx;
1122         }
1123     }
1124 
1125     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1126         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1127         if (ctx == null) {
1128             throw new NoSuchElementException(handler.getClass().getName());
1129         } else {
1130             return ctx;
1131         }
1132     }
1133 
1134     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1135         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1136         if (ctx == null) {
1137             throw new NoSuchElementException(handlerType.getName());
1138         } else {
1139             return ctx;
1140         }
1141     }
1142 
1143     private void callHandlerAddedForAllHandlers() {
1144         final PendingHandlerCallback pendingHandlerCallbackHead;
1145         synchronized (this) {
1146             assert !registered;
1147 
1148             // This Channel itself was registered.
1149             registered = true;
1150 
1151             pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1152             // Null out so it can be GC'ed.
1153             this.pendingHandlerCallbackHead = null;
1154         }
1155 
1156         // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
1157         // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
1158         // the EventLoop.
1159         PendingHandlerCallback task = pendingHandlerCallbackHead;
1160         while (task != null) {
1161             task.execute();
1162             task = task.next;
1163         }
1164     }
1165 
1166     private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1167         assert !registered;
1168 
1169         PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1170         PendingHandlerCallback pending = pendingHandlerCallbackHead;
1171         if (pending == null) {
1172             pendingHandlerCallbackHead = task;
1173         } else {
1174             // Find the tail of the linked-list.
1175             while (pending.next != null) {
1176                 pending = pending.next;
1177             }
1178             pending.next = task;
1179         }
1180     }
1181 
1182     /**
1183      * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
1184      * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
1185      */
1186     protected void onUnhandledInboundException(Throwable cause) {
1187         try {
1188             logger.warn(
1189                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1190                             "It usually means the last handler in the pipeline did not handle the exception.",
1191                     cause);
1192         } finally {
1193             ReferenceCountUtil.release(cause);
1194         }
1195     }
1196 
1197     /**
1198      * Called once the {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}event hit
1199      * the end of the {@link ChannelPipeline}.
1200      */
1201     protected void onUnhandledInboundChannelActive() {
1202     }
1203 
1204     /**
1205      * Called once the {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} event hit
1206      * the end of the {@link ChannelPipeline}.
1207      */
1208     protected void onUnhandledInboundChannelInactive() {
1209     }
1210 
1211     /**
1212      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1213      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1214      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1215      */
1216     protected void onUnhandledInboundMessage(Object msg) {
1217         try {
1218             logger.debug(
1219                     "Discarded inbound message {} that reached at the tail of the pipeline. " +
1220                             "Please check your pipeline configuration.", msg);
1221         } finally {
1222             ReferenceCountUtil.release(msg);
1223         }
1224     }
1225 
1226     /**
1227      * Called once the {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)} event hit
1228      * the end of the {@link ChannelPipeline}.
1229      */
1230     protected void onUnhandledInboundChannelReadComplete() {
1231     }
1232 
1233     /**
1234      * Called once an user event hit the end of the {@link ChannelPipeline} without been handled by the user
1235      * in {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is responsible
1236      * to call {@link ReferenceCountUtil#release(Object)} on the given event at some point.
1237      */
1238     protected void onUnhandledInboundUserEventTriggered(Object evt) {
1239         // This may not be a configuration error and so don't log anything.
1240         // The event may be superfluous for the current pipeline configuration.
1241         ReferenceCountUtil.release(evt);
1242     }
1243 
1244     /**
1245      * Called once the {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)} event hit
1246      * the end of the {@link ChannelPipeline}.
1247      */
1248     protected void onUnhandledChannelWritabilityChanged() {
1249     }
1250 
1251     @UnstableApi
1252     protected void incrementPendingOutboundBytes(long size) {
1253         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1254         if (buffer != null) {
1255             buffer.incrementPendingOutboundBytes(size);
1256         }
1257     }
1258 
1259     @UnstableApi
1260     protected void decrementPendingOutboundBytes(long size) {
1261         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1262         if (buffer != null) {
1263             buffer.decrementPendingOutboundBytes(size);
1264         }
1265     }
1266 
1267     // A special catch-all handler that handles both bytes and messages.
1268     final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1269 
1270         TailContext(DefaultChannelPipeline pipeline) {
1271             super(pipeline, null, TAIL_NAME, true, false);
1272             setAddComplete();
1273         }
1274 
1275         @Override
1276         public ChannelHandler handler() {
1277             return this;
1278         }
1279 
1280         @Override
1281         public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
1282 
1283         @Override
1284         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
1285 
1286         @Override
1287         public void channelActive(ChannelHandlerContext ctx) throws Exception {
1288             onUnhandledInboundChannelActive();
1289         }
1290 
1291         @Override
1292         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1293             onUnhandledInboundChannelInactive();
1294         }
1295 
1296         @Override
1297         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
1298             onUnhandledChannelWritabilityChanged();
1299         }
1300 
1301         @Override
1302         public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1303 
1304         @Override
1305         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1306 
1307         @Override
1308         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1309             onUnhandledInboundUserEventTriggered(evt);
1310         }
1311 
1312         @Override
1313         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1314             onUnhandledInboundException(cause);
1315         }
1316 
1317         @Override
1318         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1319             onUnhandledInboundMessage(msg);
1320         }
1321 
1322         @Override
1323         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1324             onUnhandledInboundChannelReadComplete();
1325         }
1326     }
1327 
1328     final class HeadContext extends AbstractChannelHandlerContext
1329             implements ChannelOutboundHandler, ChannelInboundHandler {
1330 
1331         private final Unsafe unsafe;
1332 
1333         HeadContext(DefaultChannelPipeline pipeline) {
1334             super(pipeline, null, HEAD_NAME, false, true);
1335             unsafe = pipeline.channel().unsafe();
1336             setAddComplete();
1337         }
1338 
1339         @Override
1340         public ChannelHandler handler() {
1341             return this;
1342         }
1343 
1344         @Override
1345         public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
1346             // NOOP
1347         }
1348 
1349         @Override
1350         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
1351             // NOOP
1352         }
1353 
1354         @Override
1355         public void bind(
1356                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
1357                 throws Exception {
1358             unsafe.bind(localAddress, promise);
1359         }
1360 
1361         @Override
1362         public void connect(
1363                 ChannelHandlerContext ctx,
1364                 SocketAddress remoteAddress, SocketAddress localAddress,
1365                 ChannelPromise promise) throws Exception {
1366             unsafe.connect(remoteAddress, localAddress, promise);
1367         }
1368 
1369         @Override
1370         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1371             unsafe.disconnect(promise);
1372         }
1373 
1374         @Override
1375         public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1376             unsafe.close(promise);
1377         }
1378 
1379         @Override
1380         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1381             unsafe.deregister(promise);
1382         }
1383 
1384         @Override
1385         public void read(ChannelHandlerContext ctx) {
1386             unsafe.beginRead();
1387         }
1388 
1389         @Override
1390         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
1391             unsafe.write(msg, promise);
1392         }
1393 
1394         @Override
1395         public void flush(ChannelHandlerContext ctx) throws Exception {
1396             unsafe.flush();
1397         }
1398 
1399         @Override
1400         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1401             ctx.fireExceptionCaught(cause);
1402         }
1403 
1404         @Override
1405         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
1406             invokeHandlerAddedIfNeeded();
1407             ctx.fireChannelRegistered();
1408         }
1409 
1410         @Override
1411         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
1412             ctx.fireChannelUnregistered();
1413 
1414             // Remove all handlers sequentially if channel is closed and unregistered.
1415             if (!channel.isOpen()) {
1416                 destroy();
1417             }
1418         }
1419 
1420         @Override
1421         public void channelActive(ChannelHandlerContext ctx) throws Exception {
1422             ctx.fireChannelActive();
1423 
1424             readIfIsAutoRead();
1425         }
1426 
1427         @Override
1428         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1429             ctx.fireChannelInactive();
1430         }
1431 
1432         @Override
1433         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1434             ctx.fireChannelRead(msg);
1435         }
1436 
1437         @Override
1438         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1439             ctx.fireChannelReadComplete();
1440 
1441             readIfIsAutoRead();
1442         }
1443 
1444         private void readIfIsAutoRead() {
1445             if (channel.config().isAutoRead()) {
1446                 channel.read();
1447             }
1448         }
1449 
1450         @Override
1451         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1452             ctx.fireUserEventTriggered(evt);
1453         }
1454 
1455         @Override
1456         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
1457             ctx.fireChannelWritabilityChanged();
1458         }
1459     }
1460 
1461     private abstract static class PendingHandlerCallback implements Runnable {
1462         final AbstractChannelHandlerContext ctx;
1463         PendingHandlerCallback next;
1464 
1465         PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1466             this.ctx = ctx;
1467         }
1468 
1469         abstract void execute();
1470     }
1471 
1472     private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1473 
1474         PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1475             super(ctx);
1476         }
1477 
1478         @Override
1479         public void run() {
1480             callHandlerAdded0(ctx);
1481         }
1482 
1483         @Override
1484         void execute() {
1485             EventExecutor executor = ctx.executor();
1486             if (executor.inEventLoop()) {
1487                 callHandlerAdded0(ctx);
1488             } else {
1489                 try {
1490                     executor.execute(this);
1491                 } catch (RejectedExecutionException e) {
1492                     if (logger.isWarnEnabled()) {
1493                         logger.warn(
1494                                 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1495                                 executor, ctx.name(), e);
1496                     }
1497                     remove0(ctx);
1498                     ctx.setRemoved();
1499                 }
1500             }
1501         }
1502     }
1503 
1504     private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1505 
1506         PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1507             super(ctx);
1508         }
1509 
1510         @Override
1511         public void run() {
1512             callHandlerRemoved0(ctx);
1513         }
1514 
1515         @Override
1516         void execute() {
1517             EventExecutor executor = ctx.executor();
1518             if (executor.inEventLoop()) {
1519                 callHandlerRemoved0(ctx);
1520             } else {
1521                 try {
1522                     executor.execute(this);
1523                 } catch (RejectedExecutionException e) {
1524                     if (logger.isWarnEnabled()) {
1525                         logger.warn(
1526                                 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1527                                         " removing handler {}.", executor, ctx.name(), e);
1528                     }
1529                     // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
1530                     ctx.setRemoved();
1531                 }
1532             }
1533         }
1534     }
1535 }