View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.ReferenceCountUtil;
22  import io.netty.util.ResourceLeakHint;
23  import io.netty.util.concurrent.AbstractEventExecutor;
24  import io.netty.util.concurrent.EventExecutor;
25  import io.netty.util.concurrent.OrderedEventExecutor;
26  import io.netty.util.internal.ObjectPool;
27  import io.netty.util.internal.ObjectPool.Handle;
28  import io.netty.util.internal.ObjectPool.ObjectCreator;
29  import io.netty.util.internal.PromiseNotificationUtil;
30  import io.netty.util.internal.ThrowableUtil;
31  import io.netty.util.internal.ObjectUtil;
32  import io.netty.util.internal.StringUtil;
33  import io.netty.util.internal.SystemPropertyUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.net.SocketAddress;
38  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39  
40  import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
41  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
42  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
43  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
44  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
45  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
46  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
47  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
48  import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
49  import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
50  import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
51  import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
52  import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
53  import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
54  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
55  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
56  import static io.netty.channel.ChannelHandlerMask.MASK_READ;
57  import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
58  import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
59  import static io.netty.channel.ChannelHandlerMask.mask;
60  
61  abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
62  
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
64      volatile AbstractChannelHandlerContext next;
65      volatile AbstractChannelHandlerContext prev;
66  
67      private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
68              AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
69  
70      /**
71       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
72       */
73      private static final int ADD_PENDING = 1;
74      /**
75       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
76       */
77      private static final int ADD_COMPLETE = 2;
78      /**
79       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
80       */
81      private static final int REMOVE_COMPLETE = 3;
82      /**
83       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
84       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
85       */
86      private static final int INIT = 0;
87  
88      private final DefaultChannelPipeline pipeline;
89      private final String name;
90      private final boolean ordered;
91      private final int executionMask;
92  
93      // Will be set to null if no child executor should be used, otherwise it will be set to the
94      // child executor.
95      final EventExecutor childExecutor;
96      // Cache the concrete value for the executor() method. This method is in the hot-path,
97      // and it's a profitable optimisation to avoid as many dependent-loads as possible.
98      // It does not need to be volatile, because it's always the same value for a given context,
99      // within the lifetime of its registration with an event loop, and deregistering will clear it.
100     EventExecutor contextExecutor;
101     private ChannelFuture succeededFuture;
102 
103     // Lazily instantiated tasks used to trigger events to a handler with different executor.
104     // There is no need to make this volatile as at worse it will just create a few more instances then needed.
105     private Tasks invokeTasks;
106 
107     private volatile int handlerState = INIT;
108 
109     AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
110                                   String name, Class<? extends ChannelHandler> handlerClass) {
111         this.name = ObjectUtil.checkNotNull(name, "name");
112         this.pipeline = pipeline;
113         childExecutor = executor;
114         executionMask = mask(handlerClass);
115         // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
116         ordered = executor == null || executor instanceof OrderedEventExecutor;
117     }
118 
119     @Override
120     public Channel channel() {
121         return pipeline.channel();
122     }
123 
124     @Override
125     public ChannelPipeline pipeline() {
126         return pipeline;
127     }
128 
129     @Override
130     public ByteBufAllocator alloc() {
131         return channel().config().getAllocator();
132     }
133 
134     @Override
135     public EventExecutor executor() {
136         EventExecutor ex = contextExecutor;
137         if (ex == null) {
138             contextExecutor = ex = childExecutor != null ? childExecutor : channel().eventLoop();
139         }
140         return ex;
141     }
142 
143     @Override
144     public String name() {
145         return name;
146     }
147 
148     @Override
149     public ChannelHandlerContext fireChannelRegistered() {
150         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
151         if (next.executor().inEventLoop()) {
152             if (next.invokeHandler()) {
153                 try {
154                     // DON'T CHANGE
155                     // Duplex handlers implements both out/in interfaces causing a scalability issue
156                     // see https://bugs.openjdk.org/browse/JDK-8180450
157                     final ChannelHandler handler = next.handler();
158                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
159                     if (handler == headContext) {
160                         headContext.channelRegistered(next);
161                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
162                         ((ChannelInboundHandlerAdapter) handler).channelRegistered(next);
163                     } else {
164                         ((ChannelInboundHandler) handler).channelRegistered(next);
165                     }
166                 } catch (Throwable t) {
167                     next.invokeExceptionCaught(t);
168                 }
169             } else {
170                 next.fireChannelRegistered();
171             }
172         } else {
173             next.executor().execute(this::fireChannelRegistered);
174         }
175         return this;
176     }
177 
178     @Override
179     public ChannelHandlerContext fireChannelUnregistered() {
180         final AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED);
181         if (next.executor().inEventLoop()) {
182             if (next.invokeHandler()) {
183                 try {
184                     // DON'T CHANGE
185                     // Duplex handlers implements both out/in interfaces causing a scalability issue
186                     // see https://bugs.openjdk.org/browse/JDK-8180450
187                     final ChannelHandler handler = next.handler();
188                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
189                     if (handler == headContext) {
190                         headContext.channelUnregistered(next);
191                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
192                         ((ChannelInboundHandlerAdapter) handler).channelUnregistered(next);
193                     } else {
194                         ((ChannelInboundHandler) handler).channelUnregistered(next);
195                     }
196                 } catch (Throwable t) {
197                     next.invokeExceptionCaught(t);
198                 }
199             } else {
200                 next.fireChannelUnregistered();
201             }
202         } else {
203             next.executor().execute(this::fireChannelUnregistered);
204         }
205         return this;
206     }
207 
208     @Override
209     public ChannelHandlerContext fireChannelActive() {
210         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE);
211         if (next.executor().inEventLoop()) {
212             if (next.invokeHandler()) {
213                 try {
214                     // DON'T CHANGE
215                     // Duplex handlers implements both out/in interfaces causing a scalability issue
216                     // see https://bugs.openjdk.org/browse/JDK-8180450
217                     final ChannelHandler handler = next.handler();
218                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
219                     if (handler == headContext) {
220                         headContext.channelActive(next);
221                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
222                         ((ChannelInboundHandlerAdapter) handler).channelActive(next);
223                     } else {
224                         ((ChannelInboundHandler) handler).channelActive(next);
225                     }
226                 } catch (Throwable t) {
227                     next.invokeExceptionCaught(t);
228                 }
229             } else {
230                 next.fireChannelActive();
231             }
232         } else {
233             next.executor().execute(this::fireChannelActive);
234         }
235         return this;
236     }
237 
238     @Override
239     public ChannelHandlerContext fireChannelInactive() {
240         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE);
241         if (next.executor().inEventLoop()) {
242             if (next.invokeHandler()) {
243                 try {
244                     // DON'T CHANGE
245                     // Duplex handlers implements both out/in interfaces causing a scalability issue
246                     // see https://bugs.openjdk.org/browse/JDK-8180450
247                     final ChannelHandler handler = next.handler();
248                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
249                     if (handler == headContext) {
250                         headContext.channelInactive(next);
251                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
252                         ((ChannelInboundHandlerAdapter) handler).channelInactive(next);
253                     } else {
254                         ((ChannelInboundHandler) handler).channelInactive(next);
255                     }
256                 } catch (Throwable t) {
257                     next.invokeExceptionCaught(t);
258                 }
259             } else {
260                 next.fireChannelInactive();
261             }
262         } else {
263             next.executor().execute(this::fireChannelInactive);
264         }
265         return this;
266     }
267 
268     @Override
269     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
270         AbstractChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT);
271         ObjectUtil.checkNotNull(cause, "cause");
272         if (next.executor().inEventLoop()) {
273             next.invokeExceptionCaught(cause);
274         } else {
275             try {
276                 next.executor().execute(() -> next.invokeExceptionCaught(cause));
277             } catch (Throwable t) {
278                 if (logger.isWarnEnabled()) {
279                     logger.warn("Failed to submit an exceptionCaught() event.", t);
280                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
281                 }
282             }
283         }
284         return this;
285     }
286 
287     @SuppressWarnings("deprecation")
288     private void invokeExceptionCaught(final Throwable cause) {
289         if (invokeHandler()) {
290             try {
291                 handler().exceptionCaught(this, cause);
292             } catch (Throwable error) {
293                 if (logger.isDebugEnabled()) {
294                     logger.debug(
295                         "An exception {}" +
296                         "was thrown by a user handler's exceptionCaught() " +
297                         "method while handling the following exception:",
298                         ThrowableUtil.stackTraceToString(error), cause);
299                 } else if (logger.isWarnEnabled()) {
300                     logger.warn(
301                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
302                         "was thrown by a user handler's exceptionCaught() " +
303                         "method while handling the following exception:", error, cause);
304                 }
305             }
306         } else {
307             fireExceptionCaught(cause);
308         }
309     }
310 
311     @Override
312     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
313         ObjectUtil.checkNotNull(event, "event");
314         AbstractChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED);
315         if (next.executor().inEventLoop()) {
316             if (next.invokeHandler()) {
317                 try {
318                     // DON'T CHANGE
319                     // Duplex handlers implements both out/in interfaces causing a scalability issue
320                     // see https://bugs.openjdk.org/browse/JDK-8180450
321                     final ChannelHandler handler = next.handler();
322                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
323                     if (handler == headContext) {
324                         headContext.userEventTriggered(next, event);
325                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
326                         ((ChannelInboundHandlerAdapter) handler).userEventTriggered(next, event);
327                     } else {
328                         ((ChannelInboundHandler) handler).userEventTriggered(next, event);
329                     }
330                 } catch (Throwable t) {
331                     next.invokeExceptionCaught(t);
332                 }
333             } else {
334                 next.fireUserEventTriggered(event);
335             }
336         } else {
337             next.executor().execute(() -> fireUserEventTriggered(event));
338         }
339         return this;
340     }
341 
342     @Override
343     public ChannelHandlerContext fireChannelRead(final Object msg) {
344         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
345         if (next.executor().inEventLoop()) {
346             final Object m = pipeline.touch(msg, next);
347             if (next.invokeHandler()) {
348                 try {
349                     // DON'T CHANGE
350                     // Duplex handlers implements both out/in interfaces causing a scalability issue
351                     // see https://bugs.openjdk.org/browse/JDK-8180450
352                     final ChannelHandler handler = next.handler();
353                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
354                     if (handler == headContext) {
355                         headContext.channelRead(next, m);
356                     } else if (handler instanceof ChannelDuplexHandler) {
357                         ((ChannelDuplexHandler) handler).channelRead(next, m);
358                     } else {
359                         ((ChannelInboundHandler) handler).channelRead(next, m);
360                     }
361                 } catch (Throwable t) {
362                     next.invokeExceptionCaught(t);
363                 }
364             } else {
365                 next.fireChannelRead(m);
366             }
367         } else {
368             next.executor().execute(() -> fireChannelRead(msg));
369         }
370         return this;
371     }
372 
373     @Override
374     public ChannelHandlerContext fireChannelReadComplete() {
375         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
376         if (next.executor().inEventLoop()) {
377             if (next.invokeHandler()) {
378                 try {
379                     // DON'T CHANGE
380                     // Duplex handlers implements both out/in interfaces causing a scalability issue
381                     // see https://bugs.openjdk.org/browse/JDK-8180450
382                     final ChannelHandler handler = next.handler();
383                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
384                     if (handler == headContext) {
385                         headContext.channelReadComplete(next);
386                     } else if (handler instanceof ChannelDuplexHandler) {
387                         ((ChannelDuplexHandler) handler).channelReadComplete(next);
388                     } else {
389                         ((ChannelInboundHandler) handler).channelReadComplete(next);
390                     }
391                 } catch (Throwable t) {
392                     next.invokeExceptionCaught(t);
393                 }
394             } else {
395                 next.fireChannelReadComplete();
396             }
397         } else {
398             next.executor().execute(getInvokeTasks().invokeChannelReadCompleteTask);
399         }
400         return this;
401     }
402 
403     @Override
404     public ChannelHandlerContext fireChannelWritabilityChanged() {
405         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
406         if (next.executor().inEventLoop()) {
407             if (next.invokeHandler()) {
408                 try {
409                     // DON'T CHANGE
410                     // Duplex handlers implements both out/in interfaces causing a scalability issue
411                     // see https://bugs.openjdk.org/browse/JDK-8180450
412                     final ChannelHandler handler = next.handler();
413                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
414                     if (handler == headContext) {
415                         headContext.channelWritabilityChanged(next);
416                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
417                         ((ChannelInboundHandlerAdapter) handler).channelWritabilityChanged(next);
418                     } else {
419                         ((ChannelInboundHandler) handler).channelWritabilityChanged(next);
420                     }
421                 } catch (Throwable t) {
422                     next.invokeExceptionCaught(t);
423                 }
424             } else {
425                 next.fireChannelWritabilityChanged();
426             }
427         } else {
428             next.executor().execute(getInvokeTasks().invokeChannelWritableStateChangedTask);
429         }
430         return this;
431     }
432 
433     @Override
434     public ChannelFuture bind(SocketAddress localAddress) {
435         return bind(localAddress, newPromise());
436     }
437 
438     @Override
439     public ChannelFuture connect(SocketAddress remoteAddress) {
440         return connect(remoteAddress, newPromise());
441     }
442 
443     @Override
444     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
445         return connect(remoteAddress, localAddress, newPromise());
446     }
447 
448     @Override
449     public ChannelFuture disconnect() {
450         return disconnect(newPromise());
451     }
452 
453     @Override
454     public ChannelFuture close() {
455         return close(newPromise());
456     }
457 
458     @Override
459     public ChannelFuture deregister() {
460         return deregister(newPromise());
461     }
462 
463     @Override
464     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
465         ObjectUtil.checkNotNull(localAddress, "localAddress");
466         if (isNotValidPromise(promise, false)) {
467             // cancelled
468             return promise;
469         }
470 
471         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
472         EventExecutor executor = next.executor();
473         if (executor.inEventLoop()) {
474             next.invokeBind(localAddress, promise);
475         } else {
476             safeExecute(executor, new Runnable() {
477                 @Override
478                 public void run() {
479                     next.invokeBind(localAddress, promise);
480                 }
481             }, promise, null, false);
482         }
483         return promise;
484     }
485 
486     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
487         if (invokeHandler()) {
488             try {
489                 // DON'T CHANGE
490                 // Duplex handlers implements both out/in interfaces causing a scalability issue
491                 // see https://bugs.openjdk.org/browse/JDK-8180450
492                 final ChannelHandler handler = handler();
493                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
494                 if (handler == headContext) {
495                     headContext.bind(this, localAddress, promise);
496                 } else if (handler instanceof ChannelDuplexHandler) {
497                     ((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
498                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
499                     ((ChannelOutboundHandlerAdapter) handler).bind(this, localAddress, promise);
500                 } else {
501                     ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
502                 }
503             } catch (Throwable t) {
504                 notifyOutboundHandlerException(t, promise);
505             }
506         } else {
507             bind(localAddress, promise);
508         }
509     }
510 
511     @Override
512     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
513         return connect(remoteAddress, null, promise);
514     }
515 
516     @Override
517     public ChannelFuture connect(
518             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
519         ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
520 
521         if (isNotValidPromise(promise, false)) {
522             // cancelled
523             return promise;
524         }
525 
526         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
527         EventExecutor executor = next.executor();
528         if (executor.inEventLoop()) {
529             next.invokeConnect(remoteAddress, localAddress, promise);
530         } else {
531             safeExecute(executor, new Runnable() {
532                 @Override
533                 public void run() {
534                     next.invokeConnect(remoteAddress, localAddress, promise);
535                 }
536             }, promise, null, false);
537         }
538         return promise;
539     }
540 
541     private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
542         if (invokeHandler()) {
543             try {
544                 // DON'T CHANGE
545                 // Duplex handlers implements both out/in interfaces causing a scalability issue
546                 // see https://bugs.openjdk.org/browse/JDK-8180450
547                 final ChannelHandler handler = handler();
548                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
549                 if (handler == headContext) {
550                     headContext.connect(this, remoteAddress, localAddress, promise);
551                 } else if (handler instanceof ChannelDuplexHandler) {
552                     ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise);
553                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
554                     ((ChannelOutboundHandlerAdapter) handler).connect(this, remoteAddress, localAddress, promise);
555                 } else {
556                     ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
557                 }
558             } catch (Throwable t) {
559                 notifyOutboundHandlerException(t, promise);
560             }
561         } else {
562             connect(remoteAddress, localAddress, promise);
563         }
564     }
565 
566     @Override
567     public ChannelFuture disconnect(final ChannelPromise promise) {
568         if (!channel().metadata().hasDisconnect()) {
569             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
570             // So far, UDP/IP is the only transport that has such behavior.
571             return close(promise);
572         }
573         if (isNotValidPromise(promise, false)) {
574             // cancelled
575             return promise;
576         }
577 
578         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
579         EventExecutor executor = next.executor();
580         if (executor.inEventLoop()) {
581             next.invokeDisconnect(promise);
582         } else {
583             safeExecute(executor, new Runnable() {
584                 @Override
585                 public void run() {
586                     next.invokeDisconnect(promise);
587                 }
588             }, promise, null, false);
589         }
590         return promise;
591     }
592 
593     private void invokeDisconnect(ChannelPromise promise) {
594         if (invokeHandler()) {
595             try {
596                 // DON'T CHANGE
597                 // Duplex handlers implements both out/in interfaces causing a scalability issue
598                 // see https://bugs.openjdk.org/browse/JDK-8180450
599                 final ChannelHandler handler = handler();
600                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
601                 if (handler == headContext) {
602                     headContext.disconnect(this, promise);
603                 } else if (handler instanceof ChannelDuplexHandler) {
604                     ((ChannelDuplexHandler) handler).disconnect(this, promise);
605                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
606                     ((ChannelOutboundHandlerAdapter) handler).disconnect(this, promise);
607                 } else {
608                     ((ChannelOutboundHandler) handler).disconnect(this, promise);
609                 }
610             } catch (Throwable t) {
611                 notifyOutboundHandlerException(t, promise);
612             }
613         } else {
614             disconnect(promise);
615         }
616     }
617 
618     @Override
619     public ChannelFuture close(final ChannelPromise promise) {
620         if (isNotValidPromise(promise, false)) {
621             // cancelled
622             return promise;
623         }
624 
625         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
626         EventExecutor executor = next.executor();
627         if (executor.inEventLoop()) {
628             next.invokeClose(promise);
629         } else {
630             safeExecute(executor, new Runnable() {
631                 @Override
632                 public void run() {
633                     next.invokeClose(promise);
634                 }
635             }, promise, null, false);
636         }
637 
638         return promise;
639     }
640 
641     private void invokeClose(ChannelPromise promise) {
642         if (invokeHandler()) {
643             try {
644                 // DON'T CHANGE
645                 // Duplex handlers implements both out/in interfaces causing a scalability issue
646                 // see https://bugs.openjdk.org/browse/JDK-8180450
647                 final ChannelHandler handler = handler();
648                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
649                 if (handler == headContext) {
650                     headContext.close(this, promise);
651                 } else if (handler instanceof ChannelDuplexHandler) {
652                     ((ChannelDuplexHandler) handler).close(this, promise);
653                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
654                     ((ChannelOutboundHandlerAdapter) handler).close(this, promise);
655                 } else {
656                     ((ChannelOutboundHandler) handler).close(this, promise);
657                 }
658             } catch (Throwable t) {
659                 notifyOutboundHandlerException(t, promise);
660             }
661         } else {
662             close(promise);
663         }
664     }
665 
666     @Override
667     public ChannelFuture deregister(final ChannelPromise promise) {
668         if (isNotValidPromise(promise, false)) {
669             // cancelled
670             return promise;
671         }
672 
673         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
674         EventExecutor executor = next.executor();
675         if (executor.inEventLoop()) {
676             next.invokeDeregister(promise);
677         } else {
678             safeExecute(executor, new Runnable() {
679                 @Override
680                 public void run() {
681                     next.invokeDeregister(promise);
682                 }
683             }, promise, null, false);
684         }
685 
686         return promise;
687     }
688 
689     private void invokeDeregister(ChannelPromise promise) {
690         if (invokeHandler()) {
691             try {
692                 // DON'T CHANGE
693                 // Duplex handlers implements both out/in interfaces causing a scalability issue
694                 // see https://bugs.openjdk.org/browse/JDK-8180450
695                 final ChannelHandler handler = handler();
696                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
697                 if (handler == headContext) {
698                     headContext.deregister(this, promise);
699                 } else if (handler instanceof ChannelDuplexHandler) {
700                     ((ChannelDuplexHandler) handler).deregister(this, promise);
701                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
702                     ((ChannelOutboundHandlerAdapter) handler).deregister(this, promise);
703                 } else {
704                     ((ChannelOutboundHandler) handler).deregister(this, promise);
705                 }
706             } catch (Throwable t) {
707                 notifyOutboundHandlerException(t, promise);
708             }
709         } else {
710             deregister(promise);
711         }
712     }
713 
714     @Override
715     public ChannelHandlerContext read() {
716         final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
717         if (next.executor().inEventLoop()) {
718             if (next.invokeHandler()) {
719                 try {
720                     // DON'T CHANGE
721                     // Duplex handlers implements both out/in interfaces causing a scalability issue
722                     // see https://bugs.openjdk.org/browse/JDK-8180450
723                     final ChannelHandler handler = next.handler();
724                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
725                     if (handler == headContext) {
726                         headContext.read(next);
727                     } else if (handler instanceof ChannelDuplexHandler) {
728                         ((ChannelDuplexHandler) handler).read(next);
729                     } else if (handler instanceof ChannelOutboundHandlerAdapter) {
730                         ((ChannelOutboundHandlerAdapter) handler).read(next);
731                     } else {
732                         ((ChannelOutboundHandler) handler).read(next);
733                     }
734                 } catch (Throwable t) {
735                     invokeExceptionCaught(t);
736                 }
737             } else {
738                 next.read();
739             }
740         } else {
741             next.executor().execute(getInvokeTasks().invokeReadTask);
742         }
743         return this;
744     }
745 
746     @Override
747     public ChannelFuture write(Object msg) {
748         ChannelPromise promise = newPromise();
749         write(msg, false, promise);
750         return promise;
751     }
752 
753     @Override
754     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
755         write(msg, false, promise);
756         return promise;
757     }
758 
759     @Override
760     public ChannelHandlerContext flush() {
761         final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
762         EventExecutor executor = next.executor();
763         if (executor.inEventLoop()) {
764             next.invokeFlush();
765         } else {
766             Tasks tasks = next.invokeTasks;
767             if (tasks == null) {
768                 next.invokeTasks = tasks = new Tasks(next);
769             }
770             safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
771         }
772 
773         return this;
774     }
775 
776     private void invokeFlush() {
777         if (invokeHandler()) {
778             invokeFlush0();
779         } else {
780             flush();
781         }
782     }
783 
784     private void invokeFlush0() {
785         try {
786             // DON'T CHANGE
787             // Duplex handlers implements both out/in interfaces causing a scalability issue
788             // see https://bugs.openjdk.org/browse/JDK-8180450
789             final ChannelHandler handler = handler();
790             final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
791             if (handler == headContext) {
792                 headContext.flush(this);
793             } else if (handler instanceof ChannelDuplexHandler) {
794                 ((ChannelDuplexHandler) handler).flush(this);
795             } else if (handler instanceof ChannelOutboundHandlerAdapter) {
796                 ((ChannelOutboundHandlerAdapter) handler).flush(this);
797             } else {
798                 ((ChannelOutboundHandler) handler).flush(this);
799             }
800         } catch (Throwable t) {
801             invokeExceptionCaught(t);
802         }
803     }
804 
805     @Override
806     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
807         write(msg, true, promise);
808         return promise;
809     }
810 
811     void write(Object msg, boolean flush, ChannelPromise promise) {
812         if (validateWrite(msg, promise)) {
813             final AbstractChannelHandlerContext next = findContextOutbound(flush ?
814                     MASK_WRITE | MASK_FLUSH : MASK_WRITE);
815             final Object m = pipeline.touch(msg, next);
816             EventExecutor executor = next.executor();
817             if (executor.inEventLoop()) {
818                 if (next.invokeHandler()) {
819                     try {
820                         // DON'T CHANGE
821                         // Duplex handlers implements both out/in interfaces causing a scalability issue
822                         // see https://bugs.openjdk.org/browse/JDK-8180450
823                         final ChannelHandler handler = next.handler();
824                         final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
825                         if (handler == headContext) {
826                             headContext.write(next, msg, promise);
827                         } else if (handler instanceof ChannelDuplexHandler) {
828                             ((ChannelDuplexHandler) handler).write(next, msg, promise);
829                         } else if (handler instanceof ChannelOutboundHandlerAdapter) {
830                             ((ChannelOutboundHandlerAdapter) handler).write(next, msg, promise);
831                         } else {
832                             ((ChannelOutboundHandler) handler).write(next, msg, promise);
833                         }
834                     } catch (Throwable t) {
835                         notifyOutboundHandlerException(t, promise);
836                     }
837                     if (flush) {
838                         next.invokeFlush0();
839                     }
840                 } else {
841                     next.write(msg, flush, promise);
842                 }
843             } else {
844                 final WriteTask task = WriteTask.newInstance(this, m, promise, flush);
845                 if (!safeExecute(executor, task, promise, m, !flush)) {
846                     // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
847                     // and put it back in the Recycler for re-use later.
848                     //
849                     // See https://github.com/netty/netty/issues/8343.
850                     task.cancel();
851                 }
852             }
853         }
854     }
855 
856     private boolean validateWrite(Object msg, ChannelPromise promise) {
857         ObjectUtil.checkNotNull(msg, "msg");
858         try {
859             if (isNotValidPromise(promise, true)) {
860                 ReferenceCountUtil.release(msg);
861                 return false; // cancelled
862             }
863         } catch (RuntimeException e) {
864             ReferenceCountUtil.release(msg);
865             throw e;
866         }
867         return true;
868     }
869 
870     @Override
871     public ChannelFuture writeAndFlush(Object msg) {
872         return writeAndFlush(msg, newPromise());
873     }
874 
875     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
876         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
877         // false.
878         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
879     }
880 
881     @Override
882     public ChannelPromise newPromise() {
883         return new DefaultChannelPromise(channel(), executor());
884     }
885 
886     @Override
887     public ChannelProgressivePromise newProgressivePromise() {
888         return new DefaultChannelProgressivePromise(channel(), executor());
889     }
890 
891     @Override
892     public ChannelFuture newSucceededFuture() {
893         ChannelFuture succeededFuture = this.succeededFuture;
894         if (succeededFuture == null) {
895             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
896         }
897         return succeededFuture;
898     }
899 
900     @Override
901     public ChannelFuture newFailedFuture(Throwable cause) {
902         return new FailedChannelFuture(channel(), executor(), cause);
903     }
904 
905     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
906         ObjectUtil.checkNotNull(promise, "promise");
907 
908         if (promise.isDone()) {
909             // Check if the promise was cancelled and if so signal that the processing of the operation
910             // should not be performed.
911             //
912             // See https://github.com/netty/netty/issues/2349
913             if (promise.isCancelled()) {
914                 return true;
915             }
916             throw new IllegalArgumentException("promise already done: " + promise);
917         }
918 
919         if (promise.channel() != channel()) {
920             throw new IllegalArgumentException(String.format(
921                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
922         }
923 
924         if (promise.getClass() == DefaultChannelPromise.class) {
925             return false;
926         }
927 
928         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
929             throw new IllegalArgumentException(
930                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
931         }
932 
933         if (promise instanceof AbstractChannel.CloseFuture) {
934             throw new IllegalArgumentException(
935                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
936         }
937         return false;
938     }
939 
940     private AbstractChannelHandlerContext findContextInbound(int mask) {
941         AbstractChannelHandlerContext ctx = this;
942         EventExecutor currentExecutor = executor();
943         do {
944             ctx = ctx.next;
945         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
946         return ctx;
947     }
948 
949     private AbstractChannelHandlerContext findContextOutbound(int mask) {
950         AbstractChannelHandlerContext ctx = this;
951         EventExecutor currentExecutor = executor();
952         do {
953             ctx = ctx.prev;
954         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
955         return ctx;
956     }
957 
958     private static boolean skipContext(
959             AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
960         // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
961         return (ctx.executionMask & (onlyMask | mask)) == 0 ||
962                 // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
963                 // everything to preserve ordering.
964                 //
965                 // See https://github.com/netty/netty/issues/10067
966                 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
967     }
968 
969     @Override
970     public ChannelPromise voidPromise() {
971         return channel().voidPromise();
972     }
973 
974     final void setRemoved() {
975         handlerState = REMOVE_COMPLETE;
976     }
977 
978     final boolean setAddComplete() {
979         for (;;) {
980             int oldState = handlerState;
981             if (oldState == REMOVE_COMPLETE) {
982                 return false;
983             }
984             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
985             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
986             // exposing ordering guarantees.
987             if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
988                 return true;
989             }
990         }
991     }
992 
993     final void setAddPending() {
994         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
995         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
996     }
997 
998     final void callHandlerAdded() throws Exception {
999         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
1000         // any pipeline events ctx.handler() will miss them because the state will not allow it.
1001         if (setAddComplete()) {
1002             handler().handlerAdded(this);
1003         }
1004     }
1005 
1006     final void callHandlerRemoved() throws Exception {
1007         try {
1008             // Only call handlerRemoved(...) if we called handlerAdded(...) before.
1009             if (handlerState == ADD_COMPLETE) {
1010                 handler().handlerRemoved(this);
1011             }
1012         } finally {
1013             // Mark the handler as removed in any case.
1014             setRemoved();
1015         }
1016     }
1017 
1018     /**
1019      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
1020      * yet. If not return {@code false} and if called or could not detect return {@code true}.
1021      *
1022      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
1023      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
1024      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
1025      */
1026     boolean invokeHandler() {
1027         // Store in local variable to reduce volatile reads.
1028         int handlerState = this.handlerState;
1029         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
1030     }
1031 
1032     @Override
1033     public boolean isRemoved() {
1034         return handlerState == REMOVE_COMPLETE;
1035     }
1036 
1037     @Override
1038     public <T> Attribute<T> attr(AttributeKey<T> key) {
1039         return channel().attr(key);
1040     }
1041 
1042     @Override
1043     public <T> boolean hasAttr(AttributeKey<T> key) {
1044         return channel().hasAttr(key);
1045     }
1046 
1047     private static boolean safeExecute(EventExecutor executor, Runnable runnable,
1048             ChannelPromise promise, Object msg, boolean lazy) {
1049         try {
1050             if (lazy && executor instanceof AbstractEventExecutor) {
1051                 ((AbstractEventExecutor) executor).lazyExecute(runnable);
1052             } else {
1053                 executor.execute(runnable);
1054             }
1055             return true;
1056         } catch (Throwable cause) {
1057             try {
1058                 if (msg != null) {
1059                     ReferenceCountUtil.release(msg);
1060                 }
1061             } finally {
1062                 promise.setFailure(cause);
1063             }
1064             return false;
1065         }
1066     }
1067 
1068     @Override
1069     public String toHintString() {
1070         return '\'' + name + "' will handle the message from this point.";
1071     }
1072 
1073     @Override
1074     public String toString() {
1075         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1076     }
1077 
1078     Tasks getInvokeTasks() {
1079         Tasks tasks = invokeTasks;
1080         if (tasks == null) {
1081             invokeTasks = tasks = new Tasks(this);
1082         }
1083         return tasks;
1084     }
1085 
1086     static final class WriteTask implements Runnable {
1087         private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
1088             @Override
1089             public WriteTask newObject(Handle<WriteTask> handle) {
1090                 return new WriteTask(handle);
1091             }
1092         });
1093 
1094         static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1095                 Object msg, ChannelPromise promise, boolean flush) {
1096             WriteTask task = RECYCLER.get();
1097             init(task, ctx, msg, promise, flush);
1098             return task;
1099         }
1100 
1101         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1102                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1103 
1104         // Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field
1105         private static final int WRITE_TASK_OVERHEAD =
1106                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1107 
1108         private final Handle<WriteTask> handle;
1109         private AbstractChannelHandlerContext ctx;
1110         private Object msg;
1111         private ChannelPromise promise;
1112         private int size; // sign bit controls flush
1113 
1114         private WriteTask(Handle<WriteTask> handle) {
1115             this.handle = handle;
1116         }
1117 
1118         static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1119                                    Object msg, ChannelPromise promise, boolean flush) {
1120             task.ctx = ctx;
1121             task.msg = msg;
1122             task.promise = promise;
1123 
1124             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1125                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1126                 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1127             } else {
1128                 task.size = 0;
1129             }
1130             if (flush) {
1131                 task.size |= Integer.MIN_VALUE;
1132             }
1133         }
1134 
1135         @Override
1136         public void run() {
1137             try {
1138                 decrementPendingOutboundBytes();
1139                 ctx.write(msg, size < 0, promise);
1140             } finally {
1141                 recycle();
1142             }
1143         }
1144 
1145         void cancel() {
1146             try {
1147                 decrementPendingOutboundBytes();
1148             } finally {
1149                 recycle();
1150             }
1151         }
1152 
1153         private void decrementPendingOutboundBytes() {
1154             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1155                 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1156             }
1157         }
1158 
1159         private void recycle() {
1160             // Set to null so the GC can collect them directly
1161             ctx = null;
1162             msg = null;
1163             promise = null;
1164             handle.recycle(this);
1165         }
1166     }
1167 
1168     static final class Tasks {
1169         final Runnable invokeChannelReadCompleteTask;
1170         private final Runnable invokeReadTask;
1171         private final Runnable invokeChannelWritableStateChangedTask;
1172         private final Runnable invokeFlushTask;
1173 
1174         Tasks(AbstractChannelHandlerContext ctx) {
1175             invokeChannelReadCompleteTask = ctx::fireChannelReadComplete;
1176             invokeReadTask = ctx::read;
1177             invokeChannelWritableStateChangedTask = ctx::fireChannelWritabilityChanged;
1178             invokeFlushTask = ctx::invokeFlush;
1179         }
1180     }
1181 }