View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.concurrent.EventExecutor;
21  import io.netty.util.concurrent.EventExecutorGroup;
22  import io.netty.util.concurrent.PausableEventExecutor;
23  import io.netty.util.internal.OneTimeTask;
24  import io.netty.util.internal.PlatformDependent;
25  import io.netty.util.internal.StringUtil;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.net.SocketAddress;
30  import java.util.ArrayList;
31  import java.util.HashMap;
32  import java.util.IdentityHashMap;
33  import java.util.Iterator;
34  import java.util.LinkedHashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NoSuchElementException;
38  import java.util.WeakHashMap;
39  import java.util.concurrent.ExecutionException;
40  import java.util.concurrent.Future;
41  
42  /**
43   * The default {@link ChannelPipeline} implementation.  It is usually created
44   * by a {@link Channel} implementation when the {@link Channel} is created.
45   */
46  final class DefaultChannelPipeline implements ChannelPipeline {
47  
48      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
49  
50      @SuppressWarnings("unchecked")
51      private static final WeakHashMap<Class<?>, String>[] nameCaches =
52              new WeakHashMap[Runtime.getRuntime().availableProcessors()];
53  
54      static {
55          for (int i = 0; i < nameCaches.length; i ++) {
56              nameCaches[i] = new WeakHashMap<Class<?>, String>();
57          }
58      }
59  
60      final AbstractChannel channel;
61  
62      final AbstractChannelHandlerContext head;
63      final AbstractChannelHandlerContext tail;
64  
65      private final Map<String, AbstractChannelHandlerContext> name2ctx =
66              new HashMap<String, AbstractChannelHandlerContext>(4);
67  
68      /**
69       * @see #findInvoker(EventExecutorGroup)
70       */
71      private Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers;
72  
73      DefaultChannelPipeline(AbstractChannel channel) {
74          if (channel == null) {
75              throw new NullPointerException("channel");
76          }
77          this.channel = channel;
78  
79          tail = new TailContext(this);
80          head = new HeadContext(this);
81  
82          head.next = tail;
83          tail.prev = head;
84      }
85  
86      @Override
87      public Channel channel() {
88          return channel;
89      }
90  
91      @Override
92      public ChannelPipeline addFirst(String name, ChannelHandler handler) {
93          return addFirst((ChannelHandlerInvoker) null, name, handler);
94      }
95  
96      @Override
97      public ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
98          synchronized (this) {
99              name = filterName(name, handler);
100             addFirst0(name, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
101         }
102         return this;
103     }
104 
105     @Override
106     public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
107         synchronized (this) {
108             name = filterName(name, handler);
109             addFirst0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
110         }
111         return this;
112     }
113 
114     private void addFirst0(String name, AbstractChannelHandlerContext newCtx) {
115         checkMultiplicity(newCtx);
116 
117         AbstractChannelHandlerContext nextCtx = head.next;
118         newCtx.prev = head;
119         newCtx.next = nextCtx;
120         head.next = newCtx;
121         nextCtx.prev = newCtx;
122 
123         name2ctx.put(name, newCtx);
124 
125         callHandlerAdded(newCtx);
126     }
127 
128     @Override
129     public ChannelPipeline addLast(String name, ChannelHandler handler) {
130         return addLast((ChannelHandlerInvoker) null, name, handler);
131     }
132 
133     @Override
134     public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
135         synchronized (this) {
136             name = filterName(name, handler);
137             addLast0(name, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
138         }
139         return this;
140     }
141 
142     @Override
143     public ChannelPipeline addLast(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
144         synchronized (this) {
145             name = filterName(name, handler);
146             addLast0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
147         }
148         return this;
149     }
150 
151     private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
152         checkMultiplicity(newCtx);
153 
154         AbstractChannelHandlerContext prev = tail.prev;
155         newCtx.prev = prev;
156         newCtx.next = tail;
157         prev.next = newCtx;
158         tail.prev = newCtx;
159 
160         name2ctx.put(name, newCtx);
161 
162         callHandlerAdded(newCtx);
163     }
164 
165     @Override
166     public ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
167         return addBefore((ChannelHandlerInvoker) null, baseName, name, handler);
168     }
169 
170     @Override
171     public ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
172         synchronized (this) {
173             AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
174             name = filterName(name, handler);
175             addBefore0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
176         }
177         return this;
178     }
179 
180     @Override
181     public ChannelPipeline addBefore(
182             ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
183         synchronized (this) {
184             AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
185             name = filterName(name, handler);
186             addBefore0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
187         }
188         return this;
189     }
190 
191     private void addBefore0(
192             final String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
193         checkMultiplicity(newCtx);
194 
195         newCtx.prev = ctx.prev;
196         newCtx.next = ctx;
197         ctx.prev.next = newCtx;
198         ctx.prev = newCtx;
199 
200         name2ctx.put(name, newCtx);
201 
202         callHandlerAdded(newCtx);
203     }
204 
205     @Override
206     public ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
207         return addAfter((ChannelHandlerInvoker) null, baseName, name, handler);
208     }
209 
210     @Override
211     public ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
212         synchronized (this) {
213             AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
214             name = filterName(name, handler);
215             addAfter0(name, ctx, new DefaultChannelHandlerContext(this, findInvoker(group), name, handler));
216         }
217         return this;
218     }
219 
220     @Override
221     public ChannelPipeline addAfter(
222             ChannelHandlerInvoker invoker, String baseName, String name, ChannelHandler handler) {
223 
224         synchronized (this) {
225             AbstractChannelHandlerContext ctx = getContextOrDie(baseName);
226             name = filterName(name, handler);
227             addAfter0(name, ctx, new DefaultChannelHandlerContext(this, invoker, name, handler));
228         }
229         return this;
230     }
231 
232     private void addAfter0(String name, AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
233         checkMultiplicity(newCtx);
234 
235         newCtx.prev = ctx;
236         newCtx.next = ctx.next;
237         ctx.next.prev = newCtx;
238         ctx.next = newCtx;
239 
240         name2ctx.put(name, newCtx);
241 
242         callHandlerAdded(newCtx);
243     }
244 
245     @Override
246     public ChannelPipeline addFirst(ChannelHandler... handlers) {
247         return addFirst((ChannelHandlerInvoker) null, handlers);
248     }
249 
250     @Override
251     public ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers) {
252         if (handlers == null) {
253             throw new NullPointerException("handlers");
254         }
255         if (handlers.length == 0 || handlers[0] == null) {
256             return this;
257         }
258 
259         int size;
260         for (size = 1; size < handlers.length; size ++) {
261             if (handlers[size] == null) {
262                 break;
263             }
264         }
265 
266         for (int i = size - 1; i >= 0; i --) {
267             ChannelHandler h = handlers[i];
268             addFirst(group, generateName(h), h);
269         }
270 
271         return this;
272     }
273 
274     @Override
275     public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
276         if (handlers == null) {
277             throw new NullPointerException("handlers");
278         }
279         if (handlers.length == 0 || handlers[0] == null) {
280             return this;
281         }
282 
283         int size;
284         for (size = 1; size < handlers.length; size ++) {
285             if (handlers[size] == null) {
286                 break;
287             }
288         }
289 
290         for (int i = size - 1; i >= 0; i --) {
291             ChannelHandler h = handlers[i];
292             addFirst(invoker, null, h);
293         }
294 
295         return this;
296     }
297 
298     @Override
299     public ChannelPipeline addLast(ChannelHandler... handlers) {
300         return addLast((ChannelHandlerInvoker) null, handlers);
301     }
302 
303     @Override
304     public ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers) {
305         if (handlers == null) {
306             throw new NullPointerException("handlers");
307         }
308 
309         for (ChannelHandler h: handlers) {
310             if (h == null) {
311                 break;
312             }
313             addLast(group, generateName(h), h);
314         }
315 
316         return this;
317     }
318 
319     @Override
320     public ChannelPipeline addLast(ChannelHandlerInvoker invoker, ChannelHandler... handlers) {
321         if (handlers == null) {
322             throw new NullPointerException("handlers");
323         }
324 
325         for (ChannelHandler h: handlers) {
326             if (h == null) {
327                 break;
328             }
329             addLast(invoker, null, h);
330         }
331 
332         return this;
333     }
334 
335     // No need for synchronization because it is always executed in a synchronized(this) block.
336     private ChannelHandlerInvoker findInvoker(EventExecutorGroup group) {
337         if (group == null) {
338             return null;
339         }
340 
341         // Lazily initialize the data structure that maps an EventExecutorGroup to a ChannelHandlerInvoker.
342         Map<EventExecutorGroup, ChannelHandlerInvoker> childInvokers = this.childInvokers;
343         if (childInvokers == null) {
344             childInvokers = this.childInvokers = new IdentityHashMap<EventExecutorGroup, ChannelHandlerInvoker>(4);
345         }
346 
347         // Pick one of the child executors and remember its invoker
348         // so that the same invoker is used to fire events for the same channel.
349         ChannelHandlerInvoker  invoker = childInvokers.get(group);
350         if (invoker == null) {
351             EventExecutor executor = group.next();
352             if (executor instanceof EventLoop) {
353                 invoker = ((EventLoop) executor).asInvoker();
354             } else {
355                 invoker = new DefaultChannelHandlerInvoker(executor);
356             }
357             childInvokers.put(group, invoker);
358         }
359 
360         return invoker;
361     }
362 
363     String generateName(ChannelHandler handler) {
364         WeakHashMap<Class<?>, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)];
365         Class<?> handlerType = handler.getClass();
366         String name;
367         synchronized (cache) {
368             name = cache.get(handlerType);
369             if (name == null) {
370                 name = generateName0(handlerType);
371                 cache.put(handlerType, name);
372             }
373         }
374 
375         synchronized (this) {
376             // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
377             // any name conflicts.  Note that we don't cache the names generated here.
378             if (name2ctx.containsKey(name)) {
379                 String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
380                 for (int i = 1;; i ++) {
381                     String newName = baseName + i;
382                     if (!name2ctx.containsKey(newName)) {
383                         name = newName;
384                         break;
385                     }
386                 }
387             }
388         }
389 
390         return name;
391     }
392 
393     private static String generateName0(Class<?> handlerType) {
394         return StringUtil.simpleClassName(handlerType) + "#0";
395     }
396 
397     @Override
398     public ChannelPipeline remove(ChannelHandler handler) {
399         remove(getContextOrDie(handler));
400         return this;
401     }
402 
403     @Override
404     public ChannelHandler remove(String name) {
405         return remove(getContextOrDie(name)).handler();
406     }
407 
408     @SuppressWarnings("unchecked")
409     @Override
410     public <T extends ChannelHandler> T remove(Class<T> handlerType) {
411         return (T) remove(getContextOrDie(handlerType)).handler();
412     }
413 
414     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
415         assert ctx != head && ctx != tail;
416 
417         AbstractChannelHandlerContext context;
418         Future<?> future;
419 
420         synchronized (this) {
421             if (!ctx.channel().isRegistered() || ctx.executor().inEventLoop()) {
422                 remove0(ctx);
423                 return ctx;
424             } else {
425                 future = ctx.executor().submit(new Runnable() {
426                     @Override
427                     public void run() {
428                         synchronized (DefaultChannelPipeline.this) {
429                             remove0(ctx);
430                         }
431                     }
432                 });
433                 context = ctx;
434             }
435         }
436 
437         // Run the following 'waiting' code outside of the above synchronized block
438         // in order to avoid deadlock
439 
440         waitForFuture(future);
441 
442         return context;
443     }
444 
445     void remove0(AbstractChannelHandlerContext ctx) {
446         AbstractChannelHandlerContext prev = ctx.prev;
447         AbstractChannelHandlerContext next = ctx.next;
448         prev.next = next;
449         next.prev = prev;
450         name2ctx.remove(ctx.name());
451         callHandlerRemoved(ctx);
452     }
453 
454     @Override
455     public ChannelHandler removeFirst() {
456         if (head.next == tail) {
457             throw new NoSuchElementException();
458         }
459         return remove(head.next).handler();
460     }
461 
462     @Override
463     public ChannelHandler removeLast() {
464         if (head.next == tail) {
465             throw new NoSuchElementException();
466         }
467         return remove(tail.prev).handler();
468     }
469 
470     @Override
471     public ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
472         replace(getContextOrDie(oldHandler), newName, newHandler);
473         return this;
474     }
475 
476     @Override
477     public ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
478         return replace(getContextOrDie(oldName), newName, newHandler);
479     }
480 
481     @Override
482     @SuppressWarnings("unchecked")
483     public <T extends ChannelHandler> T replace(
484             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
485         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
486     }
487 
488     private ChannelHandler replace(
489             final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
490 
491         assert ctx != head && ctx != tail;
492 
493         Future<?> future;
494         synchronized (this) {
495             if (newName == null) {
496                 newName = ctx.name();
497             } else if (!ctx.name().equals(newName)) {
498                 newName = filterName(newName, newHandler);
499             }
500 
501             final AbstractChannelHandlerContext newCtx =
502                     new DefaultChannelHandlerContext(this, ctx.invoker, newName, newHandler);
503 
504             if (!newCtx.channel().isRegistered() || newCtx.executor().inEventLoop()) {
505                 replace0(ctx, newName, newCtx);
506                 return ctx.handler();
507             } else {
508                 final String finalNewName = newName;
509                 future = newCtx.executor().submit(new Runnable() {
510                     @Override
511                     public void run() {
512                         synchronized (DefaultChannelPipeline.this) {
513                             replace0(ctx, finalNewName, newCtx);
514                         }
515                     }
516                 });
517             }
518         }
519 
520         // Run the following 'waiting' code outside of the above synchronized block
521         // in order to avoid deadlock
522 
523         waitForFuture(future);
524 
525         return ctx.handler();
526     }
527 
528     private void replace0(AbstractChannelHandlerContext oldCtx, String newName,
529                           AbstractChannelHandlerContext newCtx) {
530         checkMultiplicity(newCtx);
531 
532         AbstractChannelHandlerContext prev = oldCtx.prev;
533         AbstractChannelHandlerContext next = oldCtx.next;
534         newCtx.prev = prev;
535         newCtx.next = next;
536 
537         // Finish the replacement of oldCtx with newCtx in the linked list.
538         // Note that this doesn't mean events will be sent to the new handler immediately
539         // because we are currently at the event handler thread and no more than one handler methods can be invoked
540         // at the same time (we ensured that in replace().)
541         prev.next = newCtx;
542         next.prev = newCtx;
543 
544         if (!oldCtx.name().equals(newName)) {
545             name2ctx.remove(oldCtx.name());
546         }
547         name2ctx.put(newName, newCtx);
548 
549         // update the reference to the replacement so forward of buffered content will work correctly
550         oldCtx.prev = newCtx;
551         oldCtx.next = newCtx;
552 
553         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
554         // because callHandlerRemoved() will trigger inboundBufferUpdated() or flush() on newHandler and those
555         // event handlers must be called after handlerAdded().
556         callHandlerAdded(newCtx);
557         callHandlerRemoved(oldCtx);
558     }
559 
560     private static void checkMultiplicity(ChannelHandlerContext ctx) {
561         ChannelHandler handler = ctx.handler();
562         if (handler instanceof ChannelHandlerAdapter) {
563             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
564             if (!h.isSharable() && h.added) {
565                 throw new ChannelPipelineException(
566                         h.getClass().getName() +
567                         " is not a @Sharable handler, so can't be added or removed multiple times.");
568             }
569             h.added = true;
570         }
571     }
572 
573     private void callHandlerAdded(final AbstractChannelHandlerContext ctx) {
574         if ((ctx.skipFlags & AbstractChannelHandlerContext.MASK_HANDLER_ADDED) != 0) {
575             return;
576         }
577 
578         if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
579             ctx.executor().execute(new Runnable() {
580                 @Override
581                 public void run() {
582                     callHandlerAdded0(ctx);
583                 }
584             });
585             return;
586         }
587         callHandlerAdded0(ctx);
588     }
589 
590     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
591         try {
592             ctx.invokedThisChannelRead = false;
593             ctx.handler().handlerAdded(ctx);
594         } catch (Throwable t) {
595             boolean removed = false;
596             try {
597                 remove(ctx);
598                 removed = true;
599             } catch (Throwable t2) {
600                 if (logger.isWarnEnabled()) {
601                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
602                 }
603             }
604 
605             if (removed) {
606                 fireExceptionCaught(new ChannelPipelineException(
607                         ctx.handler().getClass().getName() +
608                         ".handlerAdded() has thrown an exception; removed.", t));
609             } else {
610                 fireExceptionCaught(new ChannelPipelineException(
611                         ctx.handler().getClass().getName() +
612                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
613             }
614         }
615     }
616 
617     private void callHandlerRemoved(final AbstractChannelHandlerContext ctx) {
618         if ((ctx.skipFlags & AbstractChannelHandlerContext.MASK_HANDLER_REMOVED) != 0) {
619             return;
620         }
621 
622         if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
623             ctx.executor().execute(new Runnable() {
624                 @Override
625                 public void run() {
626                     callHandlerRemoved0(ctx);
627                 }
628             });
629             return;
630         }
631         callHandlerRemoved0(ctx);
632     }
633 
634     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
635         // Notify the complete removal.
636         try {
637             ctx.handler().handlerRemoved(ctx);
638             ctx.setRemoved();
639         } catch (Throwable t) {
640             fireExceptionCaught(new ChannelPipelineException(
641                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
642         }
643     }
644 
645     /**
646      * Waits for a future to finish.  If the task is interrupted, then the current thread will be interrupted.
647      * It is expected that the task performs any appropriate locking.
648      * <p>
649      * If the internal call throws a {@link Throwable}, but it is not an instance of {@link Error} or
650      * {@link RuntimeException}, then it is wrapped inside a {@link ChannelPipelineException} and that is
651      * thrown instead.</p>
652      *
653      * @param future wait for this future
654      * @see Future#get()
655      * @throws Error if the task threw this.
656      * @throws RuntimeException if the task threw this.
657      * @throws ChannelPipelineException with a {@link Throwable} as a cause, if the task threw another type of
658      *         {@link Throwable}.
659      */
660     private static void waitForFuture(Future<?> future) {
661         try {
662             future.get();
663         } catch (ExecutionException ex) {
664             // In the arbitrary case, we can throw Error, RuntimeException, and Exception
665             PlatformDependent.throwException(ex.getCause());
666         } catch (InterruptedException ex) {
667             // Interrupt the calling thread (note that this method is not called from the event loop)
668             Thread.currentThread().interrupt();
669         }
670     }
671 
672     @Override
673     public ChannelHandler first() {
674         ChannelHandlerContext first = firstContext();
675         if (first == null) {
676             return null;
677         }
678         return first.handler();
679     }
680 
681     @Override
682     public ChannelHandlerContext firstContext() {
683         AbstractChannelHandlerContext first = head.next;
684         if (first == tail) {
685             return null;
686         }
687         return head.next;
688     }
689 
690     @Override
691     public ChannelHandler last() {
692         AbstractChannelHandlerContext last = tail.prev;
693         if (last == head) {
694             return null;
695         }
696         return last.handler();
697     }
698 
699     @Override
700     public ChannelHandlerContext lastContext() {
701         AbstractChannelHandlerContext last = tail.prev;
702         if (last == head) {
703             return null;
704         }
705         return last;
706     }
707 
708     @Override
709     public ChannelHandler get(String name) {
710         ChannelHandlerContext ctx = context(name);
711         if (ctx == null) {
712             return null;
713         } else {
714             return ctx.handler();
715         }
716     }
717 
718     @SuppressWarnings("unchecked")
719     @Override
720     public <T extends ChannelHandler> T get(Class<T> handlerType) {
721         ChannelHandlerContext ctx = context(handlerType);
722         if (ctx == null) {
723             return null;
724         } else {
725             return (T) ctx.handler();
726         }
727     }
728 
729     @Override
730     public ChannelHandlerContext context(String name) {
731         if (name == null) {
732             throw new NullPointerException("name");
733         }
734 
735         synchronized (this) {
736             return name2ctx.get(name);
737         }
738     }
739 
740     @Override
741     public ChannelHandlerContext context(ChannelHandler handler) {
742         if (handler == null) {
743             throw new NullPointerException("handler");
744         }
745 
746         AbstractChannelHandlerContext ctx = head.next;
747         for (;;) {
748 
749             if (ctx == null) {
750                 return null;
751             }
752 
753             if (ctx.handler() == handler) {
754                 return ctx;
755             }
756 
757             ctx = ctx.next;
758         }
759     }
760 
761     @Override
762     public ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
763         if (handlerType == null) {
764             throw new NullPointerException("handlerType");
765         }
766 
767         AbstractChannelHandlerContext ctx = head.next;
768         for (;;) {
769             if (ctx == null) {
770                 return null;
771             }
772             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
773                 return ctx;
774             }
775             ctx = ctx.next;
776         }
777     }
778 
779     @Override
780     public List<String> names() {
781         List<String> list = new ArrayList<String>();
782         AbstractChannelHandlerContext ctx = head.next;
783         for (;;) {
784             if (ctx == null) {
785                 return list;
786             }
787             list.add(ctx.name());
788             ctx = ctx.next;
789         }
790     }
791 
792     @Override
793     public Map<String, ChannelHandler> toMap() {
794         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
795         AbstractChannelHandlerContext ctx = head.next;
796         for (;;) {
797             if (ctx == tail) {
798                 return map;
799             }
800             map.put(ctx.name(), ctx.handler());
801             ctx = ctx.next;
802         }
803     }
804 
805     @Override
806     public Iterator<Map.Entry<String, ChannelHandler>> iterator() {
807         return toMap().entrySet().iterator();
808     }
809 
810     /**
811      * Returns the {@link String} representation of this pipeline.
812      */
813     @Override
814     public String toString() {
815         StringBuilder buf = new StringBuilder()
816             .append(StringUtil.simpleClassName(this))
817             .append('{');
818         AbstractChannelHandlerContext ctx = head.next;
819         for (;;) {
820             if (ctx == tail) {
821                 break;
822             }
823 
824             buf.append('(')
825                .append(ctx.name())
826                .append(" = ")
827                .append(ctx.handler().getClass().getName())
828                .append(')');
829 
830             ctx = ctx.next;
831             if (ctx == tail) {
832                 break;
833             }
834 
835             buf.append(", ");
836         }
837         buf.append('}');
838         return buf.toString();
839     }
840 
841     @Override
842     public ChannelPipeline fireChannelRegistered() {
843         head.fireChannelRegistered();
844         return this;
845     }
846 
847     @Override
848     public ChannelPipeline fireChannelUnregistered() {
849         head.fireChannelUnregistered();
850 
851         // Remove all handlers sequentially if channel is closed and unregistered.
852         if (!channel.isOpen()) {
853             destroy();
854         }
855         return this;
856     }
857 
858     /**
859      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
860      * handlerRemoved().
861      *
862      * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext)})
863      * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext)}) so that
864      * the handlers are removed after all events are handled.
865      *
866      * See: https://github.com/netty/netty/issues/3156
867      */
868     private void destroy() {
869         destroyUp(head.next);
870     }
871 
872     private void destroyUp(AbstractChannelHandlerContext ctx) {
873         final Thread currentThread = Thread.currentThread();
874         final AbstractChannelHandlerContext tail = this.tail;
875         for (;;) {
876             if (ctx == tail) {
877                 destroyDown(currentThread, tail.prev);
878                 break;
879             }
880 
881             final EventExecutor executor = ctx.executor();
882             if (!executor.inEventLoop(currentThread)) {
883                 final AbstractChannelHandlerContext finalCtx = ctx;
884                 executor.unwrap().execute(new OneTimeTask() {
885                     @Override
886                     public void run() {
887                         destroyUp(finalCtx);
888                     }
889                 });
890                 break;
891             }
892 
893             ctx = ctx.next;
894         }
895     }
896 
897     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx) {
898         // We have reached at tail; now traverse backwards.
899         final AbstractChannelHandlerContext head = this.head;
900         for (;;) {
901             if (ctx == head) {
902                 break;
903             }
904 
905             final EventExecutor executor = ctx.executor();
906             if (executor.inEventLoop(currentThread)) {
907                 synchronized (this) {
908                     remove0(ctx);
909                 }
910             } else {
911                 final AbstractChannelHandlerContext finalCtx = ctx;
912                 executor.unwrap().execute(new OneTimeTask() {
913                     @Override
914                     public void run() {
915                         destroyDown(Thread.currentThread(), finalCtx);
916                     }
917                 });
918                 break;
919             }
920 
921             ctx = ctx.prev;
922         }
923     }
924 
925     @Override
926     public ChannelPipeline fireChannelActive() {
927         head.fireChannelActive();
928 
929         if (channel.config().isAutoRead()) {
930             channel.read();
931         }
932 
933         return this;
934     }
935 
936     @Override
937     public ChannelPipeline fireChannelInactive() {
938         head.fireChannelInactive();
939         return this;
940     }
941 
942     @Override
943     public ChannelPipeline fireExceptionCaught(Throwable cause) {
944         head.fireExceptionCaught(cause);
945         return this;
946     }
947 
948     @Override
949     public ChannelPipeline fireUserEventTriggered(Object event) {
950         head.fireUserEventTriggered(event);
951         return this;
952     }
953 
954     @Override
955     public ChannelPipeline fireChannelRead(Object msg) {
956         head.fireChannelRead(msg);
957         return this;
958     }
959 
960     @Override
961     public ChannelPipeline fireChannelReadComplete() {
962         head.fireChannelReadComplete();
963         if (channel.config().isAutoRead()) {
964             read();
965         }
966         return this;
967     }
968 
969     @Override
970     public ChannelPipeline fireChannelWritabilityChanged() {
971         head.fireChannelWritabilityChanged();
972         return this;
973     }
974 
975     @Override
976     public ChannelFuture bind(SocketAddress localAddress) {
977         return tail.bind(localAddress);
978     }
979 
980     @Override
981     public ChannelFuture connect(SocketAddress remoteAddress) {
982         return tail.connect(remoteAddress);
983     }
984 
985     @Override
986     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
987         return tail.connect(remoteAddress, localAddress);
988     }
989 
990     @Override
991     public ChannelFuture disconnect() {
992         return tail.disconnect();
993     }
994 
995     @Override
996     public ChannelFuture close() {
997         return tail.close();
998     }
999 
1000     @Override
1001     public ChannelFuture deregister() {
1002         return tail.deregister();
1003     }
1004 
1005     @Override
1006     public ChannelPipeline flush() {
1007         tail.flush();
1008         return this;
1009     }
1010 
1011     @Override
1012     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
1013         return tail.bind(localAddress, promise);
1014     }
1015 
1016     @Override
1017     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
1018         return tail.connect(remoteAddress, promise);
1019     }
1020 
1021     @Override
1022     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1023         return tail.connect(remoteAddress, localAddress, promise);
1024     }
1025 
1026     @Override
1027     public ChannelFuture disconnect(ChannelPromise promise) {
1028         return tail.disconnect(promise);
1029     }
1030 
1031     @Override
1032     public ChannelFuture close(ChannelPromise promise) {
1033         return tail.close(promise);
1034     }
1035 
1036     @Override
1037     public ChannelFuture deregister(ChannelPromise promise) {
1038         return tail.deregister(promise);
1039     }
1040 
1041     @Override
1042     public ChannelPipeline read() {
1043         tail.read();
1044         return this;
1045     }
1046 
1047     @Override
1048     public ChannelFuture write(Object msg) {
1049         return tail.write(msg);
1050     }
1051 
1052     @Override
1053     public ChannelFuture write(Object msg, ChannelPromise promise) {
1054         return tail.write(msg, promise);
1055     }
1056 
1057     @Override
1058     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1059         return tail.writeAndFlush(msg, promise);
1060     }
1061 
1062     @Override
1063     public ChannelFuture writeAndFlush(Object msg) {
1064         return tail.writeAndFlush(msg);
1065     }
1066 
1067     private String filterName(String name, ChannelHandler handler) {
1068         if (name == null) {
1069             return generateName(handler);
1070         }
1071 
1072         if (!name2ctx.containsKey(name)) {
1073             return name;
1074         }
1075 
1076         throw new IllegalArgumentException("Duplicate handler name: " + name);
1077     }
1078 
1079     private AbstractChannelHandlerContext getContextOrDie(String name) {
1080         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1081         if (ctx == null) {
1082             throw new NoSuchElementException(name);
1083         } else {
1084             return ctx;
1085         }
1086     }
1087 
1088     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1089         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1090         if (ctx == null) {
1091             throw new NoSuchElementException(handler.getClass().getName());
1092         } else {
1093             return ctx;
1094         }
1095     }
1096 
1097     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1098         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1099         if (ctx == null) {
1100             throw new NoSuchElementException(handlerType.getName());
1101         } else {
1102             return ctx;
1103         }
1104     }
1105 
1106     static final class TailContext extends AbstractChannelHandlerContext implements ChannelHandler {
1107         private static final int SKIP_FLAGS = skipFlags0(TailContext.class);
1108         private static final String TAIL_NAME = generateName0(TailContext.class);
1109 
1110         TailContext(DefaultChannelPipeline pipeline) {
1111             super(pipeline, null, TAIL_NAME, SKIP_FLAGS);
1112         }
1113 
1114         @Override
1115         public ChannelHandler handler() {
1116             return this;
1117         }
1118 
1119         @Override
1120         public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
1121 
1122         @Override
1123         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
1124 
1125         @Override
1126         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
1127 
1128         @Override
1129         public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
1130 
1131         @Override
1132         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
1133 
1134         @Override
1135         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { }
1136 
1137         @Override
1138         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1139             logger.warn(
1140                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1141                             "It usually means the last handler in the pipeline did not handle the exception.", cause);
1142         }
1143 
1144         @Override
1145         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1146             try {
1147                 logger.debug(
1148                         "Discarded inbound message {} that reached at the tail of the pipeline. " +
1149                                 "Please check your pipeline configuration.", msg);
1150             } finally {
1151                 ReferenceCountUtil.release(msg);
1152             }
1153         }
1154 
1155         @Override
1156         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
1157 
1158         @Skip
1159         @Override
1160         public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1161 
1162         @Skip
1163         @Override
1164         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1165 
1166         @Skip
1167         @Override
1168         public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
1169                 throws Exception {
1170             ctx.bind(localAddress, promise);
1171         }
1172 
1173         @Skip
1174         @Override
1175         public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
1176                             SocketAddress localAddress, ChannelPromise promise) throws Exception {
1177             ctx.connect(remoteAddress, localAddress, promise);
1178         }
1179 
1180         @Skip
1181         @Override
1182         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1183             ctx.disconnect(promise);
1184         }
1185 
1186         @Skip
1187         @Override
1188         public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1189             ctx.close(promise);
1190         }
1191 
1192         @Skip
1193         @Override
1194         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1195             ctx.deregister(promise);
1196         }
1197 
1198         @Skip
1199         @Override
1200         public void read(ChannelHandlerContext ctx) throws Exception {
1201             ctx.read();
1202         }
1203 
1204         @Skip
1205         @Override
1206         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
1207             ctx.write(msg, promise);
1208         }
1209 
1210         @Skip
1211         @Override
1212         public void flush(ChannelHandlerContext ctx) throws Exception {
1213             ctx.flush();
1214         }
1215     }
1216 
1217     static final class HeadContext extends AbstractChannelHandlerContext implements ChannelHandler {
1218         private static final int SKIP_FLAGS = skipFlags0(HeadContext.class);
1219         private static final String HEAD_NAME = generateName0(HeadContext.class);
1220 
1221         private final Unsafe unsafe;
1222 
1223         HeadContext(DefaultChannelPipeline pipeline) {
1224             super(pipeline, null, HEAD_NAME, SKIP_FLAGS);
1225             unsafe = pipeline.channel().unsafe();
1226         }
1227 
1228         @Override
1229         public ChannelHandler handler() {
1230             return this;
1231         }
1232 
1233         @Override
1234         public void bind(
1235                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
1236                 throws Exception {
1237             unsafe.bind(localAddress, promise);
1238         }
1239 
1240         @Override
1241         public void connect(
1242                 ChannelHandlerContext ctx,
1243                 SocketAddress remoteAddress, SocketAddress localAddress,
1244                 ChannelPromise promise) throws Exception {
1245             unsafe.connect(remoteAddress, localAddress, promise);
1246         }
1247 
1248         @Override
1249         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1250             unsafe.disconnect(promise);
1251         }
1252 
1253         @Override
1254         public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1255             unsafe.close(promise);
1256         }
1257 
1258         @Override
1259         public void deregister(ChannelHandlerContext ctx, final ChannelPromise promise) throws Exception {
1260             assert !((PausableEventExecutor) ctx.channel().eventLoop()).isAcceptingNewTasks();
1261 
1262             // submit deregistration task
1263             ctx.channel().eventLoop().unwrap().execute(new OneTimeTask() {
1264                 @Override
1265                 public void run() {
1266                     unsafe.deregister(promise);
1267                 }
1268             });
1269         }
1270 
1271         @Override
1272         public void read(ChannelHandlerContext ctx) {
1273             unsafe.beginRead();
1274         }
1275 
1276         @Override
1277         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
1278             unsafe.write(msg, promise);
1279         }
1280 
1281         @Override
1282         public void flush(ChannelHandlerContext ctx) throws Exception {
1283             unsafe.flush();
1284         }
1285 
1286         @Skip
1287         @Override
1288         public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1289 
1290         @Skip
1291         @Override
1292         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1293 
1294         @Skip
1295         @Override
1296         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1297             ctx.fireExceptionCaught(cause);
1298         }
1299 
1300         @Skip
1301         @Override
1302         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
1303             ctx.fireChannelRegistered();
1304         }
1305 
1306         @Skip
1307         @Override
1308         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
1309             ctx.fireChannelUnregistered();
1310         }
1311 
1312         @Skip
1313         @Override
1314         public void channelActive(ChannelHandlerContext ctx) throws Exception {
1315             ctx.fireChannelActive();
1316         }
1317 
1318         @Skip
1319         @Override
1320         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1321             ctx.fireChannelInactive();
1322         }
1323 
1324         @Skip
1325         @Override
1326         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1327             ctx.fireChannelRead(msg);
1328         }
1329 
1330         @Skip
1331         @Override
1332         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1333             ctx.fireChannelReadComplete();
1334         }
1335 
1336         @Skip
1337         @Override
1338         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1339             ctx.fireUserEventTriggered(evt);
1340         }
1341 
1342         @Skip
1343         @Override
1344         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
1345             ctx.fireChannelWritabilityChanged();
1346         }
1347     }
1348 }