View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.channel.Channel.Unsafe;
19  import io.netty.util.ReferenceCountUtil;
20  import io.netty.util.ResourceLeakDetector;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.concurrent.EventExecutorGroup;
23  import io.netty.util.concurrent.FastThreadLocal;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.StringUtil;
26  import io.netty.util.internal.logging.InternalLogger;
27  import io.netty.util.internal.logging.InternalLoggerFactory;
28  
29  import java.net.SocketAddress;
30  import java.util.ArrayList;
31  import java.util.IdentityHashMap;
32  import java.util.Iterator;
33  import java.util.LinkedHashMap;
34  import java.util.List;
35  import java.util.Map;
36  import java.util.NoSuchElementException;
37  import java.util.WeakHashMap;
38  import java.util.concurrent.RejectedExecutionException;
39  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
40  
41  /**
42   * The default {@link ChannelPipeline} implementation.  It is usually created
43   * by a {@link Channel} implementation when the {@link Channel} is created.
44   */
45  public class DefaultChannelPipeline implements ChannelPipeline {
46  
47      static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
48  
49      private static final String HEAD_NAME = generateName0(HeadContext.class);
50      private static final String TAIL_NAME = generateName0(TailContext.class);
51  
52      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
53              new FastThreadLocal<Map<Class<?>, String>>() {
54          @Override
55          protected Map<Class<?>, String> initialValue() {
56              return new WeakHashMap<Class<?>, String>();
57          }
58      };
59  
60      private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
61              AtomicReferenceFieldUpdater.newUpdater(
62                      DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
63      final HeadContext head;
64      final TailContext tail;
65  
66      private final Channel channel;
67      private final ChannelFuture succeededFuture;
68      private final VoidChannelPromise voidPromise;
69      private final boolean touch = ResourceLeakDetector.isEnabled();
70  
71      private Map<EventExecutorGroup, EventExecutor> childExecutors;
72      private volatile MessageSizeEstimator.Handle estimatorHandle;
73      private boolean firstRegistration = true;
74  
75      /**
76       * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
77       * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
78       *
79       * We only keep the head because it is expected that the list is used infrequently and its size is small.
80       * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
81       * complexity.
82       */
83      private PendingHandlerCallback pendingHandlerCallbackHead;
84  
85      /**
86       * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
87       * change.
88       */
89      private boolean registered;
90  
91      protected DefaultChannelPipeline(Channel channel) {
92          this.channel = ObjectUtil.checkNotNull(channel, "channel");
93          succeededFuture = new SucceededChannelFuture(channel, null);
94          voidPromise = new VoidChannelPromise(channel, true);
95  
96          tail = new TailContext(this);
97          head = new HeadContext(this);
98  
99          head.next = tail;
100         tail.prev = head;
101     }
102 
103     final MessageSizeEstimator.Handle estimatorHandle() {
104         MessageSizeEstimator.Handle handle = estimatorHandle;
105         if (handle == null) {
106             handle = channel.config().getMessageSizeEstimator().newHandle();
107             if (!ESTIMATOR.compareAndSet(this, null, handle)) {
108                 handle = estimatorHandle;
109             }
110         }
111         return handle;
112     }
113 
114     final Object touch(Object msg, AbstractChannelHandlerContext next) {
115         return touch ? ReferenceCountUtil.touch(msg, next) : msg;
116     }
117 
118     private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
119         return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
120     }
121 
122     private EventExecutor childExecutor(EventExecutorGroup group) {
123         if (group == null) {
124             return null;
125         }
126         Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
127         if (pinEventExecutor != null && !pinEventExecutor) {
128             return group.next();
129         }
130         Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
131         if (childExecutors == null) {
132             // Use size of 4 as most people only use one extra EventExecutor.
133             childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
134         }
135         // Pin one of the child executors once and remember it so that the same child executor
136         // is used to fire events for the same channel.
137         EventExecutor childExecutor = childExecutors.get(group);
138         if (childExecutor == null) {
139             childExecutor = group.next();
140             childExecutors.put(group, childExecutor);
141         }
142         return childExecutor;
143     }
144     @Override
145     public final Channel channel() {
146         return channel;
147     }
148 
149     @Override
150     public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
151         return addFirst(null, name, handler);
152     }
153 
154     private enum AddStrategy {
155         ADD_FIRST,
156         ADD_LAST,
157         ADD_BEFORE,
158         ADD_AFTER;
159     }
160 
161     private ChannelPipeline internalAdd(EventExecutorGroup group, String name,
162                                         ChannelHandler handler, String baseName,
163                                         AddStrategy addStrategy) {
164         final AbstractChannelHandlerContext newCtx;
165         synchronized (this) {
166             checkMultiplicity(handler);
167             name = filterName(name, handler);
168 
169             newCtx = newContext(group, name, handler);
170 
171             switch (addStrategy) {
172                 case ADD_FIRST:
173                     addFirst0(newCtx);
174                     break;
175                 case ADD_LAST:
176                     addLast0(newCtx);
177                     break;
178                 case ADD_BEFORE:
179                     addBefore0(getContextOrDie(baseName), newCtx);
180                     break;
181                 case ADD_AFTER:
182                     addAfter0(getContextOrDie(baseName), newCtx);
183                     break;
184                 default:
185                     throw new IllegalArgumentException("unknown add strategy: " + addStrategy);
186             }
187 
188             // If the registered is false it means that the channel was not registered on an eventLoop yet.
189             // In this case we add the context to the pipeline and add a task that will call
190             // ChannelHandler.handlerAdded(...) once the channel is registered.
191             if (!registered) {
192                 newCtx.setAddPending();
193                 callHandlerCallbackLater(newCtx, true);
194                 return this;
195             }
196 
197             EventExecutor executor = newCtx.executor();
198             if (!executor.inEventLoop()) {
199                 callHandlerAddedInEventLoop(newCtx, executor);
200                 return this;
201             }
202         }
203         callHandlerAdded0(newCtx);
204         return this;
205     }
206 
207     @Override
208     public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
209         return internalAdd(group, name, handler, null, AddStrategy.ADD_FIRST);
210     }
211 
212     private void addFirst0(AbstractChannelHandlerContext newCtx) {
213         AbstractChannelHandlerContext nextCtx = head.next;
214         newCtx.prev = head;
215         newCtx.next = nextCtx;
216         head.next = newCtx;
217         nextCtx.prev = newCtx;
218     }
219 
220     @Override
221     public final ChannelPipeline addLast(String name, ChannelHandler handler) {
222         return addLast(null, name, handler);
223     }
224 
225     @Override
226     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
227         return internalAdd(group, name, handler, null, AddStrategy.ADD_LAST);
228     }
229 
230     private void addLast0(AbstractChannelHandlerContext newCtx) {
231         AbstractChannelHandlerContext prev = tail.prev;
232         newCtx.prev = prev;
233         newCtx.next = tail;
234         prev.next = newCtx;
235         tail.prev = newCtx;
236     }
237 
238     @Override
239     public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
240         return addBefore(null, baseName, name, handler);
241     }
242 
243     @Override
244     public final ChannelPipeline addBefore(
245             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
246         return internalAdd(group, name, handler, baseName, AddStrategy.ADD_BEFORE);
247     }
248 
249     private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
250         newCtx.prev = ctx.prev;
251         newCtx.next = ctx;
252         ctx.prev.next = newCtx;
253         ctx.prev = newCtx;
254     }
255 
256     private String filterName(String name, ChannelHandler handler) {
257         if (name == null) {
258             return generateName(handler);
259         }
260         checkDuplicateName(name);
261         return name;
262     }
263 
264     @Override
265     public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
266         return addAfter(null, baseName, name, handler);
267     }
268 
269     @Override
270     public final ChannelPipeline addAfter(
271             EventExecutorGroup group, String baseName, String name, ChannelHandler handler) {
272         return internalAdd(group, name, handler, baseName, AddStrategy.ADD_AFTER);
273     }
274 
275     private static void addAfter0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
276         newCtx.prev = ctx;
277         newCtx.next = ctx.next;
278         ctx.next.prev = newCtx;
279         ctx.next = newCtx;
280     }
281 
282     public final ChannelPipeline addFirst(ChannelHandler handler) {
283         return addFirst(null, handler);
284     }
285 
286     @Override
287     public final ChannelPipeline addFirst(ChannelHandler... handlers) {
288         return addFirst(null, handlers);
289     }
290 
291     @Override
292     public final ChannelPipeline addFirst(EventExecutorGroup executor, ChannelHandler... handlers) {
293         ObjectUtil.checkNotNull(handlers, "handlers");
294         if (handlers.length == 0 || handlers[0] == null) {
295             return this;
296         }
297 
298         int size;
299         for (size = 1; size < handlers.length; size ++) {
300             if (handlers[size] == null) {
301                 break;
302             }
303         }
304 
305         for (int i = size - 1; i >= 0; i --) {
306             ChannelHandler h = handlers[i];
307             addFirst(executor, null, h);
308         }
309 
310         return this;
311     }
312 
313     public final ChannelPipeline addLast(ChannelHandler handler) {
314         return addLast(null, handler);
315     }
316 
317     @Override
318     public final ChannelPipeline addLast(ChannelHandler... handlers) {
319         return addLast(null, handlers);
320     }
321 
322     @Override
323     public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
324         ObjectUtil.checkNotNull(handlers, "handlers");
325 
326         for (ChannelHandler h: handlers) {
327             if (h == null) {
328                 break;
329             }
330             addLast(executor, null, h);
331         }
332 
333         return this;
334     }
335 
336     private String generateName(ChannelHandler handler) {
337         Map<Class<?>, String> cache = nameCaches.get();
338         Class<?> handlerType = handler.getClass();
339         String name = cache.get(handlerType);
340         if (name == null) {
341             name = generateName0(handlerType);
342             cache.put(handlerType, name);
343         }
344 
345         // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
346         // any name conflicts.  Note that we don't cache the names generated here.
347         if (context0(name) != null) {
348             String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
349             for (int i = 1;; i ++) {
350                 String newName = baseName + i;
351                 if (context0(newName) == null) {
352                     name = newName;
353                     break;
354                 }
355             }
356         }
357         return name;
358     }
359 
360     private static String generateName0(Class<?> handlerType) {
361         return StringUtil.simpleClassName(handlerType) + "#0";
362     }
363 
364     @Override
365     public final ChannelPipeline remove(ChannelHandler handler) {
366         remove(getContextOrDie(handler));
367         return this;
368     }
369 
370     @Override
371     public final ChannelHandler remove(String name) {
372         return remove(getContextOrDie(name)).handler();
373     }
374 
375     @SuppressWarnings("unchecked")
376     @Override
377     public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
378         return (T) remove(getContextOrDie(handlerType)).handler();
379     }
380 
381     public final <T extends ChannelHandler> T removeIfExists(String name) {
382         return removeIfExists(context(name));
383     }
384 
385     public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
386         return removeIfExists(context(handlerType));
387     }
388 
389     public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
390         return removeIfExists(context(handler));
391     }
392 
393     @SuppressWarnings("unchecked")
394     private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
395         if (ctx == null) {
396             return null;
397         }
398         return (T) remove((AbstractChannelHandlerContext) ctx).handler();
399     }
400 
401     private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
402         assert ctx != head && ctx != tail;
403 
404         synchronized (this) {
405             atomicRemoveFromHandlerList(ctx);
406 
407             // If the registered is false it means that the channel was not registered on an eventloop yet.
408             // In this case we remove the context from the pipeline and add a task that will call
409             // ChannelHandler.handlerRemoved(...) once the channel is registered.
410             if (!registered) {
411                 callHandlerCallbackLater(ctx, false);
412                 return ctx;
413             }
414 
415             EventExecutor executor = ctx.executor();
416             if (!executor.inEventLoop()) {
417                 executor.execute(new Runnable() {
418                     @Override
419                     public void run() {
420                         callHandlerRemoved0(ctx);
421                     }
422                 });
423                 return ctx;
424             }
425         }
426         callHandlerRemoved0(ctx);
427         return ctx;
428     }
429 
430     /**
431      * Method is synchronized to make the handler removal from the double linked list atomic.
432      */
433     private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
434         AbstractChannelHandlerContext prev = ctx.prev;
435         AbstractChannelHandlerContext next = ctx.next;
436         prev.next = next;
437         next.prev = prev;
438     }
439 
440     @Override
441     public final ChannelHandler removeFirst() {
442         if (head.next == tail) {
443             throw new NoSuchElementException();
444         }
445         return remove(head.next).handler();
446     }
447 
448     @Override
449     public final ChannelHandler removeLast() {
450         if (head.next == tail) {
451             throw new NoSuchElementException();
452         }
453         return remove(tail.prev).handler();
454     }
455 
456     @Override
457     public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
458         replace(getContextOrDie(oldHandler), newName, newHandler);
459         return this;
460     }
461 
462     @Override
463     public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
464         return replace(getContextOrDie(oldName), newName, newHandler);
465     }
466 
467     @Override
468     @SuppressWarnings("unchecked")
469     public final <T extends ChannelHandler> T replace(
470             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
471         return (T) replace(getContextOrDie(oldHandlerType), newName, newHandler);
472     }
473 
474     private ChannelHandler replace(
475             final AbstractChannelHandlerContext ctx, String newName, ChannelHandler newHandler) {
476         assert ctx != head && ctx != tail;
477 
478         final AbstractChannelHandlerContext newCtx;
479         synchronized (this) {
480             checkMultiplicity(newHandler);
481             if (newName == null) {
482                 newName = generateName(newHandler);
483             } else {
484                 boolean sameName = ctx.name().equals(newName);
485                 if (!sameName) {
486                     checkDuplicateName(newName);
487                 }
488             }
489 
490             newCtx = newContext(ctx.executor, newName, newHandler);
491 
492             replace0(ctx, newCtx);
493 
494             // If the registered is false it means that the channel was not registered on an eventloop yet.
495             // In this case we replace the context in the pipeline
496             // and add a task that will call ChannelHandler.handlerAdded(...) and
497             // ChannelHandler.handlerRemoved(...) once the channel is registered.
498             if (!registered) {
499                 callHandlerCallbackLater(newCtx, true);
500                 callHandlerCallbackLater(ctx, false);
501                 return ctx.handler();
502             }
503             EventExecutor executor = ctx.executor();
504             if (!executor.inEventLoop()) {
505                 executor.execute(new Runnable() {
506                     @Override
507                     public void run() {
508                         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
509                         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
510                         // those event handlers must be called after handlerAdded().
511                         callHandlerAdded0(newCtx);
512                         callHandlerRemoved0(ctx);
513                     }
514                 });
515                 return ctx.handler();
516             }
517         }
518         // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
519         // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
520         // event handlers must be called after handlerAdded().
521         callHandlerAdded0(newCtx);
522         callHandlerRemoved0(ctx);
523         return ctx.handler();
524     }
525 
526     private static void replace0(AbstractChannelHandlerContext oldCtx, AbstractChannelHandlerContext newCtx) {
527         AbstractChannelHandlerContext prev = oldCtx.prev;
528         AbstractChannelHandlerContext next = oldCtx.next;
529         newCtx.prev = prev;
530         newCtx.next = next;
531 
532         // Finish the replacement of oldCtx with newCtx in the linked list.
533         // Note that this doesn't mean events will be sent to the new handler immediately
534         // because we are currently at the event handler thread and no more than one handler methods can be invoked
535         // at the same time (we ensured that in replace().)
536         prev.next = newCtx;
537         next.prev = newCtx;
538 
539         // update the reference to the replacement so forward of buffered content will work correctly
540         oldCtx.prev = newCtx;
541         oldCtx.next = newCtx;
542     }
543 
544     private static void checkMultiplicity(ChannelHandler handler) {
545         if (handler instanceof ChannelHandlerAdapter) {
546             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
547             if (!h.isSharable() && h.added) {
548                 throw new ChannelPipelineException(
549                         h.getClass().getName() +
550                         " is not a @Sharable handler, so can't be added or removed multiple times.");
551             }
552             h.added = true;
553         }
554     }
555 
556     private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
557         try {
558             ctx.callHandlerAdded();
559         } catch (Throwable t) {
560             boolean removed = false;
561             try {
562                 atomicRemoveFromHandlerList(ctx);
563                 ctx.callHandlerRemoved();
564                 removed = true;
565             } catch (Throwable t2) {
566                 if (logger.isWarnEnabled()) {
567                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
568                 }
569             }
570 
571             if (removed) {
572                 fireExceptionCaught(new ChannelPipelineException(
573                         ctx.handler().getClass().getName() +
574                         ".handlerAdded() has thrown an exception; removed.", t));
575             } else {
576                 fireExceptionCaught(new ChannelPipelineException(
577                         ctx.handler().getClass().getName() +
578                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
579             }
580         }
581     }
582 
583     private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
584         // Notify the complete removal.
585         try {
586             ctx.callHandlerRemoved();
587         } catch (Throwable t) {
588             fireExceptionCaught(new ChannelPipelineException(
589                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
590         }
591     }
592 
593     final void invokeHandlerAddedIfNeeded() {
594         assert channel.eventLoop().inEventLoop();
595         if (firstRegistration) {
596             firstRegistration = false;
597             // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
598             // that were added before the registration was done.
599             callHandlerAddedForAllHandlers();
600         }
601     }
602 
603     @Override
604     public final ChannelHandler first() {
605         ChannelHandlerContext first = firstContext();
606         if (first == null) {
607             return null;
608         }
609         return first.handler();
610     }
611 
612     @Override
613     public final ChannelHandlerContext firstContext() {
614         AbstractChannelHandlerContext first = head.next;
615         if (first == tail) {
616             return null;
617         }
618         return head.next;
619     }
620 
621     @Override
622     public final ChannelHandler last() {
623         AbstractChannelHandlerContext last = tail.prev;
624         if (last == head) {
625             return null;
626         }
627         return last.handler();
628     }
629 
630     @Override
631     public final ChannelHandlerContext lastContext() {
632         AbstractChannelHandlerContext last = tail.prev;
633         if (last == head) {
634             return null;
635         }
636         return last;
637     }
638 
639     @Override
640     public final ChannelHandler get(String name) {
641         ChannelHandlerContext ctx = context(name);
642         if (ctx == null) {
643             return null;
644         } else {
645             return ctx.handler();
646         }
647     }
648 
649     @SuppressWarnings("unchecked")
650     @Override
651     public final <T extends ChannelHandler> T get(Class<T> handlerType) {
652         ChannelHandlerContext ctx = context(handlerType);
653         if (ctx == null) {
654             return null;
655         } else {
656             return (T) ctx.handler();
657         }
658     }
659 
660     @Override
661     public final ChannelHandlerContext context(String name) {
662         return context0(ObjectUtil.checkNotNull(name, "name"));
663     }
664 
665     @Override
666     public final ChannelHandlerContext context(ChannelHandler handler) {
667         ObjectUtil.checkNotNull(handler, "handler");
668 
669         AbstractChannelHandlerContext ctx = head.next;
670         for (;;) {
671 
672             if (ctx == null) {
673                 return null;
674             }
675 
676             if (ctx.handler() == handler) {
677                 return ctx;
678             }
679 
680             ctx = ctx.next;
681         }
682     }
683 
684     @Override
685     public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
686         ObjectUtil.checkNotNull(handlerType, "handlerType");
687 
688         AbstractChannelHandlerContext ctx = head.next;
689         for (;;) {
690             if (ctx == null) {
691                 return null;
692             }
693             if (handlerType.isAssignableFrom(ctx.handler().getClass())) {
694                 return ctx;
695             }
696             ctx = ctx.next;
697         }
698     }
699 
700     @Override
701     public final List<String> names() {
702         List<String> list = new ArrayList<String>();
703         AbstractChannelHandlerContext ctx = head.next;
704         for (;;) {
705             if (ctx == null) {
706                 return list;
707             }
708             list.add(ctx.name());
709             ctx = ctx.next;
710         }
711     }
712 
713     @Override
714     public final Map<String, ChannelHandler> toMap() {
715         Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
716         AbstractChannelHandlerContext ctx = head.next;
717         for (;;) {
718             if (ctx == tail) {
719                 return map;
720             }
721             map.put(ctx.name(), ctx.handler());
722             ctx = ctx.next;
723         }
724     }
725 
726     @Override
727     public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
728         return toMap().entrySet().iterator();
729     }
730 
731     /**
732      * Returns the {@link String} representation of this pipeline.
733      */
734     @Override
735     public final String toString() {
736         StringBuilder buf = new StringBuilder()
737             .append(StringUtil.simpleClassName(this))
738             .append('{');
739         AbstractChannelHandlerContext ctx = head.next;
740         for (;;) {
741             if (ctx == tail) {
742                 break;
743             }
744 
745             buf.append('(')
746                .append(ctx.name())
747                .append(" = ")
748                .append(ctx.handler().getClass().getName())
749                .append(')');
750 
751             ctx = ctx.next;
752             if (ctx == tail) {
753                 break;
754             }
755 
756             buf.append(", ");
757         }
758         buf.append('}');
759         return buf.toString();
760     }
761 
762     @Override
763     public final ChannelPipeline fireChannelRegistered() {
764         AbstractChannelHandlerContext.invokeChannelRegistered(head);
765         return this;
766     }
767 
768     @Override
769     public final ChannelPipeline fireChannelUnregistered() {
770         AbstractChannelHandlerContext.invokeChannelUnregistered(head);
771         return this;
772     }
773 
774     /**
775      * Removes all handlers from the pipeline one by one from tail (exclusive) to head (exclusive) to trigger
776      * handlerRemoved().
777      *
778      * Note that we traverse up the pipeline ({@link #destroyUp(AbstractChannelHandlerContext, boolean)})
779      * before traversing down ({@link #destroyDown(Thread, AbstractChannelHandlerContext, boolean)}) so that
780      * the handlers are removed after all events are handled.
781      *
782      * See: https://github.com/netty/netty/issues/3156
783      */
784     private synchronized void destroy() {
785         destroyUp(head.next, false);
786     }
787 
788     private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
789         final Thread currentThread = Thread.currentThread();
790         final AbstractChannelHandlerContext tail = this.tail;
791         for (;;) {
792             if (ctx == tail) {
793                 destroyDown(currentThread, tail.prev, inEventLoop);
794                 break;
795             }
796 
797             final EventExecutor executor = ctx.executor();
798             if (!inEventLoop && !executor.inEventLoop(currentThread)) {
799                 final AbstractChannelHandlerContext finalCtx = ctx;
800                 executor.execute(new Runnable() {
801                     @Override
802                     public void run() {
803                         destroyUp(finalCtx, true);
804                     }
805                 });
806                 break;
807             }
808 
809             ctx = ctx.next;
810             inEventLoop = false;
811         }
812     }
813 
814     private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
815         // We have reached at tail; now traverse backwards.
816         final AbstractChannelHandlerContext head = this.head;
817         for (;;) {
818             if (ctx == head) {
819                 break;
820             }
821 
822             final EventExecutor executor = ctx.executor();
823             if (inEventLoop || executor.inEventLoop(currentThread)) {
824                 atomicRemoveFromHandlerList(ctx);
825                 callHandlerRemoved0(ctx);
826             } else {
827                 final AbstractChannelHandlerContext finalCtx = ctx;
828                 executor.execute(new Runnable() {
829                     @Override
830                     public void run() {
831                         destroyDown(Thread.currentThread(), finalCtx, true);
832                     }
833                 });
834                 break;
835             }
836 
837             ctx = ctx.prev;
838             inEventLoop = false;
839         }
840     }
841 
842     @Override
843     public final ChannelPipeline fireChannelActive() {
844         AbstractChannelHandlerContext.invokeChannelActive(head);
845         return this;
846     }
847 
848     @Override
849     public final ChannelPipeline fireChannelInactive() {
850         AbstractChannelHandlerContext.invokeChannelInactive(head);
851         return this;
852     }
853 
854     @Override
855     public final ChannelPipeline fireExceptionCaught(Throwable cause) {
856         AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
857         return this;
858     }
859 
860     @Override
861     public final ChannelPipeline fireUserEventTriggered(Object event) {
862         AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
863         return this;
864     }
865 
866     @Override
867     public final ChannelPipeline fireChannelRead(Object msg) {
868         AbstractChannelHandlerContext.invokeChannelRead(head, msg);
869         return this;
870     }
871 
872     @Override
873     public final ChannelPipeline fireChannelReadComplete() {
874         AbstractChannelHandlerContext.invokeChannelReadComplete(head);
875         return this;
876     }
877 
878     @Override
879     public final ChannelPipeline fireChannelWritabilityChanged() {
880         AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
881         return this;
882     }
883 
884     @Override
885     public final ChannelFuture bind(SocketAddress localAddress) {
886         return tail.bind(localAddress);
887     }
888 
889     @Override
890     public final ChannelFuture connect(SocketAddress remoteAddress) {
891         return tail.connect(remoteAddress);
892     }
893 
894     @Override
895     public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
896         return tail.connect(remoteAddress, localAddress);
897     }
898 
899     @Override
900     public final ChannelFuture disconnect() {
901         return tail.disconnect();
902     }
903 
904     @Override
905     public final ChannelFuture close() {
906         return tail.close();
907     }
908 
909     @Override
910     public final ChannelFuture deregister() {
911         return tail.deregister();
912     }
913 
914     @Override
915     public final ChannelPipeline flush() {
916         tail.flush();
917         return this;
918     }
919 
920     @Override
921     public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
922         return tail.bind(localAddress, promise);
923     }
924 
925     @Override
926     public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
927         return tail.connect(remoteAddress, promise);
928     }
929 
930     @Override
931     public final ChannelFuture connect(
932             SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
933         return tail.connect(remoteAddress, localAddress, promise);
934     }
935 
936     @Override
937     public final ChannelFuture disconnect(ChannelPromise promise) {
938         return tail.disconnect(promise);
939     }
940 
941     @Override
942     public final ChannelFuture close(ChannelPromise promise) {
943         return tail.close(promise);
944     }
945 
946     @Override
947     public final ChannelFuture deregister(final ChannelPromise promise) {
948         return tail.deregister(promise);
949     }
950 
951     @Override
952     public final ChannelPipeline read() {
953         tail.read();
954         return this;
955     }
956 
957     @Override
958     public final ChannelFuture write(Object msg) {
959         return tail.write(msg);
960     }
961 
962     @Override
963     public final ChannelFuture write(Object msg, ChannelPromise promise) {
964         return tail.write(msg, promise);
965     }
966 
967     @Override
968     public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
969         return tail.writeAndFlush(msg, promise);
970     }
971 
972     @Override
973     public final ChannelFuture writeAndFlush(Object msg) {
974         return tail.writeAndFlush(msg);
975     }
976 
977     @Override
978     public final ChannelPromise newPromise() {
979         return new DefaultChannelPromise(channel);
980     }
981 
982     @Override
983     public final ChannelProgressivePromise newProgressivePromise() {
984         return new DefaultChannelProgressivePromise(channel);
985     }
986 
987     @Override
988     public final ChannelFuture newSucceededFuture() {
989         return succeededFuture;
990     }
991 
992     @Override
993     public final ChannelFuture newFailedFuture(Throwable cause) {
994         return new FailedChannelFuture(channel, null, cause);
995     }
996 
997     @Override
998     public final ChannelPromise voidPromise() {
999         return voidPromise;
1000     }
1001 
1002     private void checkDuplicateName(String name) {
1003         if (context0(name) != null) {
1004             throw new IllegalArgumentException("Duplicate handler name: " + name);
1005         }
1006     }
1007 
1008     private AbstractChannelHandlerContext context0(String name) {
1009         AbstractChannelHandlerContext context = head.next;
1010         while (context != tail) {
1011             if (context.name().equals(name)) {
1012                 return context;
1013             }
1014             context = context.next;
1015         }
1016         return null;
1017     }
1018 
1019     private AbstractChannelHandlerContext getContextOrDie(String name) {
1020         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
1021         if (ctx == null) {
1022             throw new NoSuchElementException(name);
1023         } else {
1024             return ctx;
1025         }
1026     }
1027 
1028     private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
1029         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
1030         if (ctx == null) {
1031             throw new NoSuchElementException(handler.getClass().getName());
1032         } else {
1033             return ctx;
1034         }
1035     }
1036 
1037     private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
1038         AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
1039         if (ctx == null) {
1040             throw new NoSuchElementException(handlerType.getName());
1041         } else {
1042             return ctx;
1043         }
1044     }
1045 
1046     private void callHandlerAddedForAllHandlers() {
1047         final PendingHandlerCallback pendingHandlerCallbackHead;
1048         synchronized (this) {
1049             assert !registered;
1050 
1051             // This Channel itself was registered.
1052             registered = true;
1053 
1054             pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
1055             // Null out so it can be GC'ed.
1056             this.pendingHandlerCallbackHead = null;
1057         }
1058 
1059         // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
1060         // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
1061         // the EventLoop.
1062         PendingHandlerCallback task = pendingHandlerCallbackHead;
1063         while (task != null) {
1064             task.execute();
1065             task = task.next;
1066         }
1067     }
1068 
1069     private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
1070         assert !registered;
1071 
1072         PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
1073         PendingHandlerCallback pending = pendingHandlerCallbackHead;
1074         if (pending == null) {
1075             pendingHandlerCallbackHead = task;
1076         } else {
1077             // Find the tail of the linked-list.
1078             while (pending.next != null) {
1079                 pending = pending.next;
1080             }
1081             pending.next = task;
1082         }
1083     }
1084 
1085     private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
1086         newCtx.setAddPending();
1087         executor.execute(new Runnable() {
1088             @Override
1089             public void run() {
1090                 callHandlerAdded0(newCtx);
1091             }
1092         });
1093     }
1094 
1095     /**
1096      * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
1097      * in {@link ChannelHandler#exceptionCaught(ChannelHandlerContext, Throwable)}.
1098      */
1099     protected void onUnhandledInboundException(Throwable cause) {
1100         try {
1101             logger.warn(
1102                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
1103                             "It usually means the last handler in the pipeline did not handle the exception.",
1104                     cause);
1105         } finally {
1106             ReferenceCountUtil.release(cause);
1107         }
1108     }
1109 
1110     /**
1111      * Called once the {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)}event hit
1112      * the end of the {@link ChannelPipeline}.
1113      */
1114     protected void onUnhandledInboundChannelActive() {
1115     }
1116 
1117     /**
1118      * Called once the {@link ChannelInboundHandler#channelInactive(ChannelHandlerContext)} event hit
1119      * the end of the {@link ChannelPipeline}.
1120      */
1121     protected void onUnhandledInboundChannelInactive() {
1122     }
1123 
1124     /**
1125      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1126      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1127      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1128      */
1129     protected void onUnhandledInboundMessage(Object msg) {
1130         try {
1131             logger.debug(
1132                     "Discarded inbound message {} that reached at the tail of the pipeline. " +
1133                             "Please check your pipeline configuration.", msg);
1134         } finally {
1135             ReferenceCountUtil.release(msg);
1136         }
1137     }
1138 
1139     /**
1140      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
1141      * in {@link ChannelInboundHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
1142      * to call {@link ReferenceCountUtil#release(Object)} on the given msg at some point.
1143      */
1144     protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1145         onUnhandledInboundMessage(msg);
1146         if (logger.isDebugEnabled()) {
1147             logger.debug("Discarded message pipeline : {}. Channel : {}.",
1148                          ctx.pipeline().names(), ctx.channel());
1149         }
1150     }
1151 
1152     /**
1153      * Called once the {@link ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)} event hit
1154      * the end of the {@link ChannelPipeline}.
1155      */
1156     protected void onUnhandledInboundChannelReadComplete() {
1157     }
1158 
1159     /**
1160      * Called once an user event hit the end of the {@link ChannelPipeline} without been handled by the user
1161      * in {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}. This method is responsible
1162      * to call {@link ReferenceCountUtil#release(Object)} on the given event at some point.
1163      */
1164     protected void onUnhandledInboundUserEventTriggered(Object evt) {
1165         // This may not be a configuration error and so don't log anything.
1166         // The event may be superfluous for the current pipeline configuration.
1167         ReferenceCountUtil.release(evt);
1168     }
1169 
1170     /**
1171      * Called once the {@link ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)} event hit
1172      * the end of the {@link ChannelPipeline}.
1173      */
1174     protected void onUnhandledChannelWritabilityChanged() {
1175     }
1176 
1177     protected void incrementPendingOutboundBytes(long size) {
1178         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1179         if (buffer != null) {
1180             buffer.incrementPendingOutboundBytes(size);
1181         }
1182     }
1183 
1184     protected void decrementPendingOutboundBytes(long size) {
1185         ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
1186         if (buffer != null) {
1187             buffer.decrementPendingOutboundBytes(size);
1188         }
1189     }
1190 
1191     // A special catch-all handler that handles both bytes and messages.
1192     final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
1193 
1194         TailContext(DefaultChannelPipeline pipeline) {
1195             super(pipeline, null, TAIL_NAME, TailContext.class);
1196             setAddComplete();
1197         }
1198 
1199         @Override
1200         public ChannelHandler handler() {
1201             return this;
1202         }
1203 
1204         @Override
1205         public void channelRegistered(ChannelHandlerContext ctx) { }
1206 
1207         @Override
1208         public void channelUnregistered(ChannelHandlerContext ctx) { }
1209 
1210         @Override
1211         public void channelActive(ChannelHandlerContext ctx) {
1212             onUnhandledInboundChannelActive();
1213         }
1214 
1215         @Override
1216         public void channelInactive(ChannelHandlerContext ctx) {
1217             onUnhandledInboundChannelInactive();
1218         }
1219 
1220         @Override
1221         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1222             onUnhandledChannelWritabilityChanged();
1223         }
1224 
1225         @Override
1226         public void handlerAdded(ChannelHandlerContext ctx) { }
1227 
1228         @Override
1229         public void handlerRemoved(ChannelHandlerContext ctx) { }
1230 
1231         @Override
1232         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1233             onUnhandledInboundUserEventTriggered(evt);
1234         }
1235 
1236         @Override
1237         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1238             onUnhandledInboundException(cause);
1239         }
1240 
1241         @Override
1242         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1243             onUnhandledInboundMessage(ctx, msg);
1244         }
1245 
1246         @Override
1247         public void channelReadComplete(ChannelHandlerContext ctx) {
1248             onUnhandledInboundChannelReadComplete();
1249         }
1250     }
1251 
1252     final class HeadContext extends AbstractChannelHandlerContext
1253             implements ChannelOutboundHandler, ChannelInboundHandler {
1254 
1255         private final Unsafe unsafe;
1256 
1257         HeadContext(DefaultChannelPipeline pipeline) {
1258             super(pipeline, null, HEAD_NAME, HeadContext.class);
1259             unsafe = pipeline.channel().unsafe();
1260             setAddComplete();
1261         }
1262 
1263         @Override
1264         public ChannelHandler handler() {
1265             return this;
1266         }
1267 
1268         @Override
1269         public void handlerAdded(ChannelHandlerContext ctx) {
1270             // NOOP
1271         }
1272 
1273         @Override
1274         public void handlerRemoved(ChannelHandlerContext ctx) {
1275             // NOOP
1276         }
1277 
1278         @Override
1279         public void bind(
1280                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
1281             unsafe.bind(localAddress, promise);
1282         }
1283 
1284         @Override
1285         public void connect(
1286                 ChannelHandlerContext ctx,
1287                 SocketAddress remoteAddress, SocketAddress localAddress,
1288                 ChannelPromise promise) {
1289             unsafe.connect(remoteAddress, localAddress, promise);
1290         }
1291 
1292         @Override
1293         public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
1294             unsafe.disconnect(promise);
1295         }
1296 
1297         @Override
1298         public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
1299             unsafe.close(promise);
1300         }
1301 
1302         @Override
1303         public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
1304             unsafe.deregister(promise);
1305         }
1306 
1307         @Override
1308         public void read(ChannelHandlerContext ctx) {
1309             unsafe.beginRead();
1310         }
1311 
1312         @Override
1313         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1314             unsafe.write(msg, promise);
1315         }
1316 
1317         @Override
1318         public void flush(ChannelHandlerContext ctx) {
1319             unsafe.flush();
1320         }
1321 
1322         @Override
1323         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1324             ctx.fireExceptionCaught(cause);
1325         }
1326 
1327         @Override
1328         public void channelRegistered(ChannelHandlerContext ctx) {
1329             invokeHandlerAddedIfNeeded();
1330             ctx.fireChannelRegistered();
1331         }
1332 
1333         @Override
1334         public void channelUnregistered(ChannelHandlerContext ctx) {
1335             ctx.fireChannelUnregistered();
1336 
1337             // Remove all handlers sequentially if channel is closed and unregistered.
1338             if (!channel.isOpen()) {
1339                 destroy();
1340             }
1341         }
1342 
1343         @Override
1344         public void channelActive(ChannelHandlerContext ctx) {
1345             ctx.fireChannelActive();
1346 
1347             readIfIsAutoRead();
1348         }
1349 
1350         @Override
1351         public void channelInactive(ChannelHandlerContext ctx) {
1352             ctx.fireChannelInactive();
1353         }
1354 
1355         @Override
1356         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1357             ctx.fireChannelRead(msg);
1358         }
1359 
1360         @Override
1361         public void channelReadComplete(ChannelHandlerContext ctx) {
1362             ctx.fireChannelReadComplete();
1363 
1364             readIfIsAutoRead();
1365         }
1366 
1367         private void readIfIsAutoRead() {
1368             if (channel.config().isAutoRead()) {
1369                 channel.read();
1370             }
1371         }
1372 
1373         @Override
1374         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
1375             ctx.fireUserEventTriggered(evt);
1376         }
1377 
1378         @Override
1379         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1380             ctx.fireChannelWritabilityChanged();
1381         }
1382     }
1383 
1384     private abstract static class PendingHandlerCallback implements Runnable {
1385         final AbstractChannelHandlerContext ctx;
1386         PendingHandlerCallback next;
1387 
1388         PendingHandlerCallback(AbstractChannelHandlerContext ctx) {
1389             this.ctx = ctx;
1390         }
1391 
1392         abstract void execute();
1393     }
1394 
1395     private final class PendingHandlerAddedTask extends PendingHandlerCallback {
1396 
1397         PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
1398             super(ctx);
1399         }
1400 
1401         @Override
1402         public void run() {
1403             callHandlerAdded0(ctx);
1404         }
1405 
1406         @Override
1407         void execute() {
1408             EventExecutor executor = ctx.executor();
1409             if (executor.inEventLoop()) {
1410                 callHandlerAdded0(ctx);
1411             } else {
1412                 try {
1413                     executor.execute(this);
1414                 } catch (RejectedExecutionException e) {
1415                     if (logger.isWarnEnabled()) {
1416                         logger.warn(
1417                                 "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
1418                                 executor, ctx.name(), e);
1419                     }
1420                     atomicRemoveFromHandlerList(ctx);
1421                     ctx.setRemoved();
1422                 }
1423             }
1424         }
1425     }
1426 
1427     private final class PendingHandlerRemovedTask extends PendingHandlerCallback {
1428 
1429         PendingHandlerRemovedTask(AbstractChannelHandlerContext ctx) {
1430             super(ctx);
1431         }
1432 
1433         @Override
1434         public void run() {
1435             callHandlerRemoved0(ctx);
1436         }
1437 
1438         @Override
1439         void execute() {
1440             EventExecutor executor = ctx.executor();
1441             if (executor.inEventLoop()) {
1442                 callHandlerRemoved0(ctx);
1443             } else {
1444                 try {
1445                     executor.execute(this);
1446                 } catch (RejectedExecutionException e) {
1447                     if (logger.isWarnEnabled()) {
1448                         logger.warn(
1449                                 "Can't invoke handlerRemoved() as the EventExecutor {} rejected it," +
1450                                         " removing handler {}.", executor, ctx.name(), e);
1451                     }
1452                     // remove0(...) was call before so just call AbstractChannelHandlerContext.setRemoved().
1453                     ctx.setRemoved();
1454                 }
1455             }
1456         }
1457     }
1458 }