View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.ResourceLeakDetector;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.EventExecutorGroup;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.StringUtil;
26  import io.netty.util.internal.UnstableApi;
27  import io.netty.util.internal.logging.InternalLogger;
28  import io.netty.util.internal.logging.InternalLoggerFactory;
29  
30  import java.net.SocketAddress;
31  import java.util.ArrayList;
32  import java.util.IdentityHashMap;
33  import java.util.Iterator;
34  import java.util.LinkedHashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NoSuchElementException;
38  import java.util.WeakHashMap;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
41  
42  /**
43   * The default {@link ChannelPipeline} implementation.  It is usually created
44   * by a {@link Channel} implementation when the {@link Channel} is created.
45   */
46  public class DefaultChannelPipeline implements ChannelPipeline {
47  
48      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
49  
50      private static final String HEAD_NAME = generateName0(HeadContext.class);
51      private static final String TAIL_NAME = generateName0(TailContext.class);
52  
53      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
54              new FastThreadLocal<Map<Class<?>, String>>() {
55          @Override
56          protected Map<Class<?>, String> initialValue() {
57              return new WeakHashMap<Class<?>, String>();
58          }
59      };
60  
61      private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
62              AtomicReferenceFieldUpdater.newUpdater(
63                      DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
64      final AbstractChannelHandlerContext head;
65      final AbstractChannelHandlerContext tail;
66  
67      private final Channel channel;
68      private final ChannelFuture succeededFuture;
69      private final VoidChannelPromise voidPromise;
70      private final boolean touch = ResourceLeakDetector.isEnabled();
71  
72      private Map<EventExecutorGroup, EventExecutor> childExecutors;
73      private volatile MessageSizeEstimator.Handle estimatorHandle;
74      private boolean firstRegistration = true;
75  
76      /**
77       * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
78       * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
79       *
80       * We only keep the head because it is expected that the list is used infrequently and its size is small.
81       * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
82       * complexity.
83       */
84      private PendingHandlerCallback pendingHandlerCallbackHead;
85  
86      /**
87       * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
88       * change.
89       */
90      private boolean registered;
91  
92      protected DefaultChannelPipeline(Channel channel) {
93          this.channel = ObjectUtil.checkNotNull(channel, "channel");
94          succeededFuture = new SucceededChannelFuture(channel, null);
95          voidPromise =  new VoidChannelPromise(channel, true);
96  
97          tail = new TailContext(this);
98          head = new HeadContext(this);
99  
100         head.next = tail;
101         tail.prev = head;
102     }
103 
104     final MessageSizeEstimator.Handle estimatorHandle() {
105         MessageSizeEstimator.Handle handle = estimatorHandle;
106         if (handle == null) {
107             handle = channel.config().getMessageSizeEstimator().newHandle();
108             if (!ESTIMATOR.compareAndSet(this, null, handle)) {
109                 handle = estimatorHandle;
110             }
111         }
112         return handle;
113     }
114 
115     final Object touch(Object msg, AbstractChannelHandlerContext next) {
116         return touch ? ReferenceCountUtil.touch(msg, next) : msg;
117     }
118 
119     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
120         return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
121     }
122 
123     private EventExecutor childExecutor(EventExecutorGroup group) {
124         if (group == null) {
125             return null;
126         }
127         Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
128         if (pinEventExecutor != null && !pinEventExecutor) {
129             return group.next();
130         }
131         Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
132         if (childExecutors == null) {
133             // Use size of 4 as most people only use one extra EventExecutor.
134             childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
135         }
136         // Pin one of the child executors once and remember it so that the same child executor
137         // is used to fire events for the same channel.
138         EventExecutor childExecutor = childExecutors.get(group);
139         if (childExecutor == null) {
140             childExecutor = group.next();
141             childExecutors.put(group, childExecutor);
142         }
143         return childExecutor;
144     }
145     @Override
146     public final Channel channel() {
147         return channel;
148     }
149 
150     @Override
151     public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
152         return addFirst(null, name, handler);
153     }
154 
155     @Override
156     public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
157         final AbstractChannelHandlerContext newCtx;
158         synchronized (this) {
159             checkMultiplicity(handler);
160             name = filterName(name, handler);
161 
162             newCtx = newContext(group, name, handler);
163 
164             addFirst0(newCtx);
165 
166             // If the registered is false it means that the channel was not registered on an eventLoop yet.
167             // In this case we add the context to the pipeline and add a task that will call
168             // ChannelHandler.handlerAdded(...) once the channel is registered.
169             if (!registered) {
170                 newCtx.setAddPending();
171                 callHandlerCallbackLater(newCtx, true);
172                 return this;
173             }
174 
175             EventExecutor executor = newCtx.executor();
176             if (!executor.inEventLoop()) {
177                 callHandlerAddedInEventLoop(newCtx, executor);
178                 return this;
179             }
180         }
181         callHandlerAdded0(newCtx);
182         return this;
183     }
184 
185     private void addFirst0(AbstractChannelHandlerContext newCtx) {
186         AbstractChannelHandlerContext nextCtx = head.next;
187         newCtx.prev = head;
188         newCtx.next = nextCtx;
189         head.next = newCtx;
190         nextCtx.prev = newCtx;
191     }
192 
193     @Override
194     public final ChannelPipeline addLast(String name, ChannelHandler handler) {
195         return addLast(null, name, handler);
196     }
197 
198     @Override
199     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
200         final AbstractChannelHandlerContext newCtx;
201         synchronized (this) {
202             checkMultiplicity(handler);
203 
204             newCtx = newContext(group, filterName(name, handler), handler);
205 
206             addLast0(newCtx);
207 
208             // If the registered is false it means that the channel was not registered on an eventLoop yet.
209             // In this case we add the context to the pipeline and add a task that will call
210             // ChannelHandler.handlerAdded(...) once the channel is registered.
211             if (!registered) {
212                 newCtx.setAddPending();
213                 callHandlerCallbackLater(newCtx, true);
214                 return this;
215             }
216 
217             EventExecutor executor = newCtx.executor();
218             if (!executor.inEventLoop()) {
219                 callHandlerAddedInEventLoop(newCtx, executor);
220                 return this;
221             }
222         }
223         callHandlerAdded0(newCtx);
224         return this;
225     }
226 
227     private void addLast0(AbstractChannelHandlerContext newCtx) {
228         AbstractChannelHandlerContext prev = tail.prev;
229         newCtx.prev = prev;
230         newCtx.next = tail;
231         prev.next = newCtx;
232         tail.prev = newCtx;
233     }
234 
235     @Override
236     public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
237         return addBefore(null, baseName, name, handler);
238     }
239 
240     @Override
241     public final ChannelPipeline addBefore(
242             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
243         final AbstractChannelHandlerContext newCtx;
244         final AbstractChannelHandlerContext ctx;
245         synchronized (this) {
246             checkMultiplicity(handler);
247             name = filterName(name, handler);
248             ctx = getContextOrDie(baseName);
249 
250             newCtx = newContext(group, name, handler);
251 
252             addBefore0(ctx, newCtx);
253 
254             // If the registered is false it means that the channel was not registered on an eventLoop yet.
255             // In this case we add the context to the pipeline and add a task that will call
256             // ChannelHandler.handlerAdded(...) once the channel is registered.
257             if (!registered) {
258                 newCtx.setAddPending();
259                 callHandlerCallbackLater(newCtx, true);
260                 return this;
261             }
262 
263             EventExecutor executor = newCtx.executor();
264             if (!executor.inEventLoop()) {
265                 callHandlerAddedInEventLoop(newCtx, executor);
266                 return this;
267             }
268         }
269         callHandlerAdded0(newCtx);
270         return this;
271     }
272 
273     private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
274         newCtx.prev = ctx.prev;
275         newCtx.next = ctx;
276         ctx.prev.next = newCtx;
277         ctx.prev = newCtx;
278     }
279 
280     private String filterName(String name, ChannelHandler handler) {
281         if (name == null) {
282             return generateName(handler);
283         }
284         checkDuplicateName(name);
285         return name;
286     }
287 
288     @Override
289     public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
290         return addAfter(null, baseName, name, handler);
291     }
292 
293     @Override
294     public final ChannelPipeline addAfter(
295             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
296         final AbstractChannelHandlerContext newCtx;
297         final AbstractChannelHandlerContext ctx;
298 
299         synchronized (this) {
300             checkMultiplicity(handler);
301             name = filterName(name, handler);
302             ctx = getContextOrDie(baseName);
303 
304             newCtx = newContext(group, name, handler);
305 
306             addAfter0(ctx, newCtx);
307 
308             // If the registered is false it means that the channel was not registered on an eventLoop yet.
309             // In this case we remove the context from the pipeline and add a task that will call
310             // ChannelHandler.handlerRemoved(...) once the channel is registered.
311             if (!registered) {
312                 newCtx.setAddPending();
313                 callHandlerCallbackLater(newCtx, true);
314                 return this;
315             }
316             EventExecutor executor = newCtx.executor();
317             if (!executor.inEventLoop()) {
318                 callHandlerAddedInEventLoop(newCtx, executor);
319                 return this;
320             }
321         }
322         callHandlerAdded0(newCtx);
323         return this;
324     }
325 
326     private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
327         newCtx.prev = ctx;
328         newCtx.next = ctx.next;
329         ctx.next.prev = newCtx;
330         ctx.next = newCtx;
331     }
332 
333     public final ChannelPipeline addFirst(ChannelHandler handler) {
334         return addFirst(null, handler);
335     }
336 
337     @Override
338     public final ChannelPipeline addFirst(ChannelHandler... handlers) {
339         return addFirst(null, handlers);
340     }
341 
342     @Override
343     public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
344         ObjectUtil.checkNotNull(handlers, "handlers");
345         if (handlers.length == 0 || handlers[0] == null) {
346             return this;
347         }
348 
349         int size;
350         for (size = 1; size < handlers.length; size ++) {
351             if (handlers[size] == null) {
352                 break;
353             }
354         }
355 
356         for (int i = size - 1; i >= 0; i --) {
357             ChannelHandler h = handlers[i];
358             addFirst(executor, null, h);
359         }
360 
361         return this;
362     }
363 
364     public final ChannelPipeline addLast(ChannelHandler handler) {
365         return addLast(null, handler);
366     }
367 
368     @Override
369     public final ChannelPipeline addLast(ChannelHandler... handlers) {
370         return addLast(null, handlers);
371     }
372 
373     @Override
374     public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
375         ObjectUtil.checkNotNull(handlers, "handlers");
376 
377         for (ChannelHandler h: handlers) {
378             if (h == null) {
379                 break;
380             }
381             addLast(executor, null, h);
382         }
383 
384         return this;
385     }
386 
387     private String generateName(ChannelHandler handler) {
388         Map<Class<?>, String> cache = nameCaches.get();
389         Class<?> handlerType = handler.getClass();
390         String name = cache.get(handlerType);
391         if (name == null) {
392             name = generateName0(handlerType);
393             cache.put(handlerType, name);
394         }
395 
396         // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
397         // any name conflicts.  Note that we don't cache the names generated here.
398         if (context0(name) != null) {
399             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
400             for (int i = 1;; i ++) {
401                 String newName = baseName + i;
402                 if (context0(newName) == null) {
403                     name = newName;
404                     break;
405                 }
406             }
407         }
408         return name;
409     }
410 
411     private static String generateName0(Class<?> handlerType) {
412         return StringUtil.simpleClassName(handlerType) + "#0";
413     }
414 
415     @Override
416     public final ChannelPipeline remove(ChannelHandler handler) {
417         remove(getContextOrDie(handler));
418         return this;
419     }
420 
421     @Override
422     public final ChannelHandler remove(String name) {
423         return remove(getContextOrDie(name)).handler();
424     }
425 
426     @SuppressWarnings("unchecked")
427     @Override
428     public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
429         return (T) remove(getContextOrDie(handlerType)).handler();
430     }
431 
432     public final <T extends ChannelHandler> T removeIfExists(String name) {
433         return removeIfExists(context(name));
434     }
435 
436     public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
437         return removeIfExists(context(handlerType));
438     }
439 
440     public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
441         return removeIfExists(context(handler));
442     }
443 
444     @SuppressWarnings("unchecked")
445     private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
446         if (ctx == null) {
447             return null;
448         }
449         return (T) remove((AbstractChannelHandlerContext) ctx).handler();
450     }
451 
452     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
453         assert ctx != head && ctx != tail;
454 
455         synchronized (this) {
456             atomicRemoveFromHandlerList(ctx);
457 
458             // If the registered is false it means that the channel was not registered on an eventloop yet.
459             // In this case we remove the context from the pipeline and add a task that will call
460             // ChannelHandler.handlerRemoved(...) once the channel is registered.
461             if (!registered) {
462                 callHandlerCallbackLater(ctx, false);
463                 return ctx;
464             }
465 
466             EventExecutor executor = ctx.executor();
467             if (!executor.inEventLoop()) {
468                 executor.execute(new Runnable() {
469                     @Override
470                     public void run() {
471                         callHandlerRemoved0(ctx);
472                     }
473                 });
474                 return ctx;
475             }
476         }
477         callHandlerRemoved0(ctx);
478         return ctx;
479     }
480 
481     /**
482      * Method is synchronized to make the handler removal from the double linked list atomic.
483      */
484     private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
485         AbstractChannelHandlerContext prev = ctx.prev;
486         AbstractChannelHandlerContext next = ctx.next;
487         prev.next = next;
488         next.prev = prev;
489     }
490 
491     @Override
492     public final ChannelHandler removeFirst() {
493         if (head.next == tail) {
494             throw new NoSuchElementException();
495         }
496         return remove(head.next).handler();
497     }
498 
499     @Override
500     public final ChannelHandler removeLast() {
501         if (head.next == tail) {
502             throw new NoSuchElementException();
503         }
504         return remove(tail.prev).handler();
505     }
506 
507     @Override
508     public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
509         replace(getContextOrDie(oldHandler), newName, newHandler);
510         return this;
511     }
512 
513     @Override
514     public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
515         return replace(getContextOrDie(oldName), newName, newHandler);
516     }
517 
518     @Override
519     @SuppressWarnings("unchecked")
520     public final <T extends ChannelHandler> T replace(
521             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
522         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
523     }
524 
525     private ChannelHandler replace(
526             final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
527         assert ctx != head && ctx != tail;
528 
529         final AbstractChannelHandlerContext newCtx;
530         synchronized (this) {
531             checkMultiplicity(newHandler);
532             if (newName == null) {
533                 newName = generateName(newHandler);
534             } else {
535                 boolean sameName = ctx.name().equals(newName);
536                 if (!sameName) {
537                     checkDuplicateName(newName);
538                 }
539             }
540 
541             newCtx = newContext(ctx.executor, newName, newHandler);
542 
543             replace0(ctx, newCtx);
544 
545             // If the registered is false it means that the channel was not registered on an eventloop yet.
546             // In this case we replace the context in the pipeline
547             // and add a task that will call ChannelHandler.handlerAdded(...) and
548             // ChannelHandler.handlerRemoved(...) once the channel is registered.
549             if (!registered) {
550                 callHandlerCallbackLater(newCtx, true);
551                 callHandlerCallbackLater(ctx, false);
552                 return ctx.handler();
553             }
554             EventExecutor executor = ctx.executor();
555             if (!executor.inEventLoop()) {
556                 executor.execute(new Runnable() {
557                     @Override
558                     public void run() {
559                         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
560                         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
561                         // those event handlers must be called after handlerAdded().
562                         callHandlerAdded0(newCtx);
563                         callHandlerRemoved0(ctx);
564                     }
565                 });
566                 return ctx.handler();
567             }
568         }
569         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
570         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
571         // event handlers must be called after handlerAdded().
572         callHandlerAdded0(newCtx);
573         callHandlerRemoved0(ctx);
574         return ctx.handler();
575     }
576 
577     private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
578         AbstractChannelHandlerContext prev = oldCtx.prev;
579         AbstractChannelHandlerContext next = oldCtx.next;
580         newCtx.prev = prev;
581         newCtx.next = next;
582 
583         // Finish the replacement of oldCtx with newCtx in the linked list.
584         // Note that this doesn't mean events will be sent to the new handler immediately
585         // because we are currently at the event handler thread and no more than one handler methods can be invoked
586         // at the same time (we ensured that in replace().)
587         prev.next = newCtx;
588         next.prev = newCtx;
589 
590         // update the reference to the replacement so forward of buffered content will work correctly
591         oldCtx.prev = newCtx;
592         oldCtx.next = newCtx;
593     }
594 
595     private static void checkMultiplicity(ChannelHandler handler) {
596         if (handler instanceof ChannelHandlerAdapter) {
597             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
598             if (!h.isSharable() && h.added) {
599                 throw new ChannelPipelineException(
600                         h.getClass().getName() +
601                         " is not a @Sharable handler, so can't be added or removed multiple times.");
602             }
603             h.added = true;
604         }
605     }
606 
607     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
608         try {
609             ctx.callHandlerAdded();
610         } catch (Throwable t) {
611             boolean removed = false;
612             try {
613                 atomicRemoveFromHandlerList(ctx);
614                 ctx.callHandlerRemoved();
615                 removed = true;
616             } catch (Throwable t2) {
617                 if (logger.isWarnEnabled()) {
618                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
619                 }
620             }
621 
622             if (removed) {
623                 fireExceptionCaught(new ChannelPipelineException(
624                         ctx.handler().getClass().getName() +
625                         ".handlerAdded() has thrown an exception; removed.", t));
626             } else {
627                 fireExceptionCaught(new ChannelPipelineException(
628                         ctx.handler().getClass().getName() +
629                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
630             }
631         }
632     }
633 
634     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
635         // Notify the complete removal.
636         try {
637             ctx.callHandlerRemoved();
638         } catch (Throwable t) {
639             fireExceptionCaught(new ChannelPipelineException(
640                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
641         }
642     }
643 
644     final void invokeHandlerAddedIfNeeded() {
645         assert channel.eventLoop().inEventLoop();
646         if (firstRegistration) {
647             firstRegistration = false;
648             // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
649             // that were added before the registration was done.
650             callHandlerAddedForAllHandlers();
651         }
652     }
653 
654     @Override
655     public final ChannelHandler first() {
656         ChannelHandlerContext first = firstContext();
657         if (first == null) {
658             return null;
659         }
660         return first.handler();
661     }
662 
663     @Override
664     public final ChannelHandlerContext firstContext() {
665         AbstractChannelHandlerContext first = head.next;
666         if (first == tail) {
667             return null;
668         }
669         return head.next;
670     }
671 
672     @Override
673     public final ChannelHandler last() {
674         AbstractChannelHandlerContext last = tail.prev;
675         if (last == head) {
676             return null;
677         }
678         return last.handler();
679     }
680 
681     @Override
682     public final ChannelHandlerContext lastContext() {
683         AbstractChannelHandlerContext last = tail.prev;
684         if (last == head) {
685             return null;
686         }
687         return last;
688     }
689 
690     @Override
691     public final ChannelHandler get(String name) {
692         ChannelHandlerContext ctx = context(name);
693         if (ctx == null) {
694             return null;
695         } else {
696             return ctx.handler();
697         }
698     }
699 
700     @SuppressWarnings("unchecked")
701     @Override
702     public final <T extends ChannelHandler> T get(Class<T> handlerType) {
703         ChannelHandlerContext ctx = context(handlerType);
704         if (ctx == null) {
705             return null;
706         } else {
707             return (T) ctx.handler();
708         }
709     }
710 
711     @Override
712     public final ChannelHandlerContext context(String name) {
713         return context0(ObjectUtil.checkNotNull(name, "name"));
714     }
715 
716     @Override
717     public final ChannelHandlerContext context(ChannelHandler handler) {
718         ObjectUtil.checkNotNull(handler, "handler");
719 
720         AbstractChannelHandlerContext ctx = head.next;
721         for (;;) {
722 
723             if (ctx == null) {
724                 return null;
725             }
726 
727             if (ctx.handler() == handler) {
728                 return ctx;
729             }
730 
731             ctx = ctx.next;
732         }
733     }
734 
735     @Override
736     public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
737         ObjectUtil.checkNotNull(handlerType, "handlerType");
738 
739         AbstractChannelHandlerContext ctx = head.next;
740         for (;;) {
741             if (ctx == null) {
742                 return null;
743             }
744             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
745                 return ctx;
746             }
747             ctx = ctx.next;
748         }
749     }
750 
751     @Override
752     public final List<String> names() {
753         List<String> list = new ArrayList<String>();
754         AbstractChannelHandlerContext ctx = head.next;
755         for (;;) {
756             if (ctx == null) {
757                 return list;
758             }
759             list.add(ctx.name());
760             ctx = ctx.next;
761         }
762     }
763 
764     @Override
765     public final Map<String, ChannelHandler> toMap() {
766         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
767         AbstractChannelHandlerContext ctx = head.next;
768         for (;;) {
769             if (ctx == tail) {
770                 return map;
771             }
772             map.put(ctx.name(), ctx.handler());
773             ctx = ctx.next;
774         }
775     }
776 
777     @Override
778     public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
779         return toMap().entrySet().iterator();
780     }
781 
782     /**
783      * Returns the {@link String} representation of this pipeline.
784      */
785     @Override
786     public final String toString() {
787         StringBuilder buf = new StringBuilder()
788             .append(StringUtil.simpleClassName(this))
789             .append('{');
790         AbstractChannelHandlerContext ctx = head.next;
791         for (;;) {
792             if (ctx == tail) {
793                 break;
794             }
795 
796             buf.append('(')
797                .append(ctx.name())
798                .append(" = ")
799                .append(ctx.handler().getClass().getName())
800                .append(')');
801 
802             ctx = ctx.next;
803             if (ctx == tail) {
804                 break;
805             }
806 
807             buf.append(", ");
808         }
809         buf.append('}');
810         return buf.toString();
811     }
812 
813     @Override
814     public final ChannelPipeline fireChannelRegistered() {
815         AbstractChannelHandlerContext.invokeChannelRegistered(head);
816         return this;
817     }
818 
819     @Override
820     public final ChannelPipeline fireChannelUnregistered() {
821         AbstractChannelHandlerContext.invokeChannelUnregistered(head);
822         return this;
823     }
824 
825     /**
826      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
827      * handlerRemoved().
828      *
829      * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)})
830      * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
831      * the handlers are removed after all events are handled.
832      *
833      * See: https://github.com/netty/netty/issues/3156
834      */
835     private synchronized void destroy() {
836         destroyUp(head.next, false);
837     }
838 
839     private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
840         final Thread currentThread = Thread.currentThread();
841         final AbstractChannelHandlerContext tail = this.tail;
842         for (;;) {
843             if (ctx == tail) {
844                 destroyDown(currentThread, tail.prev, inEventLoop);
845                 break;
846             }
847 
848             final EventExecutor executor = ctx.executor();
849             if (!inEventLoop && !executor.inEventLoop(currentThread)) {
850                 final AbstractChannelHandlerContext finalCtx = ctx;
851                 executor.execute(new Runnable() {
852                     @Override
853                     public void run() {
854                         destroyUp(finalCtx, true);
855                     }
856                 });
857                 break;
858             }
859 
860             ctx = ctx.next;
861             inEventLoop = false;
862         }
863     }
864 
865     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
866         // We have reached at tail; now traverse backwards.
867         final AbstractChannelHandlerContext head = this.head;
868         for (;;) {
869             if (ctx == head) {
870                 break;
871             }
872 
873             final EventExecutor executor = ctx.executor();
874             if (inEventLoop || executor.inEventLoop(currentThread)) {
875                 atomicRemoveFromHandlerList(ctx);
876                 callHandlerRemoved0(ctx);
877             } else {
878                 final AbstractChannelHandlerContext finalCtx = ctx;
879                 executor.execute(new Runnable() {
880                     @Override
881                     public void run() {
882                         destroyDown(Thread.currentThread(), finalCtx, true);
883                     }
884                 });
885                 break;
886             }
887 
888             ctx = ctx.prev;
889             inEventLoop = false;
890         }
891     }
892 
893     @Override
894     public final ChannelPipeline fireChannelActive() {
895         AbstractChannelHandlerContext.invokeChannelActive(head);
896         return this;
897     }
898 
899     @Override
900     public final ChannelPipeline fireChannelInactive() {
901         AbstractChannelHandlerContext.invokeChannelInactive(head);
902         return this;
903     }
904 
905     @Override
906     public final ChannelPipeline fireExceptionCaught(Throwable cause) {
907         AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
908         return this;
909     }
910 
911     @Override
912     public final ChannelPipeline fireUserEventTriggered(Object event) {
913         AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
914         return this;
915     }
916 
917     @Override
918     public final ChannelPipeline fireChannelRead(Object msg) {
919         AbstractChannelHandlerContext.invokeChannelRead(head, msg);
920         return this;
921     }
922 
923     @Override
924     public final ChannelPipeline fireChannelReadComplete() {
925         AbstractChannelHandlerContext.invokeChannelReadComplete(head);
926         return this;
927     }
928 
929     @Override
930     public final ChannelPipeline fireChannelWritabilityChanged() {
931         AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
932         return this;
933     }
934 
935     @Override
936     public final ChannelFuture bind(SocketAddress localAddress) {
937         return tail.bind(localAddress);
938     }
939 
940     @Override
941     public final ChannelFuture connect(SocketAddress remoteAddress) {
942         return tail.connect(remoteAddress);
943     }
944 
945     @Override
946     public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
947         return tail.connect(remoteAddress, localAddress);
948     }
949 
950     @Override
951     public final ChannelFuture disconnect() {
952         return tail.disconnect();
953     }
954 
955     @Override
956     public final ChannelFuture close() {
957         return tail.close();
958     }
959 
960     @Override
961     public final ChannelFuture deregister() {
962         return tail.deregister();
963     }
964 
965     @Override
966     public final ChannelPipeline flush() {
967         tail.flush();
968         return this;
969     }
970 
971     @Override
972     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
973         return tail.bind(localAddress, promise);
974     }
975 
976     @Override
977     public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
978         return tail.connect(remoteAddress, promise);
979     }
980 
981     @Override
982     public final ChannelFuture connect(
983             SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
984         return tail.connect(remoteAddress, localAddress, promise);
985     }
986 
987     @Override
988     public final ChannelFuture disconnect(ChannelPromise promise) {
989         return tail.disconnect(promise);
990     }
991 
992     @Override
993     public final ChannelFuture close(ChannelPromise promise) {
994         return tail.close(promise);
995     }
996 
997     @Override
998     public final ChannelFuture deregister(final ChannelPromise promise) {
999         return tail.deregister(promise);
1000     }
1001 
1002     @Override
1003     public final ChannelPipeline read() {
1004         tail.read();
1005         return this;
1006     }
1007 
1008     @Override
1009     public final ChannelFuture write(Object msg) {
1010         return tail.write(msg);
1011     }
1012 
1013     @Override
1014     public final ChannelFuture write(Object msg, ChannelPromise promise) {
1015         return tail.write(msg, promise);
1016     }
1017 
1018     @Override
1019     public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1020         return tail.writeAndFlush(msg, promise);
1021     }
1022 
1023     @Override
1024     public final ChannelFuture writeAndFlush(Object msg) {
1025         return tail.writeAndFlush(msg);
1026     }
1027 
1028     @Override
1029     public final ChannelPromise newPromise() {
1030         return new DefaultChannelPromise(channel);
1031     }
1032 
1033     @Override
1034     public final ChannelProgressivePromise newProgressivePromise() {
1035         return new DefaultChannelProgressivePromise(channel);
1036     }
1037 
1038     @Override
1039     public final ChannelFuture newSucceededFuture() {
1040         return succeededFuture;
1041     }
1042 
1043     @Override
1044     public final ChannelFuture newFailedFuture(Throwable cause) {
1045         return new FailedChannelFuture(channel, null, cause);
1046     }
1047 
1048     @Override
1049     public final ChannelPromise voidPromise() {
1050         return voidPromise;
1051     }
1052 
1053     private void checkDuplicateName(String name) {
1054         if (context0(name) != null) {
1055             throw new IllegalArgumentException("Duplicate handler name: " + name);
1056         }
1057     }
1058 
1059     private AbstractChannelHandlerContext context0(String name) {
1060         AbstractChannelHandlerContext context = head.next;
1061         while (context != tail) {
1062             if (context.name().equals(name)) {
1063                 return context;
1064             }
1065             context = context.next;
1066         }
1067         return null;
1068     }
1069 
1070     private AbstractChannelHandlerContext getContextOrDie(String name) {
1071         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1072         if (ctx == null) {
1073             throw new NoSuchElementException(name);
1074         } else {
1075             return ctx;
1076         }
1077     }
1078 
1079     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1080         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1081         if (ctx == null) {
1082             throw new NoSuchElementException(handler.getClass().getName());
1083         } else {
1084             return ctx;
1085         }
1086     }
1087 
1088     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1089         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1090         if (ctx == null) {
1091             throw new NoSuchElementException(handlerType.getName());
1092         } else {
1093             return ctx;
1094         }
1095     }
1096 
1097     private void callHandlerAddedForAllHandlers() {
1098         final PendingHandlerCallback pendingHandlerCallbackHead;
1099         synchronized (this) {
1100             assert !registered;
1101 
1102             // This Channel itself was registered.
1103             registered = true;
1104 
1105             pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1106             // Null out so it can be GC'ed.
1107             this.pendingHandlerCallbackHead = null;
1108         }
1109 
1110         // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
1111         // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
1112         // the EventLoop.
1113         PendingHandlerCallback task = pendingHandlerCallbackHead;
1114         while (task != null) {
1115             task.execute();
1116             task = task.next;
1117         }
1118     }
1119 
1120     private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1121         assert !registered;
1122 
1123         PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1124         PendingHandlerCallback pending = pendingHandlerCallbackHead;
1125         if (pending == null) {
1126             pendingHandlerCallbackHead = task;
1127         } else {
1128             // Find the tail of the linked-list.
1129             while (pending.next != null) {
1130                 pending = pending.next;
1131             }
1132             pending.next = task;
1133         }
1134     }
1135 
1136     private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
1137         newCtx.setAddPending();
1138         executor.execute(new Runnable() {
1139             @Override
1140             public void run() {
1141                 callHandlerAdded0(newCtx);
1142             }
1143         });
1144     }
1145 
1146     /**
1147      * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
1148      * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
1149      */
1150     protected void onUnhandledInboundException(Throwable cause) {
1151         try {
1152             logger.warn(
1153                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1154                             "It usually means the last handler in the pipeline did not handle the exception.",
1155                     cause);
1156         } finally {
1157             ReferenceCountUtil.release(cause);
1158         }
1159     }
1160 
1161     /**
1162      * Called once the {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}event hit
1163      * the end of the {@link ChannelPipeline}.
1164      */
1165     protected void onUnhandledInboundChannelActive() {
1166     }
1167 
1168     /**
1169      * Called once the {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} event hit
1170      * the end of the {@link ChannelPipeline}.
1171      */
1172     protected void onUnhandledInboundChannelInactive() {
1173     }
1174 
1175     /**
1176      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1177      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1178      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1179      */
1180     protected void onUnhandledInboundMessage(Object msg) {
1181         try {
1182             logger.debug(
1183                     "Discarded inbound message {} that reached at the tail of the pipeline. " +
1184                             "Please check your pipeline configuration.", msg);
1185         } finally {
1186             ReferenceCountUtil.release(msg);
1187         }
1188     }
1189 
1190     /**
1191      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1192      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1193      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1194      */
1195     protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1196         onUnhandledInboundMessage(msg);
1197         if (logger.isDebugEnabled()) {
1198             logger.debug("Discarded message pipeline : {}. Channel : {}.",
1199                          ctx.pipeline().names(), ctx.channel());
1200         }
1201     }
1202 
1203     /**
1204      * Called once the {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)} event hit
1205      * the end of the {@link ChannelPipeline}.
1206      */
1207     protected void onUnhandledInboundChannelReadComplete() {
1208     }
1209 
1210     /**
1211      * Called once an user event hit the end of the {@link ChannelPipeline} without been handled by the user
1212      * in {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is responsible
1213      * to call {@link ReferenceCountUtil#release(Object)} on the given event at some point.
1214      */
1215     protected void onUnhandledInboundUserEventTriggered(Object evt) {
1216         // This may not be a configuration error and so don't log anything.
1217         // The event may be superfluous for the current pipeline configuration.
1218         ReferenceCountUtil.release(evt);
1219     }
1220 
1221     /**
1222      * Called once the {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)} event hit
1223      * the end of the {@link ChannelPipeline}.
1224      */
1225     protected void onUnhandledChannelWritabilityChanged() {
1226     }
1227 
1228     @UnstableApi
1229     protected void incrementPendingOutboundBytes(long size) {
1230         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1231         if (buffer != null) {
1232             buffer.incrementPendingOutboundBytes(size);
1233         }
1234     }
1235 
1236     @UnstableApi
1237     protected void decrementPendingOutboundBytes(long size) {
1238         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1239         if (buffer != null) {
1240             buffer.decrementPendingOutboundBytes(size);
1241         }
1242     }
1243 
1244     // A special catch-all handler that handles both bytes and messages.
1245     final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1246 
1247         TailContext(DefaultChannelPipeline pipeline) {
1248             super(pipeline, null, TAIL_NAME, TailContext.class);
1249             setAddComplete();
1250         }
1251 
1252         @Override
1253         public ChannelHandler handler() {
1254             return this;
1255         }
1256 
1257         @Override
1258         public void channelRegistered(ChannelHandlerContext ctx) { }
1259 
1260         @Override
1261         public void channelUnregistered(ChannelHandlerContext ctx) { }
1262 
1263         @Override
1264         public void channelActive(ChannelHandlerContext ctx) {
1265             onUnhandledInboundChannelActive();
1266         }
1267 
1268         @Override
1269         public void channelInactive(ChannelHandlerContext ctx) {
1270             onUnhandledInboundChannelInactive();
1271         }
1272 
1273         @Override
1274         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1275             onUnhandledChannelWritabilityChanged();
1276         }
1277 
1278         @Override
1279         public void handlerAdded(ChannelHandlerContext ctx) { }
1280 
1281         @Override
1282         public void handlerRemoved(ChannelHandlerContext ctx) { }
1283 
1284         @Override
1285         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1286             onUnhandledInboundUserEventTriggered(evt);
1287         }
1288 
1289         @Override
1290         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1291             onUnhandledInboundException(cause);
1292         }
1293 
1294         @Override
1295         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1296             onUnhandledInboundMessage(ctx, msg);
1297         }
1298 
1299         @Override
1300         public void channelReadComplete(ChannelHandlerContext ctx) {
1301             onUnhandledInboundChannelReadComplete();
1302         }
1303     }
1304 
1305     final class HeadContext extends AbstractChannelHandlerContext
1306             implements ChannelOutboundHandler, ChannelInboundHandler {
1307 
1308         private final Unsafe unsafe;
1309 
1310         HeadContext(DefaultChannelPipeline pipeline) {
1311             super(pipeline, null, HEAD_NAME, HeadContext.class);
1312             unsafe = pipeline.channel().unsafe();
1313             setAddComplete();
1314         }
1315 
1316         @Override
1317         public ChannelHandler handler() {
1318             return this;
1319         }
1320 
1321         @Override
1322         public void handlerAdded(ChannelHandlerContext ctx) {
1323             // NOOP
1324         }
1325 
1326         @Override
1327         public void handlerRemoved(ChannelHandlerContext ctx) {
1328             // NOOP
1329         }
1330 
1331         @Override
1332         public void bind(
1333                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
1334             unsafe.bind(localAddress, promise);
1335         }
1336 
1337         @Override
1338         public void connect(
1339                 ChannelHandlerContext ctx,
1340                 SocketAddress remoteAddress, SocketAddress localAddress,
1341                 ChannelPromise promise) {
1342             unsafe.connect(remoteAddress, localAddress, promise);
1343         }
1344 
1345         @Override
1346         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
1347             unsafe.disconnect(promise);
1348         }
1349 
1350         @Override
1351         public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
1352             unsafe.close(promise);
1353         }
1354 
1355         @Override
1356         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
1357             unsafe.deregister(promise);
1358         }
1359 
1360         @Override
1361         public void read(ChannelHandlerContext ctx) {
1362             unsafe.beginRead();
1363         }
1364 
1365         @Override
1366         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1367             unsafe.write(msg, promise);
1368         }
1369 
1370         @Override
1371         public void flush(ChannelHandlerContext ctx) {
1372             unsafe.flush();
1373         }
1374 
1375         @Override
1376         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1377             ctx.fireExceptionCaught(cause);
1378         }
1379 
1380         @Override
1381         public void channelRegistered(ChannelHandlerContext ctx) {
1382             invokeHandlerAddedIfNeeded();
1383             ctx.fireChannelRegistered();
1384         }
1385 
1386         @Override
1387         public void channelUnregistered(ChannelHandlerContext ctx) {
1388             ctx.fireChannelUnregistered();
1389 
1390             // Remove all handlers sequentially if channel is closed and unregistered.
1391             if (!channel.isOpen()) {
1392                 destroy();
1393             }
1394         }
1395 
1396         @Override
1397         public void channelActive(ChannelHandlerContext ctx) {
1398             ctx.fireChannelActive();
1399 
1400             readIfIsAutoRead();
1401         }
1402 
1403         @Override
1404         public void channelInactive(ChannelHandlerContext ctx) {
1405             ctx.fireChannelInactive();
1406         }
1407 
1408         @Override
1409         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1410             ctx.fireChannelRead(msg);
1411         }
1412 
1413         @Override
1414         public void channelReadComplete(ChannelHandlerContext ctx) {
1415             ctx.fireChannelReadComplete();
1416 
1417             readIfIsAutoRead();
1418         }
1419 
1420         private void readIfIsAutoRead() {
1421             if (channel.config().isAutoRead()) {
1422                 channel.read();
1423             }
1424         }
1425 
1426         @Override
1427         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1428             ctx.fireUserEventTriggered(evt);
1429         }
1430 
1431         @Override
1432         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1433             ctx.fireChannelWritabilityChanged();
1434         }
1435     }
1436 
1437     private abstract static class PendingHandlerCallback implements Runnable {
1438         final AbstractChannelHandlerContext ctx;
1439         PendingHandlerCallback next;
1440 
1441         PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1442             this.ctx = ctx;
1443         }
1444 
1445         abstract void execute();
1446     }
1447 
1448     private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1449 
1450         PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1451             super(ctx);
1452         }
1453 
1454         @Override
1455         public void run() {
1456             callHandlerAdded0(ctx);
1457         }
1458 
1459         @Override
1460         void execute() {
1461             EventExecutor executor = ctx.executor();
1462             if (executor.inEventLoop()) {
1463                 callHandlerAdded0(ctx);
1464             } else {
1465                 try {
1466                     executor.execute(this);
1467                 } catch (RejectedExecutionException e) {
1468                     if (logger.isWarnEnabled()) {
1469                         logger.warn(
1470                                 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1471                                 executor, ctx.name(), e);
1472                     }
1473                     atomicRemoveFromHandlerList(ctx);
1474                     ctx.setRemoved();
1475                 }
1476             }
1477         }
1478     }
1479 
1480     private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1481 
1482         PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1483             super(ctx);
1484         }
1485 
1486         @Override
1487         public void run() {
1488             callHandlerRemoved0(ctx);
1489         }
1490 
1491         @Override
1492         void execute() {
1493             EventExecutor executor = ctx.executor();
1494             if (executor.inEventLoop()) {
1495                 callHandlerRemoved0(ctx);
1496             } else {
1497                 try {
1498                     executor.execute(this);
1499                 } catch (RejectedExecutionException e) {
1500                     if (logger.isWarnEnabled()) {
1501                         logger.warn(
1502                                 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1503                                         " removing handler {}.", executor, ctx.name(), e);
1504                     }
1505                     // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
1506                     ctx.setRemoved();
1507                 }
1508             }
1509         }
1510     }
1511 }