View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.DefaultAttributeMap;
22  import io.netty.util.Recycler;
23  import io.netty.util.ReferenceCountUtil;
24  import io.netty.util.ResourceLeakHint;
25  import io.netty.util.concurrent.EventExecutor;
26  import io.netty.util.concurrent.OrderedEventExecutor;
27  import io.netty.util.internal.PromiseNotificationUtil;
28  import io.netty.util.internal.ThrowableUtil;
29  import io.netty.util.internal.ObjectUtil;
30  import io.netty.util.internal.StringUtil;
31  import io.netty.util.internal.SystemPropertyUtil;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.net.SocketAddress;
36  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
37  
38  abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
39          implements ChannelHandlerContext, ResourceLeakHint {
40  
41      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
42      volatile AbstractChannelHandlerContext next;
43      volatile AbstractChannelHandlerContext prev;
44  
45      private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
46              AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
47  
48      /**
49       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
50       */
51      private static final int ADD_PENDING = 1;
52      /**
53       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
54       */
55      private static final int ADD_COMPLETE = 2;
56      /**
57       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
58       */
59      private static final int REMOVE_COMPLETE = 3;
60      /**
61       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
62       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
63       */
64      private static final int INIT = 0;
65  
66      private final boolean inbound;
67      private final boolean outbound;
68      private final DefaultChannelPipeline pipeline;
69      private final String name;
70      private final boolean ordered;
71  
72      // Will be set to null if no child executor should be used, otherwise it will be set to the
73      // child executor.
74      final EventExecutor executor;
75      private ChannelFuture succeededFuture;
76  
77      // Lazily instantiated tasks used to trigger events to a handler with different executor.
78      // There is no need to make this volatile as at worse it will just create a few more instances then needed.
79      private Tasks invokeTasks;
80  
81      private volatile int handlerState = INIT;
82  
83      AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
84                                    boolean inbound, boolean outbound) {
85          this.name = ObjectUtil.checkNotNull(name, "name");
86          this.pipeline = pipeline;
87          this.executor = executor;
88          this.inbound = inbound;
89          this.outbound = outbound;
90          // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
91          ordered = executor == null || executor instanceof OrderedEventExecutor;
92      }
93  
94      @Override
95      public Channel channel() {
96          return pipeline.channel();
97      }
98  
99      @Override
100     public ChannelPipeline pipeline() {
101         return pipeline;
102     }
103 
104     @Override
105     public ByteBufAllocator alloc() {
106         return channel().config().getAllocator();
107     }
108 
109     @Override
110     public EventExecutor executor() {
111         if (executor == null) {
112             return channel().eventLoop();
113         } else {
114             return executor;
115         }
116     }
117 
118     @Override
119     public String name() {
120         return name;
121     }
122 
123     @Override
124     public ChannelHandlerContext fireChannelRegistered() {
125         invokeChannelRegistered(findContextInbound());
126         return this;
127     }
128 
129     static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
130         EventExecutor executor = next.executor();
131         if (executor.inEventLoop()) {
132             next.invokeChannelRegistered();
133         } else {
134             executor.execute(new Runnable() {
135                 @Override
136                 public void run() {
137                     next.invokeChannelRegistered();
138                 }
139             });
140         }
141     }
142 
143     private void invokeChannelRegistered() {
144         if (invokeHandler()) {
145             try {
146                 ((ChannelInboundHandler) handler()).channelRegistered(this);
147             } catch (Throwable t) {
148                 notifyHandlerException(t);
149             }
150         } else {
151             fireChannelRegistered();
152         }
153     }
154 
155     @Override
156     public ChannelHandlerContext fireChannelUnregistered() {
157         invokeChannelUnregistered(findContextInbound());
158         return this;
159     }
160 
161     static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
162         EventExecutor executor = next.executor();
163         if (executor.inEventLoop()) {
164             next.invokeChannelUnregistered();
165         } else {
166             executor.execute(new Runnable() {
167                 @Override
168                 public void run() {
169                     next.invokeChannelUnregistered();
170                 }
171             });
172         }
173     }
174 
175     private void invokeChannelUnregistered() {
176         if (invokeHandler()) {
177             try {
178                 ((ChannelInboundHandler) handler()).channelUnregistered(this);
179             } catch (Throwable t) {
180                 notifyHandlerException(t);
181             }
182         } else {
183             fireChannelUnregistered();
184         }
185     }
186 
187     @Override
188     public ChannelHandlerContext fireChannelActive() {
189         invokeChannelActive(findContextInbound());
190         return this;
191     }
192 
193     static void invokeChannelActive(final AbstractChannelHandlerContext next) {
194         EventExecutor executor = next.executor();
195         if (executor.inEventLoop()) {
196             next.invokeChannelActive();
197         } else {
198             executor.execute(new Runnable() {
199                 @Override
200                 public void run() {
201                     next.invokeChannelActive();
202                 }
203             });
204         }
205     }
206 
207     private void invokeChannelActive() {
208         if (invokeHandler()) {
209             try {
210                 ((ChannelInboundHandler) handler()).channelActive(this);
211             } catch (Throwable t) {
212                 notifyHandlerException(t);
213             }
214         } else {
215             fireChannelActive();
216         }
217     }
218 
219     @Override
220     public ChannelHandlerContext fireChannelInactive() {
221         invokeChannelInactive(findContextInbound());
222         return this;
223     }
224 
225     static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
226         EventExecutor executor = next.executor();
227         if (executor.inEventLoop()) {
228             next.invokeChannelInactive();
229         } else {
230             executor.execute(new Runnable() {
231                 @Override
232                 public void run() {
233                     next.invokeChannelInactive();
234                 }
235             });
236         }
237     }
238 
239     private void invokeChannelInactive() {
240         if (invokeHandler()) {
241             try {
242                 ((ChannelInboundHandler) handler()).channelInactive(this);
243             } catch (Throwable t) {
244                 notifyHandlerException(t);
245             }
246         } else {
247             fireChannelInactive();
248         }
249     }
250 
251     @Override
252     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
253         invokeExceptionCaught(next, cause);
254         return this;
255     }
256 
257     static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
258         ObjectUtil.checkNotNull(cause, "cause");
259         EventExecutor executor = next.executor();
260         if (executor.inEventLoop()) {
261             next.invokeExceptionCaught(cause);
262         } else {
263             try {
264                 executor.execute(new Runnable() {
265                     @Override
266                     public void run() {
267                         next.invokeExceptionCaught(cause);
268                     }
269                 });
270             } catch (Throwable t) {
271                 if (logger.isWarnEnabled()) {
272                     logger.warn("Failed to submit an exceptionCaught() event.", t);
273                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
274                 }
275             }
276         }
277     }
278 
279     private void invokeExceptionCaught(final Throwable cause) {
280         if (invokeHandler()) {
281             try {
282                 handler().exceptionCaught(this, cause);
283             } catch (Throwable error) {
284                 if (logger.isDebugEnabled()) {
285                     logger.debug(
286                         "An exception {}" +
287                         "was thrown by a user handler's exceptionCaught() " +
288                         "method while handling the following exception:",
289                         ThrowableUtil.stackTraceToString(error), cause);
290                 } else if (logger.isWarnEnabled()) {
291                     logger.warn(
292                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
293                         "was thrown by a user handler's exceptionCaught() " +
294                         "method while handling the following exception:", error, cause);
295                 }
296             }
297         } else {
298             fireExceptionCaught(cause);
299         }
300     }
301 
302     @Override
303     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
304         invokeUserEventTriggered(findContextInbound(), event);
305         return this;
306     }
307 
308     static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
309         ObjectUtil.checkNotNull(event, "event");
310         EventExecutor executor = next.executor();
311         if (executor.inEventLoop()) {
312             next.invokeUserEventTriggered(event);
313         } else {
314             executor.execute(new Runnable() {
315                 @Override
316                 public void run() {
317                     next.invokeUserEventTriggered(event);
318                 }
319             });
320         }
321     }
322 
323     private void invokeUserEventTriggered(Object event) {
324         if (invokeHandler()) {
325             try {
326                 ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
327             } catch (Throwable t) {
328                 notifyHandlerException(t);
329             }
330         } else {
331             fireUserEventTriggered(event);
332         }
333     }
334 
335     @Override
336     public ChannelHandlerContext fireChannelRead(final Object msg) {
337         invokeChannelRead(findContextInbound(), msg);
338         return this;
339     }
340 
341     static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
342         final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
343         EventExecutor executor = next.executor();
344         if (executor.inEventLoop()) {
345             next.invokeChannelRead(m);
346         } else {
347             executor.execute(new Runnable() {
348                 @Override
349                 public void run() {
350                     next.invokeChannelRead(m);
351                 }
352             });
353         }
354     }
355 
356     private void invokeChannelRead(Object msg) {
357         if (invokeHandler()) {
358             try {
359                 ((ChannelInboundHandler) handler()).channelRead(this, msg);
360             } catch (Throwable t) {
361                 notifyHandlerException(t);
362             }
363         } else {
364             fireChannelRead(msg);
365         }
366     }
367 
368     @Override
369     public ChannelHandlerContext fireChannelReadComplete() {
370         invokeChannelReadComplete(findContextInbound());
371         return this;
372     }
373 
374     static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
375         EventExecutor executor = next.executor();
376         if (executor.inEventLoop()) {
377             next.invokeChannelReadComplete();
378         } else {
379             Tasks tasks = next.invokeTasks;
380             if (tasks == null) {
381                 next.invokeTasks = tasks = new Tasks(next);
382             }
383             executor.execute(tasks.invokeChannelReadCompleteTask);
384         }
385     }
386 
387     private void invokeChannelReadComplete() {
388         if (invokeHandler()) {
389             try {
390                 ((ChannelInboundHandler) handler()).channelReadComplete(this);
391             } catch (Throwable t) {
392                 notifyHandlerException(t);
393             }
394         } else {
395             fireChannelReadComplete();
396         }
397     }
398 
399     @Override
400     public ChannelHandlerContext fireChannelWritabilityChanged() {
401         invokeChannelWritabilityChanged(findContextInbound());
402         return this;
403     }
404 
405     static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
406         EventExecutor executor = next.executor();
407         if (executor.inEventLoop()) {
408             next.invokeChannelWritabilityChanged();
409         } else {
410             Tasks tasks = next.invokeTasks;
411             if (tasks == null) {
412                 next.invokeTasks = tasks = new Tasks(next);
413             }
414             executor.execute(tasks.invokeChannelWritableStateChangedTask);
415         }
416     }
417 
418     private void invokeChannelWritabilityChanged() {
419         if (invokeHandler()) {
420             try {
421                 ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
422             } catch (Throwable t) {
423                 notifyHandlerException(t);
424             }
425         } else {
426             fireChannelWritabilityChanged();
427         }
428     }
429 
430     @Override
431     public ChannelFuture bind(SocketAddress localAddress) {
432         return bind(localAddress, newPromise());
433     }
434 
435     @Override
436     public ChannelFuture connect(SocketAddress remoteAddress) {
437         return connect(remoteAddress, newPromise());
438     }
439 
440     @Override
441     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
442         return connect(remoteAddress, localAddress, newPromise());
443     }
444 
445     @Override
446     public ChannelFuture disconnect() {
447         return disconnect(newPromise());
448     }
449 
450     @Override
451     public ChannelFuture close() {
452         return close(newPromise());
453     }
454 
455     @Override
456     public ChannelFuture deregister() {
457         return deregister(newPromise());
458     }
459 
460     @Override
461     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
462         if (localAddress == null) {
463             throw new NullPointerException("localAddress");
464         }
465         if (isNotValidPromise(promise, false)) {
466             // cancelled
467             return promise;
468         }
469 
470         final AbstractChannelHandlerContext next = findContextOutbound();
471         EventExecutor executor = next.executor();
472         if (executor.inEventLoop()) {
473             next.invokeBind(localAddress, promise);
474         } else {
475             safeExecute(executor, new Runnable() {
476                 @Override
477                 public void run() {
478                     next.invokeBind(localAddress, promise);
479                 }
480             }, promise, null);
481         }
482         return promise;
483     }
484 
485     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
486         if (invokeHandler()) {
487             try {
488                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
489             } catch (Throwable t) {
490                 notifyOutboundHandlerException(t, promise);
491             }
492         } else {
493             bind(localAddress, promise);
494         }
495     }
496 
497     @Override
498     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
499         return connect(remoteAddress, null, promise);
500     }
501 
502     @Override
503     public ChannelFuture connect(
504             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
505 
506         if (remoteAddress == null) {
507             throw new NullPointerException("remoteAddress");
508         }
509         if (isNotValidPromise(promise, false)) {
510             // cancelled
511             return promise;
512         }
513 
514         final AbstractChannelHandlerContext next = findContextOutbound();
515         EventExecutor executor = next.executor();
516         if (executor.inEventLoop()) {
517             next.invokeConnect(remoteAddress, localAddress, promise);
518         } else {
519             safeExecute(executor, new Runnable() {
520                 @Override
521                 public void run() {
522                     next.invokeConnect(remoteAddress, localAddress, promise);
523                 }
524             }, promise, null);
525         }
526         return promise;
527     }
528 
529     private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
530         if (invokeHandler()) {
531             try {
532                 ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
533             } catch (Throwable t) {
534                 notifyOutboundHandlerException(t, promise);
535             }
536         } else {
537             connect(remoteAddress, localAddress, promise);
538         }
539     }
540 
541     @Override
542     public ChannelFuture disconnect(final ChannelPromise promise) {
543         if (isNotValidPromise(promise, false)) {
544             // cancelled
545             return promise;
546         }
547 
548         final AbstractChannelHandlerContext next = findContextOutbound();
549         EventExecutor executor = next.executor();
550         if (executor.inEventLoop()) {
551             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
552             // So far, UDP/IP is the only transport that has such behavior.
553             if (!channel().metadata().hasDisconnect()) {
554                 next.invokeClose(promise);
555             } else {
556                 next.invokeDisconnect(promise);
557             }
558         } else {
559             safeExecute(executor, new Runnable() {
560                 @Override
561                 public void run() {
562                     if (!channel().metadata().hasDisconnect()) {
563                         next.invokeClose(promise);
564                     } else {
565                         next.invokeDisconnect(promise);
566                     }
567                 }
568             }, promise, null);
569         }
570         return promise;
571     }
572 
573     private void invokeDisconnect(ChannelPromise promise) {
574         if (invokeHandler()) {
575             try {
576                 ((ChannelOutboundHandler) handler()).disconnect(this, promise);
577             } catch (Throwable t) {
578                 notifyOutboundHandlerException(t, promise);
579             }
580         } else {
581             disconnect(promise);
582         }
583     }
584 
585     @Override
586     public ChannelFuture close(final ChannelPromise promise) {
587         if (isNotValidPromise(promise, false)) {
588             // cancelled
589             return promise;
590         }
591 
592         final AbstractChannelHandlerContext next = findContextOutbound();
593         EventExecutor executor = next.executor();
594         if (executor.inEventLoop()) {
595             next.invokeClose(promise);
596         } else {
597             safeExecute(executor, new Runnable() {
598                 @Override
599                 public void run() {
600                     next.invokeClose(promise);
601                 }
602             }, promise, null);
603         }
604 
605         return promise;
606     }
607 
608     private void invokeClose(ChannelPromise promise) {
609         if (invokeHandler()) {
610             try {
611                 ((ChannelOutboundHandler) handler()).close(this, promise);
612             } catch (Throwable t) {
613                 notifyOutboundHandlerException(t, promise);
614             }
615         } else {
616             close(promise);
617         }
618     }
619 
620     @Override
621     public ChannelFuture deregister(final ChannelPromise promise) {
622         if (isNotValidPromise(promise, false)) {
623             // cancelled
624             return promise;
625         }
626 
627         final AbstractChannelHandlerContext next = findContextOutbound();
628         EventExecutor executor = next.executor();
629         if (executor.inEventLoop()) {
630             next.invokeDeregister(promise);
631         } else {
632             safeExecute(executor, new Runnable() {
633                 @Override
634                 public void run() {
635                     next.invokeDeregister(promise);
636                 }
637             }, promise, null);
638         }
639 
640         return promise;
641     }
642 
643     private void invokeDeregister(ChannelPromise promise) {
644         if (invokeHandler()) {
645             try {
646                 ((ChannelOutboundHandler) handler()).deregister(this, promise);
647             } catch (Throwable t) {
648                 notifyOutboundHandlerException(t, promise);
649             }
650         } else {
651             deregister(promise);
652         }
653     }
654 
655     @Override
656     public ChannelHandlerContext read() {
657         final AbstractChannelHandlerContext next = findContextOutbound();
658         EventExecutor executor = next.executor();
659         if (executor.inEventLoop()) {
660             next.invokeRead();
661         } else {
662             Tasks tasks = next.invokeTasks;
663             if (tasks == null) {
664                 next.invokeTasks = tasks = new Tasks(next);
665             }
666             executor.execute(tasks.invokeReadTask);
667         }
668 
669         return this;
670     }
671 
672     private void invokeRead() {
673         if (invokeHandler()) {
674             try {
675                 ((ChannelOutboundHandler) handler()).read(this);
676             } catch (Throwable t) {
677                 notifyHandlerException(t);
678             }
679         } else {
680             read();
681         }
682     }
683 
684     @Override
685     public ChannelFuture write(Object msg) {
686         return write(msg, newPromise());
687     }
688 
689     @Override
690     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
691         write(msg, false, promise);
692 
693         return promise;
694     }
695 
696     private void invokeWrite(Object msg, ChannelPromise promise) {
697         if (invokeHandler()) {
698             invokeWrite0(msg, promise);
699         } else {
700             write(msg, promise);
701         }
702     }
703 
704     private void invokeWrite0(Object msg, ChannelPromise promise) {
705         try {
706             ((ChannelOutboundHandler) handler()).write(this, msg, promise);
707         } catch (Throwable t) {
708             notifyOutboundHandlerException(t, promise);
709         }
710     }
711 
712     @Override
713     public ChannelHandlerContext flush() {
714         final AbstractChannelHandlerContext next = findContextOutbound();
715         EventExecutor executor = next.executor();
716         if (executor.inEventLoop()) {
717             next.invokeFlush();
718         } else {
719             Tasks tasks = next.invokeTasks;
720             if (tasks == null) {
721                 next.invokeTasks = tasks = new Tasks(next);
722             }
723             safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null);
724         }
725 
726         return this;
727     }
728 
729     private void invokeFlush() {
730         if (invokeHandler()) {
731             invokeFlush0();
732         } else {
733             flush();
734         }
735     }
736 
737     private void invokeFlush0() {
738         try {
739             ((ChannelOutboundHandler) handler()).flush(this);
740         } catch (Throwable t) {
741             notifyHandlerException(t);
742         }
743     }
744 
745     @Override
746     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
747         write(msg, true, promise);
748         return promise;
749     }
750 
751     private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
752         if (invokeHandler()) {
753             invokeWrite0(msg, promise);
754             invokeFlush0();
755         } else {
756             writeAndFlush(msg, promise);
757         }
758     }
759 
760     private void write(Object msg, boolean flush, ChannelPromise promise) {
761         ObjectUtil.checkNotNull(msg, "msg");
762         try {
763             if (isNotValidPromise(promise, true)) {
764                 ReferenceCountUtil.release(msg);
765                 // cancelled
766                 return;
767             }
768         } catch (RuntimeException e) {
769             ReferenceCountUtil.release(msg);
770             throw e;
771         }
772 
773         AbstractChannelHandlerContext next = findContextOutbound();
774         final Object m = pipeline.touch(msg, next);
775         EventExecutor executor = next.executor();
776         if (executor.inEventLoop()) {
777             if (flush) {
778                 next.invokeWriteAndFlush(m, promise);
779             } else {
780                 next.invokeWrite(m, promise);
781             }
782         } else {
783             final AbstractWriteTask task;
784             if (flush) {
785                 task = WriteAndFlushTask.newInstance(next, m, promise);
786             }  else {
787                 task = WriteTask.newInstance(next, m, promise);
788             }
789             if (!safeExecute(executor, task, promise, m)) {
790                 // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
791                 // and put it back in the Recycler for re-use later.
792                 //
793                 // See https://github.com/netty/netty/issues/8343.
794                 task.cancel();
795             }
796         }
797     }
798 
799     @Override
800     public ChannelFuture writeAndFlush(Object msg) {
801         return writeAndFlush(msg, newPromise());
802     }
803 
804     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
805         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
806         // false.
807         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
808     }
809 
810     private void notifyHandlerException(Throwable cause) {
811         if (inExceptionCaught(cause)) {
812             if (logger.isWarnEnabled()) {
813                 logger.warn(
814                         "An exception was thrown by a user handler " +
815                                 "while handling an exceptionCaught event", cause);
816             }
817             return;
818         }
819 
820         invokeExceptionCaught(cause);
821     }
822 
823     private static boolean inExceptionCaught(Throwable cause) {
824         do {
825             StackTraceElement[] trace = cause.getStackTrace();
826             if (trace != null) {
827                 for (StackTraceElement t : trace) {
828                     if (t == null) {
829                         break;
830                     }
831                     if ("exceptionCaught".equals(t.getMethodName())) {
832                         return true;
833                     }
834                 }
835             }
836 
837             cause = cause.getCause();
838         } while (cause != null);
839 
840         return false;
841     }
842 
843     @Override
844     public ChannelPromise newPromise() {
845         return new DefaultChannelPromise(channel(), executor());
846     }
847 
848     @Override
849     public ChannelProgressivePromise newProgressivePromise() {
850         return new DefaultChannelProgressivePromise(channel(), executor());
851     }
852 
853     @Override
854     public ChannelFuture newSucceededFuture() {
855         ChannelFuture succeededFuture = this.succeededFuture;
856         if (succeededFuture == null) {
857             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
858         }
859         return succeededFuture;
860     }
861 
862     @Override
863     public ChannelFuture newFailedFuture(Throwable cause) {
864         return new FailedChannelFuture(channel(), executor(), cause);
865     }
866 
867     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
868         if (promise == null) {
869             throw new NullPointerException("promise");
870         }
871 
872         if (promise.isDone()) {
873             // Check if the promise was cancelled and if so signal that the processing of the operation
874             // should not be performed.
875             //
876             // See https://github.com/netty/netty/issues/2349
877             if (promise.isCancelled()) {
878                 return true;
879             }
880             throw new IllegalArgumentException("promise already done: " + promise);
881         }
882 
883         if (promise.channel() != channel()) {
884             throw new IllegalArgumentException(String.format(
885                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
886         }
887 
888         if (promise.getClass() == DefaultChannelPromise.class) {
889             return false;
890         }
891 
892         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
893             throw new IllegalArgumentException(
894                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
895         }
896 
897         if (promise instanceof AbstractChannel.CloseFuture) {
898             throw new IllegalArgumentException(
899                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
900         }
901         return false;
902     }
903 
904     private AbstractChannelHandlerContext findContextInbound() {
905         AbstractChannelHandlerContext ctx = this;
906         do {
907             ctx = ctx.next;
908         } while (!ctx.inbound);
909         return ctx;
910     }
911 
912     private AbstractChannelHandlerContext findContextOutbound() {
913         AbstractChannelHandlerContext ctx = this;
914         do {
915             ctx = ctx.prev;
916         } while (!ctx.outbound);
917         return ctx;
918     }
919 
920     @Override
921     public ChannelPromise voidPromise() {
922         return channel().voidPromise();
923     }
924 
925     final void setRemoved() {
926         handlerState = REMOVE_COMPLETE;
927     }
928 
929     final boolean setAddComplete() {
930         for (;;) {
931             int oldState = handlerState;
932             if (oldState == REMOVE_COMPLETE) {
933                 return false;
934             }
935             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
936             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
937             // exposing ordering guarantees.
938             if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
939                 return true;
940             }
941         }
942     }
943 
944     final void setAddPending() {
945         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
946         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
947     }
948 
949     final void callHandlerAdded() throws Exception {
950         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
951         // any pipeline events ctx.handler() will miss them because the state will not allow it.
952         if (setAddComplete()) {
953             handler().handlerAdded(this);
954         }
955     }
956 
957     final void callHandlerRemoved() throws Exception {
958         try {
959             // Only call handlerRemoved(...) if we called handlerAdded(...) before.
960             if (handlerState == ADD_COMPLETE) {
961                 handler().handlerRemoved(this);
962             }
963         } finally {
964             // Mark the handler as removed in any case.
965             setRemoved();
966         }
967     }
968 
969     /**
970      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
971      * yet. If not return {@code false} and if called or could not detect return {@code true}.
972      *
973      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
974      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
975      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
976      */
977     private boolean invokeHandler() {
978         // Store in local variable to reduce volatile reads.
979         int handlerState = this.handlerState;
980         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
981     }
982 
983     @Override
984     public boolean isRemoved() {
985         return handlerState == REMOVE_COMPLETE;
986     }
987 
988     @Override
989     public <T> Attribute<T> attr(AttributeKey<T> key) {
990         return channel().attr(key);
991     }
992 
993     @Override
994     public <T> boolean hasAttr(AttributeKey<T> key) {
995         return channel().hasAttr(key);
996     }
997 
998     private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
999         try {
1000             executor.execute(runnable);
1001             return true;
1002         } catch (Throwable cause) {
1003             try {
1004                 promise.setFailure(cause);
1005             } finally {
1006                 if (msg != null) {
1007                     ReferenceCountUtil.release(msg);
1008                 }
1009             }
1010             return false;
1011         }
1012     }
1013 
1014     @Override
1015     public String toHintString() {
1016         return '\'' + name + "' will handle the message from this point.";
1017     }
1018 
1019     @Override
1020     public String toString() {
1021         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1022     }
1023 
1024     abstract static class AbstractWriteTask implements Runnable {
1025 
1026         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1027                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1028 
1029         // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
1030         private static final int WRITE_TASK_OVERHEAD =
1031                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
1032 
1033         private final Recycler.Handle<AbstractWriteTask> handle;
1034         private AbstractChannelHandlerContext ctx;
1035         private Object msg;
1036         private ChannelPromise promise;
1037         private int size;
1038 
1039         @SuppressWarnings("unchecked")
1040         private AbstractWriteTask(Recycler.Handle<? extends AbstractWriteTask> handle) {
1041             this.handle = (Recycler.Handle<AbstractWriteTask>) handle;
1042         }
1043 
1044         protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
1045                                    Object msg, ChannelPromise promise) {
1046             task.ctx = ctx;
1047             task.msg = msg;
1048             task.promise = promise;
1049 
1050             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1051                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1052                 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1053             } else {
1054                 task.size = 0;
1055             }
1056         }
1057 
1058         @Override
1059         public final void run() {
1060             try {
1061                 decrementPendingOutboundBytes();
1062                 write(ctx, msg, promise);
1063             } finally {
1064                 recycle();
1065             }
1066         }
1067 
1068         void cancel() {
1069             try {
1070                 decrementPendingOutboundBytes();
1071             } finally {
1072                 recycle();
1073             }
1074         }
1075 
1076         private void decrementPendingOutboundBytes() {
1077             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1078                 ctx.pipeline.decrementPendingOutboundBytes(size);
1079             }
1080         }
1081 
1082         private void recycle() {
1083             // Set to null so the GC can collect them directly
1084             ctx = null;
1085             msg = null;
1086             promise = null;
1087             handle.recycle(this);
1088         }
1089 
1090         protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1091             ctx.invokeWrite(msg, promise);
1092         }
1093     }
1094 
1095     static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
1096 
1097         private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
1098             @Override
1099             protected WriteTask newObject(Handle<WriteTask> handle) {
1100                 return new WriteTask(handle);
1101             }
1102         };
1103 
1104         static WriteTask newInstance(
1105                 AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1106             WriteTask task = RECYCLER.get();
1107             init(task, ctx, msg, promise);
1108             return task;
1109         }
1110 
1111         private WriteTask(Recycler.Handle<WriteTask> handle) {
1112             super(handle);
1113         }
1114     }
1115 
1116     static final class WriteAndFlushTask extends AbstractWriteTask {
1117 
1118         private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
1119             @Override
1120             protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
1121                 return new WriteAndFlushTask(handle);
1122             }
1123         };
1124 
1125         static WriteAndFlushTask newInstance(
1126                 AbstractChannelHandlerContext ctx, Object msg,  ChannelPromise promise) {
1127             WriteAndFlushTask task = RECYCLER.get();
1128             init(task, ctx, msg, promise);
1129             return task;
1130         }
1131 
1132         private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
1133             super(handle);
1134         }
1135 
1136         @Override
1137         public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1138             super.write(ctx, msg, promise);
1139             ctx.invokeFlush();
1140         }
1141     }
1142 
1143     private static final class Tasks {
1144         private final AbstractChannelHandlerContext next;
1145         private final Runnable invokeChannelReadCompleteTask = new Runnable() {
1146             @Override
1147             public void run() {
1148                 next.invokeChannelReadComplete();
1149             }
1150         };
1151         private final Runnable invokeReadTask = new Runnable() {
1152             @Override
1153             public void run() {
1154                 next.invokeRead();
1155             }
1156         };
1157         private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
1158             @Override
1159             public void run() {
1160                 next.invokeChannelWritabilityChanged();
1161             }
1162         };
1163         private final Runnable invokeFlushTask = new Runnable() {
1164             @Override
1165             public void run() {
1166                 next.invokeFlush();
1167             }
1168         };
1169 
1170         Tasks(AbstractChannelHandlerContext next) {
1171             this.next = next;
1172         }
1173     }
1174 }