View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.DefaultAttributeMap;
22  import io.netty.util.Recycler;
23  import io.netty.util.ReferenceCountUtil;
24  import io.netty.util.ResourceLeakHint;
25  import io.netty.util.concurrent.EventExecutor;
26  import io.netty.util.concurrent.OrderedEventExecutor;
27  import io.netty.util.internal.PromiseNotificationUtil;
28  import io.netty.util.internal.ThrowableUtil;
29  import io.netty.util.internal.ObjectUtil;
30  import io.netty.util.internal.StringUtil;
31  import io.netty.util.internal.SystemPropertyUtil;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.net.SocketAddress;
36  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
37  
38  abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
39          implements ChannelHandlerContext, ResourceLeakHint {
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
42      volatile AbstractChannelHandlerContext next;
43      volatile AbstractChannelHandlerContext prev;
44  
45      private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
46              AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
47  
48      /**
49       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
50       */
51      private static final int ADD_PENDING = 1;
52      /**
53       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
54       */
55      private static final int ADD_COMPLETE = 2;
56      /**
57       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
58       */
59      private static final int REMOVE_COMPLETE = 3;
60      /**
61       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
62       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
63       */
64      private static final int INIT = 0;
65  
66      private final boolean inbound;
67      private final boolean outbound;
68      private final DefaultChannelPipeline pipeline;
69      private final String name;
70      private final boolean ordered;
71  
72      // Will be set to null if no child executor should be used, otherwise it will be set to the
73      // child executor.
74      final EventExecutor executor;
75      private ChannelFuture succeededFuture;
76  
77      // Lazily instantiated tasks used to trigger events to a handler with different executor.
78      // There is no need to make this volatile as at worse it will just create a few more instances then needed.
79      private Runnable invokeChannelReadCompleteTask;
80      private Runnable invokeReadTask;
81      private Runnable invokeChannelWritableStateChangedTask;
82      private Runnable invokeFlushTask;
83  
84      private volatile int handlerState = INIT;
85  
86      AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
87                                    boolean inbound, boolean outbound) {
88          this.name = ObjectUtil.checkNotNull(name, "name");
89          this.pipeline = pipeline;
90          this.executor = executor;
91          this.inbound = inbound;
92          this.outbound = outbound;
93          // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
94          ordered = executor == null || executor instanceof OrderedEventExecutor;
95      }
96  
97      @Override
98      public Channel channel() {
99          return pipeline.channel();
100     }
101 
102     @Override
103     public ChannelPipeline pipeline() {
104         return pipeline;
105     }
106 
107     @Override
108     public ByteBufAllocator alloc() {
109         return channel().config().getAllocator();
110     }
111 
112     @Override
113     public EventExecutor executor() {
114         if (executor == null) {
115             return channel().eventLoop();
116         } else {
117             return executor;
118         }
119     }
120 
121     @Override
122     public String name() {
123         return name;
124     }
125 
126     @Override
127     public ChannelHandlerContext fireChannelRegistered() {
128         invokeChannelRegistered(findContextInbound());
129         return this;
130     }
131 
132     static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
133         EventExecutor executor = next.executor();
134         if (executor.inEventLoop()) {
135             next.invokeChannelRegistered();
136         } else {
137             executor.execute(new Runnable() {
138                 @Override
139                 public void run() {
140                     next.invokeChannelRegistered();
141                 }
142             });
143         }
144     }
145 
146     private void invokeChannelRegistered() {
147         if (invokeHandler()) {
148             try {
149                 ((ChannelInboundHandler) handler()).channelRegistered(this);
150             } catch (Throwable t) {
151                 notifyHandlerException(t);
152             }
153         } else {
154             fireChannelRegistered();
155         }
156     }
157 
158     @Override
159     public ChannelHandlerContext fireChannelUnregistered() {
160         invokeChannelUnregistered(findContextInbound());
161         return this;
162     }
163 
164     static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
165         EventExecutor executor = next.executor();
166         if (executor.inEventLoop()) {
167             next.invokeChannelUnregistered();
168         } else {
169             executor.execute(new Runnable() {
170                 @Override
171                 public void run() {
172                     next.invokeChannelUnregistered();
173                 }
174             });
175         }
176     }
177 
178     private void invokeChannelUnregistered() {
179         if (invokeHandler()) {
180             try {
181                 ((ChannelInboundHandler) handler()).channelUnregistered(this);
182             } catch (Throwable t) {
183                 notifyHandlerException(t);
184             }
185         } else {
186             fireChannelUnregistered();
187         }
188     }
189 
190     @Override
191     public ChannelHandlerContext fireChannelActive() {
192         invokeChannelActive(findContextInbound());
193         return this;
194     }
195 
196     static void invokeChannelActive(final AbstractChannelHandlerContext next) {
197         EventExecutor executor = next.executor();
198         if (executor.inEventLoop()) {
199             next.invokeChannelActive();
200         } else {
201             executor.execute(new Runnable() {
202                 @Override
203                 public void run() {
204                     next.invokeChannelActive();
205                 }
206             });
207         }
208     }
209 
210     private void invokeChannelActive() {
211         if (invokeHandler()) {
212             try {
213                 ((ChannelInboundHandler) handler()).channelActive(this);
214             } catch (Throwable t) {
215                 notifyHandlerException(t);
216             }
217         } else {
218             fireChannelActive();
219         }
220     }
221 
222     @Override
223     public ChannelHandlerContext fireChannelInactive() {
224         invokeChannelInactive(findContextInbound());
225         return this;
226     }
227 
228     static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
229         EventExecutor executor = next.executor();
230         if (executor.inEventLoop()) {
231             next.invokeChannelInactive();
232         } else {
233             executor.execute(new Runnable() {
234                 @Override
235                 public void run() {
236                     next.invokeChannelInactive();
237                 }
238             });
239         }
240     }
241 
242     private void invokeChannelInactive() {
243         if (invokeHandler()) {
244             try {
245                 ((ChannelInboundHandler) handler()).channelInactive(this);
246             } catch (Throwable t) {
247                 notifyHandlerException(t);
248             }
249         } else {
250             fireChannelInactive();
251         }
252     }
253 
254     @Override
255     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
256         invokeExceptionCaught(next, cause);
257         return this;
258     }
259 
260     static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
261         ObjectUtil.checkNotNull(cause, "cause");
262         EventExecutor executor = next.executor();
263         if (executor.inEventLoop()) {
264             next.invokeExceptionCaught(cause);
265         } else {
266             try {
267                 executor.execute(new Runnable() {
268                     @Override
269                     public void run() {
270                         next.invokeExceptionCaught(cause);
271                     }
272                 });
273             } catch (Throwable t) {
274                 if (logger.isWarnEnabled()) {
275                     logger.warn("Failed to submit an exceptionCaught() event.", t);
276                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
277                 }
278             }
279         }
280     }
281 
282     private void invokeExceptionCaught(final Throwable cause) {
283         if (invokeHandler()) {
284             try {
285                 handler().exceptionCaught(this, cause);
286             } catch (Throwable error) {
287                 if (logger.isDebugEnabled()) {
288                     logger.debug(
289                         "An exception {}" +
290                         "was thrown by a user handler's exceptionCaught() " +
291                         "method while handling the following exception:",
292                         ThrowableUtil.stackTraceToString(error), cause);
293                 } else if (logger.isWarnEnabled()) {
294                     logger.warn(
295                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
296                         "was thrown by a user handler's exceptionCaught() " +
297                         "method while handling the following exception:", error, cause);
298                 }
299             }
300         } else {
301             fireExceptionCaught(cause);
302         }
303     }
304 
305     @Override
306     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
307         invokeUserEventTriggered(findContextInbound(), event);
308         return this;
309     }
310 
311     static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
312         ObjectUtil.checkNotNull(event, "event");
313         EventExecutor executor = next.executor();
314         if (executor.inEventLoop()) {
315             next.invokeUserEventTriggered(event);
316         } else {
317             executor.execute(new Runnable() {
318                 @Override
319                 public void run() {
320                     next.invokeUserEventTriggered(event);
321                 }
322             });
323         }
324     }
325 
326     private void invokeUserEventTriggered(Object event) {
327         if (invokeHandler()) {
328             try {
329                 ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
330             } catch (Throwable t) {
331                 notifyHandlerException(t);
332             }
333         } else {
334             fireUserEventTriggered(event);
335         }
336     }
337 
338     @Override
339     public ChannelHandlerContext fireChannelRead(final Object msg) {
340         invokeChannelRead(findContextInbound(), msg);
341         return this;
342     }
343 
344     static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
345         final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
346         EventExecutor executor = next.executor();
347         if (executor.inEventLoop()) {
348             next.invokeChannelRead(m);
349         } else {
350             executor.execute(new Runnable() {
351                 @Override
352                 public void run() {
353                     next.invokeChannelRead(m);
354                 }
355             });
356         }
357     }
358 
359     private void invokeChannelRead(Object msg) {
360         if (invokeHandler()) {
361             try {
362                 ((ChannelInboundHandler) handler()).channelRead(this, msg);
363             } catch (Throwable t) {
364                 notifyHandlerException(t);
365             }
366         } else {
367             fireChannelRead(msg);
368         }
369     }
370 
371     @Override
372     public ChannelHandlerContext fireChannelReadComplete() {
373         invokeChannelReadComplete(findContextInbound());
374         return this;
375     }
376 
377     static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
378         EventExecutor executor = next.executor();
379         if (executor.inEventLoop()) {
380             next.invokeChannelReadComplete();
381         } else {
382             Runnable task = next.invokeChannelReadCompleteTask;
383             if (task == null) {
384                 next.invokeChannelReadCompleteTask = task = new Runnable() {
385                     @Override
386                     public void run() {
387                         next.invokeChannelReadComplete();
388                     }
389                 };
390             }
391             executor.execute(task);
392         }
393     }
394 
395     private void invokeChannelReadComplete() {
396         if (invokeHandler()) {
397             try {
398                 ((ChannelInboundHandler) handler()).channelReadComplete(this);
399             } catch (Throwable t) {
400                 notifyHandlerException(t);
401             }
402         } else {
403             fireChannelReadComplete();
404         }
405     }
406 
407     @Override
408     public ChannelHandlerContext fireChannelWritabilityChanged() {
409         invokeChannelWritabilityChanged(findContextInbound());
410         return this;
411     }
412 
413     static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
414         EventExecutor executor = next.executor();
415         if (executor.inEventLoop()) {
416             next.invokeChannelWritabilityChanged();
417         } else {
418             Runnable task = next.invokeChannelWritableStateChangedTask;
419             if (task == null) {
420                 next.invokeChannelWritableStateChangedTask = task = new Runnable() {
421                     @Override
422                     public void run() {
423                         next.invokeChannelWritabilityChanged();
424                     }
425                 };
426             }
427             executor.execute(task);
428         }
429     }
430 
431     private void invokeChannelWritabilityChanged() {
432         if (invokeHandler()) {
433             try {
434                 ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
435             } catch (Throwable t) {
436                 notifyHandlerException(t);
437             }
438         } else {
439             fireChannelWritabilityChanged();
440         }
441     }
442 
443     @Override
444     public ChannelFuture bind(SocketAddress localAddress) {
445         return bind(localAddress, newPromise());
446     }
447 
448     @Override
449     public ChannelFuture connect(SocketAddress remoteAddress) {
450         return connect(remoteAddress, newPromise());
451     }
452 
453     @Override
454     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
455         return connect(remoteAddress, localAddress, newPromise());
456     }
457 
458     @Override
459     public ChannelFuture disconnect() {
460         return disconnect(newPromise());
461     }
462 
463     @Override
464     public ChannelFuture close() {
465         return close(newPromise());
466     }
467 
468     @Override
469     public ChannelFuture deregister() {
470         return deregister(newPromise());
471     }
472 
473     @Override
474     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
475         if (localAddress == null) {
476             throw new NullPointerException("localAddress");
477         }
478         if (isNotValidPromise(promise, false)) {
479             // cancelled
480             return promise;
481         }
482 
483         final AbstractChannelHandlerContext next = findContextOutbound();
484         EventExecutor executor = next.executor();
485         if (executor.inEventLoop()) {
486             next.invokeBind(localAddress, promise);
487         } else {
488             safeExecute(executor, new Runnable() {
489                 @Override
490                 public void run() {
491                     next.invokeBind(localAddress, promise);
492                 }
493             }, promise, null);
494         }
495         return promise;
496     }
497 
498     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
499         if (invokeHandler()) {
500             try {
501                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
502             } catch (Throwable t) {
503                 notifyOutboundHandlerException(t, promise);
504             }
505         } else {
506             bind(localAddress, promise);
507         }
508     }
509 
510     @Override
511     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
512         return connect(remoteAddress, null, promise);
513     }
514 
515     @Override
516     public ChannelFuture connect(
517             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
518 
519         if (remoteAddress == null) {
520             throw new NullPointerException("remoteAddress");
521         }
522         if (isNotValidPromise(promise, false)) {
523             // cancelled
524             return promise;
525         }
526 
527         final AbstractChannelHandlerContext next = findContextOutbound();
528         EventExecutor executor = next.executor();
529         if (executor.inEventLoop()) {
530             next.invokeConnect(remoteAddress, localAddress, promise);
531         } else {
532             safeExecute(executor, new Runnable() {
533                 @Override
534                 public void run() {
535                     next.invokeConnect(remoteAddress, localAddress, promise);
536                 }
537             }, promise, null);
538         }
539         return promise;
540     }
541 
542     private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
543         if (invokeHandler()) {
544             try {
545                 ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
546             } catch (Throwable t) {
547                 notifyOutboundHandlerException(t, promise);
548             }
549         } else {
550             connect(remoteAddress, localAddress, promise);
551         }
552     }
553 
554     @Override
555     public ChannelFuture disconnect(final ChannelPromise promise) {
556         if (isNotValidPromise(promise, false)) {
557             // cancelled
558             return promise;
559         }
560 
561         final AbstractChannelHandlerContext next = findContextOutbound();
562         EventExecutor executor = next.executor();
563         if (executor.inEventLoop()) {
564             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
565             // So far, UDP/IP is the only transport that has such behavior.
566             if (!channel().metadata().hasDisconnect()) {
567                 next.invokeClose(promise);
568             } else {
569                 next.invokeDisconnect(promise);
570             }
571         } else {
572             safeExecute(executor, new Runnable() {
573                 @Override
574                 public void run() {
575                     if (!channel().metadata().hasDisconnect()) {
576                         next.invokeClose(promise);
577                     } else {
578                         next.invokeDisconnect(promise);
579                     }
580                 }
581             }, promise, null);
582         }
583         return promise;
584     }
585 
586     private void invokeDisconnect(ChannelPromise promise) {
587         if (invokeHandler()) {
588             try {
589                 ((ChannelOutboundHandler) handler()).disconnect(this, promise);
590             } catch (Throwable t) {
591                 notifyOutboundHandlerException(t, promise);
592             }
593         } else {
594             disconnect(promise);
595         }
596     }
597 
598     @Override
599     public ChannelFuture close(final ChannelPromise promise) {
600         if (isNotValidPromise(promise, false)) {
601             // cancelled
602             return promise;
603         }
604 
605         final AbstractChannelHandlerContext next = findContextOutbound();
606         EventExecutor executor = next.executor();
607         if (executor.inEventLoop()) {
608             next.invokeClose(promise);
609         } else {
610             safeExecute(executor, new Runnable() {
611                 @Override
612                 public void run() {
613                     next.invokeClose(promise);
614                 }
615             }, promise, null);
616         }
617 
618         return promise;
619     }
620 
621     private void invokeClose(ChannelPromise promise) {
622         if (invokeHandler()) {
623             try {
624                 ((ChannelOutboundHandler) handler()).close(this, promise);
625             } catch (Throwable t) {
626                 notifyOutboundHandlerException(t, promise);
627             }
628         } else {
629             close(promise);
630         }
631     }
632 
633     @Override
634     public ChannelFuture deregister(final ChannelPromise promise) {
635         if (isNotValidPromise(promise, false)) {
636             // cancelled
637             return promise;
638         }
639 
640         final AbstractChannelHandlerContext next = findContextOutbound();
641         EventExecutor executor = next.executor();
642         if (executor.inEventLoop()) {
643             next.invokeDeregister(promise);
644         } else {
645             safeExecute(executor, new Runnable() {
646                 @Override
647                 public void run() {
648                     next.invokeDeregister(promise);
649                 }
650             }, promise, null);
651         }
652 
653         return promise;
654     }
655 
656     private void invokeDeregister(ChannelPromise promise) {
657         if (invokeHandler()) {
658             try {
659                 ((ChannelOutboundHandler) handler()).deregister(this, promise);
660             } catch (Throwable t) {
661                 notifyOutboundHandlerException(t, promise);
662             }
663         } else {
664             deregister(promise);
665         }
666     }
667 
668     @Override
669     public ChannelHandlerContext read() {
670         final AbstractChannelHandlerContext next = findContextOutbound();
671         EventExecutor executor = next.executor();
672         if (executor.inEventLoop()) {
673             next.invokeRead();
674         } else {
675             Runnable task = next.invokeReadTask;
676             if (task == null) {
677                 next.invokeReadTask = task = new Runnable() {
678                     @Override
679                     public void run() {
680                         next.invokeRead();
681                     }
682                 };
683             }
684             executor.execute(task);
685         }
686 
687         return this;
688     }
689 
690     private void invokeRead() {
691         if (invokeHandler()) {
692             try {
693                 ((ChannelOutboundHandler) handler()).read(this);
694             } catch (Throwable t) {
695                 notifyHandlerException(t);
696             }
697         } else {
698             read();
699         }
700     }
701 
702     @Override
703     public ChannelFuture write(Object msg) {
704         return write(msg, newPromise());
705     }
706 
707     @Override
708     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
709         if (msg == null) {
710             throw new NullPointerException("msg");
711         }
712 
713         try {
714             if (isNotValidPromise(promise, true)) {
715                 ReferenceCountUtil.release(msg);
716                 // cancelled
717                 return promise;
718             }
719         } catch (RuntimeException e) {
720             ReferenceCountUtil.release(msg);
721             throw e;
722         }
723         write(msg, false, promise);
724 
725         return promise;
726     }
727 
728     private void invokeWrite(Object msg, ChannelPromise promise) {
729         if (invokeHandler()) {
730             invokeWrite0(msg, promise);
731         } else {
732             write(msg, promise);
733         }
734     }
735 
736     private void invokeWrite0(Object msg, ChannelPromise promise) {
737         try {
738             ((ChannelOutboundHandler) handler()).write(this, msg, promise);
739         } catch (Throwable t) {
740             notifyOutboundHandlerException(t, promise);
741         }
742     }
743 
744     @Override
745     public ChannelHandlerContext flush() {
746         final AbstractChannelHandlerContext next = findContextOutbound();
747         EventExecutor executor = next.executor();
748         if (executor.inEventLoop()) {
749             next.invokeFlush();
750         } else {
751             Runnable task = next.invokeFlushTask;
752             if (task == null) {
753                 next.invokeFlushTask = task = new Runnable() {
754                     @Override
755                     public void run() {
756                         next.invokeFlush();
757                     }
758                 };
759             }
760             safeExecute(executor, task, channel().voidPromise(), null);
761         }
762 
763         return this;
764     }
765 
766     private void invokeFlush() {
767         if (invokeHandler()) {
768             invokeFlush0();
769         } else {
770             flush();
771         }
772     }
773 
774     private void invokeFlush0() {
775         try {
776             ((ChannelOutboundHandler) handler()).flush(this);
777         } catch (Throwable t) {
778             notifyHandlerException(t);
779         }
780     }
781 
782     @Override
783     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
784         if (msg == null) {
785             throw new NullPointerException("msg");
786         }
787 
788         if (isNotValidPromise(promise, true)) {
789             ReferenceCountUtil.release(msg);
790             // cancelled
791             return promise;
792         }
793 
794         write(msg, true, promise);
795 
796         return promise;
797     }
798 
799     private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
800         if (invokeHandler()) {
801             invokeWrite0(msg, promise);
802             invokeFlush0();
803         } else {
804             writeAndFlush(msg, promise);
805         }
806     }
807 
808     private void write(Object msg, boolean flush, ChannelPromise promise) {
809         AbstractChannelHandlerContext next = findContextOutbound();
810         final Object m = pipeline.touch(msg, next);
811         EventExecutor executor = next.executor();
812         if (executor.inEventLoop()) {
813             if (flush) {
814                 next.invokeWriteAndFlush(m, promise);
815             } else {
816                 next.invokeWrite(m, promise);
817             }
818         } else {
819             AbstractWriteTask task;
820             if (flush) {
821                 task = WriteAndFlushTask.newInstance(next, m, promise);
822             }  else {
823                 task = WriteTask.newInstance(next, m, promise);
824             }
825             safeExecute(executor, task, promise, m);
826         }
827     }
828 
829     @Override
830     public ChannelFuture writeAndFlush(Object msg) {
831         return writeAndFlush(msg, newPromise());
832     }
833 
834     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
835         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
836         // false.
837         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
838     }
839 
840     private void notifyHandlerException(Throwable cause) {
841         if (inExceptionCaught(cause)) {
842             if (logger.isWarnEnabled()) {
843                 logger.warn(
844                         "An exception was thrown by a user handler " +
845                                 "while handling an exceptionCaught event", cause);
846             }
847             return;
848         }
849 
850         invokeExceptionCaught(cause);
851     }
852 
853     private static boolean inExceptionCaught(Throwable cause) {
854         do {
855             StackTraceElement[] trace = cause.getStackTrace();
856             if (trace != null) {
857                 for (StackTraceElement t : trace) {
858                     if (t == null) {
859                         break;
860                     }
861                     if ("exceptionCaught".equals(t.getMethodName())) {
862                         return true;
863                     }
864                 }
865             }
866 
867             cause = cause.getCause();
868         } while (cause != null);
869 
870         return false;
871     }
872 
873     @Override
874     public ChannelPromise newPromise() {
875         return new DefaultChannelPromise(channel(), executor());
876     }
877 
878     @Override
879     public ChannelProgressivePromise newProgressivePromise() {
880         return new DefaultChannelProgressivePromise(channel(), executor());
881     }
882 
883     @Override
884     public ChannelFuture newSucceededFuture() {
885         ChannelFuture succeededFuture = this.succeededFuture;
886         if (succeededFuture == null) {
887             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
888         }
889         return succeededFuture;
890     }
891 
892     @Override
893     public ChannelFuture newFailedFuture(Throwable cause) {
894         return new FailedChannelFuture(channel(), executor(), cause);
895     }
896 
897     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
898         if (promise == null) {
899             throw new NullPointerException("promise");
900         }
901 
902         if (promise.isDone()) {
903             // Check if the promise was cancelled and if so signal that the processing of the operation
904             // should not be performed.
905             //
906             // See https://github.com/netty/netty/issues/2349
907             if (promise.isCancelled()) {
908                 return true;
909             }
910             throw new IllegalArgumentException("promise already done: " + promise);
911         }
912 
913         if (promise.channel() != channel()) {
914             throw new IllegalArgumentException(String.format(
915                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
916         }
917 
918         if (promise.getClass() == DefaultChannelPromise.class) {
919             return false;
920         }
921 
922         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
923             throw new IllegalArgumentException(
924                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
925         }
926 
927         if (promise instanceof AbstractChannel.CloseFuture) {
928             throw new IllegalArgumentException(
929                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
930         }
931         return false;
932     }
933 
934     private AbstractChannelHandlerContext findContextInbound() {
935         AbstractChannelHandlerContext ctx = this;
936         do {
937             ctx = ctx.next;
938         } while (!ctx.inbound);
939         return ctx;
940     }
941 
942     private AbstractChannelHandlerContext findContextOutbound() {
943         AbstractChannelHandlerContext ctx = this;
944         do {
945             ctx = ctx.prev;
946         } while (!ctx.outbound);
947         return ctx;
948     }
949 
950     @Override
951     public ChannelPromise voidPromise() {
952         return channel().voidPromise();
953     }
954 
955     final void setRemoved() {
956         handlerState = REMOVE_COMPLETE;
957     }
958 
959     final void setAddComplete() {
960         for (;;) {
961             int oldState = handlerState;
962             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
963             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
964             // exposing ordering guarantees.
965             if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
966                 return;
967             }
968         }
969     }
970 
971     final void setAddPending() {
972         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
973         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
974     }
975 
976     /**
977      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
978      * yet. If not return {@code false} and if called or could not detect return {@code true}.
979      *
980      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
981      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
982      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
983      */
984     private boolean invokeHandler() {
985         // Store in local variable to reduce volatile reads.
986         int handlerState = this.handlerState;
987         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
988     }
989 
990     @Override
991     public boolean isRemoved() {
992         return handlerState == REMOVE_COMPLETE;
993     }
994 
995     @Override
996     public <T> Attribute<T> attr(AttributeKey<T> key) {
997         return channel().attr(key);
998     }
999 
1000     @Override
1001     public <T> boolean hasAttr(AttributeKey<T> key) {
1002         return channel().hasAttr(key);
1003     }
1004 
1005     private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
1006         try {
1007             executor.execute(runnable);
1008         } catch (Throwable cause) {
1009             try {
1010                 promise.setFailure(cause);
1011             } finally {
1012                 if (msg != null) {
1013                     ReferenceCountUtil.release(msg);
1014                 }
1015             }
1016         }
1017     }
1018 
1019     @Override
1020     public String toHintString() {
1021         return '\'' + name + "' will handle the message from this point.";
1022     }
1023 
1024     @Override
1025     public String toString() {
1026         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1027     }
1028 
1029     abstract static class AbstractWriteTask implements Runnable {
1030 
1031         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1032                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1033 
1034         // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
1035         private static final int WRITE_TASK_OVERHEAD =
1036                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
1037 
1038         private final Recycler.Handle<AbstractWriteTask> handle;
1039         private AbstractChannelHandlerContext ctx;
1040         private Object msg;
1041         private ChannelPromise promise;
1042         private int size;
1043 
1044         @SuppressWarnings("unchecked")
1045         private AbstractWriteTask(Recycler.Handle<? extends AbstractWriteTask> handle) {
1046             this.handle = (Recycler.Handle<AbstractWriteTask>) handle;
1047         }
1048 
1049         protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
1050                                    Object msg, ChannelPromise promise) {
1051             task.ctx = ctx;
1052             task.msg = msg;
1053             task.promise = promise;
1054 
1055             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1056                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1057                 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1058             } else {
1059                 task.size = 0;
1060             }
1061         }
1062 
1063         @Override
1064         public final void run() {
1065             try {
1066                 // Check for null as it may be set to null if the channel is closed already
1067                 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1068                     ctx.pipeline.decrementPendingOutboundBytes(size);
1069                 }
1070                 write(ctx, msg, promise);
1071             } finally {
1072                 // Set to null so the GC can collect them directly
1073                 ctx = null;
1074                 msg = null;
1075                 promise = null;
1076                 handle.recycle(this);
1077             }
1078         }
1079 
1080         protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1081             ctx.invokeWrite(msg, promise);
1082         }
1083     }
1084 
1085     static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
1086 
1087         private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
1088             @Override
1089             protected WriteTask newObject(Handle<WriteTask> handle) {
1090                 return new WriteTask(handle);
1091             }
1092         };
1093 
1094         private static WriteTask newInstance(
1095                 AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1096             WriteTask task = RECYCLER.get();
1097             init(task, ctx, msg, promise);
1098             return task;
1099         }
1100 
1101         private WriteTask(Recycler.Handle<WriteTask> handle) {
1102             super(handle);
1103         }
1104     }
1105 
1106     static final class WriteAndFlushTask extends AbstractWriteTask {
1107 
1108         private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
1109             @Override
1110             protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
1111                 return new WriteAndFlushTask(handle);
1112             }
1113         };
1114 
1115         private static WriteAndFlushTask newInstance(
1116                 AbstractChannelHandlerContext ctx, Object msg,  ChannelPromise promise) {
1117             WriteAndFlushTask task = RECYCLER.get();
1118             init(task, ctx, msg, promise);
1119             return task;
1120         }
1121 
1122         private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
1123             super(handle);
1124         }
1125 
1126         @Override
1127         public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1128             super.write(ctx, msg, promise);
1129             ctx.invokeFlush();
1130         }
1131     }
1132 }