View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.Recycler;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.ResourceLeakHint;
24  import io.netty.util.concurrent.EventExecutor;
25  import io.netty.util.concurrent.OrderedEventExecutor;
26  import io.netty.util.internal.PromiseNotificationUtil;
27  import io.netty.util.internal.ThrowableUtil;
28  import io.netty.util.internal.ObjectUtil;
29  import io.netty.util.internal.StringUtil;
30  import io.netty.util.internal.SystemPropertyUtil;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.net.SocketAddress;
35  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
36  
37  import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
38  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
39  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
40  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
41  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
42  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
43  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
44  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
45  import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
46  import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
47  import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
48  import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
49  import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
50  import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
51  import static io.netty.channel.ChannelHandlerMask.MASK_READ;
52  import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
53  import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
54  import static io.netty.channel.ChannelHandlerMask.mask;
55  
56  abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
57  
58      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
59      volatile AbstractChannelHandlerContext next;
60      volatile AbstractChannelHandlerContext prev;
61  
62      private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
63              AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
64  
65      /**
66       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
67       */
68      private static final int ADD_PENDING = 1;
69      /**
70       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
71       */
72      private static final int ADD_COMPLETE = 2;
73      /**
74       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
75       */
76      private static final int REMOVE_COMPLETE = 3;
77      /**
78       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
79       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
80       */
81      private static final int INIT = 0;
82  
83      private final DefaultChannelPipeline pipeline;
84      private final String name;
85      private final boolean ordered;
86      private final int executionMask;
87  
88      // Will be set to null if no child executor should be used, otherwise it will be set to the
89      // child executor.
90      final EventExecutor executor;
91      private ChannelFuture succeededFuture;
92  
93      // Lazily instantiated tasks used to trigger events to a handler with different executor.
94      // There is no need to make this volatile as at worse it will just create a few more instances then needed.
95      private Tasks invokeTasks;
96  
97      private volatile int handlerState = INIT;
98  
99      AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
100                                   String name, Class<? extends ChannelHandler> handlerClass) {
101         this.name = ObjectUtil.checkNotNull(name, "name");
102         this.pipeline = pipeline;
103         this.executor = executor;
104         this.executionMask = mask(handlerClass);
105         // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
106         ordered = executor == null || executor instanceof OrderedEventExecutor;
107     }
108 
109     @Override
110     public Channel channel() {
111         return pipeline.channel();
112     }
113 
114     @Override
115     public ChannelPipeline pipeline() {
116         return pipeline;
117     }
118 
119     @Override
120     public ByteBufAllocator alloc() {
121         return channel().config().getAllocator();
122     }
123 
124     @Override
125     public EventExecutor executor() {
126         if (executor == null) {
127             return channel().eventLoop();
128         } else {
129             return executor;
130         }
131     }
132 
133     @Override
134     public String name() {
135         return name;
136     }
137 
138     @Override
139     public ChannelHandlerContext fireChannelRegistered() {
140         invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
141         return this;
142     }
143 
144     static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
145         EventExecutor executor = next.executor();
146         if (executor.inEventLoop()) {
147             next.invokeChannelRegistered();
148         } else {
149             executor.execute(new Runnable() {
150                 @Override
151                 public void run() {
152                     next.invokeChannelRegistered();
153                 }
154             });
155         }
156     }
157 
158     private void invokeChannelRegistered() {
159         if (invokeHandler()) {
160             try {
161                 ((ChannelInboundHandler) handler()).channelRegistered(this);
162             } catch (Throwable t) {
163                 notifyHandlerException(t);
164             }
165         } else {
166             fireChannelRegistered();
167         }
168     }
169 
170     @Override
171     public ChannelHandlerContext fireChannelUnregistered() {
172         invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED));
173         return this;
174     }
175 
176     static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
177         EventExecutor executor = next.executor();
178         if (executor.inEventLoop()) {
179             next.invokeChannelUnregistered();
180         } else {
181             executor.execute(new Runnable() {
182                 @Override
183                 public void run() {
184                     next.invokeChannelUnregistered();
185                 }
186             });
187         }
188     }
189 
190     private void invokeChannelUnregistered() {
191         if (invokeHandler()) {
192             try {
193                 ((ChannelInboundHandler) handler()).channelUnregistered(this);
194             } catch (Throwable t) {
195                 notifyHandlerException(t);
196             }
197         } else {
198             fireChannelUnregistered();
199         }
200     }
201 
202     @Override
203     public ChannelHandlerContext fireChannelActive() {
204         invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
205         return this;
206     }
207 
208     static void invokeChannelActive(final AbstractChannelHandlerContext next) {
209         EventExecutor executor = next.executor();
210         if (executor.inEventLoop()) {
211             next.invokeChannelActive();
212         } else {
213             executor.execute(new Runnable() {
214                 @Override
215                 public void run() {
216                     next.invokeChannelActive();
217                 }
218             });
219         }
220     }
221 
222     private void invokeChannelActive() {
223         if (invokeHandler()) {
224             try {
225                 ((ChannelInboundHandler) handler()).channelActive(this);
226             } catch (Throwable t) {
227                 notifyHandlerException(t);
228             }
229         } else {
230             fireChannelActive();
231         }
232     }
233 
234     @Override
235     public ChannelHandlerContext fireChannelInactive() {
236         invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
237         return this;
238     }
239 
240     static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
241         EventExecutor executor = next.executor();
242         if (executor.inEventLoop()) {
243             next.invokeChannelInactive();
244         } else {
245             executor.execute(new Runnable() {
246                 @Override
247                 public void run() {
248                     next.invokeChannelInactive();
249                 }
250             });
251         }
252     }
253 
254     private void invokeChannelInactive() {
255         if (invokeHandler()) {
256             try {
257                 ((ChannelInboundHandler) handler()).channelInactive(this);
258             } catch (Throwable t) {
259                 notifyHandlerException(t);
260             }
261         } else {
262             fireChannelInactive();
263         }
264     }
265 
266     @Override
267     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
268         invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
269         return this;
270     }
271 
272     static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
273         ObjectUtil.checkNotNull(cause, "cause");
274         EventExecutor executor = next.executor();
275         if (executor.inEventLoop()) {
276             next.invokeExceptionCaught(cause);
277         } else {
278             try {
279                 executor.execute(new Runnable() {
280                     @Override
281                     public void run() {
282                         next.invokeExceptionCaught(cause);
283                     }
284                 });
285             } catch (Throwable t) {
286                 if (logger.isWarnEnabled()) {
287                     logger.warn("Failed to submit an exceptionCaught() event.", t);
288                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
289                 }
290             }
291         }
292     }
293 
294     private void invokeExceptionCaught(final Throwable cause) {
295         if (invokeHandler()) {
296             try {
297                 handler().exceptionCaught(this, cause);
298             } catch (Throwable error) {
299                 if (logger.isDebugEnabled()) {
300                     logger.debug(
301                         "An exception {}" +
302                         "was thrown by a user handler's exceptionCaught() " +
303                         "method while handling the following exception:",
304                         ThrowableUtil.stackTraceToString(error), cause);
305                 } else if (logger.isWarnEnabled()) {
306                     logger.warn(
307                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
308                         "was thrown by a user handler's exceptionCaught() " +
309                         "method while handling the following exception:", error, cause);
310                 }
311             }
312         } else {
313             fireExceptionCaught(cause);
314         }
315     }
316 
317     @Override
318     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
319         invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
320         return this;
321     }
322 
323     static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
324         ObjectUtil.checkNotNull(event, "event");
325         EventExecutor executor = next.executor();
326         if (executor.inEventLoop()) {
327             next.invokeUserEventTriggered(event);
328         } else {
329             executor.execute(new Runnable() {
330                 @Override
331                 public void run() {
332                     next.invokeUserEventTriggered(event);
333                 }
334             });
335         }
336     }
337 
338     private void invokeUserEventTriggered(Object event) {
339         if (invokeHandler()) {
340             try {
341                 ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
342             } catch (Throwable t) {
343                 notifyHandlerException(t);
344             }
345         } else {
346             fireUserEventTriggered(event);
347         }
348     }
349 
350     @Override
351     public ChannelHandlerContext fireChannelRead(final Object msg) {
352         invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
353         return this;
354     }
355 
356     static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
357         final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
358         EventExecutor executor = next.executor();
359         if (executor.inEventLoop()) {
360             next.invokeChannelRead(m);
361         } else {
362             executor.execute(new Runnable() {
363                 @Override
364                 public void run() {
365                     next.invokeChannelRead(m);
366                 }
367             });
368         }
369     }
370 
371     private void invokeChannelRead(Object msg) {
372         if (invokeHandler()) {
373             try {
374                 ((ChannelInboundHandler) handler()).channelRead(this, msg);
375             } catch (Throwable t) {
376                 notifyHandlerException(t);
377             }
378         } else {
379             fireChannelRead(msg);
380         }
381     }
382 
383     @Override
384     public ChannelHandlerContext fireChannelReadComplete() {
385         invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
386         return this;
387     }
388 
389     static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
390         EventExecutor executor = next.executor();
391         if (executor.inEventLoop()) {
392             next.invokeChannelReadComplete();
393         } else {
394             Tasks tasks = next.invokeTasks;
395             if (tasks == null) {
396                 next.invokeTasks = tasks = new Tasks(next);
397             }
398             executor.execute(tasks.invokeChannelReadCompleteTask);
399         }
400     }
401 
402     private void invokeChannelReadComplete() {
403         if (invokeHandler()) {
404             try {
405                 ((ChannelInboundHandler) handler()).channelReadComplete(this);
406             } catch (Throwable t) {
407                 notifyHandlerException(t);
408             }
409         } else {
410             fireChannelReadComplete();
411         }
412     }
413 
414     @Override
415     public ChannelHandlerContext fireChannelWritabilityChanged() {
416         invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED));
417         return this;
418     }
419 
420     static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
421         EventExecutor executor = next.executor();
422         if (executor.inEventLoop()) {
423             next.invokeChannelWritabilityChanged();
424         } else {
425             Tasks tasks = next.invokeTasks;
426             if (tasks == null) {
427                 next.invokeTasks = tasks = new Tasks(next);
428             }
429             executor.execute(tasks.invokeChannelWritableStateChangedTask);
430         }
431     }
432 
433     private void invokeChannelWritabilityChanged() {
434         if (invokeHandler()) {
435             try {
436                 ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
437             } catch (Throwable t) {
438                 notifyHandlerException(t);
439             }
440         } else {
441             fireChannelWritabilityChanged();
442         }
443     }
444 
445     @Override
446     public ChannelFuture bind(SocketAddress localAddress) {
447         return bind(localAddress, newPromise());
448     }
449 
450     @Override
451     public ChannelFuture connect(SocketAddress remoteAddress) {
452         return connect(remoteAddress, newPromise());
453     }
454 
455     @Override
456     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
457         return connect(remoteAddress, localAddress, newPromise());
458     }
459 
460     @Override
461     public ChannelFuture disconnect() {
462         return disconnect(newPromise());
463     }
464 
465     @Override
466     public ChannelFuture close() {
467         return close(newPromise());
468     }
469 
470     @Override
471     public ChannelFuture deregister() {
472         return deregister(newPromise());
473     }
474 
475     @Override
476     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
477         if (localAddress == null) {
478             throw new NullPointerException("localAddress");
479         }
480         if (isNotValidPromise(promise, false)) {
481             // cancelled
482             return promise;
483         }
484 
485         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
486         EventExecutor executor = next.executor();
487         if (executor.inEventLoop()) {
488             next.invokeBind(localAddress, promise);
489         } else {
490             safeExecute(executor, new Runnable() {
491                 @Override
492                 public void run() {
493                     next.invokeBind(localAddress, promise);
494                 }
495             }, promise, null);
496         }
497         return promise;
498     }
499 
500     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
501         if (invokeHandler()) {
502             try {
503                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
504             } catch (Throwable t) {
505                 notifyOutboundHandlerException(t, promise);
506             }
507         } else {
508             bind(localAddress, promise);
509         }
510     }
511 
512     @Override
513     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
514         return connect(remoteAddress, null, promise);
515     }
516 
517     @Override
518     public ChannelFuture connect(
519             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
520 
521         if (remoteAddress == null) {
522             throw new NullPointerException("remoteAddress");
523         }
524         if (isNotValidPromise(promise, false)) {
525             // cancelled
526             return promise;
527         }
528 
529         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
530         EventExecutor executor = next.executor();
531         if (executor.inEventLoop()) {
532             next.invokeConnect(remoteAddress, localAddress, promise);
533         } else {
534             safeExecute(executor, new Runnable() {
535                 @Override
536                 public void run() {
537                     next.invokeConnect(remoteAddress, localAddress, promise);
538                 }
539             }, promise, null);
540         }
541         return promise;
542     }
543 
544     private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
545         if (invokeHandler()) {
546             try {
547                 ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
548             } catch (Throwable t) {
549                 notifyOutboundHandlerException(t, promise);
550             }
551         } else {
552             connect(remoteAddress, localAddress, promise);
553         }
554     }
555 
556     @Override
557     public ChannelFuture disconnect(final ChannelPromise promise) {
558         if (!channel().metadata().hasDisconnect()) {
559             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
560             // So far, UDP/IP is the only transport that has such behavior.
561             return close(promise);
562         }
563         if (isNotValidPromise(promise, false)) {
564             // cancelled
565             return promise;
566         }
567 
568         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
569         EventExecutor executor = next.executor();
570         if (executor.inEventLoop()) {
571             next.invokeDisconnect(promise);
572         } else {
573             safeExecute(executor, new Runnable() {
574                 @Override
575                 public void run() {
576                     next.invokeDisconnect(promise);
577                 }
578             }, promise, null);
579         }
580         return promise;
581     }
582 
583     private void invokeDisconnect(ChannelPromise promise) {
584         if (invokeHandler()) {
585             try {
586                 ((ChannelOutboundHandler) handler()).disconnect(this, promise);
587             } catch (Throwable t) {
588                 notifyOutboundHandlerException(t, promise);
589             }
590         } else {
591             disconnect(promise);
592         }
593     }
594 
595     @Override
596     public ChannelFuture close(final ChannelPromise promise) {
597         if (isNotValidPromise(promise, false)) {
598             // cancelled
599             return promise;
600         }
601 
602         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
603         EventExecutor executor = next.executor();
604         if (executor.inEventLoop()) {
605             next.invokeClose(promise);
606         } else {
607             safeExecute(executor, new Runnable() {
608                 @Override
609                 public void run() {
610                     next.invokeClose(promise);
611                 }
612             }, promise, null);
613         }
614 
615         return promise;
616     }
617 
618     private void invokeClose(ChannelPromise promise) {
619         if (invokeHandler()) {
620             try {
621                 ((ChannelOutboundHandler) handler()).close(this, promise);
622             } catch (Throwable t) {
623                 notifyOutboundHandlerException(t, promise);
624             }
625         } else {
626             close(promise);
627         }
628     }
629 
630     @Override
631     public ChannelFuture deregister(final ChannelPromise promise) {
632         if (isNotValidPromise(promise, false)) {
633             // cancelled
634             return promise;
635         }
636 
637         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
638         EventExecutor executor = next.executor();
639         if (executor.inEventLoop()) {
640             next.invokeDeregister(promise);
641         } else {
642             safeExecute(executor, new Runnable() {
643                 @Override
644                 public void run() {
645                     next.invokeDeregister(promise);
646                 }
647             }, promise, null);
648         }
649 
650         return promise;
651     }
652 
653     private void invokeDeregister(ChannelPromise promise) {
654         if (invokeHandler()) {
655             try {
656                 ((ChannelOutboundHandler) handler()).deregister(this, promise);
657             } catch (Throwable t) {
658                 notifyOutboundHandlerException(t, promise);
659             }
660         } else {
661             deregister(promise);
662         }
663     }
664 
665     @Override
666     public ChannelHandlerContext read() {
667         final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
668         EventExecutor executor = next.executor();
669         if (executor.inEventLoop()) {
670             next.invokeRead();
671         } else {
672             Tasks tasks = next.invokeTasks;
673             if (tasks == null) {
674                 next.invokeTasks = tasks = new Tasks(next);
675             }
676             executor.execute(tasks.invokeReadTask);
677         }
678 
679         return this;
680     }
681 
682     private void invokeRead() {
683         if (invokeHandler()) {
684             try {
685                 ((ChannelOutboundHandler) handler()).read(this);
686             } catch (Throwable t) {
687                 notifyHandlerException(t);
688             }
689         } else {
690             read();
691         }
692     }
693 
694     @Override
695     public ChannelFuture write(Object msg) {
696         return write(msg, newPromise());
697     }
698 
699     @Override
700     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
701         write(msg, false, promise);
702 
703         return promise;
704     }
705 
706     private void invokeWrite(Object msg, ChannelPromise promise) {
707         if (invokeHandler()) {
708             invokeWrite0(msg, promise);
709         } else {
710             write(msg, promise);
711         }
712     }
713 
714     private void invokeWrite0(Object msg, ChannelPromise promise) {
715         try {
716             ((ChannelOutboundHandler) handler()).write(this, msg, promise);
717         } catch (Throwable t) {
718             notifyOutboundHandlerException(t, promise);
719         }
720     }
721 
722     @Override
723     public ChannelHandlerContext flush() {
724         final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
725         EventExecutor executor = next.executor();
726         if (executor.inEventLoop()) {
727             next.invokeFlush();
728         } else {
729             Tasks tasks = next.invokeTasks;
730             if (tasks == null) {
731                 next.invokeTasks = tasks = new Tasks(next);
732             }
733             safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null);
734         }
735 
736         return this;
737     }
738 
739     private void invokeFlush() {
740         if (invokeHandler()) {
741             invokeFlush0();
742         } else {
743             flush();
744         }
745     }
746 
747     private void invokeFlush0() {
748         try {
749             ((ChannelOutboundHandler) handler()).flush(this);
750         } catch (Throwable t) {
751             notifyHandlerException(t);
752         }
753     }
754 
755     @Override
756     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
757         write(msg, true, promise);
758         return promise;
759     }
760 
761     private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
762         if (invokeHandler()) {
763             invokeWrite0(msg, promise);
764             invokeFlush0();
765         } else {
766             writeAndFlush(msg, promise);
767         }
768     }
769 
770     private void write(Object msg, boolean flush, ChannelPromise promise) {
771         ObjectUtil.checkNotNull(msg, "msg");
772         try {
773             if (isNotValidPromise(promise, true)) {
774                 ReferenceCountUtil.release(msg);
775                 // cancelled
776                 return;
777             }
778         } catch (RuntimeException e) {
779             ReferenceCountUtil.release(msg);
780             throw e;
781         }
782 
783         final AbstractChannelHandlerContext next = findContextOutbound(flush ?
784                 (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
785         final Object m = pipeline.touch(msg, next);
786         EventExecutor executor = next.executor();
787         if (executor.inEventLoop()) {
788             if (flush) {
789                 next.invokeWriteAndFlush(m, promise);
790             } else {
791                 next.invokeWrite(m, promise);
792             }
793         } else {
794             final AbstractWriteTask task;
795             if (flush) {
796                 task = WriteAndFlushTask.newInstance(next, m, promise);
797             }  else {
798                 task = WriteTask.newInstance(next, m, promise);
799             }
800             if (!safeExecute(executor, task, promise, m)) {
801                 // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
802                 // and put it back in the Recycler for re-use later.
803                 //
804                 // See https://github.com/netty/netty/issues/8343.
805                 task.cancel();
806             }
807         }
808     }
809 
810     @Override
811     public ChannelFuture writeAndFlush(Object msg) {
812         return writeAndFlush(msg, newPromise());
813     }
814 
815     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
816         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
817         // false.
818         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
819     }
820 
821     private void notifyHandlerException(Throwable cause) {
822         if (inExceptionCaught(cause)) {
823             if (logger.isWarnEnabled()) {
824                 logger.warn(
825                         "An exception was thrown by a user handler " +
826                                 "while handling an exceptionCaught event", cause);
827             }
828             return;
829         }
830 
831         invokeExceptionCaught(cause);
832     }
833 
834     private static boolean inExceptionCaught(Throwable cause) {
835         do {
836             StackTraceElement[] trace = cause.getStackTrace();
837             if (trace != null) {
838                 for (StackTraceElement t : trace) {
839                     if (t == null) {
840                         break;
841                     }
842                     if ("exceptionCaught".equals(t.getMethodName())) {
843                         return true;
844                     }
845                 }
846             }
847 
848             cause = cause.getCause();
849         } while (cause != null);
850 
851         return false;
852     }
853 
854     @Override
855     public ChannelPromise newPromise() {
856         return new DefaultChannelPromise(channel(), executor());
857     }
858 
859     @Override
860     public ChannelProgressivePromise newProgressivePromise() {
861         return new DefaultChannelProgressivePromise(channel(), executor());
862     }
863 
864     @Override
865     public ChannelFuture newSucceededFuture() {
866         ChannelFuture succeededFuture = this.succeededFuture;
867         if (succeededFuture == null) {
868             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
869         }
870         return succeededFuture;
871     }
872 
873     @Override
874     public ChannelFuture newFailedFuture(Throwable cause) {
875         return new FailedChannelFuture(channel(), executor(), cause);
876     }
877 
878     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
879         if (promise == null) {
880             throw new NullPointerException("promise");
881         }
882 
883         if (promise.isDone()) {
884             // Check if the promise was cancelled and if so signal that the processing of the operation
885             // should not be performed.
886             //
887             // See https://github.com/netty/netty/issues/2349
888             if (promise.isCancelled()) {
889                 return true;
890             }
891             throw new IllegalArgumentException("promise already done: " + promise);
892         }
893 
894         if (promise.channel() != channel()) {
895             throw new IllegalArgumentException(String.format(
896                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
897         }
898 
899         if (promise.getClass() == DefaultChannelPromise.class) {
900             return false;
901         }
902 
903         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
904             throw new IllegalArgumentException(
905                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
906         }
907 
908         if (promise instanceof AbstractChannel.CloseFuture) {
909             throw new IllegalArgumentException(
910                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
911         }
912         return false;
913     }
914 
915     private AbstractChannelHandlerContext findContextInbound(int mask) {
916         AbstractChannelHandlerContext ctx = this;
917         do {
918             ctx = ctx.next;
919         } while ((ctx.executionMask & mask) == 0);
920         return ctx;
921     }
922 
923     private AbstractChannelHandlerContext findContextOutbound(int mask) {
924         AbstractChannelHandlerContext ctx = this;
925         do {
926             ctx = ctx.prev;
927         } while ((ctx.executionMask & mask) == 0);
928         return ctx;
929     }
930 
931     @Override
932     public ChannelPromise voidPromise() {
933         return channel().voidPromise();
934     }
935 
936     final void setRemoved() {
937         handlerState = REMOVE_COMPLETE;
938     }
939 
940     final boolean setAddComplete() {
941         for (;;) {
942             int oldState = handlerState;
943             if (oldState == REMOVE_COMPLETE) {
944                 return false;
945             }
946             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
947             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
948             // exposing ordering guarantees.
949             if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
950                 return true;
951             }
952         }
953     }
954 
955     final void setAddPending() {
956         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
957         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
958     }
959 
960     final void callHandlerAdded() throws Exception {
961         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
962         // any pipeline events ctx.handler() will miss them because the state will not allow it.
963         if (setAddComplete()) {
964             handler().handlerAdded(this);
965         }
966     }
967 
968     final void callHandlerRemoved() throws Exception {
969         try {
970             // Only call handlerRemoved(...) if we called handlerAdded(...) before.
971             if (handlerState == ADD_COMPLETE) {
972                 handler().handlerRemoved(this);
973             }
974         } finally {
975             // Mark the handler as removed in any case.
976             setRemoved();
977         }
978     }
979 
980     /**
981      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
982      * yet. If not return {@code false} and if called or could not detect return {@code true}.
983      *
984      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
985      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
986      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
987      */
988     private boolean invokeHandler() {
989         // Store in local variable to reduce volatile reads.
990         int handlerState = this.handlerState;
991         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
992     }
993 
994     @Override
995     public boolean isRemoved() {
996         return handlerState == REMOVE_COMPLETE;
997     }
998 
999     @Override
1000     public <T> Attribute<T> attr(AttributeKey<T> key) {
1001         return channel().attr(key);
1002     }
1003 
1004     @Override
1005     public <T> boolean hasAttr(AttributeKey<T> key) {
1006         return channel().hasAttr(key);
1007     }
1008 
1009     private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
1010         try {
1011             executor.execute(runnable);
1012             return true;
1013         } catch (Throwable cause) {
1014             try {
1015                 promise.setFailure(cause);
1016             } finally {
1017                 if (msg != null) {
1018                     ReferenceCountUtil.release(msg);
1019                 }
1020             }
1021             return false;
1022         }
1023     }
1024 
1025     @Override
1026     public String toHintString() {
1027         return '\'' + name + "' will handle the message from this point.";
1028     }
1029 
1030     @Override
1031     public String toString() {
1032         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1033     }
1034 
1035     abstract static class AbstractWriteTask implements Runnable {
1036 
1037         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1038                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1039 
1040         // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
1041         private static final int WRITE_TASK_OVERHEAD =
1042                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
1043 
1044         private final Recycler.Handle<AbstractWriteTask> handle;
1045         private AbstractChannelHandlerContext ctx;
1046         private Object msg;
1047         private ChannelPromise promise;
1048         private int size;
1049 
1050         @SuppressWarnings("unchecked")
1051         private AbstractWriteTask(Recycler.Handle<? extends AbstractWriteTask> handle) {
1052             this.handle = (Recycler.Handle<AbstractWriteTask>) handle;
1053         }
1054 
1055         protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
1056                                    Object msg, ChannelPromise promise) {
1057             task.ctx = ctx;
1058             task.msg = msg;
1059             task.promise = promise;
1060 
1061             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1062                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1063                 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1064             } else {
1065                 task.size = 0;
1066             }
1067         }
1068 
1069         @Override
1070         public final void run() {
1071             try {
1072                 decrementPendingOutboundBytes();
1073                 write(ctx, msg, promise);
1074             } finally {
1075                 recycle();
1076             }
1077         }
1078 
1079         void cancel() {
1080             try {
1081                 decrementPendingOutboundBytes();
1082             } finally {
1083                 recycle();
1084             }
1085         }
1086 
1087         private void decrementPendingOutboundBytes() {
1088             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1089                 ctx.pipeline.decrementPendingOutboundBytes(size);
1090             }
1091         }
1092 
1093         private void recycle() {
1094             // Set to null so the GC can collect them directly
1095             ctx = null;
1096             msg = null;
1097             promise = null;
1098             handle.recycle(this);
1099         }
1100 
1101         protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1102             ctx.invokeWrite(msg, promise);
1103         }
1104     }
1105 
1106     static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
1107 
1108         private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
1109             @Override
1110             protected WriteTask newObject(Handle<WriteTask> handle) {
1111                 return new WriteTask(handle);
1112             }
1113         };
1114 
1115         static WriteTask newInstance(
1116                 AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1117             WriteTask task = RECYCLER.get();
1118             init(task, ctx, msg, promise);
1119             return task;
1120         }
1121 
1122         private WriteTask(Recycler.Handle<WriteTask> handle) {
1123             super(handle);
1124         }
1125     }
1126 
1127     static final class WriteAndFlushTask extends AbstractWriteTask {
1128 
1129         private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
1130             @Override
1131             protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
1132                 return new WriteAndFlushTask(handle);
1133             }
1134         };
1135 
1136         static WriteAndFlushTask newInstance(
1137                 AbstractChannelHandlerContext ctx, Object msg,  ChannelPromise promise) {
1138             WriteAndFlushTask task = RECYCLER.get();
1139             init(task, ctx, msg, promise);
1140             return task;
1141         }
1142 
1143         private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
1144             super(handle);
1145         }
1146 
1147         @Override
1148         public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1149             super.write(ctx, msg, promise);
1150             ctx.invokeFlush();
1151         }
1152     }
1153 
1154     private static final class Tasks {
1155         private final AbstractChannelHandlerContext next;
1156         private final Runnable invokeChannelReadCompleteTask = new Runnable() {
1157             @Override
1158             public void run() {
1159                 next.invokeChannelReadComplete();
1160             }
1161         };
1162         private final Runnable invokeReadTask = new Runnable() {
1163             @Override
1164             public void run() {
1165                 next.invokeRead();
1166             }
1167         };
1168         private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
1169             @Override
1170             public void run() {
1171                 next.invokeChannelWritabilityChanged();
1172             }
1173         };
1174         private final Runnable invokeFlushTask = new Runnable() {
1175             @Override
1176             public void run() {
1177                 next.invokeFlush();
1178             }
1179         };
1180 
1181         Tasks(AbstractChannelHandlerContext next) {
1182             this.next = next;
1183         }
1184     }
1185 }