View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.concurrent.EventExecutor;
21  import io.netty.util.concurrent.EventExecutorGroup;
22  import io.netty.util.concurrent.FastThreadLocal;
23  import io.netty.util.internal.ObjectUtil;
24  import io.netty.util.internal.StringUtil;
25  import io.netty.util.internal.logging.InternalLogger;
26  import io.netty.util.internal.logging.InternalLoggerFactory;
27  
28  import java.net.SocketAddress;
29  import java.util.ArrayList;
30  import java.util.IdentityHashMap;
31  import java.util.Iterator;
32  import java.util.LinkedHashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.NoSuchElementException;
36  import java.util.WeakHashMap;
37  import java.util.concurrent.RejectedExecutionException;
38  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
39  
40  /**
41   * The default {@link ChannelPipeline} implementation.  It is usually created
42   * by a {@link Channel} implementation when the {@link Channel} is created.
43   */
44  public class DefaultChannelPipeline implements ChannelPipeline {
45  
46      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
47  
48      private static final String HEAD_NAME = generateName0(HeadContext.class);
49      private static final String TAIL_NAME = generateName0(TailContext.class);
50  
51      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
52              new FastThreadLocal<Map<Class<?>, String>>() {
53          @Override
54          protected Map<Class<?>, String> initialValue() throws Exception {
55              return new WeakHashMap<Class<?>, String>();
56          }
57      };
58  
59      private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
60              AtomicReferenceFieldUpdater.newUpdater(
61                      DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
62      final AbstractChannelHandlerContext head;
63      final AbstractChannelHandlerContext tail;
64  
65      private final Channel channel;
66      private Map<EventExecutorGroup, EventExecutor> childExecutors;
67      private volatile MessageSizeEstimator.Handle estimatorHandle;
68      private boolean firstRegistration = true;
69  
70      /**
71       * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
72       * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
73       *
74       * We only keep the head because it is expected that the list is used infrequently and its size is small.
75       * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
76       * complexity.
77       */
78      private PendingHandlerCallback pendingHandlerCallbackHead;
79  
80      /**
81       * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
82       * change.
83       */
84      private boolean registered;
85  
86      protected DefaultChannelPipeline(Channel channel) {
87          this.channel = ObjectUtil.checkNotNull(channel, "channel");
88  
89          tail = new TailContext(this);
90          head = new HeadContext(this);
91  
92          head.next = tail;
93          tail.prev = head;
94      }
95  
96      final MessageSizeEstimator.Handle estimatorHandle() {
97          MessageSizeEstimator.Handle handle = estimatorHandle;
98          if (handle == null) {
99              handle = channel.config().getMessageSizeEstimator().newHandle();
100             if (!ESTIMATOR.compareAndSet(this, null, handle)) {
101                 handle = estimatorHandle;
102             }
103         }
104         return handle;
105     }
106 
107     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
108         return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
109     }
110 
111     private EventExecutor childExecutor(EventExecutorGroup group) {
112         if (group == null) {
113             return null;
114         }
115         Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
116         if (pinEventExecutor != null && !pinEventExecutor) {
117             return group.next();
118         }
119         Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
120         if (childExecutors == null) {
121             // Use size of 4 as most people only use one extra EventExecutor.
122             childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
123         }
124         // Pin one of the child executors once and remember it so that the same child executor
125         // is used to fire events for the same channel.
126         EventExecutor childExecutor = childExecutors.get(group);
127         if (childExecutor == null) {
128             childExecutor = group.next();
129             childExecutors.put(group, childExecutor);
130         }
131         return childExecutor;
132     }
133     @Override
134     public final Channel channel() {
135         return channel;
136     }
137 
138     @Override
139     public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
140         return addFirst(null, name, handler);
141     }
142 
143     @Override
144     public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
145         final AbstractChannelHandlerContext newCtx;
146         synchronized (this) {
147             checkMultiplicity(handler);
148             name = filterName(name, handler);
149 
150             newCtx = newContext(group, name, handler);
151 
152             addFirst0(newCtx);
153 
154             // If the registered is false it means that the channel was not registered on an eventloop yet.
155             // In this case we add the context to the pipeline and add a task that will call
156             // ChannelHandler.handlerAdded(...) once the channel is registered.
157             if (!registered) {
158                 newCtx.setAddPending();
159                 callHandlerCallbackLater(newCtx, true);
160                 return this;
161             }
162 
163             EventExecutor executor = newCtx.executor();
164             if (!executor.inEventLoop()) {
165                 newCtx.setAddPending();
166                 executor.execute(new Runnable() {
167                     @Override
168                     public void run() {
169                         callHandlerAdded0(newCtx);
170                     }
171                 });
172                 return this;
173             }
174         }
175         callHandlerAdded0(newCtx);
176         return this;
177     }
178 
179     private void addFirst0(AbstractChannelHandlerContext newCtx) {
180         AbstractChannelHandlerContext nextCtx = head.next;
181         newCtx.prev = head;
182         newCtx.next = nextCtx;
183         head.next = newCtx;
184         nextCtx.prev = newCtx;
185     }
186 
187     @Override
188     public final ChannelPipeline addLast(String name, ChannelHandler handler) {
189         return addLast(null, name, handler);
190     }
191 
192     @Override
193     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
194         final AbstractChannelHandlerContext newCtx;
195         synchronized (this) {
196             checkMultiplicity(handler);
197 
198             newCtx = newContext(group, filterName(name, handler), handler);
199 
200             addLast0(newCtx);
201 
202             // If the registered is false it means that the channel was not registered on an eventloop yet.
203             // In this case we add the context to the pipeline and add a task that will call
204             // ChannelHandler.handlerAdded(...) once the channel is registered.
205             if (!registered) {
206                 newCtx.setAddPending();
207                 callHandlerCallbackLater(newCtx, true);
208                 return this;
209             }
210 
211             EventExecutor executor = newCtx.executor();
212             if (!executor.inEventLoop()) {
213                 newCtx.setAddPending();
214                 executor.execute(new Runnable() {
215                     @Override
216                     public void run() {
217                         callHandlerAdded0(newCtx);
218                     }
219                 });
220                 return this;
221             }
222         }
223         callHandlerAdded0(newCtx);
224         return this;
225     }
226 
227     private void addLast0(AbstractChannelHandlerContext newCtx) {
228         AbstractChannelHandlerContext prev = tail.prev;
229         newCtx.prev = prev;
230         newCtx.next = tail;
231         prev.next = newCtx;
232         tail.prev = newCtx;
233     }
234 
235     @Override
236     public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
237         return addBefore(null, baseName, name, handler);
238     }
239 
240     @Override
241     public final ChannelPipeline addBefore(
242             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
243         final AbstractChannelHandlerContext newCtx;
244         final AbstractChannelHandlerContext ctx;
245         synchronized (this) {
246             checkMultiplicity(handler);
247             name = filterName(name, handler);
248             ctx = getContextOrDie(baseName);
249 
250             newCtx = newContext(group, name, handler);
251 
252             addBefore0(ctx, newCtx);
253 
254             // If the registered is false it means that the channel was not registered on an eventloop yet.
255             // In this case we add the context to the pipeline and add a task that will call
256             // ChannelHandler.handlerAdded(...) once the channel is registered.
257             if (!registered) {
258                 newCtx.setAddPending();
259                 callHandlerCallbackLater(newCtx, true);
260                 return this;
261             }
262 
263             EventExecutor executor = newCtx.executor();
264             if (!executor.inEventLoop()) {
265                 newCtx.setAddPending();
266                 executor.execute(new Runnable() {
267                     @Override
268                     public void run() {
269                         callHandlerAdded0(newCtx);
270                     }
271                 });
272                 return this;
273             }
274         }
275         callHandlerAdded0(newCtx);
276         return this;
277     }
278 
279     private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
280         newCtx.prev = ctx.prev;
281         newCtx.next = ctx;
282         ctx.prev.next = newCtx;
283         ctx.prev = newCtx;
284     }
285 
286     private String filterName(String name, ChannelHandler handler) {
287         if (name == null) {
288             return generateName(handler);
289         }
290         checkDuplicateName(name);
291         return name;
292     }
293 
294     @Override
295     public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
296         return addAfter(null, baseName, name, handler);
297     }
298 
299     @Override
300     public final ChannelPipeline addAfter(
301             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
302         final AbstractChannelHandlerContext newCtx;
303         final AbstractChannelHandlerContext ctx;
304 
305         synchronized (this) {
306             checkMultiplicity(handler);
307             name = filterName(name, handler);
308             ctx = getContextOrDie(baseName);
309 
310             newCtx = newContext(group, name, handler);
311 
312             addAfter0(ctx, newCtx);
313 
314             // If the registered is false it means that the channel was not registered on an eventloop yet.
315             // In this case we remove the context from the pipeline and add a task that will call
316             // ChannelHandler.handlerRemoved(...) once the channel is registered.
317             if (!registered) {
318                 newCtx.setAddPending();
319                 callHandlerCallbackLater(newCtx, true);
320                 return this;
321             }
322             EventExecutor executor = newCtx.executor();
323             if (!executor.inEventLoop()) {
324                 newCtx.setAddPending();
325                 executor.execute(new Runnable() {
326                     @Override
327                     public void run() {
328                         callHandlerAdded0(newCtx);
329                     }
330                 });
331                 return this;
332             }
333         }
334         callHandlerAdded0(newCtx);
335         return this;
336     }
337 
338     private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
339         newCtx.prev = ctx;
340         newCtx.next = ctx.next;
341         ctx.next.prev = newCtx;
342         ctx.next = newCtx;
343     }
344 
345     @Override
346     public final ChannelPipeline addFirst(ChannelHandler... handlers) {
347         return addFirst(null, handlers);
348     }
349 
350     @Override
351     public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
352         if (handlers == null) {
353             throw new NullPointerException("handlers");
354         }
355         if (handlers.length == 0 || handlers[0] == null) {
356             return this;
357         }
358 
359         int size;
360         for (size = 1; size < handlers.length; size ++) {
361             if (handlers[size] == null) {
362                 break;
363             }
364         }
365 
366         for (int i = size - 1; i >= 0; i --) {
367             ChannelHandler h = handlers[i];
368             addFirst(executor, null, h);
369         }
370 
371         return this;
372     }
373 
374     @Override
375     public final ChannelPipeline addLast(ChannelHandler... handlers) {
376         return addLast(null, handlers);
377     }
378 
379     @Override
380     public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
381         if (handlers == null) {
382             throw new NullPointerException("handlers");
383         }
384 
385         for (ChannelHandler h: handlers) {
386             if (h == null) {
387                 break;
388             }
389             addLast(executor, null, h);
390         }
391 
392         return this;
393     }
394 
395     private String generateName(ChannelHandler handler) {
396         Map<Class<?>, String> cache = nameCaches.get();
397         Class<?> handlerType = handler.getClass();
398         String name = cache.get(handlerType);
399         if (name == null) {
400             name = generateName0(handlerType);
401             cache.put(handlerType, name);
402         }
403 
404         // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
405         // any name conflicts.  Note that we don't cache the names generated here.
406         if (context0(name) != null) {
407             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
408             for (int i = 1;; i ++) {
409                 String newName = baseName + i;
410                 if (context0(newName) == null) {
411                     name = newName;
412                     break;
413                 }
414             }
415         }
416         return name;
417     }
418 
419     private static String generateName0(Class<?> handlerType) {
420         return StringUtil.simpleClassName(handlerType) + "#0";
421     }
422 
423     @Override
424     public final ChannelPipeline remove(ChannelHandler handler) {
425         remove(getContextOrDie(handler));
426         return this;
427     }
428 
429     @Override
430     public final ChannelHandler remove(String name) {
431         return remove(getContextOrDie(name)).handler();
432     }
433 
434     @SuppressWarnings("unchecked")
435     @Override
436     public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
437         return (T) remove(getContextOrDie(handlerType)).handler();
438     }
439 
440     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
441         assert ctx != head && ctx != tail;
442 
443         synchronized (this) {
444             remove0(ctx);
445 
446             // If the registered is false it means that the channel was not registered on an eventloop yet.
447             // In this case we remove the context from the pipeline and add a task that will call
448             // ChannelHandler.handlerRemoved(...) once the channel is registered.
449             if (!registered) {
450                 callHandlerCallbackLater(ctx, false);
451                 return ctx;
452             }
453 
454             EventExecutor executor = ctx.executor();
455             if (!executor.inEventLoop()) {
456                 executor.execute(new Runnable() {
457                     @Override
458                     public void run() {
459                         callHandlerRemoved0(ctx);
460                     }
461                 });
462                 return ctx;
463             }
464         }
465         callHandlerRemoved0(ctx);
466         return ctx;
467     }
468 
469     private static void remove0(AbstractChannelHandlerContext ctx) {
470         AbstractChannelHandlerContext prev = ctx.prev;
471         AbstractChannelHandlerContext next = ctx.next;
472         prev.next = next;
473         next.prev = prev;
474     }
475 
476     @Override
477     public final ChannelHandler removeFirst() {
478         if (head.next == tail) {
479             throw new NoSuchElementException();
480         }
481         return remove(head.next).handler();
482     }
483 
484     @Override
485     public final ChannelHandler removeLast() {
486         if (head.next == tail) {
487             throw new NoSuchElementException();
488         }
489         return remove(tail.prev).handler();
490     }
491 
492     @Override
493     public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
494         replace(getContextOrDie(oldHandler), newName, newHandler);
495         return this;
496     }
497 
498     @Override
499     public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
500         return replace(getContextOrDie(oldName), newName, newHandler);
501     }
502 
503     @Override
504     @SuppressWarnings("unchecked")
505     public final <T extends ChannelHandler> T replace(
506             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
507         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
508     }
509 
510     private ChannelHandler replace(
511             final AbstractChannelHandlerContext ctx, final String newName, ChannelHandler newHandler) {
512         assert ctx != head && ctx != tail;
513 
514         final AbstractChannelHandlerContext newCtx;
515         synchronized (this) {
516             checkMultiplicity(newHandler);
517             boolean sameName = ctx.name().equals(newName);
518             if (!sameName) {
519                 checkDuplicateName(newName);
520             }
521 
522             newCtx = newContext(ctx.executor, newName, newHandler);
523 
524             replace0(ctx, newCtx);
525 
526             // If the registered is false it means that the channel was not registered on an eventloop yet.
527             // In this case we replace the context in the pipeline
528             // and add a task that will call ChannelHandler.handlerAdded(...) and
529             // ChannelHandler.handlerRemoved(...) once the channel is registered.
530             if (!registered) {
531                 callHandlerCallbackLater(newCtx, true);
532                 callHandlerCallbackLater(ctx, false);
533                 return ctx.handler();
534             }
535             EventExecutor executor = ctx.executor();
536             if (!executor.inEventLoop()) {
537                 executor.execute(new Runnable() {
538                     @Override
539                     public void run() {
540                         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
541                         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
542                         // those event handlers must be called after handlerAdded().
543                         callHandlerAdded0(newCtx);
544                         callHandlerRemoved0(ctx);
545                     }
546                 });
547                 return ctx.handler();
548             }
549         }
550         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
551         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
552         // event handlers must be called after handlerAdded().
553         callHandlerAdded0(newCtx);
554         callHandlerRemoved0(ctx);
555         return ctx.handler();
556     }
557 
558     private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
559         AbstractChannelHandlerContext prev = oldCtx.prev;
560         AbstractChannelHandlerContext next = oldCtx.next;
561         newCtx.prev = prev;
562         newCtx.next = next;
563 
564         // Finish the replacement of oldCtx with newCtx in the linked list.
565         // Note that this doesn't mean events will be sent to the new handler immediately
566         // because we are currently at the event handler thread and no more than one handler methods can be invoked
567         // at the same time (we ensured that in replace().)
568         prev.next = newCtx;
569         next.prev = newCtx;
570 
571         // update the reference to the replacement so forward of buffered content will work correctly
572         oldCtx.prev = newCtx;
573         oldCtx.next = newCtx;
574     }
575 
576     private static void checkMultiplicity(ChannelHandler handler) {
577         if (handler instanceof ChannelHandlerAdapter) {
578             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
579             if (!h.isSharable() && h.added) {
580                 throw new ChannelPipelineException(
581                         h.getClass().getName() +
582                         " is not a @Sharable handler, so can't be added or removed multiple times.");
583             }
584             h.added = true;
585         }
586     }
587 
588     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
589         try {
590             ctx.handler().handlerAdded(ctx);
591             ctx.setAddComplete();
592         } catch (Throwable t) {
593             boolean removed = false;
594             try {
595                 remove0(ctx);
596                 try {
597                     ctx.handler().handlerRemoved(ctx);
598                 } finally {
599                     ctx.setRemoved();
600                 }
601                 removed = true;
602             } catch (Throwable t2) {
603                 if (logger.isWarnEnabled()) {
604                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
605                 }
606             }
607 
608             if (removed) {
609                 fireExceptionCaught(new ChannelPipelineException(
610                         ctx.handler().getClass().getName() +
611                         ".handlerAdded() has thrown an exception; removed.", t));
612             } else {
613                 fireExceptionCaught(new ChannelPipelineException(
614                         ctx.handler().getClass().getName() +
615                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
616             }
617         }
618     }
619 
620     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
621         // Notify the complete removal.
622         try {
623             try {
624                 ctx.handler().handlerRemoved(ctx);
625             } finally {
626                 ctx.setRemoved();
627             }
628         } catch (Throwable t) {
629             fireExceptionCaught(new ChannelPipelineException(
630                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
631         }
632     }
633 
634     final void invokeHandlerAddedIfNeeded() {
635         assert channel.eventLoop().inEventLoop();
636         if (firstRegistration) {
637             firstRegistration = false;
638             // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
639             // that were added before the registration was done.
640             callHandlerAddedForAllHandlers();
641         }
642     }
643 
644     @Override
645     public final ChannelHandler first() {
646         ChannelHandlerContext first = firstContext();
647         if (first == null) {
648             return null;
649         }
650         return first.handler();
651     }
652 
653     @Override
654     public final ChannelHandlerContext firstContext() {
655         AbstractChannelHandlerContext first = head.next;
656         if (first == tail) {
657             return null;
658         }
659         return head.next;
660     }
661 
662     @Override
663     public final ChannelHandler last() {
664         AbstractChannelHandlerContext last = tail.prev;
665         if (last == head) {
666             return null;
667         }
668         return last.handler();
669     }
670 
671     @Override
672     public final ChannelHandlerContext lastContext() {
673         AbstractChannelHandlerContext last = tail.prev;
674         if (last == head) {
675             return null;
676         }
677         return last;
678     }
679 
680     @Override
681     public final ChannelHandler get(String name) {
682         ChannelHandlerContext ctx = context(name);
683         if (ctx == null) {
684             return null;
685         } else {
686             return ctx.handler();
687         }
688     }
689 
690     @SuppressWarnings("unchecked")
691     @Override
692     public final <T extends ChannelHandler> T get(Class<T> handlerType) {
693         ChannelHandlerContext ctx = context(handlerType);
694         if (ctx == null) {
695             return null;
696         } else {
697             return (T) ctx.handler();
698         }
699     }
700 
701     @Override
702     public final ChannelHandlerContext context(String name) {
703         if (name == null) {
704             throw new NullPointerException("name");
705         }
706 
707         return context0(name);
708     }
709 
710     @Override
711     public final ChannelHandlerContext context(ChannelHandler handler) {
712         if (handler == null) {
713             throw new NullPointerException("handler");
714         }
715 
716         AbstractChannelHandlerContext ctx = head.next;
717         for (;;) {
718 
719             if (ctx == null) {
720                 return null;
721             }
722 
723             if (ctx.handler() == handler) {
724                 return ctx;
725             }
726 
727             ctx = ctx.next;
728         }
729     }
730 
731     @Override
732     public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
733         if (handlerType == null) {
734             throw new NullPointerException("handlerType");
735         }
736 
737         AbstractChannelHandlerContext ctx = head.next;
738         for (;;) {
739             if (ctx == null) {
740                 return null;
741             }
742             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
743                 return ctx;
744             }
745             ctx = ctx.next;
746         }
747     }
748 
749     @Override
750     public final List<String> names() {
751         List<String> list = new ArrayList<String>();
752         AbstractChannelHandlerContext ctx = head.next;
753         for (;;) {
754             if (ctx == null) {
755                 return list;
756             }
757             list.add(ctx.name());
758             ctx = ctx.next;
759         }
760     }
761 
762     @Override
763     public final Map<String, ChannelHandler> toMap() {
764         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
765         AbstractChannelHandlerContext ctx = head.next;
766         for (;;) {
767             if (ctx == tail) {
768                 return map;
769             }
770             map.put(ctx.name(), ctx.handler());
771             ctx = ctx.next;
772         }
773     }
774 
775     @Override
776     public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
777         return toMap().entrySet().iterator();
778     }
779 
780     /**
781      * Returns the {@link String} representation of this pipeline.
782      */
783     @Override
784     public final String toString() {
785         StringBuilder buf = new StringBuilder()
786             .append(StringUtil.simpleClassName(this))
787             .append('{');
788         AbstractChannelHandlerContext ctx = head.next;
789         for (;;) {
790             if (ctx == tail) {
791                 break;
792             }
793 
794             buf.append('(')
795                .append(ctx.name())
796                .append(" = ")
797                .append(ctx.handler().getClass().getName())
798                .append(')');
799 
800             ctx = ctx.next;
801             if (ctx == tail) {
802                 break;
803             }
804 
805             buf.append(", ");
806         }
807         buf.append('}');
808         return buf.toString();
809     }
810 
811     @Override
812     public final ChannelPipeline fireChannelRegistered() {
813         AbstractChannelHandlerContext.invokeChannelRegistered(head);
814         return this;
815     }
816 
817     @Override
818     public final ChannelPipeline fireChannelUnregistered() {
819         AbstractChannelHandlerContext.invokeChannelUnregistered(head);
820         return this;
821     }
822 
823     /**
824      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
825      * handlerRemoved().
826      *
827      * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)})
828      * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
829      * the handlers are removed after all events are handled.
830      *
831      * See: https://github.com/netty/netty/issues/3156
832      */
833     private synchronized void destroy() {
834         destroyUp(head.next, false);
835     }
836 
837     private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
838         final Thread currentThread = Thread.currentThread();
839         final AbstractChannelHandlerContext tail = this.tail;
840         for (;;) {
841             if (ctx == tail) {
842                 destroyDown(currentThread, tail.prev, inEventLoop);
843                 break;
844             }
845 
846             final EventExecutor executor = ctx.executor();
847             if (!inEventLoop && !executor.inEventLoop(currentThread)) {
848                 final AbstractChannelHandlerContext finalCtx = ctx;
849                 executor.execute(new Runnable() {
850                     @Override
851                     public void run() {
852                         destroyUp(finalCtx, true);
853                     }
854                 });
855                 break;
856             }
857 
858             ctx = ctx.next;
859             inEventLoop = false;
860         }
861     }
862 
863     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
864         // We have reached at tail; now traverse backwards.
865         final AbstractChannelHandlerContext head = this.head;
866         for (;;) {
867             if (ctx == head) {
868                 break;
869             }
870 
871             final EventExecutor executor = ctx.executor();
872             if (inEventLoop || executor.inEventLoop(currentThread)) {
873                 synchronized (this) {
874                     remove0(ctx);
875                 }
876                 callHandlerRemoved0(ctx);
877             } else {
878                 final AbstractChannelHandlerContext finalCtx = ctx;
879                 executor.execute(new Runnable() {
880                     @Override
881                     public void run() {
882                         destroyDown(Thread.currentThread(), finalCtx, true);
883                     }
884                 });
885                 break;
886             }
887 
888             ctx = ctx.prev;
889             inEventLoop = false;
890         }
891     }
892 
893     @Override
894     public final ChannelPipeline fireChannelActive() {
895         AbstractChannelHandlerContext.invokeChannelActive(head);
896         return this;
897     }
898 
899     @Override
900     public final ChannelPipeline fireChannelInactive() {
901         AbstractChannelHandlerContext.invokeChannelInactive(head);
902         return this;
903     }
904 
905     @Override
906     public final ChannelPipeline fireExceptionCaught(Throwable cause) {
907         AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
908         return this;
909     }
910 
911     @Override
912     public final ChannelPipeline fireUserEventTriggered(Object event) {
913         AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
914         return this;
915     }
916 
917     @Override
918     public final ChannelPipeline fireChannelRead(Object msg) {
919         AbstractChannelHandlerContext.invokeChannelRead(head, msg);
920         return this;
921     }
922 
923     @Override
924     public final ChannelPipeline fireChannelReadComplete() {
925         AbstractChannelHandlerContext.invokeChannelReadComplete(head);
926         return this;
927     }
928 
929     @Override
930     public final ChannelPipeline fireChannelWritabilityChanged() {
931         AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
932         return this;
933     }
934 
935     @Override
936     public final ChannelFuture bind(SocketAddress localAddress) {
937         return tail.bind(localAddress);
938     }
939 
940     @Override
941     public final ChannelFuture connect(SocketAddress remoteAddress) {
942         return tail.connect(remoteAddress);
943     }
944 
945     @Override
946     public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
947         return tail.connect(remoteAddress, localAddress);
948     }
949 
950     @Override
951     public final ChannelFuture disconnect() {
952         return tail.disconnect();
953     }
954 
955     @Override
956     public final ChannelFuture close() {
957         return tail.close();
958     }
959 
960     @Override
961     public final ChannelFuture deregister() {
962         return tail.deregister();
963     }
964 
965     @Override
966     public final ChannelPipeline flush() {
967         tail.flush();
968         return this;
969     }
970 
971     @Override
972     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
973         return tail.bind(localAddress, promise);
974     }
975 
976     @Override
977     public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
978         return tail.connect(remoteAddress, promise);
979     }
980 
981     @Override
982     public final ChannelFuture connect(
983             SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
984         return tail.connect(remoteAddress, localAddress, promise);
985     }
986 
987     @Override
988     public final ChannelFuture disconnect(ChannelPromise promise) {
989         return tail.disconnect(promise);
990     }
991 
992     @Override
993     public final ChannelFuture close(ChannelPromise promise) {
994         return tail.close(promise);
995     }
996 
997     @Override
998     public final ChannelFuture deregister(final ChannelPromise promise) {
999         return tail.deregister(promise);
1000     }
1001 
1002     @Override
1003     public final ChannelPipeline read() {
1004         tail.read();
1005         return this;
1006     }
1007 
1008     @Override
1009     public final ChannelFuture write(Object msg) {
1010         return tail.write(msg);
1011     }
1012 
1013     @Override
1014     public final ChannelFuture write(Object msg, ChannelPromise promise) {
1015         return tail.write(msg, promise);
1016     }
1017 
1018     @Override
1019     public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
1020         return tail.writeAndFlush(msg, promise);
1021     }
1022 
1023     @Override
1024     public final ChannelFuture writeAndFlush(Object msg) {
1025         return tail.writeAndFlush(msg);
1026     }
1027 
1028     private void checkDuplicateName(String name) {
1029         if (context0(name) != null) {
1030             throw new IllegalArgumentException("Duplicate handler name: " + name);
1031         }
1032     }
1033 
1034     private AbstractChannelHandlerContext context0(String name) {
1035         AbstractChannelHandlerContext context = head.next;
1036         while (context != tail) {
1037             if (context.name().equals(name)) {
1038                 return context;
1039             }
1040             context = context.next;
1041         }
1042         return null;
1043     }
1044 
1045     private AbstractChannelHandlerContext getContextOrDie(String name) {
1046         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1047         if (ctx == null) {
1048             throw new NoSuchElementException(name);
1049         } else {
1050             return ctx;
1051         }
1052     }
1053 
1054     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1055         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1056         if (ctx == null) {
1057             throw new NoSuchElementException(handler.getClass().getName());
1058         } else {
1059             return ctx;
1060         }
1061     }
1062 
1063     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1064         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1065         if (ctx == null) {
1066             throw new NoSuchElementException(handlerType.getName());
1067         } else {
1068             return ctx;
1069         }
1070     }
1071 
1072     private void callHandlerAddedForAllHandlers() {
1073         final PendingHandlerCallback pendingHandlerCallbackHead;
1074         synchronized (this) {
1075             assert !registered;
1076 
1077             // This Channel itself was registered.
1078             registered = true;
1079 
1080             pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1081             // Null out so it can be GC'ed.
1082             this.pendingHandlerCallbackHead = null;
1083         }
1084 
1085         // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
1086         // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
1087         // the EventLoop.
1088         PendingHandlerCallback task = pendingHandlerCallbackHead;
1089         while (task != null) {
1090             task.execute();
1091             task = task.next;
1092         }
1093     }
1094 
1095     private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1096         assert !registered;
1097 
1098         PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1099         PendingHandlerCallback pending = pendingHandlerCallbackHead;
1100         if (pending == null) {
1101             pendingHandlerCallbackHead = task;
1102         } else {
1103             // Find the tail of the linked-list.
1104             while (pending.next != null) {
1105                 pending = pending.next;
1106             }
1107             pending.next = task;
1108         }
1109     }
1110 
1111     /**
1112      * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
1113      * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
1114      */
1115     protected void onUnhandledInboundException(Throwable cause) {
1116         try {
1117             logger.warn(
1118                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1119                             "It usually means the last handler in the pipeline did not handle the exception.",
1120                     cause);
1121         } finally {
1122             ReferenceCountUtil.release(cause);
1123         }
1124     }
1125 
1126     /**
1127      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1128      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1129      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1130      */
1131     protected void onUnhandledInboundMessage(Object msg) {
1132         try {
1133             logger.debug(
1134                     "Discarded inbound message {} that reached at the tail of the pipeline. " +
1135                             "Please check your pipeline configuration.", msg);
1136         } finally {
1137             ReferenceCountUtil.release(msg);
1138         }
1139     }
1140 
1141     // A special catch-all handler that handles both bytes and messages.
1142     final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1143 
1144         TailContext(DefaultChannelPipeline pipeline) {
1145             super(pipeline, null, TAIL_NAME, true, false);
1146             setAddComplete();
1147         }
1148 
1149         @Override
1150         public ChannelHandler handler() {
1151             return this;
1152         }
1153 
1154         @Override
1155         public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
1156 
1157         @Override
1158         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
1159 
1160         @Override
1161         public void channelActive(ChannelHandlerContext ctx) throws Exception { }
1162 
1163         @Override
1164         public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
1165 
1166         @Override
1167         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
1168 
1169         @Override
1170         public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
1171 
1172         @Override
1173         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
1174 
1175         @Override
1176         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1177             // This may not be a configuration error and so don't log anything.
1178             // The event may be superfluous for the current pipeline configuration.
1179             ReferenceCountUtil.release(evt);
1180         }
1181 
1182         @Override
1183         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1184             onUnhandledInboundException(cause);
1185         }
1186 
1187         @Override
1188         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1189             onUnhandledInboundMessage(msg);
1190         }
1191 
1192         @Override
1193         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
1194     }
1195 
1196     final class HeadContext extends AbstractChannelHandlerContext
1197             implements ChannelOutboundHandler, ChannelInboundHandler {
1198 
1199         private final Unsafe unsafe;
1200 
1201         HeadContext(DefaultChannelPipeline pipeline) {
1202             super(pipeline, null, HEAD_NAME, false, true);
1203             unsafe = pipeline.channel().unsafe();
1204             setAddComplete();
1205         }
1206 
1207         @Override
1208         public ChannelHandler handler() {
1209             return this;
1210         }
1211 
1212         @Override
1213         public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
1214             // NOOP
1215         }
1216 
1217         @Override
1218         public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
1219             // NOOP
1220         }
1221 
1222         @Override
1223         public void bind(
1224                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
1225                 throws Exception {
1226             unsafe.bind(localAddress, promise);
1227         }
1228 
1229         @Override
1230         public void connect(
1231                 ChannelHandlerContext ctx,
1232                 SocketAddress remoteAddress, SocketAddress localAddress,
1233                 ChannelPromise promise) throws Exception {
1234             unsafe.connect(remoteAddress, localAddress, promise);
1235         }
1236 
1237         @Override
1238         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1239             unsafe.disconnect(promise);
1240         }
1241 
1242         @Override
1243         public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1244             unsafe.close(promise);
1245         }
1246 
1247         @Override
1248         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1249             unsafe.deregister(promise);
1250         }
1251 
1252         @Override
1253         public void read(ChannelHandlerContext ctx) {
1254             unsafe.beginRead();
1255         }
1256 
1257         @Override
1258         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
1259             unsafe.write(msg, promise);
1260         }
1261 
1262         @Override
1263         public void flush(ChannelHandlerContext ctx) throws Exception {
1264             unsafe.flush();
1265         }
1266 
1267         @Override
1268         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1269             ctx.fireExceptionCaught(cause);
1270         }
1271 
1272         @Override
1273         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
1274             invokeHandlerAddedIfNeeded();
1275             ctx.fireChannelRegistered();
1276         }
1277 
1278         @Override
1279         public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
1280             ctx.fireChannelUnregistered();
1281 
1282             // Remove all handlers sequentially if channel is closed and unregistered.
1283             if (!channel.isOpen()) {
1284                 destroy();
1285             }
1286         }
1287 
1288         @Override
1289         public void channelActive(ChannelHandlerContext ctx) throws Exception {
1290             ctx.fireChannelActive();
1291 
1292             readIfIsAutoRead();
1293         }
1294 
1295         @Override
1296         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1297             ctx.fireChannelInactive();
1298         }
1299 
1300         @Override
1301         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
1302             ctx.fireChannelRead(msg);
1303         }
1304 
1305         @Override
1306         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1307             ctx.fireChannelReadComplete();
1308 
1309             readIfIsAutoRead();
1310         }
1311 
1312         private void readIfIsAutoRead() {
1313             if (channel.config().isAutoRead()) {
1314                 channel.read();
1315             }
1316         }
1317 
1318         @Override
1319         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
1320             ctx.fireUserEventTriggered(evt);
1321         }
1322 
1323         @Override
1324         public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
1325             ctx.fireChannelWritabilityChanged();
1326         }
1327     }
1328 
1329     private abstract static class PendingHandlerCallback implements Runnable {
1330         final AbstractChannelHandlerContext ctx;
1331         PendingHandlerCallback next;
1332 
1333         PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1334             this.ctx = ctx;
1335         }
1336 
1337         abstract void execute();
1338     }
1339 
1340     private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1341 
1342         PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1343             super(ctx);
1344         }
1345 
1346         @Override
1347         public void run() {
1348             callHandlerAdded0(ctx);
1349         }
1350 
1351         @Override
1352         void execute() {
1353             EventExecutor executor = ctx.executor();
1354             if (executor.inEventLoop()) {
1355                 callHandlerAdded0(ctx);
1356             } else {
1357                 try {
1358                     executor.execute(this);
1359                 } catch (RejectedExecutionException e) {
1360                     if (logger.isWarnEnabled()) {
1361                         logger.warn(
1362                                 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1363                                 executor, ctx.name(), e);
1364                     }
1365                     remove0(ctx);
1366                     ctx.setRemoved();
1367                 }
1368             }
1369         }
1370     }
1371 
1372     private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1373 
1374         PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1375             super(ctx);
1376         }
1377 
1378         @Override
1379         public void run() {
1380             callHandlerRemoved0(ctx);
1381         }
1382 
1383         @Override
1384         void execute() {
1385             EventExecutor executor = ctx.executor();
1386             if (executor.inEventLoop()) {
1387                 callHandlerRemoved0(ctx);
1388             } else {
1389                 try {
1390                     executor.execute(this);
1391                 } catch (RejectedExecutionException e) {
1392                     if (logger.isWarnEnabled()) {
1393                         logger.warn(
1394                                 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1395                                         " removing handler {}.", executor, ctx.name(), e);
1396                     }
1397                     // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
1398                     ctx.setRemoved();
1399                 }
1400             }
1401         }
1402     }
1403 }