View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.ReferenceCountUtil;
22  import io.netty.util.ResourceLeakHint;
23  import io.netty.util.concurrent.AbstractEventExecutor;
24  import io.netty.util.concurrent.EventExecutor;
25  import io.netty.util.concurrent.OrderedEventExecutor;
26  import io.netty.util.internal.ObjectPool;
27  import io.netty.util.internal.ObjectPool.Handle;
28  import io.netty.util.internal.ObjectPool.ObjectCreator;
29  import io.netty.util.internal.PromiseNotificationUtil;
30  import io.netty.util.internal.ThrowableUtil;
31  import io.netty.util.internal.ObjectUtil;
32  import io.netty.util.internal.StringUtil;
33  import io.netty.util.internal.SystemPropertyUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.net.SocketAddress;
38  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39  
40  import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
41  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
42  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
43  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
44  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
45  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
46  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
47  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
48  import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
49  import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
50  import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
51  import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
52  import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
53  import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
54  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
55  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
56  import static io.netty.channel.ChannelHandlerMask.MASK_READ;
57  import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
58  import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
59  import static io.netty.channel.ChannelHandlerMask.mask;
60  
61  abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
62  
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
64      volatile AbstractChannelHandlerContext next;
65      volatile AbstractChannelHandlerContext prev;
66  
67      private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
68              AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
69  
70      /**
71       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
72       */
73      private static final int ADD_PENDING = 1;
74      /**
75       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
76       */
77      private static final int ADD_COMPLETE = 2;
78      /**
79       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
80       */
81      private static final int REMOVE_COMPLETE = 3;
82      /**
83       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
84       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
85       */
86      private static final int INIT = 0;
87  
88      private final DefaultChannelPipeline pipeline;
89      private final String name;
90      private final boolean ordered;
91      private final int executionMask;
92  
93      // Will be set to null if no child executor should be used, otherwise it will be set to the
94      // child executor.
95      final EventExecutor executor;
96      private ChannelFuture succeededFuture;
97  
98      // Lazily instantiated tasks used to trigger events to a handler with different executor.
99      // There is no need to make this volatile as at worse it will just create a few more instances then needed.
100     private Tasks invokeTasks;
101 
102     private volatile int handlerState = INIT;
103 
104     AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
105                                   String name, Class<? extends ChannelHandler> handlerClass) {
106         this.name = ObjectUtil.checkNotNull(name, "name");
107         this.pipeline = pipeline;
108         this.executor = executor;
109         this.executionMask = mask(handlerClass);
110         // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
111         ordered = executor == null || executor instanceof OrderedEventExecutor;
112     }
113 
114     @Override
115     public Channel channel() {
116         return pipeline.channel();
117     }
118 
119     @Override
120     public ChannelPipeline pipeline() {
121         return pipeline;
122     }
123 
124     @Override
125     public ByteBufAllocator alloc() {
126         return channel().config().getAllocator();
127     }
128 
129     @Override
130     public EventExecutor executor() {
131         if (executor == null) {
132             return channel().eventLoop();
133         } else {
134             return executor;
135         }
136     }
137 
138     @Override
139     public String name() {
140         return name;
141     }
142 
143     @Override
144     public ChannelHandlerContext fireChannelRegistered() {
145         invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
146         return this;
147     }
148 
149     static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
150         EventExecutor executor = next.executor();
151         if (executor.inEventLoop()) {
152             next.invokeChannelRegistered();
153         } else {
154             executor.execute(new Runnable() {
155                 @Override
156                 public void run() {
157                     next.invokeChannelRegistered();
158                 }
159             });
160         }
161     }
162 
163     private void invokeChannelRegistered() {
164         if (invokeHandler()) {
165             try {
166                 // DON'T CHANGE
167                 // Duplex handlers implements both out/in interfaces causing a scalability issue
168                 // see https://bugs.openjdk.org/browse/JDK-8180450
169                 final ChannelHandler handler = handler();
170                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
171                 if (handler == headContext) {
172                     headContext.channelRegistered(this);
173                 } else if (handler instanceof ChannelInboundHandlerAdapter) {
174                     ((ChannelInboundHandlerAdapter) handler).channelRegistered(this);
175                 } else {
176                     ((ChannelInboundHandler) handler).channelRegistered(this);
177                 }
178             } catch (Throwable t) {
179                 invokeExceptionCaught(t);
180             }
181         } else {
182             fireChannelRegistered();
183         }
184     }
185 
186     @Override
187     public ChannelHandlerContext fireChannelUnregistered() {
188         invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED));
189         return this;
190     }
191 
192     static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
193         EventExecutor executor = next.executor();
194         if (executor.inEventLoop()) {
195             next.invokeChannelUnregistered();
196         } else {
197             executor.execute(new Runnable() {
198                 @Override
199                 public void run() {
200                     next.invokeChannelUnregistered();
201                 }
202             });
203         }
204     }
205 
206     private void invokeChannelUnregistered() {
207         if (invokeHandler()) {
208             try {
209                 // DON'T CHANGE
210                 // Duplex handlers implements both out/in interfaces causing a scalability issue
211                 // see https://bugs.openjdk.org/browse/JDK-8180450
212                 final ChannelHandler handler = handler();
213                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
214                 if (handler == headContext) {
215                     headContext.channelUnregistered(this);
216                 } else if (handler instanceof ChannelInboundHandlerAdapter) {
217                     ((ChannelInboundHandlerAdapter) handler).channelUnregistered(this);
218                 } else {
219                     ((ChannelInboundHandler) handler).channelUnregistered(this);
220                 }
221             } catch (Throwable t) {
222                 invokeExceptionCaught(t);
223             }
224         } else {
225             fireChannelUnregistered();
226         }
227     }
228 
229     @Override
230     public ChannelHandlerContext fireChannelActive() {
231         invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
232         return this;
233     }
234 
235     static void invokeChannelActive(final AbstractChannelHandlerContext next) {
236         EventExecutor executor = next.executor();
237         if (executor.inEventLoop()) {
238             next.invokeChannelActive();
239         } else {
240             executor.execute(new Runnable() {
241                 @Override
242                 public void run() {
243                     next.invokeChannelActive();
244                 }
245             });
246         }
247     }
248 
249     private void invokeChannelActive() {
250         if (invokeHandler()) {
251             try {
252                 // DON'T CHANGE
253                 // Duplex handlers implements both out/in interfaces causing a scalability issue
254                 // see https://bugs.openjdk.org/browse/JDK-8180450
255                 final ChannelHandler handler = handler();
256                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
257                 if (handler == headContext) {
258                     headContext.channelActive(this);
259                 } else if (handler instanceof ChannelInboundHandlerAdapter) {
260                     ((ChannelInboundHandlerAdapter) handler).channelActive(this);
261                 } else {
262                     ((ChannelInboundHandler) handler).channelActive(this);
263                 }
264             } catch (Throwable t) {
265                 invokeExceptionCaught(t);
266             }
267         } else {
268             fireChannelActive();
269         }
270     }
271 
272     @Override
273     public ChannelHandlerContext fireChannelInactive() {
274         invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
275         return this;
276     }
277 
278     static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
279         EventExecutor executor = next.executor();
280         if (executor.inEventLoop()) {
281             next.invokeChannelInactive();
282         } else {
283             executor.execute(new Runnable() {
284                 @Override
285                 public void run() {
286                     next.invokeChannelInactive();
287                 }
288             });
289         }
290     }
291 
292     private void invokeChannelInactive() {
293         if (invokeHandler()) {
294             try {
295                 // DON'T CHANGE
296                 // Duplex handlers implements both out/in interfaces causing a scalability issue
297                 // see https://bugs.openjdk.org/browse/JDK-8180450
298                 final ChannelHandler handler = handler();
299                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
300                 if (handler == headContext) {
301                     headContext.channelInactive(this);
302                 } else if (handler instanceof ChannelInboundHandlerAdapter) {
303                     ((ChannelInboundHandlerAdapter) handler).channelInactive(this);
304                 } else {
305                     ((ChannelInboundHandler) handler).channelInactive(this);
306                 }
307             } catch (Throwable t) {
308                 invokeExceptionCaught(t);
309             }
310         } else {
311             fireChannelInactive();
312         }
313     }
314 
315     @Override
316     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
317         invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
318         return this;
319     }
320 
321     static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
322         ObjectUtil.checkNotNull(cause, "cause");
323         EventExecutor executor = next.executor();
324         if (executor.inEventLoop()) {
325             next.invokeExceptionCaught(cause);
326         } else {
327             try {
328                 executor.execute(new Runnable() {
329                     @Override
330                     public void run() {
331                         next.invokeExceptionCaught(cause);
332                     }
333                 });
334             } catch (Throwable t) {
335                 if (logger.isWarnEnabled()) {
336                     logger.warn("Failed to submit an exceptionCaught() event.", t);
337                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
338                 }
339             }
340         }
341     }
342 
343     private void invokeExceptionCaught(final Throwable cause) {
344         if (invokeHandler()) {
345             try {
346                 handler().exceptionCaught(this, cause);
347             } catch (Throwable error) {
348                 if (logger.isDebugEnabled()) {
349                     logger.debug(
350                         "An exception {}" +
351                         "was thrown by a user handler's exceptionCaught() " +
352                         "method while handling the following exception:",
353                         ThrowableUtil.stackTraceToString(error), cause);
354                 } else if (logger.isWarnEnabled()) {
355                     logger.warn(
356                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
357                         "was thrown by a user handler's exceptionCaught() " +
358                         "method while handling the following exception:", error, cause);
359                 }
360             }
361         } else {
362             fireExceptionCaught(cause);
363         }
364     }
365 
366     @Override
367     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
368         invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
369         return this;
370     }
371 
372     static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
373         ObjectUtil.checkNotNull(event, "event");
374         EventExecutor executor = next.executor();
375         if (executor.inEventLoop()) {
376             next.invokeUserEventTriggered(event);
377         } else {
378             executor.execute(new Runnable() {
379                 @Override
380                 public void run() {
381                     next.invokeUserEventTriggered(event);
382                 }
383             });
384         }
385     }
386 
387     private void invokeUserEventTriggered(Object event) {
388         if (invokeHandler()) {
389             try {
390                 // DON'T CHANGE
391                 // Duplex handlers implements both out/in interfaces causing a scalability issue
392                 // see https://bugs.openjdk.org/browse/JDK-8180450
393                 final ChannelHandler handler = handler();
394                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
395                 if (handler == headContext) {
396                     headContext.userEventTriggered(this, event);
397                 } else if (handler instanceof ChannelInboundHandlerAdapter) {
398                     ((ChannelInboundHandlerAdapter) handler).userEventTriggered(this, event);
399                 } else {
400                     ((ChannelInboundHandler) handler).userEventTriggered(this, event);
401                 }
402             } catch (Throwable t) {
403                 invokeExceptionCaught(t);
404             }
405         } else {
406             fireUserEventTriggered(event);
407         }
408     }
409 
410     @Override
411     public ChannelHandlerContext fireChannelRead(final Object msg) {
412         invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
413         return this;
414     }
415 
416     static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
417         final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
418         EventExecutor executor = next.executor();
419         if (executor.inEventLoop()) {
420             next.invokeChannelRead(m);
421         } else {
422             executor.execute(new Runnable() {
423                 @Override
424                 public void run() {
425                     next.invokeChannelRead(m);
426                 }
427             });
428         }
429     }
430 
431     private void invokeChannelRead(Object msg) {
432         if (invokeHandler()) {
433             try {
434                 // DON'T CHANGE
435                 // Duplex handlers implements both out/in interfaces causing a scalability issue
436                 // see https://bugs.openjdk.org/browse/JDK-8180450
437                 final ChannelHandler handler = handler();
438                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
439                 if (handler == headContext) {
440                     headContext.channelRead(this, msg);
441                 } else if (handler instanceof ChannelDuplexHandler) {
442                     ((ChannelDuplexHandler) handler).channelRead(this, msg);
443                 } else {
444                     ((ChannelInboundHandler) handler).channelRead(this, msg);
445                 }
446             } catch (Throwable t) {
447                 invokeExceptionCaught(t);
448             }
449         } else {
450             fireChannelRead(msg);
451         }
452     }
453 
454     @Override
455     public ChannelHandlerContext fireChannelReadComplete() {
456         invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
457         return this;
458     }
459 
460     static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
461         EventExecutor executor = next.executor();
462         if (executor.inEventLoop()) {
463             next.invokeChannelReadComplete();
464         } else {
465             Tasks tasks = next.invokeTasks;
466             if (tasks == null) {
467                 next.invokeTasks = tasks = new Tasks(next);
468             }
469             executor.execute(tasks.invokeChannelReadCompleteTask);
470         }
471     }
472 
473     private void invokeChannelReadComplete() {
474         if (invokeHandler()) {
475             try {
476                 // DON'T CHANGE
477                 // Duplex handlers implements both out/in interfaces causing a scalability issue
478                 // see https://bugs.openjdk.org/browse/JDK-8180450
479                 final ChannelHandler handler = handler();
480                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
481                 if (handler == headContext) {
482                     headContext.channelReadComplete(this);
483                 } else if (handler instanceof ChannelDuplexHandler) {
484                     ((ChannelDuplexHandler) handler).channelReadComplete(this);
485                 } else {
486                     ((ChannelInboundHandler) handler).channelReadComplete(this);
487                 }
488             } catch (Throwable t) {
489                 invokeExceptionCaught(t);
490             }
491         } else {
492             fireChannelReadComplete();
493         }
494     }
495 
496     @Override
497     public ChannelHandlerContext fireChannelWritabilityChanged() {
498         invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED));
499         return this;
500     }
501 
502     static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
503         EventExecutor executor = next.executor();
504         if (executor.inEventLoop()) {
505             next.invokeChannelWritabilityChanged();
506         } else {
507             Tasks tasks = next.invokeTasks;
508             if (tasks == null) {
509                 next.invokeTasks = tasks = new Tasks(next);
510             }
511             executor.execute(tasks.invokeChannelWritableStateChangedTask);
512         }
513     }
514 
515     private void invokeChannelWritabilityChanged() {
516         if (invokeHandler()) {
517             try {
518                 // DON'T CHANGE
519                 // Duplex handlers implements both out/in interfaces causing a scalability issue
520                 // see https://bugs.openjdk.org/browse/JDK-8180450
521                 final ChannelHandler handler = handler();
522                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
523                 if (handler == headContext) {
524                     headContext.channelWritabilityChanged(this);
525                 } else if (handler instanceof ChannelInboundHandlerAdapter) {
526                     ((ChannelInboundHandlerAdapter) handler).channelWritabilityChanged(this);
527                 } else {
528                     ((ChannelInboundHandler) handler).channelWritabilityChanged(this);
529                 }
530             } catch (Throwable t) {
531                 invokeExceptionCaught(t);
532             }
533         } else {
534             fireChannelWritabilityChanged();
535         }
536     }
537 
538     @Override
539     public ChannelFuture bind(SocketAddress localAddress) {
540         return bind(localAddress, newPromise());
541     }
542 
543     @Override
544     public ChannelFuture connect(SocketAddress remoteAddress) {
545         return connect(remoteAddress, newPromise());
546     }
547 
548     @Override
549     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
550         return connect(remoteAddress, localAddress, newPromise());
551     }
552 
553     @Override
554     public ChannelFuture disconnect() {
555         return disconnect(newPromise());
556     }
557 
558     @Override
559     public ChannelFuture close() {
560         return close(newPromise());
561     }
562 
563     @Override
564     public ChannelFuture deregister() {
565         return deregister(newPromise());
566     }
567 
568     @Override
569     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
570         ObjectUtil.checkNotNull(localAddress, "localAddress");
571         if (isNotValidPromise(promise, false)) {
572             // cancelled
573             return promise;
574         }
575 
576         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
577         EventExecutor executor = next.executor();
578         if (executor.inEventLoop()) {
579             next.invokeBind(localAddress, promise);
580         } else {
581             safeExecute(executor, new Runnable() {
582                 @Override
583                 public void run() {
584                     next.invokeBind(localAddress, promise);
585                 }
586             }, promise, null, false);
587         }
588         return promise;
589     }
590 
591     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
592         if (invokeHandler()) {
593             try {
594                 // DON'T CHANGE
595                 // Duplex handlers implements both out/in interfaces causing a scalability issue
596                 // see https://bugs.openjdk.org/browse/JDK-8180450
597                 final ChannelHandler handler = handler();
598                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
599                 if (handler == headContext) {
600                     headContext.bind(this, localAddress, promise);
601                 } else if (handler instanceof ChannelDuplexHandler) {
602                     ((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
603                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
604                     ((ChannelOutboundHandlerAdapter) handler).bind(this, localAddress, promise);
605                 } else {
606                     ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
607                 }
608             } catch (Throwable t) {
609                 notifyOutboundHandlerException(t, promise);
610             }
611         } else {
612             bind(localAddress, promise);
613         }
614     }
615 
616     @Override
617     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
618         return connect(remoteAddress, null, promise);
619     }
620 
621     @Override
622     public ChannelFuture connect(
623             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
624         ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
625 
626         if (isNotValidPromise(promise, false)) {
627             // cancelled
628             return promise;
629         }
630 
631         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
632         EventExecutor executor = next.executor();
633         if (executor.inEventLoop()) {
634             next.invokeConnect(remoteAddress, localAddress, promise);
635         } else {
636             safeExecute(executor, new Runnable() {
637                 @Override
638                 public void run() {
639                     next.invokeConnect(remoteAddress, localAddress, promise);
640                 }
641             }, promise, null, false);
642         }
643         return promise;
644     }
645 
646     private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
647         if (invokeHandler()) {
648             try {
649                 // DON'T CHANGE
650                 // Duplex handlers implements both out/in interfaces causing a scalability issue
651                 // see https://bugs.openjdk.org/browse/JDK-8180450
652                 final ChannelHandler handler = handler();
653                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
654                 if (handler == headContext) {
655                     headContext.connect(this, remoteAddress, localAddress, promise);
656                 } else if (handler instanceof ChannelDuplexHandler) {
657                     ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise);
658                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
659                     ((ChannelOutboundHandlerAdapter) handler).connect(this, remoteAddress, localAddress, promise);
660                 } else {
661                     ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
662                 }
663             } catch (Throwable t) {
664                 notifyOutboundHandlerException(t, promise);
665             }
666         } else {
667             connect(remoteAddress, localAddress, promise);
668         }
669     }
670 
671     @Override
672     public ChannelFuture disconnect(final ChannelPromise promise) {
673         if (!channel().metadata().hasDisconnect()) {
674             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
675             // So far, UDP/IP is the only transport that has such behavior.
676             return close(promise);
677         }
678         if (isNotValidPromise(promise, false)) {
679             // cancelled
680             return promise;
681         }
682 
683         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
684         EventExecutor executor = next.executor();
685         if (executor.inEventLoop()) {
686             next.invokeDisconnect(promise);
687         } else {
688             safeExecute(executor, new Runnable() {
689                 @Override
690                 public void run() {
691                     next.invokeDisconnect(promise);
692                 }
693             }, promise, null, false);
694         }
695         return promise;
696     }
697 
698     private void invokeDisconnect(ChannelPromise promise) {
699         if (invokeHandler()) {
700             try {
701                 // DON'T CHANGE
702                 // Duplex handlers implements both out/in interfaces causing a scalability issue
703                 // see https://bugs.openjdk.org/browse/JDK-8180450
704                 final ChannelHandler handler = handler();
705                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
706                 if (handler == headContext) {
707                     headContext.disconnect(this, promise);
708                 } else if (handler instanceof ChannelDuplexHandler) {
709                     ((ChannelDuplexHandler) handler).disconnect(this, promise);
710                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
711                     ((ChannelOutboundHandlerAdapter) handler).disconnect(this, promise);
712                 } else {
713                     ((ChannelOutboundHandler) handler).disconnect(this, promise);
714                 }
715             } catch (Throwable t) {
716                 notifyOutboundHandlerException(t, promise);
717             }
718         } else {
719             disconnect(promise);
720         }
721     }
722 
723     @Override
724     public ChannelFuture close(final ChannelPromise promise) {
725         if (isNotValidPromise(promise, false)) {
726             // cancelled
727             return promise;
728         }
729 
730         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
731         EventExecutor executor = next.executor();
732         if (executor.inEventLoop()) {
733             next.invokeClose(promise);
734         } else {
735             safeExecute(executor, new Runnable() {
736                 @Override
737                 public void run() {
738                     next.invokeClose(promise);
739                 }
740             }, promise, null, false);
741         }
742 
743         return promise;
744     }
745 
746     private void invokeClose(ChannelPromise promise) {
747         if (invokeHandler()) {
748             try {
749                 // DON'T CHANGE
750                 // Duplex handlers implements both out/in interfaces causing a scalability issue
751                 // see https://bugs.openjdk.org/browse/JDK-8180450
752                 final ChannelHandler handler = handler();
753                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
754                 if (handler == headContext) {
755                     headContext.close(this, promise);
756                 } else if (handler instanceof ChannelDuplexHandler) {
757                     ((ChannelDuplexHandler) handler).close(this, promise);
758                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
759                     ((ChannelOutboundHandlerAdapter) handler).close(this, promise);
760                 } else {
761                     ((ChannelOutboundHandler) handler).close(this, promise);
762                 }
763             } catch (Throwable t) {
764                 notifyOutboundHandlerException(t, promise);
765             }
766         } else {
767             close(promise);
768         }
769     }
770 
771     @Override
772     public ChannelFuture deregister(final ChannelPromise promise) {
773         if (isNotValidPromise(promise, false)) {
774             // cancelled
775             return promise;
776         }
777 
778         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
779         EventExecutor executor = next.executor();
780         if (executor.inEventLoop()) {
781             next.invokeDeregister(promise);
782         } else {
783             safeExecute(executor, new Runnable() {
784                 @Override
785                 public void run() {
786                     next.invokeDeregister(promise);
787                 }
788             }, promise, null, false);
789         }
790 
791         return promise;
792     }
793 
794     private void invokeDeregister(ChannelPromise promise) {
795         if (invokeHandler()) {
796             try {
797                 // DON'T CHANGE
798                 // Duplex handlers implements both out/in interfaces causing a scalability issue
799                 // see https://bugs.openjdk.org/browse/JDK-8180450
800                 final ChannelHandler handler = handler();
801                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
802                 if (handler == headContext) {
803                     headContext.deregister(this, promise);
804                 } else if (handler instanceof ChannelDuplexHandler) {
805                     ((ChannelDuplexHandler) handler).deregister(this, promise);
806                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
807                     ((ChannelOutboundHandlerAdapter) handler).deregister(this, promise);
808                 } else {
809                     ((ChannelOutboundHandler) handler).deregister(this, promise);
810                 }
811             } catch (Throwable t) {
812                 notifyOutboundHandlerException(t, promise);
813             }
814         } else {
815             deregister(promise);
816         }
817     }
818 
819     @Override
820     public ChannelHandlerContext read() {
821         final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
822         EventExecutor executor = next.executor();
823         if (executor.inEventLoop()) {
824             next.invokeRead();
825         } else {
826             Tasks tasks = next.invokeTasks;
827             if (tasks == null) {
828                 next.invokeTasks = tasks = new Tasks(next);
829             }
830             executor.execute(tasks.invokeReadTask);
831         }
832 
833         return this;
834     }
835 
836     private void invokeRead() {
837         if (invokeHandler()) {
838             try {
839                 // DON'T CHANGE
840                 // Duplex handlers implements both out/in interfaces causing a scalability issue
841                 // see https://bugs.openjdk.org/browse/JDK-8180450
842                 final ChannelHandler handler = handler();
843                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
844                 if (handler == headContext) {
845                     headContext.read(this);
846                 } else if (handler instanceof ChannelDuplexHandler) {
847                     ((ChannelDuplexHandler) handler).read(this);
848                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
849                     ((ChannelOutboundHandlerAdapter) handler).read(this);
850                 } else {
851                     ((ChannelOutboundHandler) handler).read(this);
852                 }
853             } catch (Throwable t) {
854                 invokeExceptionCaught(t);
855             }
856         } else {
857             read();
858         }
859     }
860 
861     @Override
862     public ChannelFuture write(Object msg) {
863         return write(msg, newPromise());
864     }
865 
866     @Override
867     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
868         write(msg, false, promise);
869 
870         return promise;
871     }
872 
873     void invokeWrite(Object msg, ChannelPromise promise) {
874         if (invokeHandler()) {
875             invokeWrite0(msg, promise);
876         } else {
877             write(msg, promise);
878         }
879     }
880 
881     private void invokeWrite0(Object msg, ChannelPromise promise) {
882         try {
883             // DON'T CHANGE
884             // Duplex handlers implements both out/in interfaces causing a scalability issue
885             // see https://bugs.openjdk.org/browse/JDK-8180450
886             final ChannelHandler handler = handler();
887             final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
888             if (handler == headContext) {
889                 headContext.write(this, msg, promise);
890             } else if (handler instanceof ChannelDuplexHandler) {
891                 ((ChannelDuplexHandler) handler).write(this, msg, promise);
892             } else if (handler instanceof ChannelOutboundHandlerAdapter) {
893                 ((ChannelOutboundHandlerAdapter) handler).write(this, msg, promise);
894             } else {
895                 ((ChannelOutboundHandler) handler).write(this, msg, promise);
896             }
897         } catch (Throwable t) {
898             notifyOutboundHandlerException(t, promise);
899         }
900     }
901 
902     @Override
903     public ChannelHandlerContext flush() {
904         final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
905         EventExecutor executor = next.executor();
906         if (executor.inEventLoop()) {
907             next.invokeFlush();
908         } else {
909             Tasks tasks = next.invokeTasks;
910             if (tasks == null) {
911                 next.invokeTasks = tasks = new Tasks(next);
912             }
913             safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
914         }
915 
916         return this;
917     }
918 
919     private void invokeFlush() {
920         if (invokeHandler()) {
921             invokeFlush0();
922         } else {
923             flush();
924         }
925     }
926 
927     private void invokeFlush0() {
928         try {
929             // DON'T CHANGE
930             // Duplex handlers implements both out/in interfaces causing a scalability issue
931             // see https://bugs.openjdk.org/browse/JDK-8180450
932             final ChannelHandler handler = handler();
933             final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
934             if (handler == headContext) {
935                 headContext.flush(this);
936             } else if (handler instanceof ChannelDuplexHandler) {
937                 ((ChannelDuplexHandler) handler).flush(this);
938             } else if (handler instanceof ChannelOutboundHandlerAdapter) {
939                 ((ChannelOutboundHandlerAdapter) handler).flush(this);
940             } else {
941                 ((ChannelOutboundHandler) handler).flush(this);
942             }
943         } catch (Throwable t) {
944             invokeExceptionCaught(t);
945         }
946     }
947 
948     @Override
949     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
950         write(msg, true, promise);
951         return promise;
952     }
953 
954     void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
955         if (invokeHandler()) {
956             invokeWrite0(msg, promise);
957             invokeFlush0();
958         } else {
959             writeAndFlush(msg, promise);
960         }
961     }
962 
963     private void write(Object msg, boolean flush, ChannelPromise promise) {
964         ObjectUtil.checkNotNull(msg, "msg");
965         try {
966             if (isNotValidPromise(promise, true)) {
967                 ReferenceCountUtil.release(msg);
968                 // cancelled
969                 return;
970             }
971         } catch (RuntimeException e) {
972             ReferenceCountUtil.release(msg);
973             throw e;
974         }
975 
976         final AbstractChannelHandlerContext next = findContextOutbound(flush ?
977                 (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
978         final Object m = pipeline.touch(msg, next);
979         EventExecutor executor = next.executor();
980         if (executor.inEventLoop()) {
981             if (flush) {
982                 next.invokeWriteAndFlush(m, promise);
983             } else {
984                 next.invokeWrite(m, promise);
985             }
986         } else {
987             final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
988             if (!safeExecute(executor, task, promise, m, !flush)) {
989                 // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
990                 // and put it back in the Recycler for re-use later.
991                 //
992                 // See https://github.com/netty/netty/issues/8343.
993                 task.cancel();
994             }
995         }
996     }
997 
998     @Override
999     public ChannelFuture writeAndFlush(Object msg) {
1000         return writeAndFlush(msg, newPromise());
1001     }
1002 
1003     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
1004         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
1005         // false.
1006         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
1007     }
1008 
1009     @Override
1010     public ChannelPromise newPromise() {
1011         return new DefaultChannelPromise(channel(), executor());
1012     }
1013 
1014     @Override
1015     public ChannelProgressivePromise newProgressivePromise() {
1016         return new DefaultChannelProgressivePromise(channel(), executor());
1017     }
1018 
1019     @Override
1020     public ChannelFuture newSucceededFuture() {
1021         ChannelFuture succeededFuture = this.succeededFuture;
1022         if (succeededFuture == null) {
1023             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
1024         }
1025         return succeededFuture;
1026     }
1027 
1028     @Override
1029     public ChannelFuture newFailedFuture(Throwable cause) {
1030         return new FailedChannelFuture(channel(), executor(), cause);
1031     }
1032 
1033     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
1034         ObjectUtil.checkNotNull(promise, "promise");
1035 
1036         if (promise.isDone()) {
1037             // Check if the promise was cancelled and if so signal that the processing of the operation
1038             // should not be performed.
1039             //
1040             // See https://github.com/netty/netty/issues/2349
1041             if (promise.isCancelled()) {
1042                 return true;
1043             }
1044             throw new IllegalArgumentException("promise already done: " + promise);
1045         }
1046 
1047         if (promise.channel() != channel()) {
1048             throw new IllegalArgumentException(String.format(
1049                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
1050         }
1051 
1052         if (promise.getClass() == DefaultChannelPromise.class) {
1053             return false;
1054         }
1055 
1056         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
1057             throw new IllegalArgumentException(
1058                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
1059         }
1060 
1061         if (promise instanceof AbstractChannel.CloseFuture) {
1062             throw new IllegalArgumentException(
1063                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
1064         }
1065         return false;
1066     }
1067 
1068     private AbstractChannelHandlerContext findContextInbound(int mask) {
1069         AbstractChannelHandlerContext ctx = this;
1070         EventExecutor currentExecutor = executor();
1071         do {
1072             ctx = ctx.next;
1073         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
1074         return ctx;
1075     }
1076 
1077     private AbstractChannelHandlerContext findContextOutbound(int mask) {
1078         AbstractChannelHandlerContext ctx = this;
1079         EventExecutor currentExecutor = executor();
1080         do {
1081             ctx = ctx.prev;
1082         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
1083         return ctx;
1084     }
1085 
1086     private static boolean skipContext(
1087             AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
1088         // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
1089         return (ctx.executionMask & (onlyMask | mask)) == 0 ||
1090                 // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
1091                 // everything to preserve ordering.
1092                 //
1093                 // See https://github.com/netty/netty/issues/10067
1094                 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
1095     }
1096 
1097     @Override
1098     public ChannelPromise voidPromise() {
1099         return channel().voidPromise();
1100     }
1101 
1102     final void setRemoved() {
1103         handlerState = REMOVE_COMPLETE;
1104     }
1105 
1106     final boolean setAddComplete() {
1107         for (;;) {
1108             int oldState = handlerState;
1109             if (oldState == REMOVE_COMPLETE) {
1110                 return false;
1111             }
1112             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
1113             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
1114             // exposing ordering guarantees.
1115             if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
1116                 return true;
1117             }
1118         }
1119     }
1120 
1121     final void setAddPending() {
1122         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
1123         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
1124     }
1125 
1126     final void callHandlerAdded() throws Exception {
1127         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
1128         // any pipeline events ctx.handler() will miss them because the state will not allow it.
1129         if (setAddComplete()) {
1130             handler().handlerAdded(this);
1131         }
1132     }
1133 
1134     final void callHandlerRemoved() throws Exception {
1135         try {
1136             // Only call handlerRemoved(...) if we called handlerAdded(...) before.
1137             if (handlerState == ADD_COMPLETE) {
1138                 handler().handlerRemoved(this);
1139             }
1140         } finally {
1141             // Mark the handler as removed in any case.
1142             setRemoved();
1143         }
1144     }
1145 
1146     /**
1147      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
1148      * yet. If not return {@code false} and if called or could not detect return {@code true}.
1149      *
1150      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
1151      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
1152      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
1153      */
1154     private boolean invokeHandler() {
1155         // Store in local variable to reduce volatile reads.
1156         int handlerState = this.handlerState;
1157         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
1158     }
1159 
1160     @Override
1161     public boolean isRemoved() {
1162         return handlerState == REMOVE_COMPLETE;
1163     }
1164 
1165     @Override
1166     public <T> Attribute<T> attr(AttributeKey<T> key) {
1167         return channel().attr(key);
1168     }
1169 
1170     @Override
1171     public <T> boolean hasAttr(AttributeKey<T> key) {
1172         return channel().hasAttr(key);
1173     }
1174 
1175     private static boolean safeExecute(EventExecutor executor, Runnable runnable,
1176             ChannelPromise promise, Object msg, boolean lazy) {
1177         try {
1178             if (lazy && executor instanceof AbstractEventExecutor) {
1179                 ((AbstractEventExecutor) executor).lazyExecute(runnable);
1180             } else {
1181                 executor.execute(runnable);
1182             }
1183             return true;
1184         } catch (Throwable cause) {
1185             try {
1186                 if (msg != null) {
1187                     ReferenceCountUtil.release(msg);
1188                 }
1189             } finally {
1190                 promise.setFailure(cause);
1191             }
1192             return false;
1193         }
1194     }
1195 
1196     @Override
1197     public String toHintString() {
1198         return '\'' + name + "' will handle the message from this point.";
1199     }
1200 
1201     @Override
1202     public String toString() {
1203         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1204     }
1205 
1206     static final class WriteTask implements Runnable {
1207         private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
1208             @Override
1209             public WriteTask newObject(Handle<WriteTask> handle) {
1210                 return new WriteTask(handle);
1211             }
1212         });
1213 
1214         static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1215                 Object msg, ChannelPromise promise, boolean flush) {
1216             WriteTask task = RECYCLER.get();
1217             init(task, ctx, msg, promise, flush);
1218             return task;
1219         }
1220 
1221         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1222                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1223 
1224         // Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field
1225         private static final int WRITE_TASK_OVERHEAD =
1226                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1227 
1228         private final Handle<WriteTask> handle;
1229         private AbstractChannelHandlerContext ctx;
1230         private Object msg;
1231         private ChannelPromise promise;
1232         private int size; // sign bit controls flush
1233 
1234         @SuppressWarnings("unchecked")
1235         private WriteTask(Handle<? extends WriteTask> handle) {
1236             this.handle = (Handle<WriteTask>) handle;
1237         }
1238 
1239         protected static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1240                                    Object msg, ChannelPromise promise, boolean flush) {
1241             task.ctx = ctx;
1242             task.msg = msg;
1243             task.promise = promise;
1244 
1245             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1246                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1247                 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1248             } else {
1249                 task.size = 0;
1250             }
1251             if (flush) {
1252                 task.size |= Integer.MIN_VALUE;
1253             }
1254         }
1255 
1256         @Override
1257         public void run() {
1258             try {
1259                 decrementPendingOutboundBytes();
1260                 if (size >= 0) {
1261                     ctx.invokeWrite(msg, promise);
1262                 } else {
1263                     ctx.invokeWriteAndFlush(msg, promise);
1264                 }
1265             } finally {
1266                 recycle();
1267             }
1268         }
1269 
1270         void cancel() {
1271             try {
1272                 decrementPendingOutboundBytes();
1273             } finally {
1274                 recycle();
1275             }
1276         }
1277 
1278         private void decrementPendingOutboundBytes() {
1279             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1280                 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1281             }
1282         }
1283 
1284         private void recycle() {
1285             // Set to null so the GC can collect them directly
1286             ctx = null;
1287             msg = null;
1288             promise = null;
1289             handle.recycle(this);
1290         }
1291     }
1292 
1293     private static final class Tasks {
1294         private final AbstractChannelHandlerContext next;
1295         private final Runnable invokeChannelReadCompleteTask = new Runnable() {
1296             @Override
1297             public void run() {
1298                 next.invokeChannelReadComplete();
1299             }
1300         };
1301         private final Runnable invokeReadTask = new Runnable() {
1302             @Override
1303             public void run() {
1304                 next.invokeRead();
1305             }
1306         };
1307         private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
1308             @Override
1309             public void run() {
1310                 next.invokeChannelWritabilityChanged();
1311             }
1312         };
1313         private final Runnable invokeFlushTask = new Runnable() {
1314             @Override
1315             public void run() {
1316                 next.invokeFlush();
1317             }
1318         };
1319 
1320         Tasks(AbstractChannelHandlerContext next) {
1321             this.next = next;
1322         }
1323     }
1324 }