View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.DefaultAttributeMap;
20  import io.netty.util.Recycler;
21  import io.netty.util.ReferenceCountUtil;
22  import io.netty.util.concurrent.EventExecutor;
23  import io.netty.util.concurrent.OrderedEventExecutor;
24  import io.netty.util.internal.PromiseNotificationUtil;
25  import io.netty.util.internal.ThrowableUtil;
26  import io.netty.util.internal.ObjectUtil;
27  import io.netty.util.internal.StringUtil;
28  import io.netty.util.internal.SystemPropertyUtil;
29  
30  import java.net.SocketAddress;
31  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
32  
33  import static io.netty.channel.DefaultChannelPipeline.*;
34  
35  abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
36  
37      volatile AbstractChannelHandlerContext next;
38      volatile AbstractChannelHandlerContext prev;
39  
40      private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
41              AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
42  
43      /**
44       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
45       */
46      private static final int ADD_PENDING = 1;
47      /**
48       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
49       */
50      private static final int ADD_COMPLETE = 2;
51      /**
52       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
53       */
54      private static final int REMOVE_COMPLETE = 3;
55      /**
56       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
57       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
58       */
59      private static final int INIT = 0;
60  
61      private final boolean inbound;
62      private final boolean outbound;
63      private final DefaultChannelPipeline pipeline;
64      private final String name;
65      private final boolean ordered;
66  
67      // Will be set to null if no child executor should be used, otherwise it will be set to the
68      // child executor.
69      final EventExecutor executor;
70      private ChannelFuture succeededFuture;
71  
72      // Lazily instantiated tasks used to trigger events to a handler with different executor.
73      // There is no need to make this volatile as at worse it will just create a few more instances then needed.
74      private Runnable invokeChannelReadCompleteTask;
75      private Runnable invokeReadTask;
76      private Runnable invokeChannelWritableStateChangedTask;
77      private Runnable invokeFlushTask;
78  
79      private volatile int handlerState = INIT;
80  
81      AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
82                                    boolean inbound, boolean outbound) {
83          this.name = ObjectUtil.checkNotNull(name, "name");
84          this.pipeline = pipeline;
85          this.executor = executor;
86          this.inbound = inbound;
87          this.outbound = outbound;
88          // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
89          ordered = executor == null || executor instanceof OrderedEventExecutor;
90      }
91  
92      @Override
93      public Channel channel() {
94          return pipeline.channel();
95      }
96  
97      @Override
98      public ChannelPipeline pipeline() {
99          return pipeline;
100     }
101 
102     @Override
103     public ByteBufAllocator alloc() {
104         return channel().config().getAllocator();
105     }
106 
107     @Override
108     public EventExecutor executor() {
109         if (executor == null) {
110             return channel().eventLoop();
111         } else {
112             return executor;
113         }
114     }
115 
116     @Override
117     public String name() {
118         return name;
119     }
120 
121     @Override
122     public ChannelHandlerContext fireChannelRegistered() {
123         invokeChannelRegistered(findContextInbound());
124         return this;
125     }
126 
127     static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
128         EventExecutor executor = next.executor();
129         if (executor.inEventLoop()) {
130             next.invokeChannelRegistered();
131         } else {
132             executor.execute(new Runnable() {
133                 @Override
134                 public void run() {
135                     next.invokeChannelRegistered();
136                 }
137             });
138         }
139     }
140 
141     private void invokeChannelRegistered() {
142         if (invokeHandler()) {
143             try {
144                 ((ChannelInboundHandler) handler()).channelRegistered(this);
145             } catch (Throwable t) {
146                 notifyHandlerException(t);
147             }
148         } else {
149             fireChannelRegistered();
150         }
151     }
152 
153     @Override
154     public ChannelHandlerContext fireChannelUnregistered() {
155         invokeChannelUnregistered(findContextInbound());
156         return this;
157     }
158 
159     static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
160         EventExecutor executor = next.executor();
161         if (executor.inEventLoop()) {
162             next.invokeChannelUnregistered();
163         } else {
164             executor.execute(new Runnable() {
165                 @Override
166                 public void run() {
167                     next.invokeChannelUnregistered();
168                 }
169             });
170         }
171     }
172 
173     private void invokeChannelUnregistered() {
174         if (invokeHandler()) {
175             try {
176                 ((ChannelInboundHandler) handler()).channelUnregistered(this);
177             } catch (Throwable t) {
178                 notifyHandlerException(t);
179             }
180         } else {
181             fireChannelUnregistered();
182         }
183     }
184 
185     @Override
186     public ChannelHandlerContext fireChannelActive() {
187         invokeChannelActive(findContextInbound());
188         return this;
189     }
190 
191     static void invokeChannelActive(final AbstractChannelHandlerContext next) {
192         EventExecutor executor = next.executor();
193         if (executor.inEventLoop()) {
194             next.invokeChannelActive();
195         } else {
196             executor.execute(new Runnable() {
197                 @Override
198                 public void run() {
199                     next.invokeChannelActive();
200                 }
201             });
202         }
203     }
204 
205     private void invokeChannelActive() {
206         if (invokeHandler()) {
207             try {
208                 ((ChannelInboundHandler) handler()).channelActive(this);
209             } catch (Throwable t) {
210                 notifyHandlerException(t);
211             }
212         } else {
213             fireChannelActive();
214         }
215     }
216 
217     @Override
218     public ChannelHandlerContext fireChannelInactive() {
219         invokeChannelInactive(findContextInbound());
220         return this;
221     }
222 
223     static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
224         EventExecutor executor = next.executor();
225         if (executor.inEventLoop()) {
226             next.invokeChannelInactive();
227         } else {
228             executor.execute(new Runnable() {
229                 @Override
230                 public void run() {
231                     next.invokeChannelInactive();
232                 }
233             });
234         }
235     }
236 
237     private void invokeChannelInactive() {
238         if (invokeHandler()) {
239             try {
240                 ((ChannelInboundHandler) handler()).channelInactive(this);
241             } catch (Throwable t) {
242                 notifyHandlerException(t);
243             }
244         } else {
245             fireChannelInactive();
246         }
247     }
248 
249     @Override
250     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
251         invokeExceptionCaught(next, cause);
252         return this;
253     }
254 
255     static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
256         ObjectUtil.checkNotNull(cause, "cause");
257         EventExecutor executor = next.executor();
258         if (executor.inEventLoop()) {
259             next.invokeExceptionCaught(cause);
260         } else {
261             try {
262                 executor.execute(new Runnable() {
263                     @Override
264                     public void run() {
265                         next.invokeExceptionCaught(cause);
266                     }
267                 });
268             } catch (Throwable t) {
269                 if (logger.isWarnEnabled()) {
270                     logger.warn("Failed to submit an exceptionCaught() event.", t);
271                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
272                 }
273             }
274         }
275     }
276 
277     private void invokeExceptionCaught(final Throwable cause) {
278         if (invokeHandler()) {
279             try {
280                 handler().exceptionCaught(this, cause);
281             } catch (Throwable error) {
282                 if (logger.isDebugEnabled()) {
283                     logger.debug(
284                         "An exception {}" +
285                         "was thrown by a user handler's exceptionCaught() " +
286                         "method while handling the following exception:",
287                         ThrowableUtil.stackTraceToString(error), cause);
288                 } else if (logger.isWarnEnabled()) {
289                     logger.warn(
290                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
291                         "was thrown by a user handler's exceptionCaught() " +
292                         "method while handling the following exception:", error, cause);
293                 }
294             }
295         } else {
296             fireExceptionCaught(cause);
297         }
298     }
299 
300     @Override
301     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
302         invokeUserEventTriggered(findContextInbound(), event);
303         return this;
304     }
305 
306     static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
307         ObjectUtil.checkNotNull(event, "event");
308         EventExecutor executor = next.executor();
309         if (executor.inEventLoop()) {
310             next.invokeUserEventTriggered(event);
311         } else {
312             executor.execute(new Runnable() {
313                 @Override
314                 public void run() {
315                     next.invokeUserEventTriggered(event);
316                 }
317             });
318         }
319     }
320 
321     private void invokeUserEventTriggered(Object event) {
322         if (invokeHandler()) {
323             try {
324                 ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
325             } catch (Throwable t) {
326                 notifyHandlerException(t);
327             }
328         } else {
329             fireUserEventTriggered(event);
330         }
331     }
332 
333     @Override
334     public ChannelHandlerContext fireChannelRead(final Object msg) {
335         invokeChannelRead(findContextInbound(), msg);
336         return this;
337     }
338     static void invokeChannelRead(final AbstractChannelHandlerContext next, final Object msg) {
339         ObjectUtil.checkNotNull(msg, "msg");
340         EventExecutor executor = next.executor();
341         if (executor.inEventLoop()) {
342             next.invokeChannelRead(msg);
343         } else {
344             executor.execute(new Runnable() {
345                 @Override
346                 public void run() {
347                     next.invokeChannelRead(msg);
348                 }
349             });
350         }
351     }
352 
353     private void invokeChannelRead(Object msg) {
354         if (invokeHandler()) {
355             try {
356                 ((ChannelInboundHandler) handler()).channelRead(this, msg);
357             } catch (Throwable t) {
358                 notifyHandlerException(t);
359             }
360         } else {
361             fireChannelRead(msg);
362         }
363     }
364 
365     @Override
366     public ChannelHandlerContext fireChannelReadComplete() {
367         invokeChannelReadComplete(findContextInbound());
368         return this;
369     }
370 
371     static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
372         EventExecutor executor = next.executor();
373         if (executor.inEventLoop()) {
374             next.invokeChannelReadComplete();
375         } else {
376             Runnable task = next.invokeChannelReadCompleteTask;
377             if (task == null) {
378                 next.invokeChannelReadCompleteTask = task = new Runnable() {
379                     @Override
380                     public void run() {
381                         next.invokeChannelReadComplete();
382                     }
383                 };
384             }
385             executor.execute(task);
386         }
387     }
388 
389     private void invokeChannelReadComplete() {
390         if (invokeHandler()) {
391             try {
392                 ((ChannelInboundHandler) handler()).channelReadComplete(this);
393             } catch (Throwable t) {
394                 notifyHandlerException(t);
395             }
396         } else {
397             fireChannelReadComplete();
398         }
399     }
400 
401     @Override
402     public ChannelHandlerContext fireChannelWritabilityChanged() {
403         invokeChannelWritabilityChanged(findContextInbound());
404         return this;
405     }
406 
407     static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
408         EventExecutor executor = next.executor();
409         if (executor.inEventLoop()) {
410             next.invokeChannelWritabilityChanged();
411         } else {
412             Runnable task = next.invokeChannelWritableStateChangedTask;
413             if (task == null) {
414                 next.invokeChannelWritableStateChangedTask = task = new Runnable() {
415                     @Override
416                     public void run() {
417                         next.invokeChannelWritabilityChanged();
418                     }
419                 };
420             }
421             executor.execute(task);
422         }
423     }
424 
425     private void invokeChannelWritabilityChanged() {
426         if (invokeHandler()) {
427             try {
428                 ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
429             } catch (Throwable t) {
430                 notifyHandlerException(t);
431             }
432         } else {
433             fireChannelWritabilityChanged();
434         }
435     }
436 
437     @Override
438     public ChannelFuture bind(SocketAddress localAddress) {
439         return bind(localAddress, newPromise());
440     }
441 
442     @Override
443     public ChannelFuture connect(SocketAddress remoteAddress) {
444         return connect(remoteAddress, newPromise());
445     }
446 
447     @Override
448     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
449         return connect(remoteAddress, localAddress, newPromise());
450     }
451 
452     @Override
453     public ChannelFuture disconnect() {
454         return disconnect(newPromise());
455     }
456 
457     @Override
458     public ChannelFuture close() {
459         return close(newPromise());
460     }
461 
462     @Override
463     public ChannelFuture deregister() {
464         return deregister(newPromise());
465     }
466 
467     @Override
468     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
469         if (localAddress == null) {
470             throw new NullPointerException("localAddress");
471         }
472         if (isNotValidPromise(promise, false)) {
473             // cancelled
474             return promise;
475         }
476 
477         final AbstractChannelHandlerContext next = findContextOutbound();
478         EventExecutor executor = next.executor();
479         if (executor.inEventLoop()) {
480             next.invokeBind(localAddress, promise);
481         } else {
482             safeExecute(executor, new Runnable() {
483                 @Override
484                 public void run() {
485                     next.invokeBind(localAddress, promise);
486                 }
487             }, promise, null);
488         }
489         return promise;
490     }
491 
492     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
493         if (invokeHandler()) {
494             try {
495                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
496             } catch (Throwable t) {
497                 notifyOutboundHandlerException(t, promise);
498             }
499         } else {
500             bind(localAddress, promise);
501         }
502     }
503 
504     @Override
505     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
506         return connect(remoteAddress, null, promise);
507     }
508 
509     @Override
510     public ChannelFuture connect(
511             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
512 
513         if (remoteAddress == null) {
514             throw new NullPointerException("remoteAddress");
515         }
516         if (isNotValidPromise(promise, false)) {
517             // cancelled
518             return promise;
519         }
520 
521         final AbstractChannelHandlerContext next = findContextOutbound();
522         EventExecutor executor = next.executor();
523         if (executor.inEventLoop()) {
524             next.invokeConnect(remoteAddress, localAddress, promise);
525         } else {
526             safeExecute(executor, new Runnable() {
527                 @Override
528                 public void run() {
529                     next.invokeConnect(remoteAddress, localAddress, promise);
530                 }
531             }, promise, null);
532         }
533         return promise;
534     }
535 
536     private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
537         if (invokeHandler()) {
538             try {
539                 ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
540             } catch (Throwable t) {
541                 notifyOutboundHandlerException(t, promise);
542             }
543         } else {
544             connect(remoteAddress, localAddress, promise);
545         }
546     }
547 
548     @Override
549     public ChannelFuture disconnect(final ChannelPromise promise) {
550         if (isNotValidPromise(promise, false)) {
551             // cancelled
552             return promise;
553         }
554 
555         final AbstractChannelHandlerContext next = findContextOutbound();
556         EventExecutor executor = next.executor();
557         if (executor.inEventLoop()) {
558             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
559             // So far, UDP/IP is the only transport that has such behavior.
560             if (!channel().metadata().hasDisconnect()) {
561                 next.invokeClose(promise);
562             } else {
563                 next.invokeDisconnect(promise);
564             }
565         } else {
566             safeExecute(executor, new Runnable() {
567                 @Override
568                 public void run() {
569                     if (!channel().metadata().hasDisconnect()) {
570                         next.invokeClose(promise);
571                     } else {
572                         next.invokeDisconnect(promise);
573                     }
574                 }
575             }, promise, null);
576         }
577         return promise;
578     }
579 
580     private void invokeDisconnect(ChannelPromise promise) {
581         if (invokeHandler()) {
582             try {
583                 ((ChannelOutboundHandler) handler()).disconnect(this, promise);
584             } catch (Throwable t) {
585                 notifyOutboundHandlerException(t, promise);
586             }
587         } else {
588             disconnect(promise);
589         }
590     }
591 
592     @Override
593     public ChannelFuture close(final ChannelPromise promise) {
594         if (isNotValidPromise(promise, false)) {
595             // cancelled
596             return promise;
597         }
598 
599         final AbstractChannelHandlerContext next = findContextOutbound();
600         EventExecutor executor = next.executor();
601         if (executor.inEventLoop()) {
602             next.invokeClose(promise);
603         } else {
604             safeExecute(executor, new Runnable() {
605                 @Override
606                 public void run() {
607                     next.invokeClose(promise);
608                 }
609             }, promise, null);
610         }
611 
612         return promise;
613     }
614 
615     private void invokeClose(ChannelPromise promise) {
616         if (invokeHandler()) {
617             try {
618                 ((ChannelOutboundHandler) handler()).close(this, promise);
619             } catch (Throwable t) {
620                 notifyOutboundHandlerException(t, promise);
621             }
622         } else {
623             close(promise);
624         }
625     }
626 
627     @Override
628     public ChannelFuture deregister(final ChannelPromise promise) {
629         if (isNotValidPromise(promise, false)) {
630             // cancelled
631             return promise;
632         }
633 
634         final AbstractChannelHandlerContext next = findContextOutbound();
635         EventExecutor executor = next.executor();
636         if (executor.inEventLoop()) {
637             next.invokeDeregister(promise);
638         } else {
639             safeExecute(executor, new Runnable() {
640                 @Override
641                 public void run() {
642                     next.invokeDeregister(promise);
643                 }
644             }, promise, null);
645         }
646 
647         return promise;
648     }
649 
650     private void invokeDeregister(ChannelPromise promise) {
651         if (invokeHandler()) {
652             try {
653                 ((ChannelOutboundHandler) handler()).deregister(this, promise);
654             } catch (Throwable t) {
655                 notifyOutboundHandlerException(t, promise);
656             }
657         } else {
658             deregister(promise);
659         }
660     }
661 
662     @Override
663     public ChannelHandlerContext read() {
664         final AbstractChannelHandlerContext next = findContextOutbound();
665         EventExecutor executor = next.executor();
666         if (executor.inEventLoop()) {
667             next.invokeRead();
668         } else {
669             Runnable task = next.invokeReadTask;
670             if (task == null) {
671                 next.invokeReadTask = task = new Runnable() {
672                     @Override
673                     public void run() {
674                         next.invokeRead();
675                     }
676                 };
677             }
678             executor.execute(task);
679         }
680 
681         return this;
682     }
683 
684     private void invokeRead() {
685         if (invokeHandler()) {
686             try {
687                 ((ChannelOutboundHandler) handler()).read(this);
688             } catch (Throwable t) {
689                 notifyHandlerException(t);
690             }
691         } else {
692             read();
693         }
694     }
695 
696     @Override
697     public ChannelFuture write(Object msg) {
698         return write(msg, newPromise());
699     }
700 
701     @Override
702     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
703         if (msg == null) {
704             throw new NullPointerException("msg");
705         }
706 
707         try {
708             if (isNotValidPromise(promise, true)) {
709                 ReferenceCountUtil.release(msg);
710                 // cancelled
711                 return promise;
712             }
713         } catch (RuntimeException e) {
714             ReferenceCountUtil.release(msg);
715             throw e;
716         }
717         write(msg, false, promise);
718 
719         return promise;
720     }
721 
722     private void invokeWrite(Object msg, ChannelPromise promise) {
723         if (invokeHandler()) {
724             invokeWrite0(msg, promise);
725         } else {
726             write(msg, promise);
727         }
728     }
729 
730     private void invokeWrite0(Object msg, ChannelPromise promise) {
731         try {
732             ((ChannelOutboundHandler) handler()).write(this, msg, promise);
733         } catch (Throwable t) {
734             notifyOutboundHandlerException(t, promise);
735         }
736     }
737 
738     @Override
739     public ChannelHandlerContext flush() {
740         final AbstractChannelHandlerContext next = findContextOutbound();
741         EventExecutor executor = next.executor();
742         if (executor.inEventLoop()) {
743             next.invokeFlush();
744         } else {
745             Runnable task = next.invokeFlushTask;
746             if (task == null) {
747                 next.invokeFlushTask = task = new Runnable() {
748                     @Override
749                     public void run() {
750                         next.invokeFlush();
751                     }
752                 };
753             }
754             safeExecute(executor, task, channel().voidPromise(), null);
755         }
756 
757         return this;
758     }
759 
760     private void invokeFlush() {
761         if (invokeHandler()) {
762             invokeFlush0();
763         } else {
764             flush();
765         }
766     }
767 
768     private void invokeFlush0() {
769         try {
770             ((ChannelOutboundHandler) handler()).flush(this);
771         } catch (Throwable t) {
772             notifyHandlerException(t);
773         }
774     }
775 
776     @Override
777     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
778         if (msg == null) {
779             throw new NullPointerException("msg");
780         }
781 
782         if (isNotValidPromise(promise, true)) {
783             ReferenceCountUtil.release(msg);
784             // cancelled
785             return promise;
786         }
787 
788         write(msg, true, promise);
789 
790         return promise;
791     }
792 
793     private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
794         if (invokeHandler()) {
795             invokeWrite0(msg, promise);
796             invokeFlush0();
797         } else {
798             writeAndFlush(msg, promise);
799         }
800     }
801 
802     private void write(Object msg, boolean flush, ChannelPromise promise) {
803         AbstractChannelHandlerContext next = findContextOutbound();
804         EventExecutor executor = next.executor();
805         if (executor.inEventLoop()) {
806             if (flush) {
807                 next.invokeWriteAndFlush(msg, promise);
808             } else {
809                 next.invokeWrite(msg, promise);
810             }
811         } else {
812             AbstractWriteTask task;
813             if (flush) {
814                 task = WriteAndFlushTask.newInstance(next, msg, promise);
815             }  else {
816                 task = WriteTask.newInstance(next, msg, promise);
817             }
818             safeExecute(executor, task, promise, msg);
819         }
820     }
821 
822     @Override
823     public ChannelFuture writeAndFlush(Object msg) {
824         return writeAndFlush(msg, newPromise());
825     }
826 
827     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
828         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
829         // false.
830         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
831     }
832 
833     private void notifyHandlerException(Throwable cause) {
834         if (inExceptionCaught(cause)) {
835             if (logger.isWarnEnabled()) {
836                 logger.warn(
837                         "An exception was thrown by a user handler " +
838                                 "while handling an exceptionCaught event", cause);
839             }
840             return;
841         }
842 
843         invokeExceptionCaught(cause);
844     }
845 
846     private static boolean inExceptionCaught(Throwable cause) {
847         do {
848             StackTraceElement[] trace = cause.getStackTrace();
849             if (trace != null) {
850                 for (StackTraceElement t : trace) {
851                     if (t == null) {
852                         break;
853                     }
854                     if ("exceptionCaught".equals(t.getMethodName())) {
855                         return true;
856                     }
857                 }
858             }
859 
860             cause = cause.getCause();
861         } while (cause != null);
862 
863         return false;
864     }
865 
866     @Override
867     public ChannelPromise newPromise() {
868         return new DefaultChannelPromise(channel(), executor());
869     }
870 
871     @Override
872     public ChannelProgressivePromise newProgressivePromise() {
873         return new DefaultChannelProgressivePromise(channel(), executor());
874     }
875 
876     @Override
877     public ChannelFuture newSucceededFuture() {
878         ChannelFuture succeededFuture = this.succeededFuture;
879         if (succeededFuture == null) {
880             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
881         }
882         return succeededFuture;
883     }
884 
885     @Override
886     public ChannelFuture newFailedFuture(Throwable cause) {
887         return new FailedChannelFuture(channel(), executor(), cause);
888     }
889 
890     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
891         if (promise == null) {
892             throw new NullPointerException("promise");
893         }
894 
895         if (promise.isDone()) {
896             // Check if the promise was cancelled and if so signal that the processing of the operation
897             // should not be performed.
898             //
899             // See https://github.com/netty/netty/issues/2349
900             if (promise.isCancelled()) {
901                 return true;
902             }
903             throw new IllegalArgumentException("promise already done: " + promise);
904         }
905 
906         if (promise.channel() != channel()) {
907             throw new IllegalArgumentException(String.format(
908                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
909         }
910 
911         if (promise.getClass() == DefaultChannelPromise.class) {
912             return false;
913         }
914 
915         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
916             throw new IllegalArgumentException(
917                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
918         }
919 
920         if (promise instanceof AbstractChannel.CloseFuture) {
921             throw new IllegalArgumentException(
922                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
923         }
924         return false;
925     }
926 
927     private AbstractChannelHandlerContext findContextInbound() {
928         AbstractChannelHandlerContext ctx = this;
929         do {
930             ctx = ctx.next;
931         } while (!ctx.inbound);
932         return ctx;
933     }
934 
935     private AbstractChannelHandlerContext findContextOutbound() {
936         AbstractChannelHandlerContext ctx = this;
937         do {
938             ctx = ctx.prev;
939         } while (!ctx.outbound);
940         return ctx;
941     }
942 
943     @Override
944     public ChannelPromise voidPromise() {
945         return channel().voidPromise();
946     }
947 
948     final void setRemoved() {
949         handlerState = REMOVE_COMPLETE;
950     }
951 
952     final void setAddComplete() {
953         for (;;) {
954             int oldState = handlerState;
955             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
956             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
957             // exposing ordering guarantees.
958             if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
959                 return;
960             }
961         }
962     }
963 
964     final void setAddPending() {
965         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
966         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
967     }
968 
969     /**
970      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
971      * yet. If not return {@code false} and if called or could not detect return {@code true}.
972      *
973      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
974      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
975      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
976      */
977     private boolean invokeHandler() {
978         // Store in local variable to reduce volatile reads.
979         int handlerState = this.handlerState;
980         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
981     }
982 
983     @Override
984     public boolean isRemoved() {
985         return handlerState == REMOVE_COMPLETE;
986     }
987 
988     private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
989         try {
990             executor.execute(runnable);
991         } catch (Throwable cause) {
992             try {
993                 promise.setFailure(cause);
994             } finally {
995                 if (msg != null) {
996                     ReferenceCountUtil.release(msg);
997                 }
998             }
999         }
1000     }
1001 
1002     abstract static class AbstractWriteTask implements Runnable {
1003 
1004         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1005                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1006 
1007         // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
1008         private static final int WRITE_TASK_OVERHEAD =
1009                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
1010 
1011         private final Recycler.Handle handle;
1012         private AbstractChannelHandlerContext ctx;
1013         private Object msg;
1014         private ChannelPromise promise;
1015         private int size;
1016 
1017         private AbstractWriteTask(Recycler.Handle handle) {
1018             this.handle = handle;
1019         }
1020 
1021         protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
1022                                    Object msg, ChannelPromise promise) {
1023             task.ctx = ctx;
1024             task.msg = msg;
1025             task.promise = promise;
1026 
1027             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1028                 ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
1029 
1030                 // Check for null as it may be set to null if the channel is closed already
1031                 if (buffer != null) {
1032                     task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1033                     buffer.incrementPendingOutboundBytes(task.size);
1034                 } else {
1035                     task.size = 0;
1036                 }
1037             } else {
1038                 task.size = 0;
1039             }
1040         }
1041 
1042         @Override
1043         public final void run() {
1044             try {
1045                 ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
1046                 // Check for null as it may be set to null if the channel is closed already
1047                 if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) {
1048                     buffer.decrementPendingOutboundBytes(size);
1049                 }
1050                 write(ctx, msg, promise);
1051             } finally {
1052                 // Set to null so the GC can collect them directly
1053                 ctx = null;
1054                 msg = null;
1055                 promise = null;
1056                 recycle(handle);
1057             }
1058         }
1059 
1060         protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1061             ctx.invokeWrite(msg, promise);
1062         }
1063 
1064         protected abstract void recycle(Recycler.Handle handle);
1065     }
1066 
1067     static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
1068 
1069         private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
1070             @Override
1071             protected WriteTask newObject(Handle handle) {
1072                 return new WriteTask(handle);
1073             }
1074         };
1075 
1076         private static WriteTask newInstance(
1077                 AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1078             WriteTask task = RECYCLER.get();
1079             init(task, ctx, msg, promise);
1080             return task;
1081         }
1082 
1083         private WriteTask(Recycler.Handle handle) {
1084             super(handle);
1085         }
1086 
1087         @Override
1088         protected void recycle(Recycler.Handle handle) {
1089             RECYCLER.recycle(this, handle);
1090         }
1091     }
1092 
1093     static final class WriteAndFlushTask extends AbstractWriteTask {
1094 
1095         private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
1096             @Override
1097             protected WriteAndFlushTask newObject(Handle handle) {
1098                 return new WriteAndFlushTask(handle);
1099             }
1100         };
1101 
1102         private static WriteAndFlushTask newInstance(
1103                 AbstractChannelHandlerContext ctx, Object msg,  ChannelPromise promise) {
1104             WriteAndFlushTask task = RECYCLER.get();
1105             init(task, ctx, msg, promise);
1106             return task;
1107         }
1108 
1109         private WriteAndFlushTask(Recycler.Handle handle) {
1110             super(handle);
1111         }
1112 
1113         @Override
1114         public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1115             super.write(ctx, msg, promise);
1116             ctx.invokeFlush();
1117         }
1118 
1119         @Override
1120         protected void recycle(Recycler.Handle handle) {
1121             RECYCLER.recycle(this, handle);
1122         }
1123     }
1124 }