View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.ReferenceCountUtil;
22  import io.netty.util.ResourceLeakHint;
23  import io.netty.util.concurrent.AbstractEventExecutor;
24  import io.netty.util.concurrent.EventExecutor;
25  import io.netty.util.concurrent.OrderedEventExecutor;
26  import io.netty.util.internal.ObjectPool;
27  import io.netty.util.internal.ObjectPool.Handle;
28  import io.netty.util.internal.ObjectPool.ObjectCreator;
29  import io.netty.util.internal.ObjectUtil;
30  import io.netty.util.internal.PromiseNotificationUtil;
31  import io.netty.util.internal.StringUtil;
32  import io.netty.util.internal.SystemPropertyUtil;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.net.SocketAddress;
37  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
38  
39  import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
40  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
41  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
42  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
43  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
44  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
45  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
46  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
47  import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
48  import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
49  import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
50  import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
51  import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
52  import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
53  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
54  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
55  import static io.netty.channel.ChannelHandlerMask.MASK_READ;
56  import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
57  import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
58  import static io.netty.channel.ChannelHandlerMask.mask;
59  
60  abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
61  
62      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
63      volatile AbstractChannelHandlerContext next;
64      volatile AbstractChannelHandlerContext prev;
65  
66      private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
67              AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
68  
69      /**
70       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
71       */
72      private static final int ADD_PENDING = 1;
73      /**
74       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
75       */
76      private static final int ADD_COMPLETE = 2;
77      /**
78       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
79       */
80      private static final int REMOVE_COMPLETE = 3;
81      /**
82       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
83       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
84       */
85      private static final int INIT = 0;
86  
87      private final DefaultChannelPipeline pipeline;
88      private final String name;
89      private final boolean ordered;
90      private final int executionMask;
91  
92      // Will be set to null if no child executor should be used, otherwise it will be set to the
93      // child executor.
94      final EventExecutor childExecutor;
95      // Cache the concrete value for the executor() method. This method is in the hot-path,
96      // and it's a profitable optimisation to avoid as many dependent-loads as possible.
97      // It does not need to be volatile, because it's always the same value for a given context,
98      // within the lifetime of its registration with an event loop, and deregistering will clear it.
99      EventExecutor contextExecutor;
100     private ChannelFuture succeededFuture;
101 
102     // Lazily instantiated tasks used to trigger events to a handler with different executor.
103     // There is no need to make this volatile as at worse it will just create a few more instances then needed.
104     private Tasks invokeTasks;
105 
106     private volatile int handlerState = INIT;
107 
108     AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
109                                   String name, Class<? extends ChannelHandler> handlerClass) {
110         this.name = ObjectUtil.checkNotNull(name, "name");
111         this.pipeline = pipeline;
112         childExecutor = executor;
113         executionMask = mask(handlerClass);
114         // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
115         ordered = executor == null || executor instanceof OrderedEventExecutor;
116     }
117 
118     @Override
119     public Channel channel() {
120         return pipeline.channel();
121     }
122 
123     @Override
124     public ChannelPipeline pipeline() {
125         return pipeline;
126     }
127 
128     @Override
129     public ByteBufAllocator alloc() {
130         return channel().config().getAllocator();
131     }
132 
133     @Override
134     public EventExecutor executor() {
135         EventExecutor ex = contextExecutor;
136         if (ex == null) {
137             contextExecutor = ex = childExecutor != null ? childExecutor : channel().eventLoop();
138         }
139         return ex;
140     }
141 
142     @Override
143     public String name() {
144         return name;
145     }
146 
147     @Override
148     public ChannelHandlerContext fireChannelRegistered() {
149         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
150         if (next.executor().inEventLoop()) {
151             if (next.invokeHandler()) {
152                 try {
153                     // DON'T CHANGE
154                     // Duplex handlers implements both out/in interfaces causing a scalability issue
155                     // see https://bugs.openjdk.org/browse/JDK-8180450
156                     final ChannelHandler handler = next.handler();
157                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
158                     if (handler == headContext) {
159                         headContext.channelRegistered(next);
160                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
161                         ((ChannelInboundHandlerAdapter) handler).channelRegistered(next);
162                     } else {
163                         ((ChannelInboundHandler) handler).channelRegistered(next);
164                     }
165                 } catch (Throwable t) {
166                     next.invokeExceptionCaught(t);
167                 }
168             } else {
169                 next.fireChannelRegistered();
170             }
171         } else {
172             next.executor().execute(this::fireChannelRegistered);
173         }
174         return this;
175     }
176 
177     @Override
178     public ChannelHandlerContext fireChannelUnregistered() {
179         final AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED);
180         if (next.executor().inEventLoop()) {
181             if (next.invokeHandler()) {
182                 try {
183                     // DON'T CHANGE
184                     // Duplex handlers implements both out/in interfaces causing a scalability issue
185                     // see https://bugs.openjdk.org/browse/JDK-8180450
186                     final ChannelHandler handler = next.handler();
187                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
188                     if (handler == headContext) {
189                         headContext.channelUnregistered(next);
190                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
191                         ((ChannelInboundHandlerAdapter) handler).channelUnregistered(next);
192                     } else {
193                         ((ChannelInboundHandler) handler).channelUnregistered(next);
194                     }
195                 } catch (Throwable t) {
196                     next.invokeExceptionCaught(t);
197                 }
198             } else {
199                 next.fireChannelUnregistered();
200             }
201         } else {
202             next.executor().execute(this::fireChannelUnregistered);
203         }
204         return this;
205     }
206 
207     @Override
208     public ChannelHandlerContext fireChannelActive() {
209         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE);
210         if (next.executor().inEventLoop()) {
211             if (next.invokeHandler()) {
212                 try {
213                     // DON'T CHANGE
214                     // Duplex handlers implements both out/in interfaces causing a scalability issue
215                     // see https://bugs.openjdk.org/browse/JDK-8180450
216                     final ChannelHandler handler = next.handler();
217                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
218                     if (handler == headContext) {
219                         headContext.channelActive(next);
220                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
221                         ((ChannelInboundHandlerAdapter) handler).channelActive(next);
222                     } else {
223                         ((ChannelInboundHandler) handler).channelActive(next);
224                     }
225                 } catch (Throwable t) {
226                     next.invokeExceptionCaught(t);
227                 }
228             } else {
229                 next.fireChannelActive();
230             }
231         } else {
232             next.executor().execute(this::fireChannelActive);
233         }
234         return this;
235     }
236 
237     @Override
238     public ChannelHandlerContext fireChannelInactive() {
239         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE);
240         if (next.executor().inEventLoop()) {
241             if (next.invokeHandler()) {
242                 try {
243                     // DON'T CHANGE
244                     // Duplex handlers implements both out/in interfaces causing a scalability issue
245                     // see https://bugs.openjdk.org/browse/JDK-8180450
246                     final ChannelHandler handler = next.handler();
247                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
248                     if (handler == headContext) {
249                         headContext.channelInactive(next);
250                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
251                         ((ChannelInboundHandlerAdapter) handler).channelInactive(next);
252                     } else {
253                         ((ChannelInboundHandler) handler).channelInactive(next);
254                     }
255                 } catch (Throwable t) {
256                     next.invokeExceptionCaught(t);
257                 }
258             } else {
259                 next.fireChannelInactive();
260             }
261         } else {
262             next.executor().execute(this::fireChannelInactive);
263         }
264         return this;
265     }
266 
267     @Override
268     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
269         AbstractChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT);
270         ObjectUtil.checkNotNull(cause, "cause");
271         if (next.executor().inEventLoop()) {
272             next.invokeExceptionCaught(cause);
273         } else {
274             try {
275                 next.executor().execute(() -> next.invokeExceptionCaught(cause));
276             } catch (Throwable t) {
277                 if (logger.isWarnEnabled()) {
278                     logger.warn("Failed to submit an exceptionCaught() event.", t);
279                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
280                 }
281             }
282         }
283         return this;
284     }
285 
286     @SuppressWarnings("deprecation")
287     private void invokeExceptionCaught(final Throwable cause) {
288         if (invokeHandler()) {
289             try {
290                 handler().exceptionCaught(this, cause);
291             } catch (Throwable error) {
292                 if (logger.isDebugEnabled()) {
293                     logger.debug(
294                         "An exception " +
295                         "was thrown by a user handler's exceptionCaught() " +
296                         "method while handling the following exception:", cause);
297                 } else if (logger.isWarnEnabled()) {
298                     logger.warn(
299                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
300                         "was thrown by a user handler's exceptionCaught() " +
301                         "method while handling the following exception:", error, cause);
302                 }
303             }
304         } else {
305             fireExceptionCaught(cause);
306         }
307     }
308 
309     @Override
310     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
311         ObjectUtil.checkNotNull(event, "event");
312         AbstractChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED);
313         if (next.executor().inEventLoop()) {
314             if (next.invokeHandler()) {
315                 try {
316                     // DON'T CHANGE
317                     // Duplex handlers implements both out/in interfaces causing a scalability issue
318                     // see https://bugs.openjdk.org/browse/JDK-8180450
319                     final ChannelHandler handler = next.handler();
320                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
321                     if (handler == headContext) {
322                         headContext.userEventTriggered(next, event);
323                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
324                         ((ChannelInboundHandlerAdapter) handler).userEventTriggered(next, event);
325                     } else {
326                         ((ChannelInboundHandler) handler).userEventTriggered(next, event);
327                     }
328                 } catch (Throwable t) {
329                     next.invokeExceptionCaught(t);
330                 }
331             } else {
332                 next.fireUserEventTriggered(event);
333             }
334         } else {
335             next.executor().execute(() -> fireUserEventTriggered(event));
336         }
337         return this;
338     }
339 
340     @Override
341     public ChannelHandlerContext fireChannelRead(final Object msg) {
342         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
343         if (next.executor().inEventLoop()) {
344             final Object m = pipeline.touch(msg, next);
345             if (next.invokeHandler()) {
346                 try {
347                     // DON'T CHANGE
348                     // Duplex handlers implements both out/in interfaces causing a scalability issue
349                     // see https://bugs.openjdk.org/browse/JDK-8180450
350                     final ChannelHandler handler = next.handler();
351                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
352                     if (handler == headContext) {
353                         headContext.channelRead(next, m);
354                     } else if (handler instanceof ChannelDuplexHandler) {
355                         ((ChannelDuplexHandler) handler).channelRead(next, m);
356                     } else {
357                         ((ChannelInboundHandler) handler).channelRead(next, m);
358                     }
359                 } catch (Throwable t) {
360                     next.invokeExceptionCaught(t);
361                 }
362             } else {
363                 next.fireChannelRead(m);
364             }
365         } else {
366             next.executor().execute(() -> fireChannelRead(msg));
367         }
368         return this;
369     }
370 
371     @Override
372     public ChannelHandlerContext fireChannelReadComplete() {
373         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
374         if (next.executor().inEventLoop()) {
375             if (next.invokeHandler()) {
376                 try {
377                     // DON'T CHANGE
378                     // Duplex handlers implements both out/in interfaces causing a scalability issue
379                     // see https://bugs.openjdk.org/browse/JDK-8180450
380                     final ChannelHandler handler = next.handler();
381                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
382                     if (handler == headContext) {
383                         headContext.channelReadComplete(next);
384                     } else if (handler instanceof ChannelDuplexHandler) {
385                         ((ChannelDuplexHandler) handler).channelReadComplete(next);
386                     } else {
387                         ((ChannelInboundHandler) handler).channelReadComplete(next);
388                     }
389                 } catch (Throwable t) {
390                     next.invokeExceptionCaught(t);
391                 }
392             } else {
393                 next.fireChannelReadComplete();
394             }
395         } else {
396             next.executor().execute(getInvokeTasks().invokeChannelReadCompleteTask);
397         }
398         return this;
399     }
400 
401     @Override
402     public ChannelHandlerContext fireChannelWritabilityChanged() {
403         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
404         if (next.executor().inEventLoop()) {
405             if (next.invokeHandler()) {
406                 try {
407                     // DON'T CHANGE
408                     // Duplex handlers implements both out/in interfaces causing a scalability issue
409                     // see https://bugs.openjdk.org/browse/JDK-8180450
410                     final ChannelHandler handler = next.handler();
411                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
412                     if (handler == headContext) {
413                         headContext.channelWritabilityChanged(next);
414                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
415                         ((ChannelInboundHandlerAdapter) handler).channelWritabilityChanged(next);
416                     } else {
417                         ((ChannelInboundHandler) handler).channelWritabilityChanged(next);
418                     }
419                 } catch (Throwable t) {
420                     next.invokeExceptionCaught(t);
421                 }
422             } else {
423                 next.fireChannelWritabilityChanged();
424             }
425         } else {
426             next.executor().execute(getInvokeTasks().invokeChannelWritableStateChangedTask);
427         }
428         return this;
429     }
430 
431     @Override
432     public ChannelFuture bind(SocketAddress localAddress) {
433         return bind(localAddress, newPromise());
434     }
435 
436     @Override
437     public ChannelFuture connect(SocketAddress remoteAddress) {
438         return connect(remoteAddress, newPromise());
439     }
440 
441     @Override
442     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
443         return connect(remoteAddress, localAddress, newPromise());
444     }
445 
446     @Override
447     public ChannelFuture disconnect() {
448         return disconnect(newPromise());
449     }
450 
451     @Override
452     public ChannelFuture close() {
453         return close(newPromise());
454     }
455 
456     @Override
457     public ChannelFuture deregister() {
458         return deregister(newPromise());
459     }
460 
461     @Override
462     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
463         ObjectUtil.checkNotNull(localAddress, "localAddress");
464         if (isNotValidPromise(promise, false)) {
465             // cancelled
466             return promise;
467         }
468 
469         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
470         EventExecutor executor = next.executor();
471         if (executor.inEventLoop()) {
472             next.invokeBind(localAddress, promise);
473         } else {
474             safeExecute(executor, new Runnable() {
475                 @Override
476                 public void run() {
477                     next.invokeBind(localAddress, promise);
478                 }
479             }, promise, null, false);
480         }
481         return promise;
482     }
483 
484     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
485         if (invokeHandler()) {
486             try {
487                 // DON'T CHANGE
488                 // Duplex handlers implements both out/in interfaces causing a scalability issue
489                 // see https://bugs.openjdk.org/browse/JDK-8180450
490                 final ChannelHandler handler = handler();
491                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
492                 if (handler == headContext) {
493                     headContext.bind(this, localAddress, promise);
494                 } else if (handler instanceof ChannelDuplexHandler) {
495                     ((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
496                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
497                     ((ChannelOutboundHandlerAdapter) handler).bind(this, localAddress, promise);
498                 } else {
499                     ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
500                 }
501             } catch (Throwable t) {
502                 notifyOutboundHandlerException(t, promise);
503             }
504         } else {
505             bind(localAddress, promise);
506         }
507     }
508 
509     @Override
510     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
511         return connect(remoteAddress, null, promise);
512     }
513 
514     @Override
515     public ChannelFuture connect(
516             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
517         ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
518 
519         if (isNotValidPromise(promise, false)) {
520             // cancelled
521             return promise;
522         }
523 
524         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
525         EventExecutor executor = next.executor();
526         if (executor.inEventLoop()) {
527             next.invokeConnect(remoteAddress, localAddress, promise);
528         } else {
529             safeExecute(executor, new Runnable() {
530                 @Override
531                 public void run() {
532                     next.invokeConnect(remoteAddress, localAddress, promise);
533                 }
534             }, promise, null, false);
535         }
536         return promise;
537     }
538 
539     private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
540         if (invokeHandler()) {
541             try {
542                 // DON'T CHANGE
543                 // Duplex handlers implements both out/in interfaces causing a scalability issue
544                 // see https://bugs.openjdk.org/browse/JDK-8180450
545                 final ChannelHandler handler = handler();
546                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
547                 if (handler == headContext) {
548                     headContext.connect(this, remoteAddress, localAddress, promise);
549                 } else if (handler instanceof ChannelDuplexHandler) {
550                     ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise);
551                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
552                     ((ChannelOutboundHandlerAdapter) handler).connect(this, remoteAddress, localAddress, promise);
553                 } else {
554                     ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
555                 }
556             } catch (Throwable t) {
557                 notifyOutboundHandlerException(t, promise);
558             }
559         } else {
560             connect(remoteAddress, localAddress, promise);
561         }
562     }
563 
564     @Override
565     public ChannelFuture disconnect(final ChannelPromise promise) {
566         if (!channel().metadata().hasDisconnect()) {
567             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
568             // So far, UDP/IP is the only transport that has such behavior.
569             return close(promise);
570         }
571         if (isNotValidPromise(promise, false)) {
572             // cancelled
573             return promise;
574         }
575 
576         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
577         EventExecutor executor = next.executor();
578         if (executor.inEventLoop()) {
579             next.invokeDisconnect(promise);
580         } else {
581             safeExecute(executor, new Runnable() {
582                 @Override
583                 public void run() {
584                     next.invokeDisconnect(promise);
585                 }
586             }, promise, null, false);
587         }
588         return promise;
589     }
590 
591     private void invokeDisconnect(ChannelPromise promise) {
592         if (invokeHandler()) {
593             try {
594                 // DON'T CHANGE
595                 // Duplex handlers implements both out/in interfaces causing a scalability issue
596                 // see https://bugs.openjdk.org/browse/JDK-8180450
597                 final ChannelHandler handler = handler();
598                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
599                 if (handler == headContext) {
600                     headContext.disconnect(this, promise);
601                 } else if (handler instanceof ChannelDuplexHandler) {
602                     ((ChannelDuplexHandler) handler).disconnect(this, promise);
603                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
604                     ((ChannelOutboundHandlerAdapter) handler).disconnect(this, promise);
605                 } else {
606                     ((ChannelOutboundHandler) handler).disconnect(this, promise);
607                 }
608             } catch (Throwable t) {
609                 notifyOutboundHandlerException(t, promise);
610             }
611         } else {
612             disconnect(promise);
613         }
614     }
615 
616     @Override
617     public ChannelFuture close(final ChannelPromise promise) {
618         if (isNotValidPromise(promise, false)) {
619             // cancelled
620             return promise;
621         }
622 
623         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
624         EventExecutor executor = next.executor();
625         if (executor.inEventLoop()) {
626             next.invokeClose(promise);
627         } else {
628             safeExecute(executor, new Runnable() {
629                 @Override
630                 public void run() {
631                     next.invokeClose(promise);
632                 }
633             }, promise, null, false);
634         }
635 
636         return promise;
637     }
638 
639     private void invokeClose(ChannelPromise promise) {
640         if (invokeHandler()) {
641             try {
642                 // DON'T CHANGE
643                 // Duplex handlers implements both out/in interfaces causing a scalability issue
644                 // see https://bugs.openjdk.org/browse/JDK-8180450
645                 final ChannelHandler handler = handler();
646                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
647                 if (handler == headContext) {
648                     headContext.close(this, promise);
649                 } else if (handler instanceof ChannelDuplexHandler) {
650                     ((ChannelDuplexHandler) handler).close(this, promise);
651                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
652                     ((ChannelOutboundHandlerAdapter) handler).close(this, promise);
653                 } else {
654                     ((ChannelOutboundHandler) handler).close(this, promise);
655                 }
656             } catch (Throwable t) {
657                 notifyOutboundHandlerException(t, promise);
658             }
659         } else {
660             close(promise);
661         }
662     }
663 
664     @Override
665     public ChannelFuture deregister(final ChannelPromise promise) {
666         if (isNotValidPromise(promise, false)) {
667             // cancelled
668             return promise;
669         }
670 
671         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
672         EventExecutor executor = next.executor();
673         if (executor.inEventLoop()) {
674             next.invokeDeregister(promise);
675         } else {
676             safeExecute(executor, new Runnable() {
677                 @Override
678                 public void run() {
679                     next.invokeDeregister(promise);
680                 }
681             }, promise, null, false);
682         }
683 
684         return promise;
685     }
686 
687     private void invokeDeregister(ChannelPromise promise) {
688         if (invokeHandler()) {
689             try {
690                 // DON'T CHANGE
691                 // Duplex handlers implements both out/in interfaces causing a scalability issue
692                 // see https://bugs.openjdk.org/browse/JDK-8180450
693                 final ChannelHandler handler = handler();
694                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
695                 if (handler == headContext) {
696                     headContext.deregister(this, promise);
697                 } else if (handler instanceof ChannelDuplexHandler) {
698                     ((ChannelDuplexHandler) handler).deregister(this, promise);
699                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
700                     ((ChannelOutboundHandlerAdapter) handler).deregister(this, promise);
701                 } else {
702                     ((ChannelOutboundHandler) handler).deregister(this, promise);
703                 }
704             } catch (Throwable t) {
705                 notifyOutboundHandlerException(t, promise);
706             }
707         } else {
708             deregister(promise);
709         }
710     }
711 
712     @Override
713     public ChannelHandlerContext read() {
714         final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
715         if (next.executor().inEventLoop()) {
716             if (next.invokeHandler()) {
717                 try {
718                     // DON'T CHANGE
719                     // Duplex handlers implements both out/in interfaces causing a scalability issue
720                     // see https://bugs.openjdk.org/browse/JDK-8180450
721                     final ChannelHandler handler = next.handler();
722                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
723                     if (handler == headContext) {
724                         headContext.read(next);
725                     } else if (handler instanceof ChannelDuplexHandler) {
726                         ((ChannelDuplexHandler) handler).read(next);
727                     } else if (handler instanceof ChannelOutboundHandlerAdapter) {
728                         ((ChannelOutboundHandlerAdapter) handler).read(next);
729                     } else {
730                         ((ChannelOutboundHandler) handler).read(next);
731                     }
732                 } catch (Throwable t) {
733                     invokeExceptionCaught(t);
734                 }
735             } else {
736                 next.read();
737             }
738         } else {
739             next.executor().execute(getInvokeTasks().invokeReadTask);
740         }
741         return this;
742     }
743 
744     @Override
745     public ChannelFuture write(Object msg) {
746         ChannelPromise promise = newPromise();
747         write(msg, false, promise);
748         return promise;
749     }
750 
751     @Override
752     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
753         write(msg, false, promise);
754         return promise;
755     }
756 
757     @Override
758     public ChannelHandlerContext flush() {
759         final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
760         EventExecutor executor = next.executor();
761         if (executor.inEventLoop()) {
762             next.invokeFlush();
763         } else {
764             Tasks tasks = next.invokeTasks;
765             if (tasks == null) {
766                 next.invokeTasks = tasks = new Tasks(next);
767             }
768             safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
769         }
770 
771         return this;
772     }
773 
774     private void invokeFlush() {
775         if (invokeHandler()) {
776             invokeFlush0();
777         } else {
778             flush();
779         }
780     }
781 
782     private void invokeFlush0() {
783         try {
784             // DON'T CHANGE
785             // Duplex handlers implements both out/in interfaces causing a scalability issue
786             // see https://bugs.openjdk.org/browse/JDK-8180450
787             final ChannelHandler handler = handler();
788             final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
789             if (handler == headContext) {
790                 headContext.flush(this);
791             } else if (handler instanceof ChannelDuplexHandler) {
792                 ((ChannelDuplexHandler) handler).flush(this);
793             } else if (handler instanceof ChannelOutboundHandlerAdapter) {
794                 ((ChannelOutboundHandlerAdapter) handler).flush(this);
795             } else {
796                 ((ChannelOutboundHandler) handler).flush(this);
797             }
798         } catch (Throwable t) {
799             invokeExceptionCaught(t);
800         }
801     }
802 
803     @Override
804     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
805         write(msg, true, promise);
806         return promise;
807     }
808 
809     void write(Object msg, boolean flush, ChannelPromise promise) {
810         if (validateWrite(msg, promise)) {
811             final AbstractChannelHandlerContext next = findContextOutbound(flush ?
812                     MASK_WRITE | MASK_FLUSH : MASK_WRITE);
813             final Object m = pipeline.touch(msg, next);
814             EventExecutor executor = next.executor();
815             if (executor.inEventLoop()) {
816                 if (next.invokeHandler()) {
817                     try {
818                         // DON'T CHANGE
819                         // Duplex handlers implements both out/in interfaces causing a scalability issue
820                         // see https://bugs.openjdk.org/browse/JDK-8180450
821                         final ChannelHandler handler = next.handler();
822                         final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
823                         if (handler == headContext) {
824                             headContext.write(next, msg, promise);
825                         } else if (handler instanceof ChannelDuplexHandler) {
826                             ((ChannelDuplexHandler) handler).write(next, msg, promise);
827                         } else if (handler instanceof ChannelOutboundHandlerAdapter) {
828                             ((ChannelOutboundHandlerAdapter) handler).write(next, msg, promise);
829                         } else {
830                             ((ChannelOutboundHandler) handler).write(next, msg, promise);
831                         }
832                     } catch (Throwable t) {
833                         notifyOutboundHandlerException(t, promise);
834                     }
835                     if (flush) {
836                         next.invokeFlush0();
837                     }
838                 } else {
839                     next.write(msg, flush, promise);
840                 }
841             } else {
842                 final WriteTask task = WriteTask.newInstance(this, m, promise, flush);
843                 if (!safeExecute(executor, task, promise, m, !flush)) {
844                     // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
845                     // and put it back in the Recycler for re-use later.
846                     //
847                     // See https://github.com/netty/netty/issues/8343.
848                     task.cancel();
849                 }
850             }
851         }
852     }
853 
854     private boolean validateWrite(Object msg, ChannelPromise promise) {
855         ObjectUtil.checkNotNull(msg, "msg");
856         try {
857             if (isNotValidPromise(promise, true)) {
858                 ReferenceCountUtil.release(msg);
859                 return false; // cancelled
860             }
861         } catch (RuntimeException e) {
862             ReferenceCountUtil.release(msg);
863             throw e;
864         }
865         return true;
866     }
867 
868     @Override
869     public ChannelFuture writeAndFlush(Object msg) {
870         return writeAndFlush(msg, newPromise());
871     }
872 
873     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
874         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
875         // false.
876         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
877     }
878 
879     @Override
880     public ChannelPromise newPromise() {
881         return new DefaultChannelPromise(channel(), executor());
882     }
883 
884     @Override
885     public ChannelProgressivePromise newProgressivePromise() {
886         return new DefaultChannelProgressivePromise(channel(), executor());
887     }
888 
889     @Override
890     public ChannelFuture newSucceededFuture() {
891         ChannelFuture succeededFuture = this.succeededFuture;
892         if (succeededFuture == null) {
893             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
894         }
895         return succeededFuture;
896     }
897 
898     @Override
899     public ChannelFuture newFailedFuture(Throwable cause) {
900         return new FailedChannelFuture(channel(), executor(), cause);
901     }
902 
903     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
904         ObjectUtil.checkNotNull(promise, "promise");
905 
906         if (promise.isDone()) {
907             // Check if the promise was cancelled and if so signal that the processing of the operation
908             // should not be performed.
909             //
910             // See https://github.com/netty/netty/issues/2349
911             if (promise.isCancelled()) {
912                 return true;
913             }
914             throw new IllegalArgumentException("promise already done: " + promise);
915         }
916 
917         if (promise.channel() != channel()) {
918             throw new IllegalArgumentException(String.format(
919                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
920         }
921 
922         if (promise.getClass() == DefaultChannelPromise.class) {
923             return false;
924         }
925 
926         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
927             throw new IllegalArgumentException(
928                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
929         }
930 
931         if (promise instanceof AbstractChannel.CloseFuture) {
932             throw new IllegalArgumentException(
933                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
934         }
935         return false;
936     }
937 
938     private AbstractChannelHandlerContext findContextInbound(int mask) {
939         AbstractChannelHandlerContext ctx = this;
940         EventExecutor currentExecutor = executor();
941         do {
942             ctx = ctx.next;
943         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
944         return ctx;
945     }
946 
947     private AbstractChannelHandlerContext findContextOutbound(int mask) {
948         AbstractChannelHandlerContext ctx = this;
949         EventExecutor currentExecutor = executor();
950         do {
951             ctx = ctx.prev;
952         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
953         return ctx;
954     }
955 
956     private static boolean skipContext(
957             AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
958         // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
959         return (ctx.executionMask & (onlyMask | mask)) == 0 ||
960                 // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
961                 // everything to preserve ordering.
962                 //
963                 // See https://github.com/netty/netty/issues/10067
964                 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
965     }
966 
967     @Override
968     public ChannelPromise voidPromise() {
969         return channel().voidPromise();
970     }
971 
972     final void setRemoved() {
973         handlerState = REMOVE_COMPLETE;
974     }
975 
976     final boolean setAddComplete() {
977         for (;;) {
978             int oldState = handlerState;
979             if (oldState == REMOVE_COMPLETE) {
980                 return false;
981             }
982             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
983             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
984             // exposing ordering guarantees.
985             if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
986                 return true;
987             }
988         }
989     }
990 
991     final void setAddPending() {
992         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
993         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
994     }
995 
996     final void callHandlerAdded() throws Exception {
997         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
998         // any pipeline events ctx.handler() will miss them because the state will not allow it.
999         if (setAddComplete()) {
1000             handler().handlerAdded(this);
1001         }
1002     }
1003 
1004     final void callHandlerRemoved() throws Exception {
1005         try {
1006             // Only call handlerRemoved(...) if we called handlerAdded(...) before.
1007             if (handlerState == ADD_COMPLETE) {
1008                 handler().handlerRemoved(this);
1009             }
1010         } finally {
1011             // Mark the handler as removed in any case.
1012             setRemoved();
1013         }
1014     }
1015 
1016     /**
1017      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
1018      * yet. If not return {@code false} and if called or could not detect return {@code true}.
1019      *
1020      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
1021      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
1022      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
1023      */
1024     boolean invokeHandler() {
1025         // Store in local variable to reduce volatile reads.
1026         int handlerState = this.handlerState;
1027         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
1028     }
1029 
1030     @Override
1031     public boolean isRemoved() {
1032         return handlerState == REMOVE_COMPLETE;
1033     }
1034 
1035     @Override
1036     public <T> Attribute<T> attr(AttributeKey<T> key) {
1037         return channel().attr(key);
1038     }
1039 
1040     @Override
1041     public <T> boolean hasAttr(AttributeKey<T> key) {
1042         return channel().hasAttr(key);
1043     }
1044 
1045     private static boolean safeExecute(EventExecutor executor, Runnable runnable,
1046             ChannelPromise promise, Object msg, boolean lazy) {
1047         try {
1048             if (lazy && executor instanceof AbstractEventExecutor) {
1049                 ((AbstractEventExecutor) executor).lazyExecute(runnable);
1050             } else {
1051                 executor.execute(runnable);
1052             }
1053             return true;
1054         } catch (Throwable cause) {
1055             try {
1056                 if (msg != null) {
1057                     ReferenceCountUtil.release(msg);
1058                 }
1059             } finally {
1060                 promise.setFailure(cause);
1061             }
1062             return false;
1063         }
1064     }
1065 
1066     @Override
1067     public String toHintString() {
1068         return '\'' + name + "' will handle the message from this point.";
1069     }
1070 
1071     @Override
1072     public String toString() {
1073         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1074     }
1075 
1076     Tasks getInvokeTasks() {
1077         Tasks tasks = invokeTasks;
1078         if (tasks == null) {
1079             invokeTasks = tasks = new Tasks(this);
1080         }
1081         return tasks;
1082     }
1083 
1084     static final class WriteTask implements Runnable {
1085         private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
1086             @Override
1087             public WriteTask newObject(Handle<WriteTask> handle) {
1088                 return new WriteTask(handle);
1089             }
1090         });
1091 
1092         static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1093                 Object msg, ChannelPromise promise, boolean flush) {
1094             WriteTask task = RECYCLER.get();
1095             init(task, ctx, msg, promise, flush);
1096             return task;
1097         }
1098 
1099         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1100                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1101 
1102         // Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field
1103         private static final int WRITE_TASK_OVERHEAD =
1104                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1105 
1106         private final Handle<WriteTask> handle;
1107         private AbstractChannelHandlerContext ctx;
1108         private Object msg;
1109         private ChannelPromise promise;
1110         private int size; // sign bit controls flush
1111 
1112         private WriteTask(Handle<WriteTask> handle) {
1113             this.handle = handle;
1114         }
1115 
1116         static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1117                                    Object msg, ChannelPromise promise, boolean flush) {
1118             task.ctx = ctx;
1119             task.msg = msg;
1120             task.promise = promise;
1121 
1122             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1123                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1124                 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1125             } else {
1126                 task.size = 0;
1127             }
1128             if (flush) {
1129                 task.size |= Integer.MIN_VALUE;
1130             }
1131         }
1132 
1133         @Override
1134         public void run() {
1135             try {
1136                 decrementPendingOutboundBytes();
1137                 ctx.write(msg, size < 0, promise);
1138             } finally {
1139                 recycle();
1140             }
1141         }
1142 
1143         void cancel() {
1144             try {
1145                 decrementPendingOutboundBytes();
1146             } finally {
1147                 recycle();
1148             }
1149         }
1150 
1151         private void decrementPendingOutboundBytes() {
1152             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1153                 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1154             }
1155         }
1156 
1157         private void recycle() {
1158             // Set to null so the GC can collect them directly
1159             ctx = null;
1160             msg = null;
1161             promise = null;
1162             handle.recycle(this);
1163         }
1164     }
1165 
1166     static final class Tasks {
1167         final Runnable invokeChannelReadCompleteTask;
1168         private final Runnable invokeReadTask;
1169         private final Runnable invokeChannelWritableStateChangedTask;
1170         private final Runnable invokeFlushTask;
1171 
1172         Tasks(AbstractChannelHandlerContext ctx) {
1173             invokeChannelReadCompleteTask = ctx::fireChannelReadComplete;
1174             invokeReadTask = ctx::read;
1175             invokeChannelWritableStateChangedTask = ctx::fireChannelWritabilityChanged;
1176             invokeFlushTask = ctx::invokeFlush;
1177         }
1178     }
1179 }