View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.ResourceLeakDetector;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.EventExecutorGroup;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.StringUtil;
26  import io.netty.util.internal.UnstableApi;
27  import io.netty.util.internal.logging.InternalLogger;
28  import io.netty.util.internal.logging.InternalLoggerFactory;
29  
30  import java.net.SocketAddress;
31  import java.util.ArrayList;
32  import java.util.IdentityHashMap;
33  import java.util.Iterator;
34  import java.util.LinkedHashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NoSuchElementException;
38  import java.util.WeakHashMap;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
41  
42  /**
43   * The default {@link ChannelPipeline} implementation.  It is usually created
44   * by a {@link Channel} implementation when the {@link Channel} is created.
45   */
46  public class DefaultChannelPipeline implements ChannelPipeline {
47  
48      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
49  
50      private static final String HEAD_NAME = generateName0(HeadContext.class);
51      private static final String TAIL_NAME = generateName0(TailContext.class);
52  
53      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
54              new FastThreadLocal<Map<Class<?>, String>>() {
55          @Override
56          protected Map<Class<?>, String> initialValue() throws Exception {
57              return new WeakHashMap<Class<?>, String>();
58          }
59      };
60  
61      private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
62              AtomicReferenceFieldUpdater.newUpdater(
63                      DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
64      final AbstractChannelHandlerContext head;
65      final AbstractChannelHandlerContext tail;
66  
67      private final Channel channel;
68      private final ChannelFuture succeededFuture;
69      private final VoidChannelPromise voidPromise;
70      private final boolean touch = ResourceLeakDetector.isEnabled();
71  
72      private Map<EventExecutorGroup, EventExecutor> childExecutors;
73      private volatile MessageSizeEstimator.Handle estimatorHandle;
74      private boolean firstRegistration = true;
75  
76      /**
77       * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
78       * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
79       *
80       * We only keep the head because it is expected that the list is used infrequently and its size is small.
81       * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
82       * complexity.
83       */
84      private PendingHandlerCallback pendingHandlerCallbackHead;
85  
86      /**
87       * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
88       * change.
89       */
90      private boolean registered;
91  
92      protected DefaultChannelPipeline(Channel channel) {
93          this.channel = ObjectUtil.checkNotNull(channel, "channel");
94          succeededFuture = new SucceededChannelFuture(channel, null);
95          voidPromise =  new VoidChannelPromise(channel, true);
96  
97          tail = new TailContext(this);
98          head = new HeadContext(this);
99  
100         head.next = tail;
101         tail.prev = head;
102     }
103 
104     final MessageSizeEstimator.Handle estimatorHandle() {
105         MessageSizeEstimator.Handle handle = estimatorHandle;
106         if (handle == null) {
107             handle = channel.config().getMessageSizeEstimator().newHandle();
108             if (!ESTIMATOR.compareAndSet(this, null, handle)) {
109                 handle = estimatorHandle;
110             }
111         }
112         return handle;
113     }
114 
115     final Object touch(Object msg, AbstractChannelHandlerContext next) {
116         return touch ? ReferenceCountUtil.touch(msg, next) : msg;
117     }
118 
119     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
120         return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
121     }
122 
123     private EventExecutor childExecutor(EventExecutorGroup group) {
124         if (group == null) {
125             return null;
126         }
127         Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
128         if (pinEventExecutor != null && !pinEventExecutor) {
129             return group.next();
130         }
131         Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
132         if (childExecutors == null) {
133             // Use size of 4 as most people only use one extra EventExecutor.
134             childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
135         }
136         // Pin one of the child executors once and remember it so that the same child executor
137         // is used to fire events for the same channel.
138         EventExecutor childExecutor = childExecutors.get(group);
139         if (childExecutor == null) {
140             childExecutor = group.next();
141             childExecutors.put(group, childExecutor);
142         }
143         return childExecutor;
144     }
145     @Override
146     public final Channel channel() {
147         return channel;
148     }
149 
150     @Override
151     public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
152         return addFirst(null, name, handler);
153     }
154 
155     @Override
156     public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
157         final AbstractChannelHandlerContext newCtx;
158         synchronized (this) {
159             checkMultiplicity(handler);
160             name = filterName(name, handler);
161 
162             newCtx = newContext(group, name, handler);
163 
164             addFirst0(newCtx);
165 
166             // If the registered is false it means that the channel was not registered on an eventloop yet.
167             // In this case we add the context to the pipeline and add a task that will call
168             // ChannelHandler.handlerAdded(...) once the channel is registered.
169             if (!registered) {
170                 newCtx.setAddPending();
171                 callHandlerCallbackLater(newCtx, true);
172                 return this;
173             }
174 
175             EventExecutor executor = newCtx.executor();
176             if (!executor.inEventLoop()) {
177                 newCtx.setAddPending();
178                 executor.execute(new Runnable() {
179                     @Override
180                     public void run() {
181                         callHandlerAdded0(newCtx);
182                     }
183                 });
184                 return this;
185             }
186         }
187         callHandlerAdded0(newCtx);
188         return this;
189     }
190 
191     private void addFirst0(AbstractChannelHandlerContext newCtx) {
192         AbstractChannelHandlerContext nextCtx = head.next;
193         newCtx.prev = head;
194         newCtx.next = nextCtx;
195         head.next = newCtx;
196         nextCtx.prev = newCtx;
197     }
198 
199     @Override
200     public final ChannelPipeline addLast(String name, ChannelHandler handler) {
201         return addLast(null, name, handler);
202     }
203 
204     @Override
205     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
206         final AbstractChannelHandlerContext newCtx;
207         synchronized (this) {
208             checkMultiplicity(handler);
209 
210             newCtx = newContext(group, filterName(name, handler), handler);
211 
212             addLast0(newCtx);
213 
214             // If the registered is false it means that the channel was not registered on an eventloop yet.
215             // In this case we add the context to the pipeline and add a task that will call
216             // ChannelHandler.handlerAdded(...) once the channel is registered.
217             if (!registered) {
218                 newCtx.setAddPending();
219                 callHandlerCallbackLater(newCtx, true);
220                 return this;
221             }
222 
223             EventExecutor executor = newCtx.executor();
224             if (!executor.inEventLoop()) {
225                 newCtx.setAddPending();
226                 executor.execute(new Runnable() {
227                     @Override
228                     public void run() {
229                         callHandlerAdded0(newCtx);
230                     }
231                 });
232                 return this;
233             }
234         }
235         callHandlerAdded0(newCtx);
236         return this;
237     }
238 
239     private void addLast0(AbstractChannelHandlerContext newCtx) {
240         AbstractChannelHandlerContext prev = tail.prev;
241         newCtx.prev = prev;
242         newCtx.next = tail;
243         prev.next = newCtx;
244         tail.prev = newCtx;
245     }
246 
247     @Override
248     public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
249         return addBefore(null, baseName, name, handler);
250     }
251 
252     @Override
253     public final ChannelPipeline addBefore(
254             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
255         final AbstractChannelHandlerContext newCtx;
256         final AbstractChannelHandlerContext ctx;
257         synchronized (this) {
258             checkMultiplicity(handler);
259             name = filterName(name, handler);
260             ctx = getContextOrDie(baseName);
261 
262             newCtx = newContext(group, name, handler);
263 
264             addBefore0(ctx, newCtx);
265 
266             // If the registered is false it means that the channel was not registered on an eventloop yet.
267             // In this case we add the context to the pipeline and add a task that will call
268             // ChannelHandler.handlerAdded(...) once the channel is registered.
269             if (!registered) {
270                 newCtx.setAddPending();
271                 callHandlerCallbackLater(newCtx, true);
272                 return this;
273             }
274 
275             EventExecutor executor = newCtx.executor();
276             if (!executor.inEventLoop()) {
277                 newCtx.setAddPending();
278                 executor.execute(new Runnable() {
279                     @Override
280                     public void run() {
281                         callHandlerAdded0(newCtx);
282                     }
283                 });
284                 return this;
285             }
286         }
287         callHandlerAdded0(newCtx);
288         return this;
289     }
290 
291     private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
292         newCtx.prev = ctx.prev;
293         newCtx.next = ctx;
294         ctx.prev.next = newCtx;
295         ctx.prev = newCtx;
296     }
297 
298     private String filterName(String name, ChannelHandler handler) {
299         if (name == null) {
300             return generateName(handler);
301         }
302         checkDuplicateName(name);
303         return name;
304     }
305 
306     @Override
307     public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
308         return addAfter(null, baseName, name, handler);
309     }
310 
311     @Override
312     public final ChannelPipeline addAfter(
313             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
314         final AbstractChannelHandlerContext newCtx;
315         final AbstractChannelHandlerContext ctx;
316 
317         synchronized (this) {
318             checkMultiplicity(handler);
319             name = filterName(name, handler);
320             ctx = getContextOrDie(baseName);
321 
322             newCtx = newContext(group, name, handler);
323 
324             addAfter0(ctx, newCtx);
325 
326             // If the registered is false it means that the channel was not registered on an eventloop yet.
327             // In this case we remove the context from the pipeline and add a task that will call
328             // ChannelHandler.handlerRemoved(...) once the channel is registered.
329             if (!registered) {
330                 newCtx.setAddPending();
331                 callHandlerCallbackLater(newCtx, true);
332                 return this;
333             }
334             EventExecutor executor = newCtx.executor();
335             if (!executor.inEventLoop()) {
336                 newCtx.setAddPending();
337                 executor.execute(new Runnable() {
338                     @Override
339                     public void run() {
340                         callHandlerAdded0(newCtx);
341                     }
342                 });
343                 return this;
344             }
345         }
346         callHandlerAdded0(newCtx);
347         return this;
348     }
349 
350     private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
351         newCtx.prev = ctx;
352         newCtx.next = ctx.next;
353         ctx.next.prev = newCtx;
354         ctx.next = newCtx;
355     }
356 
357     @Override
358     public final ChannelPipeline addFirst(ChannelHandler... handlers) {
359         return addFirst(null, handlers);
360     }
361 
362     @Override
363     public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
364         if (handlers == null) {
365             throw new NullPointerException("handlers");
366         }
367         if (handlers.length == 0 || handlers[0] == null) {
368             return this;
369         }
370 
371         int size;
372         for (size = 1; size < handlers.length; size ++) {
373             if (handlers[size] == null) {
374                 break;
375             }
376         }
377 
378         for (int i = size - 1; i >= 0; i --) {
379             ChannelHandler h = handlers[i];
380             addFirst(executor, null, h);
381         }
382 
383         return this;
384     }
385 
386     @Override
387     public final ChannelPipeline addLast(ChannelHandler... handlers) {
388         return addLast(null, handlers);
389     }
390 
391     @Override
392     public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
393         if (handlers == null) {
394             throw new NullPointerException("handlers");
395         }
396 
397         for (ChannelHandler h: handlers) {
398             if (h == null) {
399                 break;
400             }
401             addLast(executor, null, h);
402         }
403 
404         return this;
405     }
406 
407     private String generateName(ChannelHandler handler) {
408         Map<Class<?>, String> cache = nameCaches.get();
409         Class<?> handlerType = handler.getClass();
410         String name = cache.get(handlerType);
411         if (name == null) {
412             name = generateName0(handlerType);
413             cache.put(handlerType, name);
414         }
415 
416         // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
417         // any name conflicts.  Note that we don't cache the names generated here.
418         if (context0(name) != null) {
419             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
420             for (int i = 1;; i ++) {
421                 String newName = baseName + i;
422                 if (context0(newName) == null) {
423                     name = newName;
424                     break;
425                 }
426             }
427         }
428         return name;
429     }
430 
431     private static String generateName0(Class<?> handlerType) {
432         return StringUtil.simpleClassName(handlerType) + "#0";
433     }
434 
435     @Override
436     public final ChannelPipeline remove(ChannelHandler handler) {
437         remove(getContextOrDie(handler));
438         return this;
439     }
440 
441     @Override
442     public final ChannelHandler remove(String name) {
443         return remove(getContextOrDie(name)).handler();
444     }
445 
446     @SuppressWarnings("unchecked")
447     @Override
448     public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
449         return (T) remove(getContextOrDie(handlerType)).handler();
450     }
451 
452     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
453         assert ctx != head && ctx != tail;
454 
455         synchronized (this) {
456             remove0(ctx);
457 
458             // If the registered is false it means that the channel was not registered on an eventloop yet.
459             // In this case we remove the context from the pipeline and add a task that will call
460             // ChannelHandler.handlerRemoved(...) once the channel is registered.
461             if (!registered) {
462                 callHandlerCallbackLater(ctx, false);
463                 return ctx;
464             }
465 
466             EventExecutor executor = ctx.executor();
467             if (!executor.inEventLoop()) {
468                 executor.execute(new Runnable() {
469                     @Override
470                     public void run() {
471                         callHandlerRemoved0(ctx);
472                     }
473                 });
474                 return ctx;
475             }
476         }
477         callHandlerRemoved0(ctx);
478         return ctx;
479     }
480 
481     private static void remove0(AbstractChannelHandlerContext ctx) {
482         AbstractChannelHandlerContext prev = ctx.prev;
483         AbstractChannelHandlerContext next = ctx.next;
484         prev.next = next;
485         next.prev = prev;
486     }
487 
488     @Override
489     public final ChannelHandler removeFirst() {
490         if (head.next == tail) {
491             throw new NoSuchElementException();
492         }
493         return remove(head.next).handler();
494     }
495 
496     @Override
497     public final ChannelHandler removeLast() {
498         if (head.next == tail) {
499             throw new NoSuchElementException();
500         }
501         return remove(tail.prev).handler();
502     }
503 
504     @Override
505     public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
506         replace(getContextOrDie(oldHandler), newName, newHandler);
507         return this;
508     }
509 
510     @Override
511     public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
512         return replace(getContextOrDie(oldName), newName, newHandler);
513     }
514 
515     @Override
516     @SuppressWarnings("unchecked")
517     public final <T extends ChannelHandler> T replace(
518             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
519         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
520     }
521 
522     private ChannelHandler replace(
523             final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
524         assert ctx != head && ctx != tail;
525 
526         final AbstractChannelHandlerContext newCtx;
527         synchronized (this) {
528             checkMultiplicity(newHandler);
529             if (newName == null) {
530                 newName = generateName(newHandler);
531             } else {
532                 boolean sameName = ctx.name().equals(newName);
533                 if (!sameName) {
534                     checkDuplicateName(newName);
535                 }
536             }
537 
538             newCtx = newContext(ctx.executor, newName, newHandler);
539 
540             replace0(ctx, newCtx);
541 
542             // If the registered is false it means that the channel was not registered on an eventloop yet.
543             // In this case we replace the context in the pipeline
544             // and add a task that will call ChannelHandler.handlerAdded(...) and
545             // ChannelHandler.handlerRemoved(...) once the channel is registered.
546             if (!registered) {
547                 callHandlerCallbackLater(newCtx, true);
548                 callHandlerCallbackLater(ctx, false);
549                 return ctx.handler();
550             }
551             EventExecutor executor = ctx.executor();
552             if (!executor.inEventLoop()) {
553                 executor.execute(new Runnable() {
554                     @Override
555                     public void run() {
556                         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
557                         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
558                         // those event handlers must be called after handlerAdded().
559                         callHandlerAdded0(newCtx);
560                         callHandlerRemoved0(ctx);
561                     }
562                 });
563                 return ctx.handler();
564             }
565         }
566         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
567         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
568         // event handlers must be called after handlerAdded().
569         callHandlerAdded0(newCtx);
570         callHandlerRemoved0(ctx);
571         return ctx.handler();
572     }
573 
574     private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
575         AbstractChannelHandlerContext prev = oldCtx.prev;
576         AbstractChannelHandlerContext next = oldCtx.next;
577         newCtx.prev = prev;
578         newCtx.next = next;
579 
580         // Finish the replacement of oldCtx with newCtx in the linked list.
581         // Note that this doesn't mean events will be sent to the new handler immediately
582         // because we are currently at the event handler thread and no more than one handler methods can be invoked
583         // at the same time (we ensured that in replace().)
584         prev.next = newCtx;
585         next.prev = newCtx;
586 
587         // update the reference to the replacement so forward of buffered content will work correctly
588         oldCtx.prev = newCtx;
589         oldCtx.next = newCtx;
590     }
591 
592     private static void checkMultiplicity(ChannelHandler handler) {
593         if (handler instanceof ChannelHandlerAdapter) {
594             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
595             if (!h.isSharable() && h.added) {
596                 throw new ChannelPipelineException(
597                         h.getClass().getName() +
598                         " is not a @Sharable handler, so can't be added or removed multiple times.");
599             }
600             h.added = true;
601         }
602     }
603 
604     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
605         try {
606             ctx.handler().handlerAdded(ctx);
607             ctx.setAddComplete();
608         } catch (Throwable t) {
609             boolean removed = false;
610             try {
611                 remove0(ctx);
612                 try {
613                     ctx.handler().handlerRemoved(ctx);
614                 } finally {
615                     ctx.setRemoved();
616                 }
617                 removed = true;
618             } catch (Throwable t2) {
619                 if (logger.isWarnEnabled()) {
620                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
621                 }
622             }
623 
624             if (removed) {
625                 fireExceptionCaught(new ChannelPipelineException(
626                         ctx.handler().getClass().getName() +
627                         ".handlerAdded() has thrown an exception; removed.", t));
628             } else {
629                 fireExceptionCaught(new ChannelPipelineException(
630                         ctx.handler().getClass().getName() +
631                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
632             }
633         }
634     }
635 
636     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
637         // Notify the complete removal.
638         try {
639             try {
640                 ctx.handler().handlerRemoved(ctx);
641             } finally {
642                 ctx.setRemoved();
643             }
644         } catch (Throwable t) {
645             fireExceptionCaught(new ChannelPipelineException(
646                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
647         }
648     }
649 
650     final void invokeHandlerAddedIfNeeded() {
651         assert channel.eventLoop().inEventLoop();
652         if (firstRegistration) {
653             firstRegistration = false;
654             // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
655             // that were added before the registration was done.
656             callHandlerAddedForAllHandlers();
657         }
658     }
659 
660     @Override
661     public final ChannelHandler first() {
662         ChannelHandlerContext first = firstContext();
663         if (first == null) {
664             return null;
665         }
666         return first.handler();
667     }
668 
669     @Override
670     public final ChannelHandlerContext firstContext() {
671         AbstractChannelHandlerContext first = head.next;
672         if (first == tail) {
673             return null;
674         }
675         return head.next;
676     }
677 
678     @Override
679     public final ChannelHandler last() {
680         AbstractChannelHandlerContext last = tail.prev;
681         if (last == head) {
682             return null;
683         }
684         return last.handler();
685     }
686 
687     @Override
688     public final ChannelHandlerContext lastContext() {
689         AbstractChannelHandlerContext last = tail.prev;
690         if (last == head) {
691             return null;
692         }
693         return last;
694     }
695 
696     @Override
697     public final ChannelHandler get(String name) {
698         ChannelHandlerContext ctx = context(name);
699         if (ctx == null) {
700             return null;
701         } else {
702             return ctx.handler();
703         }
704     }
705 
706     @SuppressWarnings("unchecked")
707     @Override
708     public final <T extends ChannelHandler> T get(Class<T> handlerType) {
709         ChannelHandlerContext ctx = context(handlerType);
710         if (ctx == null) {
711             return null;
712         } else {
713             return (T) ctx.handler();
714         }
715     }
716 
717     @Override
718     public final ChannelHandlerContext context(String name) {
719         if (name == null) {
720             throw new NullPointerException("name");
721         }
722 
723         return context0(name);
724     }
725 
726     @Override
727     public final ChannelHandlerContext context(ChannelHandler handler) {
728         if (handler == null) {
729             throw new NullPointerException("handler");
730         }
731 
732         AbstractChannelHandlerContext ctx = head.next;
733         for (;;) {
734 
735             if (ctx == null) {
736                 return null;
737             }
738 
739             if (ctx.handler() == handler) {
740                 return ctx;
741             }
742 
743             ctx = ctx.next;
744         }
745     }
746 
747     @Override
748     public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
749         if (handlerType == null) {
750             throw new NullPointerException("handlerType");
751         }
752 
753         AbstractChannelHandlerContext ctx = head.next;
754         for (;;) {
755             if (ctx == null) {
756                 return null;
757             }
758             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
759                 return ctx;
760             }
761             ctx = ctx.next;
762         }
763     }
764 
765     @Override
766     public final List<String> names() {
767         List<String> list = new ArrayList<String>();
768         AbstractChannelHandlerContext ctx = head.next;
769         for (;;) {
770             if (ctx == null) {
771                 return list;
772             }
773             list.add(ctx.name());
774             ctx = ctx.next;
775         }
776     }
777 
778     @Override
779     public final Map<String, ChannelHandler> toMap() {
780         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
781         AbstractChannelHandlerContext ctx = head.next;
782         for (;;) {
783             if (ctx == tail) {
784                 return map;
785             }
786             map.put(ctx.name(), ctx.handler());
787             ctx = ctx.next;
788         }
789     }
790 
791     @Override
792     public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
793         return toMap().entrySet().iterator();
794     }
795 
796     /**
797      * Returns the {@link String} representation of this pipeline.
798      */
799     @Override
800     public final String toString() {
801         StringBuilder buf = new StringBuilder()
802             .append(StringUtil.simpleClassName(this))
803             .append('{');
804         AbstractChannelHandlerContext ctx = head.next;
805         for (;;) {
806             if (ctx == tail) {
807                 break;
808             }
809 
810             buf.append('(')
811                .append(ctx.name())
812                .append(" = ")
813                .append(ctx.handler().getClass().getName())
814                .append(')');
815 
816             ctx = ctx.next;
817             if (ctx == tail) {
818                 break;
819             }
820 
821             buf.append(", ");
822         }
823         buf.append('}');
824         return buf.toString();
825     }
826 
827     @Override
828     public final ChannelPipeline fireChannelRegistered() {
829         AbstractChannelHandlerContext.invokeChannelRegistered(head);
830         return this;
831     }
832 
833     @Override
834     public final ChannelPipeline fireChannelUnregistered() {
835         AbstractChannelHandlerContext.invokeChannelUnregistered(head);
836         return this;
837     }
838 
839     /**
840      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
841      * handlerRemoved().
842      *
843      * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)})
844      * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
845      * the handlers are removed after all events are handled.
846      *
847      * See: https://github.com/netty/netty/issues/3156
848      */
849     private synchronized void destroy() {
850         destroyUp(head.next, false);
851     }
852 
853     private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
854         final Thread currentThread = Thread.currentThread();
855         final AbstractChannelHandlerContext tail = this.tail;
856         for (;;) {
857             if (ctx == tail) {
858                 destroyDown(currentThread, tail.prev, inEventLoop);
859                 break;
860             }
861 
862             final EventExecutor executor = ctx.executor();
863             if (!inEventLoop && !executor.inEventLoop(currentThread)) {
864                 final AbstractChannelHandlerContext finalCtx = ctx;
865                 executor.execute(new Runnable() {
866                     @Override
867                     public void run() {
868                         destroyUp(finalCtx, true);
869                     }
870                 });
871                 break;
872             }
873 
874             ctx = ctx.next;
875             inEventLoop = false;
876         }
877     }
878 
879     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
880         // We have reached at tail; now traverse backwards.
881         final AbstractChannelHandlerContext head = this.head;
882         for (;;) {
883             if (ctx == head) {
884                 break;
885             }
886 
887             final EventExecutor executor = ctx.executor();
888             if (inEventLoop || executor.inEventLoop(currentThread)) {
889                 synchronized (this) {
890                     remove0(ctx);
891                 }
892                 callHandlerRemoved0(ctx);
893             } else {
894                 final AbstractChannelHandlerContext finalCtx = ctx;
895                 executor.execute(new Runnable() {
896                     @Override
897                     public void run() {
898                         destroyDown(Thread.currentThread(), finalCtx, true);
899                     }
900                 });
901                 break;
902             }
903 
904             ctx = ctx.prev;
905             inEventLoop = false;
906         }
907     }
908 
909     @Override
910     public final ChannelPipeline fireChannelActive() {
911         AbstractChannelHandlerContext.invokeChannelActive(head);
912         return this;
913     }
914 
915     @Override
916     public final ChannelPipeline fireChannelInactive() {
917         AbstractChannelHandlerContext.invokeChannelInactive(head);
918         return this;
919     }
920 
921     @Override
922     public final ChannelPipeline fireExceptionCaught(Throwable cause) {
923         AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
924         return this;
925     }
926 
927     @Override
928     public final ChannelPipeline fireUserEventTriggered(Object event) {
929         AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
930         return this;
931     }
932 
933     @Override
934     public final ChannelPipeline fireChannelRead(Object msg) {
935         AbstractChannelHandlerContext.invokeChannelRead(head, msg);
936         return this;
937     }
938 
939     @Override
940     public final ChannelPipeline fireChannelReadComplete() {
941         AbstractChannelHandlerContext.invokeChannelReadComplete(head);
942         return this;
943     }
944 
945     @Override
946     public final ChannelPipeline fireChannelWritabilityChanged() {
947         AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
948         return this;
949     }
950 
951     @Override
952     public final ChannelFuture bind(SocketAddress localAddress) {
953         return tail.bind(localAddress);
954     }
955 
956     @Override
957     public final ChannelFuture connect(SocketAddress remoteAddress) {
958         return tail.connect(remoteAddress);
959     }
960 
961     @Override
962     public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
963         return tail.connect(remoteAddress, localAddress);
964     }
965 
966     @Override
967     public final ChannelFuture disconnect() {
968         return tail.disconnect();
969     }
970 
971     @Override
972     public final ChannelFuture close() {
973         return tail.close();
974     }
975 
976     @Override
977     public final ChannelFuture deregister() {
978         return tail.deregister();
979     }
980 
981     @Override
982     public final ChannelPipeline flush() {
983         tail.flush();
984         return this;
985     }
986 
987     @Override
988     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
989         return tail.bind(localAddress, promise);
990     }
991 
992     @Override
993     public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
994         return tail.connect(remoteAddress, promise);
995     }
996 
997     @Override
998     public final ChannelFuture connect(
999             SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1000         return tail.connect(remoteAddress, localAddress, promise);
1001     }
1002 
1003     @Override
1004     public final ChannelFuture disconnect(ChannelPromise promise) {
1005         return tail.disconnect(promise);
1006     }
1007 
1008     @Override
1009     public final ChannelFuture close(ChannelPromise promise) {
1010         return tail.close(promise);
1011     }
1012 
1013     @Override
1014     public final ChannelFuture deregister(final ChannelPromise promise) {
1015         return tail.deregister(promise);
1016     }
1017 
1018     @Override
1019     public final ChannelPipeline read() {
1020         tail.read();
1021         return this;
1022     }
1023 
1024     @Override
1025     public final ChannelFuture write(Object msg) {
1026         return tail.write(msg);
1027     }
1028 
1029     @Override
1030     public final ChannelFuture write(Object msg, ChannelPromise promise) {
1031         return tail.write(msg, promise);
1032     }
1033 
1034     @Override
1035     public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1036         return tail.writeAndFlush(msg, promise);
1037     }
1038 
1039     @Override
1040     public final ChannelFuture writeAndFlush(Object msg) {
1041         return tail.writeAndFlush(msg);
1042     }
1043 
1044     @Override
1045     public final ChannelPromise newPromise() {
1046         return new DefaultChannelPromise(channel);
1047     }
1048 
1049     @Override
1050     public final ChannelProgressivePromise newProgressivePromise() {
1051         return new DefaultChannelProgressivePromise(channel);
1052     }
1053 
1054     @Override
1055     public final ChannelFuture newSucceededFuture() {
1056         return succeededFuture;
1057     }
1058 
1059     @Override
1060     public final ChannelFuture newFailedFuture(Throwable cause) {
1061         return new FailedChannelFuture(channel, null, cause);
1062     }
1063 
1064     @Override
1065     public final ChannelPromise voidPromise() {
1066         return voidPromise;
1067     }
1068 
1069     private void checkDuplicateName(String name) {
1070         if (context0(name) != null) {
1071             throw new IllegalArgumentException("Duplicate handler name: " + name);
1072         }
1073     }
1074 
1075     private AbstractChannelHandlerContext context0(String name) {
1076         AbstractChannelHandlerContext context = head.next;
1077         while (context != tail) {
1078             if (context.name().equals(name)) {
1079                 return context;
1080             }
1081             context = context.next;
1082         }
1083         return null;
1084     }
1085 
1086     private AbstractChannelHandlerContext getContextOrDie(String name) {
1087         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1088         if (ctx == null) {
1089             throw new NoSuchElementException(name);
1090         } else {
1091             return ctx;
1092         }
1093     }
1094 
1095     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1096         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1097         if (ctx == null) {
1098             throw new NoSuchElementException(handler.getClass().getName());
1099         } else {
1100             return ctx;
1101         }
1102     }
1103 
1104     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1105         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1106         if (ctx == null) {
1107             throw new NoSuchElementException(handlerType.getName());
1108         } else {
1109             return ctx;
1110         }
1111     }
1112 
1113     private void callHandlerAddedForAllHandlers() {
1114         final PendingHandlerCallback pendingHandlerCallbackHead;
1115         synchronized (this) {
1116             assert !registered;
1117 
1118             // This Channel itself was registered.
1119             registered = true;
1120 
1121             pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1122             // Null out so it can be GC'ed.
1123             this.pendingHandlerCallbackHead = null;
1124         }
1125 
1126         // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
1127         // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
1128         // the EventLoop.
1129         PendingHandlerCallback task = pendingHandlerCallbackHead;
1130         while (task != null) {
1131             task.execute();
1132             task = task.next;
1133         }
1134     }
1135 
1136     private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1137         assert !registered;
1138 
1139         PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1140         PendingHandlerCallback pending = pendingHandlerCallbackHead;
1141         if (pending == null) {
1142             pendingHandlerCallbackHead = task;
1143         } else {
1144             // Find the tail of the linked-list.
1145             while (pending.next != null) {
1146                 pending = pending.next;
1147             }
1148             pending.next = task;
1149         }
1150     }
1151 
1152     /**
1153      * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
1154      * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
1155      */
1156     protected void onUnhandledInboundException(Throwable cause) {
1157         try {
1158             logger.warn(
1159                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1160                             "It usually means the last handler in the pipeline did not handle the exception.",
1161                     cause);
1162         } finally {
1163             ReferenceCountUtil.release(cause);
1164         }
1165     }
1166 
1167     /**
1168      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1169      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1170      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1171      */
1172     protected void onUnhandledInboundMessage(Object msg) {
1173         try {
1174             logger.debug(
1175                     "Discarded inbound message {} that reached at the tail of the pipeline. " +
1176                             "Please check your pipeline configuration.", msg);
1177         } finally {
1178             ReferenceCountUtil.release(msg);
1179         }
1180     }
1181 
1182     @UnstableApi
1183     protected void incrementPendingOutboundBytes(long size) {
1184         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1185         if (buffer != null) {
1186             buffer.incrementPendingOutboundBytes(size);
1187         }
1188     }
1189 
1190     @UnstableApi
1191     protected void decrementPendingOutboundBytes(long size) {
1192         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1193         if (buffer != null) {
1194             buffer.decrementPendingOutboundBytes(size);
1195         }
1196     }
1197 
1198     // A special catch-all handler that handles both bytes and messages.
1199     final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1200 
1201         TailContext(DefaultChannelPipeline pipeline) {
1202             super(pipeline, null, TAIL_NAME, true, false);
1203             setAddComplete();
1204         }
1205 
1206         @Override
1207         public ChannelHandler handler() {
1208             return this;
1209         }
1210 
1211         @Override
1212         public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
1213 
1214         @Override
1215         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
1216 
1217         @Override
1218         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
1219 
1220         @Override
1221         public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
1222 
1223         @Override
1224         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
1225 
1226         @Override
1227         public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1228 
1229         @Override
1230         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1231 
1232         @Override
1233         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1234             // This may not be a configuration error and so don't log anything.
1235             // The event may be superfluous for the current pipeline configuration.
1236             ReferenceCountUtil.release(evt);
1237         }
1238 
1239         @Override
1240         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1241             onUnhandledInboundException(cause);
1242         }
1243 
1244         @Override
1245         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1246             onUnhandledInboundMessage(msg);
1247         }
1248 
1249         @Override
1250         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
1251     }
1252 
1253     final class HeadContext extends AbstractChannelHandlerContext
1254             implements ChannelOutboundHandler, ChannelInboundHandler {
1255 
1256         private final Unsafe unsafe;
1257 
1258         HeadContext(DefaultChannelPipeline pipeline) {
1259             super(pipeline, null, HEAD_NAME, false, true);
1260             unsafe = pipeline.channel().unsafe();
1261             setAddComplete();
1262         }
1263 
1264         @Override
1265         public ChannelHandler handler() {
1266             return this;
1267         }
1268 
1269         @Override
1270         public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
1271             // NOOP
1272         }
1273 
1274         @Override
1275         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
1276             // NOOP
1277         }
1278 
1279         @Override
1280         public void bind(
1281                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
1282                 throws Exception {
1283             unsafe.bind(localAddress, promise);
1284         }
1285 
1286         @Override
1287         public void connect(
1288                 ChannelHandlerContext ctx,
1289                 SocketAddress remoteAddress, SocketAddress localAddress,
1290                 ChannelPromise promise) throws Exception {
1291             unsafe.connect(remoteAddress, localAddress, promise);
1292         }
1293 
1294         @Override
1295         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1296             unsafe.disconnect(promise);
1297         }
1298 
1299         @Override
1300         public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1301             unsafe.close(promise);
1302         }
1303 
1304         @Override
1305         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1306             unsafe.deregister(promise);
1307         }
1308 
1309         @Override
1310         public void read(ChannelHandlerContext ctx) {
1311             unsafe.beginRead();
1312         }
1313 
1314         @Override
1315         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
1316             unsafe.write(msg, promise);
1317         }
1318 
1319         @Override
1320         public void flush(ChannelHandlerContext ctx) throws Exception {
1321             unsafe.flush();
1322         }
1323 
1324         @Override
1325         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1326             ctx.fireExceptionCaught(cause);
1327         }
1328 
1329         @Override
1330         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
1331             invokeHandlerAddedIfNeeded();
1332             ctx.fireChannelRegistered();
1333         }
1334 
1335         @Override
1336         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
1337             ctx.fireChannelUnregistered();
1338 
1339             // Remove all handlers sequentially if channel is closed and unregistered.
1340             if (!channel.isOpen()) {
1341                 destroy();
1342             }
1343         }
1344 
1345         @Override
1346         public void channelActive(ChannelHandlerContext ctx) throws Exception {
1347             ctx.fireChannelActive();
1348 
1349             readIfIsAutoRead();
1350         }
1351 
1352         @Override
1353         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1354             ctx.fireChannelInactive();
1355         }
1356 
1357         @Override
1358         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1359             ctx.fireChannelRead(msg);
1360         }
1361 
1362         @Override
1363         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1364             ctx.fireChannelReadComplete();
1365 
1366             readIfIsAutoRead();
1367         }
1368 
1369         private void readIfIsAutoRead() {
1370             if (channel.config().isAutoRead()) {
1371                 channel.read();
1372             }
1373         }
1374 
1375         @Override
1376         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1377             ctx.fireUserEventTriggered(evt);
1378         }
1379 
1380         @Override
1381         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
1382             ctx.fireChannelWritabilityChanged();
1383         }
1384     }
1385 
1386     private abstract static class PendingHandlerCallback implements Runnable {
1387         final AbstractChannelHandlerContext ctx;
1388         PendingHandlerCallback next;
1389 
1390         PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1391             this.ctx = ctx;
1392         }
1393 
1394         abstract void execute();
1395     }
1396 
1397     private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1398 
1399         PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1400             super(ctx);
1401         }
1402 
1403         @Override
1404         public void run() {
1405             callHandlerAdded0(ctx);
1406         }
1407 
1408         @Override
1409         void execute() {
1410             EventExecutor executor = ctx.executor();
1411             if (executor.inEventLoop()) {
1412                 callHandlerAdded0(ctx);
1413             } else {
1414                 try {
1415                     executor.execute(this);
1416                 } catch (RejectedExecutionException e) {
1417                     if (logger.isWarnEnabled()) {
1418                         logger.warn(
1419                                 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1420                                 executor, ctx.name(), e);
1421                     }
1422                     remove0(ctx);
1423                     ctx.setRemoved();
1424                 }
1425             }
1426         }
1427     }
1428 
1429     private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1430 
1431         PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1432             super(ctx);
1433         }
1434 
1435         @Override
1436         public void run() {
1437             callHandlerRemoved0(ctx);
1438         }
1439 
1440         @Override
1441         void execute() {
1442             EventExecutor executor = ctx.executor();
1443             if (executor.inEventLoop()) {
1444                 callHandlerRemoved0(ctx);
1445             } else {
1446                 try {
1447                     executor.execute(this);
1448                 } catch (RejectedExecutionException e) {
1449                     if (logger.isWarnEnabled()) {
1450                         logger.warn(
1451                                 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1452                                         " removing handler {}.", executor, ctx.name(), e);
1453                     }
1454                     // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
1455                     ctx.setRemoved();
1456                 }
1457             }
1458         }
1459     }
1460 }