View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.ResourceLeakDetector;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.EventExecutorGroup;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.StringUtil;
26  import io.netty.util.internal.UnstableApi;
27  import io.netty.util.internal.logging.InternalLogger;
28  import io.netty.util.internal.logging.InternalLoggerFactory;
29  
30  import java.net.SocketAddress;
31  import java.util.ArrayList;
32  import java.util.IdentityHashMap;
33  import java.util.Iterator;
34  import java.util.LinkedHashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NoSuchElementException;
38  import java.util.WeakHashMap;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
41  
42  /**
43   * The default {@link ChannelPipeline} implementation.  It is usually created
44   * by a {@link Channel} implementation when the {@link Channel} is created.
45   */
46  public class DefaultChannelPipeline implements ChannelPipeline {
47  
48      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
49  
50      private static final String HEAD_NAME = generateName0(HeadContext.class);
51      private static final String TAIL_NAME = generateName0(TailContext.class);
52  
53      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
54              new FastThreadLocal<Map<Class<?>, String>>() {
55          @Override
56          protected Map<Class<?>, String> initialValue() {
57              return new WeakHashMap<Class<?>, String>();
58          }
59      };
60  
61      private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
62              AtomicReferenceFieldUpdater.newUpdater(
63                      DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
64      final AbstractChannelHandlerContext head;
65      final AbstractChannelHandlerContext tail;
66  
67      private final Channel channel;
68      private final ChannelFuture succeededFuture;
69      private final VoidChannelPromise voidPromise;
70      private final boolean touch = ResourceLeakDetector.isEnabled();
71  
72      private Map<EventExecutorGroup, EventExecutor> childExecutors;
73      private volatile MessageSizeEstimator.Handle estimatorHandle;
74      private boolean firstRegistration = true;
75  
76      /**
77       * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
78       * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
79       *
80       * We only keep the head because it is expected that the list is used infrequently and its size is small.
81       * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
82       * complexity.
83       */
84      private PendingHandlerCallback pendingHandlerCallbackHead;
85  
86      /**
87       * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
88       * change.
89       */
90      private boolean registered;
91  
92      protected DefaultChannelPipeline(Channel channel) {
93          this.channel = ObjectUtil.checkNotNull(channel, "channel");
94          succeededFuture = new SucceededChannelFuture(channel, null);
95          voidPromise =  new VoidChannelPromise(channel, true);
96  
97          tail = new TailContext(this);
98          head = new HeadContext(this);
99  
100         head.next = tail;
101         tail.prev = head;
102     }
103 
104     final MessageSizeEstimator.Handle estimatorHandle() {
105         MessageSizeEstimator.Handle handle = estimatorHandle;
106         if (handle == null) {
107             handle = channel.config().getMessageSizeEstimator().newHandle();
108             if (!ESTIMATOR.compareAndSet(this, null, handle)) {
109                 handle = estimatorHandle;
110             }
111         }
112         return handle;
113     }
114 
115     final Object touch(Object msg, AbstractChannelHandlerContext next) {
116         return touch ? ReferenceCountUtil.touch(msg, next) : msg;
117     }
118 
119     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
120         return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
121     }
122 
123     private EventExecutor childExecutor(EventExecutorGroup group) {
124         if (group == null) {
125             return null;
126         }
127         Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
128         if (pinEventExecutor != null && !pinEventExecutor) {
129             return group.next();
130         }
131         Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
132         if (childExecutors == null) {
133             // Use size of 4 as most people only use one extra EventExecutor.
134             childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
135         }
136         // Pin one of the child executors once and remember it so that the same child executor
137         // is used to fire events for the same channel.
138         EventExecutor childExecutor = childExecutors.get(group);
139         if (childExecutor == null) {
140             childExecutor = group.next();
141             childExecutors.put(group, childExecutor);
142         }
143         return childExecutor;
144     }
145     @Override
146     public final Channel channel() {
147         return channel;
148     }
149 
150     @Override
151     public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
152         return addFirst(null, name, handler);
153     }
154 
155     @Override
156     public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
157         final AbstractChannelHandlerContext newCtx;
158         synchronized (this) {
159             checkMultiplicity(handler);
160             name = filterName(name, handler);
161 
162             newCtx = newContext(group, name, handler);
163 
164             addFirst0(newCtx);
165 
166             // If the registered is false it means that the channel was not registered on an eventLoop yet.
167             // In this case we add the context to the pipeline and add a task that will call
168             // ChannelHandler.handlerAdded(...) once the channel is registered.
169             if (!registered) {
170                 newCtx.setAddPending();
171                 callHandlerCallbackLater(newCtx, true);
172                 return this;
173             }
174 
175             EventExecutor executor = newCtx.executor();
176             if (!executor.inEventLoop()) {
177                 callHandlerAddedInEventLoop(newCtx, executor);
178                 return this;
179             }
180         }
181         callHandlerAdded0(newCtx);
182         return this;
183     }
184 
185     private void addFirst0(AbstractChannelHandlerContext newCtx) {
186         AbstractChannelHandlerContext nextCtx = head.next;
187         newCtx.prev = head;
188         newCtx.next = nextCtx;
189         head.next = newCtx;
190         nextCtx.prev = newCtx;
191     }
192 
193     @Override
194     public final ChannelPipeline addLast(String name, ChannelHandler handler) {
195         return addLast(null, name, handler);
196     }
197 
198     @Override
199     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
200         final AbstractChannelHandlerContext newCtx;
201         synchronized (this) {
202             checkMultiplicity(handler);
203 
204             newCtx = newContext(group, filterName(name, handler), handler);
205 
206             addLast0(newCtx);
207 
208             // If the registered is false it means that the channel was not registered on an eventLoop yet.
209             // In this case we add the context to the pipeline and add a task that will call
210             // ChannelHandler.handlerAdded(...) once the channel is registered.
211             if (!registered) {
212                 newCtx.setAddPending();
213                 callHandlerCallbackLater(newCtx, true);
214                 return this;
215             }
216 
217             EventExecutor executor = newCtx.executor();
218             if (!executor.inEventLoop()) {
219                 callHandlerAddedInEventLoop(newCtx, executor);
220                 return this;
221             }
222         }
223         callHandlerAdded0(newCtx);
224         return this;
225     }
226 
227     private void addLast0(AbstractChannelHandlerContext newCtx) {
228         AbstractChannelHandlerContext prev = tail.prev;
229         newCtx.prev = prev;
230         newCtx.next = tail;
231         prev.next = newCtx;
232         tail.prev = newCtx;
233     }
234 
235     @Override
236     public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
237         return addBefore(null, baseName, name, handler);
238     }
239 
240     @Override
241     public final ChannelPipeline addBefore(
242             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
243         final AbstractChannelHandlerContext newCtx;
244         final AbstractChannelHandlerContext ctx;
245         synchronized (this) {
246             checkMultiplicity(handler);
247             name = filterName(name, handler);
248             ctx = getContextOrDie(baseName);
249 
250             newCtx = newContext(group, name, handler);
251 
252             addBefore0(ctx, newCtx);
253 
254             // If the registered is false it means that the channel was not registered on an eventLoop yet.
255             // In this case we add the context to the pipeline and add a task that will call
256             // ChannelHandler.handlerAdded(...) once the channel is registered.
257             if (!registered) {
258                 newCtx.setAddPending();
259                 callHandlerCallbackLater(newCtx, true);
260                 return this;
261             }
262 
263             EventExecutor executor = newCtx.executor();
264             if (!executor.inEventLoop()) {
265                 callHandlerAddedInEventLoop(newCtx, executor);
266                 return this;
267             }
268         }
269         callHandlerAdded0(newCtx);
270         return this;
271     }
272 
273     private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
274         newCtx.prev = ctx.prev;
275         newCtx.next = ctx;
276         ctx.prev.next = newCtx;
277         ctx.prev = newCtx;
278     }
279 
280     private String filterName(String name, ChannelHandler handler) {
281         if (name == null) {
282             return generateName(handler);
283         }
284         checkDuplicateName(name);
285         return name;
286     }
287 
288     @Override
289     public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
290         return addAfter(null, baseName, name, handler);
291     }
292 
293     @Override
294     public final ChannelPipeline addAfter(
295             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
296         final AbstractChannelHandlerContext newCtx;
297         final AbstractChannelHandlerContext ctx;
298 
299         synchronized (this) {
300             checkMultiplicity(handler);
301             name = filterName(name, handler);
302             ctx = getContextOrDie(baseName);
303 
304             newCtx = newContext(group, name, handler);
305 
306             addAfter0(ctx, newCtx);
307 
308             // If the registered is false it means that the channel was not registered on an eventLoop yet.
309             // In this case we remove the context from the pipeline and add a task that will call
310             // ChannelHandler.handlerRemoved(...) once the channel is registered.
311             if (!registered) {
312                 newCtx.setAddPending();
313                 callHandlerCallbackLater(newCtx, true);
314                 return this;
315             }
316             EventExecutor executor = newCtx.executor();
317             if (!executor.inEventLoop()) {
318                 callHandlerAddedInEventLoop(newCtx, executor);
319                 return this;
320             }
321         }
322         callHandlerAdded0(newCtx);
323         return this;
324     }
325 
326     private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
327         newCtx.prev = ctx;
328         newCtx.next = ctx.next;
329         ctx.next.prev = newCtx;
330         ctx.next = newCtx;
331     }
332 
333     public final ChannelPipeline addFirst(ChannelHandler handler) {
334         return addFirst(null, handler);
335     }
336 
337     @Override
338     public final ChannelPipeline addFirst(ChannelHandler... handlers) {
339         return addFirst(null, handlers);
340     }
341 
342     @Override
343     public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
344         if (handlers == null) {
345             throw new NullPointerException("handlers");
346         }
347         if (handlers.length == 0 || handlers[0] == null) {
348             return this;
349         }
350 
351         int size;
352         for (size = 1; size < handlers.length; size ++) {
353             if (handlers[size] == null) {
354                 break;
355             }
356         }
357 
358         for (int i = size - 1; i >= 0; i --) {
359             ChannelHandler h = handlers[i];
360             addFirst(executor, null, h);
361         }
362 
363         return this;
364     }
365 
366     public final ChannelPipeline addLast(ChannelHandler handler) {
367         return addLast(null, handler);
368     }
369 
370     @Override
371     public final ChannelPipeline addLast(ChannelHandler... handlers) {
372         return addLast(null, handlers);
373     }
374 
375     @Override
376     public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
377         if (handlers == null) {
378             throw new NullPointerException("handlers");
379         }
380 
381         for (ChannelHandler h: handlers) {
382             if (h == null) {
383                 break;
384             }
385             addLast(executor, null, h);
386         }
387 
388         return this;
389     }
390 
391     private String generateName(ChannelHandler handler) {
392         Map<Class<?>, String> cache = nameCaches.get();
393         Class<?> handlerType = handler.getClass();
394         String name = cache.get(handlerType);
395         if (name == null) {
396             name = generateName0(handlerType);
397             cache.put(handlerType, name);
398         }
399 
400         // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
401         // any name conflicts.  Note that we don't cache the names generated here.
402         if (context0(name) != null) {
403             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
404             for (int i = 1;; i ++) {
405                 String newName = baseName + i;
406                 if (context0(newName) == null) {
407                     name = newName;
408                     break;
409                 }
410             }
411         }
412         return name;
413     }
414 
415     private static String generateName0(Class<?> handlerType) {
416         return StringUtil.simpleClassName(handlerType) + "#0";
417     }
418 
419     @Override
420     public final ChannelPipeline remove(ChannelHandler handler) {
421         remove(getContextOrDie(handler));
422         return this;
423     }
424 
425     @Override
426     public final ChannelHandler remove(String name) {
427         return remove(getContextOrDie(name)).handler();
428     }
429 
430     @SuppressWarnings("unchecked")
431     @Override
432     public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
433         return (T) remove(getContextOrDie(handlerType)).handler();
434     }
435 
436     public final <T extends ChannelHandler> T removeIfExists(String name) {
437         return removeIfExists(context(name));
438     }
439 
440     public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
441         return removeIfExists(context(handlerType));
442     }
443 
444     public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
445         return removeIfExists(context(handler));
446     }
447 
448     @SuppressWarnings("unchecked")
449     private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
450         if (ctx == null) {
451             return null;
452         }
453         return (T) remove((AbstractChannelHandlerContext) ctx).handler();
454     }
455 
456     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
457         assert ctx != head && ctx != tail;
458 
459         synchronized (this) {
460             remove0(ctx);
461 
462             // If the registered is false it means that the channel was not registered on an eventloop yet.
463             // In this case we remove the context from the pipeline and add a task that will call
464             // ChannelHandler.handlerRemoved(...) once the channel is registered.
465             if (!registered) {
466                 callHandlerCallbackLater(ctx, false);
467                 return ctx;
468             }
469 
470             EventExecutor executor = ctx.executor();
471             if (!executor.inEventLoop()) {
472                 executor.execute(new Runnable() {
473                     @Override
474                     public void run() {
475                         callHandlerRemoved0(ctx);
476                     }
477                 });
478                 return ctx;
479             }
480         }
481         callHandlerRemoved0(ctx);
482         return ctx;
483     }
484 
485     private static void remove0(AbstractChannelHandlerContext ctx) {
486         AbstractChannelHandlerContext prev = ctx.prev;
487         AbstractChannelHandlerContext next = ctx.next;
488         prev.next = next;
489         next.prev = prev;
490     }
491 
492     @Override
493     public final ChannelHandler removeFirst() {
494         if (head.next == tail) {
495             throw new NoSuchElementException();
496         }
497         return remove(head.next).handler();
498     }
499 
500     @Override
501     public final ChannelHandler removeLast() {
502         if (head.next == tail) {
503             throw new NoSuchElementException();
504         }
505         return remove(tail.prev).handler();
506     }
507 
508     @Override
509     public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
510         replace(getContextOrDie(oldHandler), newName, newHandler);
511         return this;
512     }
513 
514     @Override
515     public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
516         return replace(getContextOrDie(oldName), newName, newHandler);
517     }
518 
519     @Override
520     @SuppressWarnings("unchecked")
521     public final <T extends ChannelHandler> T replace(
522             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
523         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
524     }
525 
526     private ChannelHandler replace(
527             final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
528         assert ctx != head && ctx != tail;
529 
530         final AbstractChannelHandlerContext newCtx;
531         synchronized (this) {
532             checkMultiplicity(newHandler);
533             if (newName == null) {
534                 newName = generateName(newHandler);
535             } else {
536                 boolean sameName = ctx.name().equals(newName);
537                 if (!sameName) {
538                     checkDuplicateName(newName);
539                 }
540             }
541 
542             newCtx = newContext(ctx.executor, newName, newHandler);
543 
544             replace0(ctx, newCtx);
545 
546             // If the registered is false it means that the channel was not registered on an eventloop yet.
547             // In this case we replace the context in the pipeline
548             // and add a task that will call ChannelHandler.handlerAdded(...) and
549             // ChannelHandler.handlerRemoved(...) once the channel is registered.
550             if (!registered) {
551                 callHandlerCallbackLater(newCtx, true);
552                 callHandlerCallbackLater(ctx, false);
553                 return ctx.handler();
554             }
555             EventExecutor executor = ctx.executor();
556             if (!executor.inEventLoop()) {
557                 executor.execute(new Runnable() {
558                     @Override
559                     public void run() {
560                         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
561                         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
562                         // those event handlers must be called after handlerAdded().
563                         callHandlerAdded0(newCtx);
564                         callHandlerRemoved0(ctx);
565                     }
566                 });
567                 return ctx.handler();
568             }
569         }
570         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
571         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
572         // event handlers must be called after handlerAdded().
573         callHandlerAdded0(newCtx);
574         callHandlerRemoved0(ctx);
575         return ctx.handler();
576     }
577 
578     private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
579         AbstractChannelHandlerContext prev = oldCtx.prev;
580         AbstractChannelHandlerContext next = oldCtx.next;
581         newCtx.prev = prev;
582         newCtx.next = next;
583 
584         // Finish the replacement of oldCtx with newCtx in the linked list.
585         // Note that this doesn't mean events will be sent to the new handler immediately
586         // because we are currently at the event handler thread and no more than one handler methods can be invoked
587         // at the same time (we ensured that in replace().)
588         prev.next = newCtx;
589         next.prev = newCtx;
590 
591         // update the reference to the replacement so forward of buffered content will work correctly
592         oldCtx.prev = newCtx;
593         oldCtx.next = newCtx;
594     }
595 
596     private static void checkMultiplicity(ChannelHandler handler) {
597         if (handler instanceof ChannelHandlerAdapter) {
598             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
599             if (!h.isSharable() && h.added) {
600                 throw new ChannelPipelineException(
601                         h.getClass().getName() +
602                         " is not a @Sharable handler, so can't be added or removed multiple times.");
603             }
604             h.added = true;
605         }
606     }
607 
608     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
609         try {
610             ctx.callHandlerAdded();
611         } catch (Throwable t) {
612             boolean removed = false;
613             try {
614                 remove0(ctx);
615                 ctx.callHandlerRemoved();
616                 removed = true;
617             } catch (Throwable t2) {
618                 if (logger.isWarnEnabled()) {
619                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
620                 }
621             }
622 
623             if (removed) {
624                 fireExceptionCaught(new ChannelPipelineException(
625                         ctx.handler().getClass().getName() +
626                         ".handlerAdded() has thrown an exception; removed.", t));
627             } else {
628                 fireExceptionCaught(new ChannelPipelineException(
629                         ctx.handler().getClass().getName() +
630                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
631             }
632         }
633     }
634 
635     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
636         // Notify the complete removal.
637         try {
638             ctx.callHandlerRemoved();
639         } catch (Throwable t) {
640             fireExceptionCaught(new ChannelPipelineException(
641                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
642         }
643     }
644 
645     final void invokeHandlerAddedIfNeeded() {
646         assert channel.eventLoop().inEventLoop();
647         if (firstRegistration) {
648             firstRegistration = false;
649             // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
650             // that were added before the registration was done.
651             callHandlerAddedForAllHandlers();
652         }
653     }
654 
655     @Override
656     public final ChannelHandler first() {
657         ChannelHandlerContext first = firstContext();
658         if (first == null) {
659             return null;
660         }
661         return first.handler();
662     }
663 
664     @Override
665     public final ChannelHandlerContext firstContext() {
666         AbstractChannelHandlerContext first = head.next;
667         if (first == tail) {
668             return null;
669         }
670         return head.next;
671     }
672 
673     @Override
674     public final ChannelHandler last() {
675         AbstractChannelHandlerContext last = tail.prev;
676         if (last == head) {
677             return null;
678         }
679         return last.handler();
680     }
681 
682     @Override
683     public final ChannelHandlerContext lastContext() {
684         AbstractChannelHandlerContext last = tail.prev;
685         if (last == head) {
686             return null;
687         }
688         return last;
689     }
690 
691     @Override
692     public final ChannelHandler get(String name) {
693         ChannelHandlerContext ctx = context(name);
694         if (ctx == null) {
695             return null;
696         } else {
697             return ctx.handler();
698         }
699     }
700 
701     @SuppressWarnings("unchecked")
702     @Override
703     public final <T extends ChannelHandler> T get(Class<T> handlerType) {
704         ChannelHandlerContext ctx = context(handlerType);
705         if (ctx == null) {
706             return null;
707         } else {
708             return (T) ctx.handler();
709         }
710     }
711 
712     @Override
713     public final ChannelHandlerContext context(String name) {
714         if (name == null) {
715             throw new NullPointerException("name");
716         }
717 
718         return context0(name);
719     }
720 
721     @Override
722     public final ChannelHandlerContext context(ChannelHandler handler) {
723         if (handler == null) {
724             throw new NullPointerException("handler");
725         }
726 
727         AbstractChannelHandlerContext ctx = head.next;
728         for (;;) {
729 
730             if (ctx == null) {
731                 return null;
732             }
733 
734             if (ctx.handler() == handler) {
735                 return ctx;
736             }
737 
738             ctx = ctx.next;
739         }
740     }
741 
742     @Override
743     public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
744         if (handlerType == null) {
745             throw new NullPointerException("handlerType");
746         }
747 
748         AbstractChannelHandlerContext ctx = head.next;
749         for (;;) {
750             if (ctx == null) {
751                 return null;
752             }
753             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
754                 return ctx;
755             }
756             ctx = ctx.next;
757         }
758     }
759 
760     @Override
761     public final List<String> names() {
762         List<String> list = new ArrayList<String>();
763         AbstractChannelHandlerContext ctx = head.next;
764         for (;;) {
765             if (ctx == null) {
766                 return list;
767             }
768             list.add(ctx.name());
769             ctx = ctx.next;
770         }
771     }
772 
773     @Override
774     public final Map<String, ChannelHandler> toMap() {
775         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
776         AbstractChannelHandlerContext ctx = head.next;
777         for (;;) {
778             if (ctx == tail) {
779                 return map;
780             }
781             map.put(ctx.name(), ctx.handler());
782             ctx = ctx.next;
783         }
784     }
785 
786     @Override
787     public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
788         return toMap().entrySet().iterator();
789     }
790 
791     /**
792      * Returns the {@link String} representation of this pipeline.
793      */
794     @Override
795     public final String toString() {
796         StringBuilder buf = new StringBuilder()
797             .append(StringUtil.simpleClassName(this))
798             .append('{');
799         AbstractChannelHandlerContext ctx = head.next;
800         for (;;) {
801             if (ctx == tail) {
802                 break;
803             }
804 
805             buf.append('(')
806                .append(ctx.name())
807                .append(" = ")
808                .append(ctx.handler().getClass().getName())
809                .append(')');
810 
811             ctx = ctx.next;
812             if (ctx == tail) {
813                 break;
814             }
815 
816             buf.append(", ");
817         }
818         buf.append('}');
819         return buf.toString();
820     }
821 
822     @Override
823     public final ChannelPipeline fireChannelRegistered() {
824         AbstractChannelHandlerContext.invokeChannelRegistered(head);
825         return this;
826     }
827 
828     @Override
829     public final ChannelPipeline fireChannelUnregistered() {
830         AbstractChannelHandlerContext.invokeChannelUnregistered(head);
831         return this;
832     }
833 
834     /**
835      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
836      * handlerRemoved().
837      *
838      * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)})
839      * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
840      * the handlers are removed after all events are handled.
841      *
842      * See: https://github.com/netty/netty/issues/3156
843      */
844     private synchronized void destroy() {
845         destroyUp(head.next, false);
846     }
847 
848     private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
849         final Thread currentThread = Thread.currentThread();
850         final AbstractChannelHandlerContext tail = this.tail;
851         for (;;) {
852             if (ctx == tail) {
853                 destroyDown(currentThread, tail.prev, inEventLoop);
854                 break;
855             }
856 
857             final EventExecutor executor = ctx.executor();
858             if (!inEventLoop && !executor.inEventLoop(currentThread)) {
859                 final AbstractChannelHandlerContext finalCtx = ctx;
860                 executor.execute(new Runnable() {
861                     @Override
862                     public void run() {
863                         destroyUp(finalCtx, true);
864                     }
865                 });
866                 break;
867             }
868 
869             ctx = ctx.next;
870             inEventLoop = false;
871         }
872     }
873 
874     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
875         // We have reached at tail; now traverse backwards.
876         final AbstractChannelHandlerContext head = this.head;
877         for (;;) {
878             if (ctx == head) {
879                 break;
880             }
881 
882             final EventExecutor executor = ctx.executor();
883             if (inEventLoop || executor.inEventLoop(currentThread)) {
884                 synchronized (this) {
885                     remove0(ctx);
886                 }
887                 callHandlerRemoved0(ctx);
888             } else {
889                 final AbstractChannelHandlerContext finalCtx = ctx;
890                 executor.execute(new Runnable() {
891                     @Override
892                     public void run() {
893                         destroyDown(Thread.currentThread(), finalCtx, true);
894                     }
895                 });
896                 break;
897             }
898 
899             ctx = ctx.prev;
900             inEventLoop = false;
901         }
902     }
903 
904     @Override
905     public final ChannelPipeline fireChannelActive() {
906         AbstractChannelHandlerContext.invokeChannelActive(head);
907         return this;
908     }
909 
910     @Override
911     public final ChannelPipeline fireChannelInactive() {
912         AbstractChannelHandlerContext.invokeChannelInactive(head);
913         return this;
914     }
915 
916     @Override
917     public final ChannelPipeline fireExceptionCaught(Throwable cause) {
918         AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
919         return this;
920     }
921 
922     @Override
923     public final ChannelPipeline fireUserEventTriggered(Object event) {
924         AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
925         return this;
926     }
927 
928     @Override
929     public final ChannelPipeline fireChannelRead(Object msg) {
930         AbstractChannelHandlerContext.invokeChannelRead(head, msg);
931         return this;
932     }
933 
934     @Override
935     public final ChannelPipeline fireChannelReadComplete() {
936         AbstractChannelHandlerContext.invokeChannelReadComplete(head);
937         return this;
938     }
939 
940     @Override
941     public final ChannelPipeline fireChannelWritabilityChanged() {
942         AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
943         return this;
944     }
945 
946     @Override
947     public final ChannelFuture bind(SocketAddress localAddress) {
948         return tail.bind(localAddress);
949     }
950 
951     @Override
952     public final ChannelFuture connect(SocketAddress remoteAddress) {
953         return tail.connect(remoteAddress);
954     }
955 
956     @Override
957     public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
958         return tail.connect(remoteAddress, localAddress);
959     }
960 
961     @Override
962     public final ChannelFuture disconnect() {
963         return tail.disconnect();
964     }
965 
966     @Override
967     public final ChannelFuture close() {
968         return tail.close();
969     }
970 
971     @Override
972     public final ChannelFuture deregister() {
973         return tail.deregister();
974     }
975 
976     @Override
977     public final ChannelPipeline flush() {
978         tail.flush();
979         return this;
980     }
981 
982     @Override
983     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
984         return tail.bind(localAddress, promise);
985     }
986 
987     @Override
988     public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
989         return tail.connect(remoteAddress, promise);
990     }
991 
992     @Override
993     public final ChannelFuture connect(
994             SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
995         return tail.connect(remoteAddress, localAddress, promise);
996     }
997 
998     @Override
999     public final ChannelFuture disconnect(ChannelPromise promise) {
1000         return tail.disconnect(promise);
1001     }
1002 
1003     @Override
1004     public final ChannelFuture close(ChannelPromise promise) {
1005         return tail.close(promise);
1006     }
1007 
1008     @Override
1009     public final ChannelFuture deregister(final ChannelPromise promise) {
1010         return tail.deregister(promise);
1011     }
1012 
1013     @Override
1014     public final ChannelPipeline read() {
1015         tail.read();
1016         return this;
1017     }
1018 
1019     @Override
1020     public final ChannelFuture write(Object msg) {
1021         return tail.write(msg);
1022     }
1023 
1024     @Override
1025     public final ChannelFuture write(Object msg, ChannelPromise promise) {
1026         return tail.write(msg, promise);
1027     }
1028 
1029     @Override
1030     public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1031         return tail.writeAndFlush(msg, promise);
1032     }
1033 
1034     @Override
1035     public final ChannelFuture writeAndFlush(Object msg) {
1036         return tail.writeAndFlush(msg);
1037     }
1038 
1039     @Override
1040     public final ChannelPromise newPromise() {
1041         return new DefaultChannelPromise(channel);
1042     }
1043 
1044     @Override
1045     public final ChannelProgressivePromise newProgressivePromise() {
1046         return new DefaultChannelProgressivePromise(channel);
1047     }
1048 
1049     @Override
1050     public final ChannelFuture newSucceededFuture() {
1051         return succeededFuture;
1052     }
1053 
1054     @Override
1055     public final ChannelFuture newFailedFuture(Throwable cause) {
1056         return new FailedChannelFuture(channel, null, cause);
1057     }
1058 
1059     @Override
1060     public final ChannelPromise voidPromise() {
1061         return voidPromise;
1062     }
1063 
1064     private void checkDuplicateName(String name) {
1065         if (context0(name) != null) {
1066             throw new IllegalArgumentException("Duplicate handler name: " + name);
1067         }
1068     }
1069 
1070     private AbstractChannelHandlerContext context0(String name) {
1071         AbstractChannelHandlerContext context = head.next;
1072         while (context != tail) {
1073             if (context.name().equals(name)) {
1074                 return context;
1075             }
1076             context = context.next;
1077         }
1078         return null;
1079     }
1080 
1081     private AbstractChannelHandlerContext getContextOrDie(String name) {
1082         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1083         if (ctx == null) {
1084             throw new NoSuchElementException(name);
1085         } else {
1086             return ctx;
1087         }
1088     }
1089 
1090     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1091         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1092         if (ctx == null) {
1093             throw new NoSuchElementException(handler.getClass().getName());
1094         } else {
1095             return ctx;
1096         }
1097     }
1098 
1099     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1100         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1101         if (ctx == null) {
1102             throw new NoSuchElementException(handlerType.getName());
1103         } else {
1104             return ctx;
1105         }
1106     }
1107 
1108     private void callHandlerAddedForAllHandlers() {
1109         final PendingHandlerCallback pendingHandlerCallbackHead;
1110         synchronized (this) {
1111             assert !registered;
1112 
1113             // This Channel itself was registered.
1114             registered = true;
1115 
1116             pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1117             // Null out so it can be GC'ed.
1118             this.pendingHandlerCallbackHead = null;
1119         }
1120 
1121         // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
1122         // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
1123         // the EventLoop.
1124         PendingHandlerCallback task = pendingHandlerCallbackHead;
1125         while (task != null) {
1126             task.execute();
1127             task = task.next;
1128         }
1129     }
1130 
1131     private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1132         assert !registered;
1133 
1134         PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1135         PendingHandlerCallback pending = pendingHandlerCallbackHead;
1136         if (pending == null) {
1137             pendingHandlerCallbackHead = task;
1138         } else {
1139             // Find the tail of the linked-list.
1140             while (pending.next != null) {
1141                 pending = pending.next;
1142             }
1143             pending.next = task;
1144         }
1145     }
1146 
1147     private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
1148         newCtx.setAddPending();
1149         executor.execute(new Runnable() {
1150             @Override
1151             public void run() {
1152                 callHandlerAdded0(newCtx);
1153             }
1154         });
1155     }
1156 
1157     /**
1158      * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
1159      * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
1160      */
1161     protected void onUnhandledInboundException(Throwable cause) {
1162         try {
1163             logger.warn(
1164                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1165                             "It usually means the last handler in the pipeline did not handle the exception.",
1166                     cause);
1167         } finally {
1168             ReferenceCountUtil.release(cause);
1169         }
1170     }
1171 
1172     /**
1173      * Called once the {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}event hit
1174      * the end of the {@link ChannelPipeline}.
1175      */
1176     protected void onUnhandledInboundChannelActive() {
1177     }
1178 
1179     /**
1180      * Called once the {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} event hit
1181      * the end of the {@link ChannelPipeline}.
1182      */
1183     protected void onUnhandledInboundChannelInactive() {
1184     }
1185 
1186     /**
1187      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1188      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1189      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1190      */
1191     protected void onUnhandledInboundMessage(Object msg) {
1192         try {
1193             logger.debug(
1194                     "Discarded inbound message {} that reached at the tail of the pipeline. " +
1195                             "Please check your pipeline configuration.", msg);
1196         } finally {
1197             ReferenceCountUtil.release(msg);
1198         }
1199     }
1200 
1201     /**
1202      * Called once the {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)} event hit
1203      * the end of the {@link ChannelPipeline}.
1204      */
1205     protected void onUnhandledInboundChannelReadComplete() {
1206     }
1207 
1208     /**
1209      * Called once an user event hit the end of the {@link ChannelPipeline} without been handled by the user
1210      * in {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is responsible
1211      * to call {@link ReferenceCountUtil#release(Object)} on the given event at some point.
1212      */
1213     protected void onUnhandledInboundUserEventTriggered(Object evt) {
1214         // This may not be a configuration error and so don't log anything.
1215         // The event may be superfluous for the current pipeline configuration.
1216         ReferenceCountUtil.release(evt);
1217     }
1218 
1219     /**
1220      * Called once the {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)} event hit
1221      * the end of the {@link ChannelPipeline}.
1222      */
1223     protected void onUnhandledChannelWritabilityChanged() {
1224     }
1225 
1226     @UnstableApi
1227     protected void incrementPendingOutboundBytes(long size) {
1228         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1229         if (buffer != null) {
1230             buffer.incrementPendingOutboundBytes(size);
1231         }
1232     }
1233 
1234     @UnstableApi
1235     protected void decrementPendingOutboundBytes(long size) {
1236         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1237         if (buffer != null) {
1238             buffer.decrementPendingOutboundBytes(size);
1239         }
1240     }
1241 
1242     // A special catch-all handler that handles both bytes and messages.
1243     final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1244 
1245         TailContext(DefaultChannelPipeline pipeline) {
1246             super(pipeline, null, TAIL_NAME, TailContext.class);
1247             setAddComplete();
1248         }
1249 
1250         @Override
1251         public ChannelHandler handler() {
1252             return this;
1253         }
1254 
1255         @Override
1256         public void channelRegistered(ChannelHandlerContext ctx) { }
1257 
1258         @Override
1259         public void channelUnregistered(ChannelHandlerContext ctx) { }
1260 
1261         @Override
1262         public void channelActive(ChannelHandlerContext ctx) {
1263             onUnhandledInboundChannelActive();
1264         }
1265 
1266         @Override
1267         public void channelInactive(ChannelHandlerContext ctx) {
1268             onUnhandledInboundChannelInactive();
1269         }
1270 
1271         @Override
1272         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1273             onUnhandledChannelWritabilityChanged();
1274         }
1275 
1276         @Override
1277         public void handlerAdded(ChannelHandlerContext ctx) { }
1278 
1279         @Override
1280         public void handlerRemoved(ChannelHandlerContext ctx) { }
1281 
1282         @Override
1283         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1284             onUnhandledInboundUserEventTriggered(evt);
1285         }
1286 
1287         @Override
1288         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1289             onUnhandledInboundException(cause);
1290         }
1291 
1292         @Override
1293         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1294             onUnhandledInboundMessage(msg);
1295         }
1296 
1297         @Override
1298         public void channelReadComplete(ChannelHandlerContext ctx) {
1299             onUnhandledInboundChannelReadComplete();
1300         }
1301     }
1302 
1303     final class HeadContext extends AbstractChannelHandlerContext
1304             implements ChannelOutboundHandler, ChannelInboundHandler {
1305 
1306         private final Unsafe unsafe;
1307 
1308         HeadContext(DefaultChannelPipeline pipeline) {
1309             super(pipeline, null, HEAD_NAME, HeadContext.class);
1310             unsafe = pipeline.channel().unsafe();
1311             setAddComplete();
1312         }
1313 
1314         @Override
1315         public ChannelHandler handler() {
1316             return this;
1317         }
1318 
1319         @Override
1320         public void handlerAdded(ChannelHandlerContext ctx) {
1321             // NOOP
1322         }
1323 
1324         @Override
1325         public void handlerRemoved(ChannelHandlerContext ctx) {
1326             // NOOP
1327         }
1328 
1329         @Override
1330         public void bind(
1331                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
1332             unsafe.bind(localAddress, promise);
1333         }
1334 
1335         @Override
1336         public void connect(
1337                 ChannelHandlerContext ctx,
1338                 SocketAddress remoteAddress, SocketAddress localAddress,
1339                 ChannelPromise promise) {
1340             unsafe.connect(remoteAddress, localAddress, promise);
1341         }
1342 
1343         @Override
1344         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
1345             unsafe.disconnect(promise);
1346         }
1347 
1348         @Override
1349         public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
1350             unsafe.close(promise);
1351         }
1352 
1353         @Override
1354         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
1355             unsafe.deregister(promise);
1356         }
1357 
1358         @Override
1359         public void read(ChannelHandlerContext ctx) {
1360             unsafe.beginRead();
1361         }
1362 
1363         @Override
1364         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1365             unsafe.write(msg, promise);
1366         }
1367 
1368         @Override
1369         public void flush(ChannelHandlerContext ctx) {
1370             unsafe.flush();
1371         }
1372 
1373         @Override
1374         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1375             ctx.fireExceptionCaught(cause);
1376         }
1377 
1378         @Override
1379         public void channelRegistered(ChannelHandlerContext ctx) {
1380             invokeHandlerAddedIfNeeded();
1381             ctx.fireChannelRegistered();
1382         }
1383 
1384         @Override
1385         public void channelUnregistered(ChannelHandlerContext ctx) {
1386             ctx.fireChannelUnregistered();
1387 
1388             // Remove all handlers sequentially if channel is closed and unregistered.
1389             if (!channel.isOpen()) {
1390                 destroy();
1391             }
1392         }
1393 
1394         @Override
1395         public void channelActive(ChannelHandlerContext ctx) {
1396             ctx.fireChannelActive();
1397 
1398             readIfIsAutoRead();
1399         }
1400 
1401         @Override
1402         public void channelInactive(ChannelHandlerContext ctx) {
1403             ctx.fireChannelInactive();
1404         }
1405 
1406         @Override
1407         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1408             ctx.fireChannelRead(msg);
1409         }
1410 
1411         @Override
1412         public void channelReadComplete(ChannelHandlerContext ctx) {
1413             ctx.fireChannelReadComplete();
1414 
1415             readIfIsAutoRead();
1416         }
1417 
1418         private void readIfIsAutoRead() {
1419             if (channel.config().isAutoRead()) {
1420                 channel.read();
1421             }
1422         }
1423 
1424         @Override
1425         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1426             ctx.fireUserEventTriggered(evt);
1427         }
1428 
1429         @Override
1430         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1431             ctx.fireChannelWritabilityChanged();
1432         }
1433     }
1434 
1435     private abstract static class PendingHandlerCallback implements Runnable {
1436         final AbstractChannelHandlerContext ctx;
1437         PendingHandlerCallback next;
1438 
1439         PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1440             this.ctx = ctx;
1441         }
1442 
1443         abstract void execute();
1444     }
1445 
1446     private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1447 
1448         PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1449             super(ctx);
1450         }
1451 
1452         @Override
1453         public void run() {
1454             callHandlerAdded0(ctx);
1455         }
1456 
1457         @Override
1458         void execute() {
1459             EventExecutor executor = ctx.executor();
1460             if (executor.inEventLoop()) {
1461                 callHandlerAdded0(ctx);
1462             } else {
1463                 try {
1464                     executor.execute(this);
1465                 } catch (RejectedExecutionException e) {
1466                     if (logger.isWarnEnabled()) {
1467                         logger.warn(
1468                                 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1469                                 executor, ctx.name(), e);
1470                     }
1471                     remove0(ctx);
1472                     ctx.setRemoved();
1473                 }
1474             }
1475         }
1476     }
1477 
1478     private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1479 
1480         PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1481             super(ctx);
1482         }
1483 
1484         @Override
1485         public void run() {
1486             callHandlerRemoved0(ctx);
1487         }
1488 
1489         @Override
1490         void execute() {
1491             EventExecutor executor = ctx.executor();
1492             if (executor.inEventLoop()) {
1493                 callHandlerRemoved0(ctx);
1494             } else {
1495                 try {
1496                     executor.execute(this);
1497                 } catch (RejectedExecutionException e) {
1498                     if (logger.isWarnEnabled()) {
1499                         logger.warn(
1500                                 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1501                                         " removing handler {}.", executor, ctx.name(), e);
1502                     }
1503                     // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
1504                     ctx.setRemoved();
1505                 }
1506             }
1507         }
1508     }
1509 }