View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel;
17  
18  import io.netty5.util.Resource;
19  import io.netty5.util.ResourceLeakDetector;
20  import io.netty5.util.concurrent.EventExecutor;
21  import io.netty5.util.concurrent.FastThreadLocal;
22  import io.netty5.util.concurrent.Future;
23  import io.netty5.util.concurrent.Promise;
24  import io.netty5.util.internal.StringUtil;
25  import io.netty5.util.internal.logging.InternalLogger;
26  import io.netty5.util.internal.logging.InternalLoggerFactory;
27  
28  import java.net.SocketAddress;
29  import java.util.ArrayList;
30  import java.util.Collections;
31  import java.util.Iterator;
32  import java.util.LinkedHashMap;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.NoSuchElementException;
36  import java.util.WeakHashMap;
37  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
38  import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
39  import java.util.function.IntSupplier;
40  import java.util.function.Predicate;
41  
42  import static java.util.Objects.requireNonNull;
43  import static io.netty5.channel.DefaultChannelHandlerContext.safeExecute;
44  
45  /**
46   * The default {@link ChannelPipeline} implementation.  It is usually created
47   * by a {@link Channel} implementation when the {@link Channel} is created.
48   */
49  public abstract class DefaultChannelPipeline implements ChannelPipeline {
50  
51      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
52      private static final String HEAD_NAME = generateName0(HeadHandler.class);
53      private static final String TAIL_NAME = generateName0(TailHandler.class);
54  
55      private static final ChannelHandler HEAD_HANDLER = new HeadHandler();
56      private static final ChannelHandler TAIL_HANDLER = new TailHandler();
57  
58      private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
59              new FastThreadLocal<>() {
60                  @Override
61                  protected Map<Class<?>, String> initialValue() {
62                      return new WeakHashMap<>();
63                  }
64              };
65  
66      private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
67              AtomicReferenceFieldUpdater.newUpdater(
68                      DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
69      private final DefaultChannelHandlerContext head;
70      private final DefaultChannelHandlerContext tail;
71  
72      private final Channel channel;
73      private final Future<Void> succeededFuture;
74      private final boolean touch = ResourceLeakDetector.isEnabled();
75      private final List<DefaultChannelHandlerContext> handlers = new ArrayList<>(4);
76  
77      private volatile MessageSizeEstimator.Handle estimatorHandle;
78  
79      private static final AtomicLongFieldUpdater<DefaultChannelPipeline> TOTAL_PENDING_OUTBOUND_BYTES_UPDATER =
80              AtomicLongFieldUpdater.newUpdater(DefaultChannelPipeline.class, "pendingOutboundBytes");
81  
82      private volatile long pendingOutboundBytes;
83  
84      protected DefaultChannelPipeline(Channel channel) {
85          this.channel = requireNonNull(channel, "channel");
86          succeededFuture = channel.executor().newSucceededFuture(null);
87  
88          tail = new DefaultChannelHandlerContext(this, TAIL_NAME, TAIL_HANDLER);
89          head = new DefaultChannelHandlerContext(this, HEAD_NAME, HEAD_HANDLER);
90  
91          head.next = tail;
92          tail.prev = head;
93          head.setAddComplete();
94          tail.setAddComplete();
95      }
96  
97      final MessageSizeEstimator.Handle estimatorHandle() {
98          MessageSizeEstimator.Handle handle = estimatorHandle;
99          if (handle == null) {
100             handle = channel.getOption(ChannelOption.MESSAGE_SIZE_ESTIMATOR).newHandle();
101             if (!ESTIMATOR.compareAndSet(this, null, handle)) {
102                 handle = estimatorHandle;
103             }
104         }
105         return handle;
106     }
107 
108     final Object touch(Object msg, DefaultChannelHandlerContext next) {
109         if (touch) {
110             Resource.touch(msg, next);
111         }
112         return msg;
113     }
114 
115     private DefaultChannelHandlerContext newContext(String name, ChannelHandler handler) {
116         checkMultiplicity(handler);
117         if (name == null) {
118             name = generateName(handler);
119         }
120         return new DefaultChannelHandlerContext(this, name, handler);
121     }
122 
123     private void checkDuplicateName(String name) {
124         if (context(name) != null) {
125             throw new IllegalArgumentException("Duplicate handler name: " + name);
126         }
127     }
128 
129     private static int checkNoSuchElement(int idx, String name) {
130         if (idx == -1) {
131             throw new NoSuchElementException(name);
132         }
133         return idx;
134     }
135 
136     @Override
137     public final Channel channel() {
138         return channel;
139     }
140 
141     @Override
142     public final EventExecutor executor() {
143         return channel().executor();
144     }
145 
146     @Override
147     public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
148         DefaultChannelHandlerContext newCtx = newContext(name, handler);
149         EventExecutor executor = executor();
150         boolean inEventLoop = executor.inEventLoop();
151         synchronized (handlers) {
152             checkDuplicateName(newCtx.name());
153 
154             handlers.add(0, newCtx);
155             if (!inEventLoop) {
156                 try {
157                     executor.execute(() -> addFirst0(newCtx));
158                     return this;
159                 } catch (Throwable cause) {
160                     handlers.remove(0);
161                     throw cause;
162                 }
163             }
164         }
165 
166         addFirst0(newCtx);
167         return this;
168     }
169 
170     private void addFirst0(DefaultChannelHandlerContext newCtx) {
171         DefaultChannelHandlerContext nextCtx = head.next;
172         newCtx.prev = head;
173         newCtx.next = nextCtx;
174         head.next = newCtx;
175         nextCtx.prev = newCtx;
176         callHandlerAdded0(newCtx);
177     }
178 
179     @Override
180     public final ChannelPipeline addLast(String name, ChannelHandler handler) {
181         DefaultChannelHandlerContext newCtx = newContext(name, handler);
182         EventExecutor executor = executor();
183         boolean inEventLoop = executor.inEventLoop();
184         synchronized (handlers) {
185             checkDuplicateName(newCtx.name());
186 
187             handlers.add(newCtx);
188             if (!inEventLoop) {
189                 try {
190                     executor.execute(() -> addLast0(newCtx));
191                     return this;
192                 } catch (Throwable cause) {
193                     handlers.remove(handlers.size() - 1);
194                     throw cause;
195                 }
196             }
197         }
198 
199         addLast0(newCtx);
200         return this;
201     }
202 
203     private void addLast0(DefaultChannelHandlerContext newCtx) {
204         DefaultChannelHandlerContext prev = tail.prev;
205         newCtx.prev = prev;
206         newCtx.next = tail;
207         prev.next = newCtx;
208         tail.prev = newCtx;
209         callHandlerAdded0(newCtx);
210     }
211 
212     @Override
213     public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
214         final DefaultChannelHandlerContext ctx;
215 
216         DefaultChannelHandlerContext newCtx = newContext(name, handler);
217         EventExecutor executor = executor();
218         boolean inEventLoop = executor.inEventLoop();
219         synchronized (handlers) {
220             int i = checkNoSuchElement(findCtxIdx(context -> context.name().equals(baseName)), baseName);
221             checkDuplicateName(newCtx.name());
222 
223             ctx = handlers.get(i);
224             handlers.add(i, newCtx);
225             if (!inEventLoop) {
226                 try {
227                     executor.execute(() -> addBefore0(ctx, newCtx));
228                     return this;
229                 } catch (Throwable cause) {
230                     handlers.remove(i);
231                     throw cause;
232                 }
233             }
234         }
235 
236         addBefore0(ctx, newCtx);
237         return this;
238     }
239 
240     private void addBefore0(DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
241         newCtx.prev = ctx.prev;
242         newCtx.next = ctx;
243         ctx.prev.next = newCtx;
244         ctx.prev = newCtx;
245         callHandlerAdded0(newCtx);
246     }
247 
248     @Override
249     public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
250         final DefaultChannelHandlerContext ctx;
251 
252         if (name == null) {
253             name = generateName(handler);
254         }
255 
256         DefaultChannelHandlerContext newCtx = newContext(name, handler);
257         EventExecutor executor = executor();
258         boolean inEventLoop = executor.inEventLoop();
259         synchronized (handlers) {
260             int i = checkNoSuchElement(findCtxIdx(context -> context.name().equals(baseName)), baseName);
261 
262             checkDuplicateName(newCtx.name());
263 
264             ctx = handlers.get(i);
265             handlers.add(i + 1, newCtx);
266             if (!inEventLoop) {
267                 try {
268                     executor.execute(() -> addAfter0(ctx, newCtx));
269                     return this;
270                 } catch (Throwable cause) {
271                     handlers.remove(i + 1);
272                     throw cause;
273                 }
274             }
275         }
276 
277         addAfter0(ctx, newCtx);
278         return this;
279     }
280 
281     private void addAfter0(DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
282         newCtx.prev = ctx;
283         newCtx.next = ctx.next;
284         ctx.next.prev = newCtx;
285         ctx.next = newCtx;
286         callHandlerAdded0(newCtx);
287     }
288 
289     public final ChannelPipeline addFirst(ChannelHandler handler) {
290         return addFirst(null, handler);
291     }
292 
293     @Override
294     public final ChannelPipeline addFirst(ChannelHandler... handlers) {
295         requireNonNull(handlers, "handlers");
296 
297         for (int i = handlers.length - 1; i >= 0; i--) {
298             ChannelHandler h = handlers[i];
299             if (h != null) {
300                 addFirst(null, h);
301             }
302         }
303 
304         return this;
305     }
306 
307     public final ChannelPipeline addLast(ChannelHandler handler) {
308         return addLast(null, handler);
309     }
310 
311     @Override
312     public final ChannelPipeline addLast(ChannelHandler... handlers) {
313         requireNonNull(handlers, "handlers");
314 
315         for (ChannelHandler h : handlers) {
316             if (h != null) {
317                 addLast(null, h);
318             }
319         }
320 
321         return this;
322     }
323 
324     private String generateName(ChannelHandler handler) {
325         Map<Class<?>, String> cache = nameCaches.get();
326         Class<?> handlerType = handler.getClass();
327         String name = cache.get(handlerType);
328         if (name == null) {
329             name = generateName0(handlerType);
330             cache.put(handlerType, name);
331         }
332 
333         synchronized (handlers) {
334             // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
335             // any name conflicts.  Note that we don't cache the names generated here.
336             if (context(name) != null) {
337                 String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
338                 for (int i = 1;; i ++) {
339                     String newName = baseName + i;
340                     if (context(newName) == null) {
341                         name = newName;
342                         break;
343                     }
344                 }
345             }
346         }
347 
348         return name;
349     }
350 
351     private static String generateName0(Class<?> handlerType) {
352         return StringUtil.simpleClassName(handlerType) + "#0";
353     }
354 
355     private int findCtxIdx(Predicate<DefaultChannelHandlerContext> predicate) {
356         for (int i = 0; i < handlers.size(); i++) {
357             if (predicate.test(handlers.get(i))) {
358                 return i;
359             }
360         }
361         return -1;
362     }
363 
364     @Override
365     public final ChannelPipeline remove(ChannelHandler handler) {
366         final DefaultChannelHandlerContext ctx;
367         EventExecutor executor = executor();
368         boolean inEventLoop = executor.inEventLoop();
369         synchronized (handlers) {
370             int idx = checkNoSuchElement(findCtxIdx(context -> context.handler() == handler), null);
371             ctx = handlers.remove(idx);
372             assert ctx != null;
373 
374             if (!inEventLoop) {
375                 scheduleRemove(idx, ctx);
376                 return this;
377             }
378         }
379 
380         remove0(ctx);
381         return this;
382     }
383 
384     @Override
385     public final ChannelHandler remove(String name) {
386         final DefaultChannelHandlerContext ctx;
387         EventExecutor executor = executor();
388         boolean inEventLoop = executor.inEventLoop();
389         synchronized (handlers) {
390             int idx = checkNoSuchElement(findCtxIdx(context -> context.name().equals(name)), name);
391             ctx = handlers.remove(idx);
392             assert ctx != null;
393 
394             if (!inEventLoop) {
395                 return scheduleRemove(idx, ctx);
396             }
397         }
398 
399         remove0(ctx);
400         return ctx.handler();
401     }
402 
403     @SuppressWarnings("unchecked")
404     @Override
405     public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
406         final DefaultChannelHandlerContext ctx;
407         EventExecutor executor = executor();
408         boolean inEventLoop = executor.inEventLoop();
409         synchronized (handlers) {
410             int idx = checkNoSuchElement(findCtxIdx(
411                     context -> handlerType.isAssignableFrom(context.handler().getClass())), null);
412             ctx = handlers.remove(idx);
413             assert ctx != null;
414 
415             if (!inEventLoop) {
416                 return scheduleRemove(idx, ctx);
417             }
418         }
419 
420         remove0(ctx);
421         return (T) ctx.handler();
422     }
423 
424     @Override
425     public final <T extends ChannelHandler> T removeIfExists(String name) {
426         return removeIfExists(() -> findCtxIdx(context -> name.equals(context.name())));
427     }
428 
429     @Override
430     public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
431         return removeIfExists(() -> findCtxIdx(
432                 context -> handlerType.isAssignableFrom(context.handler().getClass())));
433     }
434 
435     @Override
436     public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
437         return removeIfExists(() -> findCtxIdx(context -> handler == context.handler()));
438     }
439 
440     @SuppressWarnings("unchecked")
441     private <T extends ChannelHandler> T removeIfExists(IntSupplier idxSupplier) {
442         final DefaultChannelHandlerContext ctx;
443         EventExecutor executor = executor();
444         boolean inEventLoop = executor.inEventLoop();
445         synchronized (handlers) {
446             int idx = idxSupplier.getAsInt();
447             if (idx == -1) {
448                 return null;
449             }
450             ctx = handlers.remove(idx);
451             assert ctx != null;
452 
453             if (!inEventLoop) {
454                 return scheduleRemove(idx, ctx);
455             }
456         }
457         remove0(ctx);
458         return (T) ctx.handler();
459     }
460 
461     private void remove0(DefaultChannelHandlerContext ctx) {
462         try {
463             callHandlerRemoved0(ctx);
464         } finally {
465             ctx.remove(true);
466         }
467     }
468 
469     @Override
470     public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
471         replace(ctx -> ctx.handler() == oldHandler, newName, newHandler);
472         return this;
473     }
474 
475     @Override
476     public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
477         return replace(ctx -> ctx.name().equals(oldName), newName, newHandler);
478     }
479 
480     @Override
481     @SuppressWarnings("unchecked")
482     public final <T extends ChannelHandler> T replace(
483             Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
484         return (T) replace(ctx -> oldHandlerType.isAssignableFrom(ctx.handler().getClass()), newName, newHandler);
485     }
486 
487     private ChannelHandler replace(
488             Predicate<DefaultChannelHandlerContext> predicate, String newName, ChannelHandler newHandler) {
489         DefaultChannelHandlerContext oldCtx;
490         DefaultChannelHandlerContext newCtx = newContext(newName, newHandler);
491         EventExecutor executor = executor();
492         boolean inEventLoop = executor.inEventLoop();
493         synchronized (handlers) {
494             int idx = checkNoSuchElement(findCtxIdx(predicate), null);
495             oldCtx = handlers.get(idx);
496             assert oldCtx != head && oldCtx != tail && oldCtx != null;
497 
498             if (!oldCtx.name().equals(newCtx.name())) {
499                 checkDuplicateName(newCtx.name());
500             }
501             DefaultChannelHandlerContext removed = handlers.set(idx, newCtx);
502             assert removed != null;
503 
504             if (!inEventLoop) {
505                 try {
506                     executor.execute(() -> replace0(oldCtx, newCtx));
507                     return oldCtx.handler();
508                 } catch (Throwable cause) {
509                     handlers.set(idx, oldCtx);
510                     throw cause;
511                 }
512             }
513         }
514 
515         replace0(oldCtx, newCtx);
516         return oldCtx.handler();
517     }
518 
519     private void replace0(DefaultChannelHandlerContext oldCtx, DefaultChannelHandlerContext newCtx) {
520         DefaultChannelHandlerContext prev = oldCtx.prev;
521         DefaultChannelHandlerContext next = oldCtx.next;
522         newCtx.prev = prev;
523         newCtx.next = next;
524 
525         // Finish the replacement of oldCtx with newCtx in the linked list.
526         // Note that this doesn't mean events will be sent to the new handler immediately
527         // because we are currently at the event handler thread and no more than one handler methods can be invoked
528         // at the same time (we ensured that in replace().)
529         prev.next = newCtx;
530         next.prev = newCtx;
531 
532         // update the reference to the replacement so forward of buffered content will work correctly
533         oldCtx.prev = newCtx;
534         oldCtx.next = newCtx;
535 
536         try {
537             // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
538             // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and those
539             // event handlers must be called after handlerAdded().
540             callHandlerAdded0(newCtx);
541             callHandlerRemoved0(oldCtx);
542         } finally {
543             oldCtx.remove(false);
544         }
545     }
546 
547     private static void checkMultiplicity(ChannelHandler handler) {
548         if (handler instanceof ChannelHandlerAdapter) {
549             ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
550             if (!h.isSharable() && h.added) {
551                 throw new ChannelPipelineException(
552                         h.getClass().getName() +
553                         " is not a @Sharable handler, so can't be added or removed multiple times.");
554             }
555             h.added = true;
556         }
557     }
558 
559     private void callHandlerAdded0(final DefaultChannelHandlerContext ctx) {
560         try {
561             ctx.callHandlerAdded();
562         } catch (Throwable t) {
563             boolean removed = false;
564             try {
565                 synchronized (handlers) {
566                     handlers.remove(ctx);
567                 }
568 
569                 ctx.callHandlerRemoved();
570 
571                 removed = true;
572             } catch (Throwable t2) {
573                 if (logger.isWarnEnabled()) {
574                     logger.warn("Failed to remove a handler: " + ctx.name(), t2);
575                 }
576             } finally {
577                 ctx.remove(true);
578             }
579 
580             if (removed) {
581                 fireChannelExceptionCaught(new ChannelPipelineException(
582                         ctx.handler().getClass().getName() +
583                         ".handlerAdded() has thrown an exception; removed.", t));
584             } else {
585                 fireChannelExceptionCaught(new ChannelPipelineException(
586                         ctx.handler().getClass().getName() +
587                         ".handlerAdded() has thrown an exception; also failed to remove.", t));
588             }
589         }
590     }
591 
592     private void callHandlerRemoved0(final DefaultChannelHandlerContext ctx) {
593         // Notify the complete removal.
594         try {
595             ctx.callHandlerRemoved();
596         } catch (Throwable t) {
597             fireChannelExceptionCaught(new ChannelPipelineException(
598                     ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
599         }
600     }
601 
602     @Override
603     public final ChannelHandler get(String name) {
604         ChannelHandlerContext ctx = context(name);
605         return ctx == null ? null : ctx.handler();
606     }
607 
608     @SuppressWarnings("unchecked")
609     @Override
610     public final <T extends ChannelHandler> T get(Class<T> handlerType) {
611         ChannelHandlerContext ctx = context(handlerType);
612         return ctx == null ? null : (T) ctx.handler();
613     }
614 
615     private DefaultChannelHandlerContext findCtx(Predicate<DefaultChannelHandlerContext> predicate) {
616         for (int i = 0; i < handlers.size(); i++) {
617             DefaultChannelHandlerContext ctx = handlers.get(i);
618             if (predicate.test(ctx)) {
619                 return ctx;
620             }
621         }
622         return null;
623     }
624 
625     @Override
626     public final ChannelHandlerContext context(String name) {
627         requireNonNull(name, "name");
628 
629         synchronized (handlers) {
630             return findCtx(ctx -> ctx.name().equals(name));
631         }
632     }
633 
634     @Override
635     public final ChannelHandlerContext context(ChannelHandler handler) {
636         requireNonNull(handler, "handler");
637 
638         synchronized (handlers) {
639             return findCtx(ctx -> ctx.handler() == handler);
640         }
641     }
642 
643     @Override
644     public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
645         requireNonNull(handlerType, "handlerType");
646 
647         synchronized (handlers) {
648             return findCtx(ctx -> handlerType.isAssignableFrom(ctx.handler().getClass()));
649         }
650     }
651 
652     @Override
653     public final List<String> names() {
654         synchronized (handlers) {
655             List<String> names = new ArrayList<>(handlers.size());
656             for (int i = 0; i < handlers.size(); i++) {
657                 names.add(handlers.get(i).name());
658             }
659             return names;
660         }
661     }
662 
663     /**
664      * Returns the {@link String} representation of this pipeline.
665      */
666     @Override
667     public final String toString() {
668         StringBuilder buf = new StringBuilder()
669             .append(StringUtil.simpleClassName(this))
670             .append('{');
671         synchronized (handlers) {
672             if (!handlers.isEmpty())  {
673                 for (int i = 0; i < handlers.size(); i++) {
674                     DefaultChannelHandlerContext ctx = handlers.get(i);
675 
676                     buf.append('(')
677                             .append(ctx.name())
678                             .append(" = ")
679                             .append(ctx.handler().getClass().getName())
680                             .append("), ");
681                 }
682                 buf.setLength(buf.length() - 2);
683             }
684         }
685         buf.append('}');
686         return buf.toString();
687     }
688 
689     @Override
690     public ChannelHandler removeFirst() {
691         final DefaultChannelHandlerContext ctx;
692         EventExecutor executor = executor();
693         boolean inEventLoop = executor.inEventLoop();
694         synchronized (handlers) {
695             if (handlers.isEmpty()) {
696                 throw new NoSuchElementException();
697             }
698             int idx = 0;
699 
700             ctx = handlers.remove(idx);
701             assert ctx != null;
702 
703             if (!inEventLoop) {
704                 return scheduleRemove(idx, ctx);
705             }
706         }
707         remove0(ctx);
708         return ctx.handler();
709     }
710 
711     @Override
712     public ChannelHandler removeLast() {
713         final DefaultChannelHandlerContext ctx;
714         EventExecutor executor = executor();
715         boolean inEventLoop = executor.inEventLoop();
716         synchronized (handlers) {
717             if (handlers.isEmpty()) {
718                 return null;
719             }
720             int idx = handlers.size() - 1;
721 
722             ctx = handlers.remove(idx);
723             assert ctx != null;
724 
725             if (!inEventLoop) {
726                 return scheduleRemove(idx, ctx);
727             }
728         }
729         remove0(ctx);
730         return ctx.handler();
731     }
732 
733     @SuppressWarnings("unchecked")
734     private <T extends ChannelHandler> T scheduleRemove(int idx, DefaultChannelHandlerContext ctx) {
735         try {
736             ctx.executor().execute(() -> remove0(ctx));
737             return (T) ctx.handler();
738         } catch (Throwable cause) {
739             handlers.add(idx, ctx);
740             throw cause;
741         }
742     }
743 
744     @Override
745     public ChannelHandler first() {
746         ChannelHandlerContext ctx = firstContext();
747         return ctx == null ? null : ctx.handler();
748     }
749 
750     @Override
751     public ChannelHandlerContext firstContext() {
752         synchronized (handlers) {
753             return handlers.isEmpty() ? null : handlers.get(0);
754         }
755     }
756 
757     @Override
758     public ChannelHandler last() {
759         ChannelHandlerContext ctx = lastContext();
760         return ctx == null ? null : ctx.handler();
761     }
762 
763     @Override
764     public ChannelHandlerContext lastContext() {
765         synchronized (handlers) {
766             return handlers.isEmpty() ? null : handlers.get(handlers.size() - 1);
767         }
768     }
769 
770     @Override
771     public Map<String, ChannelHandler> toMap() {
772         Map<String, ChannelHandler> map;
773         synchronized (handlers) {
774             if (handlers.isEmpty()) {
775                 return Collections.emptyMap();
776             }
777             map = new LinkedHashMap<>(handlers.size());
778             for (int i = 0; i < handlers.size(); i++) {
779                 ChannelHandlerContext ctx = handlers.get(i);
780                 map.put(ctx.name(), ctx.handler());
781             }
782             return map;
783         }
784     }
785 
786     @Override
787     public Iterator<Map.Entry<String, ChannelHandler>> iterator() {
788         return toMap().entrySet().iterator();
789     }
790 
791     @Override
792     public final ChannelPipeline fireChannelRegistered() {
793         head.invokeChannelRegistered();
794         return this;
795     }
796 
797     @Override
798     public final ChannelPipeline fireChannelUnregistered() {
799         head.invokeChannelUnregistered();
800         return this;
801     }
802 
803     @Override
804     public final ChannelPipeline fireChannelActive() {
805         head.invokeChannelActive();
806         return this;
807     }
808 
809     @Override
810     public final ChannelPipeline fireChannelInactive() {
811         head.invokeChannelInactive();
812         return this;
813     }
814 
815     @Override
816     public final ChannelPipeline fireChannelShutdown(ChannelShutdownDirection direction) {
817         head.invokeChannelShutdown(direction);
818         return this;
819     }
820 
821     @Override
822     public final ChannelPipeline fireChannelExceptionCaught(Throwable cause) {
823         head.invokeChannelExceptionCaught(cause);
824         return this;
825     }
826 
827     @Override
828     public final ChannelPipeline fireChannelInboundEvent(Object event) {
829         head.invokeChannelInboundEvent(event);
830         return this;
831     }
832 
833     @Override
834     public final ChannelPipeline fireChannelRead(Object msg) {
835         head.invokeChannelRead(msg);
836         return this;
837     }
838 
839     @Override
840     public final ChannelPipeline fireChannelReadComplete() {
841         head.invokeChannelReadComplete();
842         return this;
843     }
844 
845     @Override
846     public final ChannelPipeline fireChannelWritabilityChanged() {
847         head.invokeChannelWritabilityChanged();
848         return this;
849     }
850 
851     @Override
852     public final Future<Void> bind(SocketAddress localAddress) {
853         return tail.bind(localAddress);
854     }
855 
856     @Override
857     public final Future<Void> connect(SocketAddress remoteAddress) {
858         return tail.connect(remoteAddress);
859     }
860 
861     @Override
862     public final Future<Void> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
863         return tail.connect(remoteAddress, localAddress);
864     }
865 
866     @Override
867     public final Future<Void> disconnect() {
868         return tail.disconnect();
869     }
870 
871     @Override
872     public final Future<Void> close() {
873         return tail.close();
874     }
875 
876     @Override
877     public Future<Void> shutdown(ChannelShutdownDirection direction) {
878         return tail.shutdown(direction);
879     }
880 
881     @Override
882     public final Future<Void> register() {
883         return tail.register();
884     }
885 
886     @Override
887     public final Future<Void> deregister() {
888         return tail.deregister();
889     }
890 
891     @Override
892     public final ChannelPipeline flush() {
893         tail.flush();
894         return this;
895     }
896 
897     @Override
898     public final ChannelPipeline read() {
899         tail.read();
900         return this;
901     }
902 
903     @Override
904     public final Future<Void> write(Object msg) {
905         return tail.write(msg);
906     }
907 
908     @Override
909     public final Future<Void> writeAndFlush(Object msg) {
910         return tail.writeAndFlush(msg);
911     }
912 
913     @Override
914     public final Future<Void> sendOutboundEvent(Object event) {
915         return tail.sendOutboundEvent(event);
916     }
917 
918     @Override
919     public final <V> Promise<V> newPromise() {
920         return ChannelPipeline.super.newPromise();
921     }
922 
923     @Override
924     public final Future<Void> newSucceededFuture() {
925         return succeededFuture;
926     }
927     @Override
928     public final <V> Future<V> newSucceededFuture(V value) {
929         return ChannelPipeline.super.newSucceededFuture(value);
930     }
931 
932     @Override
933     public final <V> Future<V> newFailedFuture(Throwable cause) {
934         return ChannelPipeline.super.newFailedFuture(cause);
935     }
936 
937     /**
938      * Called once a {@link Throwable} hit the end of the {@link ChannelPipeline} without been handled by the user
939      * in {@link ChannelHandler#channelExceptionCaught(ChannelHandlerContext, Throwable)}.
940      */
941     protected void onUnhandledInboundException(Throwable cause) {
942         try {
943             logger.warn(
944                     "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
945                             "It usually means the last handler in the pipeline did not handle the exception.",
946                     cause);
947         } finally {
948             Resource.dispose(cause);
949         }
950     }
951 
952     /**
953      * Called once the {@link ChannelHandler#channelActive(ChannelHandlerContext)}event hit
954      * the end of the {@link ChannelPipeline}.
955      */
956     protected void onUnhandledInboundChannelActive() {
957     }
958 
959     /**
960      * Called once the {@link ChannelHandler#channelInactive(ChannelHandlerContext)} event hit
961      * the end of the {@link ChannelPipeline}.
962      */
963     protected void onUnhandledInboundChannelInactive() {
964     }
965 
966     /**
967      * Called once the {@link ChannelHandler#channelShutdown(ChannelHandlerContext, ChannelShutdownDirection)} event hit
968      * the end of the {@link ChannelPipeline}.
969      */
970     protected void onUnhandledInboundChannelShutdown(ChannelShutdownDirection direction) {
971     }
972 
973     /**
974      * Called once a message hit the end of the {@link ChannelPipeline} without been handled by the user
975      * in {@link ChannelHandler#channelRead(ChannelHandlerContext, Object)}. This method is responsible
976      * to call {@link Resource#dispose(Object)} on the given msg at some point.
977      */
978     protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
979         try {
980             logger.debug(
981                     "Discarded inbound message {} that reached at the tail of the pipeline. " +
982                             "Please check your pipeline configuration. Discarded message pipeline : {}. Channel : {}.",
983                     msg, ctx.pipeline().names(), ctx.channel());
984         } finally {
985             Resource.dispose(msg);
986         }
987     }
988 
989     /**
990      * Called once the {@link ChannelHandler#channelReadComplete(ChannelHandlerContext)} event hit
991      * the end of the {@link ChannelPipeline}.
992      */
993     protected void onUnhandledInboundChannelReadComplete() {
994     }
995 
996     /**
997      * Called once an user event hit the end of the {@link ChannelPipeline} without been handled by the user
998      * in {@link ChannelHandler#channelInboundEvent(ChannelHandlerContext, Object)}. This method is responsible
999      * to call {@link Resource#dispose(Object)} on the given event at some point.
1000      */
1001     protected void onUnhandledChannelInboundEvent(Object evt) {
1002         // This may not be a configuration error and so don't log anything.
1003         // The event may be superfluous for the current pipeline configuration.
1004         Resource.dispose(evt);
1005     }
1006 
1007     /**
1008      * Called once the {@link ChannelHandler#channelWritabilityChanged(ChannelHandlerContext)} event hit
1009      * the end of the {@link ChannelPipeline}.
1010      */
1011     protected void onUnhandledChannelWritabilityChanged() {
1012     }
1013 
1014     /**
1015      * Called once the {@link #pendingOutboundBytes()} were changed.
1016      *
1017      * @param pendingOutboundBytes          the new {@link #pendingOutboundBytes()}.
1018      */
1019     protected abstract void pendingOutboundBytesUpdated(long pendingOutboundBytes);
1020 
1021     @Override
1022     public final long pendingOutboundBytes() {
1023         return TOTAL_PENDING_OUTBOUND_BYTES_UPDATER.get(this);
1024     }
1025 
1026     /**
1027      * Increments the {@link #pendingOutboundBytes()} of the pipeline by the given delta.
1028      *
1029      * @param delta the number of bytes to add.
1030      */
1031     final void incrementPendingOutboundBytes(long delta) {
1032         assert delta > 0;
1033         long pending = TOTAL_PENDING_OUTBOUND_BYTES_UPDATER.addAndGet(this, delta);
1034         if (pending < 0) {
1035             forceCloseTransport();
1036             throw new IllegalStateException("pendingOutboundBytes overflowed, force closed transport.");
1037         }
1038         pendingOutboundBytesUpdated(pending);
1039     }
1040 
1041     /**
1042      * Decrements the {@link #pendingOutboundBytes()} of the pipeline by the given delta.
1043      *
1044      * @param delta the number of bytes to remove.
1045      */
1046     final void decrementPendingOutboundBytes(long delta) {
1047         assert delta > 0;
1048         long pending = TOTAL_PENDING_OUTBOUND_BYTES_UPDATER.addAndGet(this, -delta);
1049         if (pending < 0) {
1050             forceCloseTransport();
1051             throw new IllegalStateException("pendingOutboundBytes underflowed, force closed transport.");
1052         }
1053         pendingOutboundBytesUpdated(pending);
1054     }
1055 
1056     private static DefaultChannelPipeline defaultChannelPipeline(ChannelHandlerContext ctx) {
1057         return (DefaultChannelPipeline) ctx.pipeline();
1058     }
1059 
1060     // A special catch-all handler that handles both bytes and messages.
1061     private static final class TailHandler implements ChannelHandler {
1062 
1063         @Override
1064         public void channelRegistered(ChannelHandlerContext ctx) {
1065             // Just swallow event
1066         }
1067 
1068         @Override
1069         public void channelUnregistered(ChannelHandlerContext ctx) {
1070             // Just swallow event
1071         }
1072 
1073         @Override
1074         public void channelActive(ChannelHandlerContext ctx) {
1075             defaultChannelPipeline(ctx).onUnhandledInboundChannelActive();
1076         }
1077 
1078         @Override
1079         public void channelInactive(ChannelHandlerContext ctx) {
1080             defaultChannelPipeline(ctx).onUnhandledInboundChannelInactive();
1081         }
1082 
1083         @Override
1084         public void channelShutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) {
1085             defaultChannelPipeline(ctx).onUnhandledInboundChannelShutdown(direction);
1086         }
1087 
1088         @Override
1089         public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1090             defaultChannelPipeline(ctx).onUnhandledChannelWritabilityChanged();
1091         }
1092 
1093         @Override
1094         public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) {
1095             ((DefaultChannelPipeline) ctx.pipeline()).onUnhandledChannelInboundEvent(evt);
1096         }
1097 
1098         @Override
1099         public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1100             defaultChannelPipeline(ctx).onUnhandledInboundException(cause);
1101         }
1102 
1103         @Override
1104         public void channelRead(ChannelHandlerContext ctx, Object msg) {
1105             defaultChannelPipeline(ctx).onUnhandledInboundMessage(ctx, msg);
1106         }
1107 
1108         @Override
1109         public void channelReadComplete(ChannelHandlerContext ctx) {
1110             defaultChannelPipeline(ctx).onUnhandledInboundChannelReadComplete();
1111         }
1112     }
1113 
1114     private static final class HeadHandler implements ChannelHandler {
1115 
1116         @Override
1117         public Future<Void> bind(
1118                 ChannelHandlerContext ctx, SocketAddress localAddress) {
1119             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1120             EventExecutor executor = pipeline.transportExecutor();
1121             Promise<Void> promise = executor.newPromise();
1122             if (executor.inEventLoop()) {
1123                 pipeline.bindTransport(localAddress, promise);
1124             } else {
1125                 safeExecute(executor, () -> pipeline.bindTransport(localAddress, promise), promise, null);
1126             }
1127             return promise.asFuture();
1128         }
1129 
1130         @Override
1131         public Future<Void> connect(
1132                 ChannelHandlerContext ctx,
1133                 SocketAddress remoteAddress, SocketAddress localAddress) {
1134             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1135             EventExecutor executor = pipeline.transportExecutor();
1136             Promise<Void> promise = executor.newPromise();
1137             if (executor.inEventLoop()) {
1138                 pipeline.connectTransport(remoteAddress, localAddress, promise);
1139             } else {
1140                 safeExecute(executor, () ->
1141                         pipeline.connectTransport(remoteAddress, localAddress, promise), promise, null);
1142             }
1143             return promise.asFuture();
1144         }
1145 
1146         @Override
1147         public Future<Void> disconnect(ChannelHandlerContext ctx) {
1148             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1149             EventExecutor executor = pipeline.transportExecutor();
1150             Promise<Void> promise = executor.newPromise();
1151             if (executor.inEventLoop()) {
1152                 pipeline.disconnectTransport(promise);
1153             } else {
1154                 safeExecute(executor, () -> pipeline.disconnectTransport(promise), promise, null);
1155             }
1156             return promise.asFuture();
1157         }
1158 
1159         @Override
1160         public Future<Void> close(ChannelHandlerContext ctx) {
1161             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1162             EventExecutor executor = pipeline.transportExecutor();
1163             Promise<Void> promise = executor.newPromise();
1164             if (executor.inEventLoop()) {
1165                 pipeline.closeTransport(promise);
1166             } else {
1167                 safeExecute(executor, () -> pipeline.closeTransport(promise), promise, null);
1168             }
1169             return promise.asFuture();
1170         }
1171 
1172         @Override
1173         public Future<Void> shutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) {
1174             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1175             EventExecutor executor = pipeline.transportExecutor();
1176             Promise<Void> promise = executor.newPromise();
1177             if (executor.inEventLoop()) {
1178                 pipeline.shutdownTransport(direction, promise);
1179             } else {
1180                 safeExecute(executor, () -> pipeline.shutdownTransport(direction, promise), promise, null);
1181             }
1182             return promise.asFuture();
1183         }
1184 
1185         @Override
1186         public Future<Void> register(ChannelHandlerContext ctx) {
1187             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1188             EventExecutor executor = pipeline.transportExecutor();
1189             Promise<Void> promise = executor.newPromise();
1190             if (executor.inEventLoop()) {
1191                 pipeline.registerTransport(promise);
1192             } else {
1193                 safeExecute(executor, () -> pipeline.registerTransport(promise), promise, null);
1194             }
1195             return promise.asFuture();
1196         }
1197 
1198         @Override
1199         public Future<Void> deregister(ChannelHandlerContext ctx) {
1200             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1201             EventExecutor executor = pipeline.transportExecutor();
1202             Promise<Void> promise = executor.newPromise();
1203             if (executor.inEventLoop()) {
1204                 pipeline.deregisterTransport(promise);
1205             } else {
1206                 safeExecute(executor, () -> pipeline.deregisterTransport(promise), promise, null);
1207             }
1208             return promise.asFuture();
1209         }
1210 
1211         @Override
1212         public void read(ChannelHandlerContext ctx) {
1213             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1214             EventExecutor executor = pipeline.transportExecutor();
1215             if (executor.inEventLoop()) {
1216                 pipeline.readTransport();
1217             } else {
1218                 safeExecute(executor, pipeline::readTransport, null, null);
1219             }
1220         }
1221 
1222         @Override
1223         public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
1224             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1225             EventExecutor executor = pipeline.transportExecutor();
1226             Promise<Void> promise = executor.newPromise();
1227             if (executor.inEventLoop()) {
1228                 pipeline.writeTransport(msg, promise);
1229             } else {
1230                 safeExecute(executor, () -> pipeline.writeTransport(msg, promise), promise, msg);
1231             }
1232             return promise.asFuture();
1233         }
1234 
1235         @Override
1236         public void flush(ChannelHandlerContext ctx) {
1237             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1238             EventExecutor executor = pipeline.transportExecutor();
1239             if (executor.inEventLoop()) {
1240                 pipeline.flushTransport();
1241             } else {
1242                 safeExecute(executor, pipeline::flushTransport, null, null);
1243             }
1244         }
1245 
1246         @Override
1247         public Future<Void> sendOutboundEvent(ChannelHandlerContext ctx, Object event) {
1248             DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1249             EventExecutor executor = pipeline.transportExecutor();
1250             Promise<Void> promise = executor.newPromise();
1251             if (executor.inEventLoop()) {
1252                 pipeline.sendOutboundEventTransport(event, promise);
1253             } else {
1254                 safeExecute(executor, () -> pipeline.sendOutboundEventTransport(event, promise), promise, event);
1255             }
1256             return promise.asFuture();
1257         }
1258     }
1259 
1260     final void forceCloseTransport() {
1261         EventExecutor executor = transportExecutor();
1262         Promise<Void> promise = executor.newPromise();
1263         if (executor.inEventLoop()) {
1264             closeTransport(promise);
1265         } else {
1266             safeExecute(executor, () -> closeTransport(promise), promise, null);
1267         }
1268     }
1269 
1270     /**
1271      * Returns the {@link EventExecutor} that is used for all the abstract transport operations.
1272      *
1273      * @return executor.
1274      */
1275     protected abstract EventExecutor transportExecutor();
1276 
1277     /**
1278      * Register the transport and notify the {@link Promise} once the operation was completed. This method is
1279      * guaranteed to be called from the {@link #transportExecutor()}.
1280      */
1281     protected abstract void registerTransport(Promise<Void> promise);
1282 
1283     /**
1284      * Bind the {@link SocketAddress} to the transport and notify the {@link Promise} once the operation was completed.
1285      * This method is guaranteed to be called from the {@link #transportExecutor()}.
1286      */
1287     protected abstract void bindTransport(SocketAddress localAddress, Promise<Void> promise);
1288 
1289     /**
1290      * Connect the transport with the given remote {@link SocketAddress}. If a specific local {@link SocketAddress}
1291      * should be used it needs to be given as argument. Otherwise just pass {@code null} to it.
1292      *
1293      * The {@link Promise} will get notified once the operation was completed.
1294      * This method is guaranteed to be called from the {@link #transportExecutor()}.
1295      */
1296     protected abstract void connectTransport(
1297             SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise);
1298 
1299     /**
1300      * Disconnect the transport and notify the {@link Promise} once the operation was completed.
1301      * This method is guaranteed to be called from the {@link #transportExecutor()}.
1302      */
1303     protected abstract void disconnectTransport(Promise<Void> promise);
1304 
1305     /**
1306      * Close the transport and notify the {@link Promise} once the operation was completed.
1307      * This method is guaranteed to be called from the {@link #transportExecutor()}.
1308      */
1309     protected abstract void closeTransport(Promise<Void> promise);
1310 
1311     /**
1312      * Shutdown the given direction of the transport and notify the {@link Promise} once the operation was completed.
1313      * This method is guaranteed to be called from the {@link #transportExecutor()}.
1314      */
1315     protected abstract void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise);
1316 
1317     /**
1318      * Deregister the transport of the {@link EventLoop} and notify the {@link Promise} once the operation was
1319      * completed.
1320      * This method is guaranteed to be called from the {@link #transportExecutor()}.
1321      */
1322     protected abstract void deregisterTransport(Promise<Void> promise);
1323 
1324     /**
1325      * Schedules a read operation on the transport that fills the inbound buffer of the first {@link ChannelHandler}
1326      * in the {@link ChannelPipeline}. If there's already a pending read operation, this method does nothing.
1327      * This method is guaranteed to be called from the {@link #transportExecutor()}.
1328      */
1329     protected abstract void readTransport();
1330 
1331     /**
1332      * Schedules a write operation on the transport. The given {@link Promise} will be notified once the write was
1333      * either successful or failed.
1334      * This method is guaranteed to be called from the {@link #transportExecutor()}.
1335      */
1336     protected abstract void writeTransport(Object msg, Promise<Void> promise);
1337 
1338     /**
1339      * Flush out all (previous) scheduled write operations, scheduled via {@link #writeTransport(Object, Promise)}.
1340      * This method is guaranteed to be called from the {@link #transportExecutor()}.
1341      */
1342     protected abstract void flushTransport();
1343 
1344     /**
1345      * Send a custom outbound event on the transport.
1346      * This method is guaranteed to be called from the {@link #transportExecutor()}.
1347      */
1348     protected abstract void sendOutboundEventTransport(Object event, Promise<Void> promise);
1349 }