View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.ReferenceCountUtil;
22  import io.netty.util.ResourceLeakHint;
23  import io.netty.util.concurrent.AbstractEventExecutor;
24  import io.netty.util.concurrent.EventExecutor;
25  import io.netty.util.concurrent.OrderedEventExecutor;
26  import io.netty.util.internal.ObjectPool;
27  import io.netty.util.internal.ObjectPool.Handle;
28  import io.netty.util.internal.ObjectPool.ObjectCreator;
29  import io.netty.util.internal.PromiseNotificationUtil;
30  import io.netty.util.internal.ThrowableUtil;
31  import io.netty.util.internal.ObjectUtil;
32  import io.netty.util.internal.StringUtil;
33  import io.netty.util.internal.SystemPropertyUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  import io.netty.util.internal.logging.InternalLoggerFactory;
36  
37  import java.net.SocketAddress;
38  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39  
40  import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
41  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
42  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
43  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
44  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
45  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
46  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
47  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
48  import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
49  import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
50  import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
51  import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
52  import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
53  import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
54  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
55  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
56  import static io.netty.channel.ChannelHandlerMask.MASK_READ;
57  import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
58  import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
59  import static io.netty.channel.ChannelHandlerMask.mask;
60  
61  abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
62  
63      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
64      volatile AbstractChannelHandlerContext next;
65      volatile AbstractChannelHandlerContext prev;
66  
67      private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
68              AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
69  
70      /**
71       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
72       */
73      private static final int ADD_PENDING = 1;
74      /**
75       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
76       */
77      private static final int ADD_COMPLETE = 2;
78      /**
79       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
80       */
81      private static final int REMOVE_COMPLETE = 3;
82      /**
83       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
84       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
85       */
86      private static final int INIT = 0;
87  
88      private final DefaultChannelPipeline pipeline;
89      private final String name;
90      private final boolean ordered;
91      private final int executionMask;
92  
93      // Will be set to null if no child executor should be used, otherwise it will be set to the
94      // child executor.
95      final EventExecutor executor;
96      private ChannelFuture succeededFuture;
97  
98      // Lazily instantiated tasks used to trigger events to a handler with different executor.
99      // There is no need to make this volatile as at worse it will just create a few more instances then needed.
100     private Tasks invokeTasks;
101 
102     private volatile int handlerState = INIT;
103 
104     AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
105                                   String name, Class<? extends ChannelHandler> handlerClass) {
106         this.name = ObjectUtil.checkNotNull(name, "name");
107         this.pipeline = pipeline;
108         this.executor = executor;
109         this.executionMask = mask(handlerClass);
110         // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
111         ordered = executor == null || executor instanceof OrderedEventExecutor;
112     }
113 
114     @Override
115     public Channel channel() {
116         return pipeline.channel();
117     }
118 
119     @Override
120     public ChannelPipeline pipeline() {
121         return pipeline;
122     }
123 
124     @Override
125     public ByteBufAllocator alloc() {
126         return channel().config().getAllocator();
127     }
128 
129     @Override
130     public EventExecutor executor() {
131         if (executor == null) {
132             return channel().eventLoop();
133         } else {
134             return executor;
135         }
136     }
137 
138     @Override
139     public String name() {
140         return name;
141     }
142 
143     @Override
144     public ChannelHandlerContext fireChannelRegistered() {
145         invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
146         return this;
147     }
148 
149     static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
150         EventExecutor executor = next.executor();
151         if (executor.inEventLoop()) {
152             next.invokeChannelRegistered();
153         } else {
154             executor.execute(new Runnable() {
155                 @Override
156                 public void run() {
157                     next.invokeChannelRegistered();
158                 }
159             });
160         }
161     }
162 
163     private void invokeChannelRegistered() {
164         if (invokeHandler()) {
165             try {
166                 ((ChannelInboundHandler) handler()).channelRegistered(this);
167             } catch (Throwable t) {
168                 invokeExceptionCaught(t);
169             }
170         } else {
171             fireChannelRegistered();
172         }
173     }
174 
175     @Override
176     public ChannelHandlerContext fireChannelUnregistered() {
177         invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED));
178         return this;
179     }
180 
181     static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
182         EventExecutor executor = next.executor();
183         if (executor.inEventLoop()) {
184             next.invokeChannelUnregistered();
185         } else {
186             executor.execute(new Runnable() {
187                 @Override
188                 public void run() {
189                     next.invokeChannelUnregistered();
190                 }
191             });
192         }
193     }
194 
195     private void invokeChannelUnregistered() {
196         if (invokeHandler()) {
197             try {
198                 ((ChannelInboundHandler) handler()).channelUnregistered(this);
199             } catch (Throwable t) {
200                 invokeExceptionCaught(t);
201             }
202         } else {
203             fireChannelUnregistered();
204         }
205     }
206 
207     @Override
208     public ChannelHandlerContext fireChannelActive() {
209         invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
210         return this;
211     }
212 
213     static void invokeChannelActive(final AbstractChannelHandlerContext next) {
214         EventExecutor executor = next.executor();
215         if (executor.inEventLoop()) {
216             next.invokeChannelActive();
217         } else {
218             executor.execute(new Runnable() {
219                 @Override
220                 public void run() {
221                     next.invokeChannelActive();
222                 }
223             });
224         }
225     }
226 
227     private void invokeChannelActive() {
228         if (invokeHandler()) {
229             try {
230                 ((ChannelInboundHandler) handler()).channelActive(this);
231             } catch (Throwable t) {
232                 invokeExceptionCaught(t);
233             }
234         } else {
235             fireChannelActive();
236         }
237     }
238 
239     @Override
240     public ChannelHandlerContext fireChannelInactive() {
241         invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
242         return this;
243     }
244 
245     static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
246         EventExecutor executor = next.executor();
247         if (executor.inEventLoop()) {
248             next.invokeChannelInactive();
249         } else {
250             executor.execute(new Runnable() {
251                 @Override
252                 public void run() {
253                     next.invokeChannelInactive();
254                 }
255             });
256         }
257     }
258 
259     private void invokeChannelInactive() {
260         if (invokeHandler()) {
261             try {
262                 ((ChannelInboundHandler) handler()).channelInactive(this);
263             } catch (Throwable t) {
264                 invokeExceptionCaught(t);
265             }
266         } else {
267             fireChannelInactive();
268         }
269     }
270 
271     @Override
272     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
273         invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
274         return this;
275     }
276 
277     static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
278         ObjectUtil.checkNotNull(cause, "cause");
279         EventExecutor executor = next.executor();
280         if (executor.inEventLoop()) {
281             next.invokeExceptionCaught(cause);
282         } else {
283             try {
284                 executor.execute(new Runnable() {
285                     @Override
286                     public void run() {
287                         next.invokeExceptionCaught(cause);
288                     }
289                 });
290             } catch (Throwable t) {
291                 if (logger.isWarnEnabled()) {
292                     logger.warn("Failed to submit an exceptionCaught() event.", t);
293                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
294                 }
295             }
296         }
297     }
298 
299     private void invokeExceptionCaught(final Throwable cause) {
300         if (invokeHandler()) {
301             try {
302                 handler().exceptionCaught(this, cause);
303             } catch (Throwable error) {
304                 if (logger.isDebugEnabled()) {
305                     logger.debug(
306                         "An exception {}" +
307                         "was thrown by a user handler's exceptionCaught() " +
308                         "method while handling the following exception:",
309                         ThrowableUtil.stackTraceToString(error), cause);
310                 } else if (logger.isWarnEnabled()) {
311                     logger.warn(
312                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
313                         "was thrown by a user handler's exceptionCaught() " +
314                         "method while handling the following exception:", error, cause);
315                 }
316             }
317         } else {
318             fireExceptionCaught(cause);
319         }
320     }
321 
322     @Override
323     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
324         invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
325         return this;
326     }
327 
328     static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
329         ObjectUtil.checkNotNull(event, "event");
330         EventExecutor executor = next.executor();
331         if (executor.inEventLoop()) {
332             next.invokeUserEventTriggered(event);
333         } else {
334             executor.execute(new Runnable() {
335                 @Override
336                 public void run() {
337                     next.invokeUserEventTriggered(event);
338                 }
339             });
340         }
341     }
342 
343     private void invokeUserEventTriggered(Object event) {
344         if (invokeHandler()) {
345             try {
346                 ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
347             } catch (Throwable t) {
348                 invokeExceptionCaught(t);
349             }
350         } else {
351             fireUserEventTriggered(event);
352         }
353     }
354 
355     @Override
356     public ChannelHandlerContext fireChannelRead(final Object msg) {
357         invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
358         return this;
359     }
360 
361     static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
362         final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
363         EventExecutor executor = next.executor();
364         if (executor.inEventLoop()) {
365             next.invokeChannelRead(m);
366         } else {
367             executor.execute(new Runnable() {
368                 @Override
369                 public void run() {
370                     next.invokeChannelRead(m);
371                 }
372             });
373         }
374     }
375 
376     private void invokeChannelRead(Object msg) {
377         if (invokeHandler()) {
378             try {
379                 ((ChannelInboundHandler) handler()).channelRead(this, msg);
380             } catch (Throwable t) {
381                 invokeExceptionCaught(t);
382             }
383         } else {
384             fireChannelRead(msg);
385         }
386     }
387 
388     @Override
389     public ChannelHandlerContext fireChannelReadComplete() {
390         invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
391         return this;
392     }
393 
394     static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
395         EventExecutor executor = next.executor();
396         if (executor.inEventLoop()) {
397             next.invokeChannelReadComplete();
398         } else {
399             Tasks tasks = next.invokeTasks;
400             if (tasks == null) {
401                 next.invokeTasks = tasks = new Tasks(next);
402             }
403             executor.execute(tasks.invokeChannelReadCompleteTask);
404         }
405     }
406 
407     private void invokeChannelReadComplete() {
408         if (invokeHandler()) {
409             try {
410                 ((ChannelInboundHandler) handler()).channelReadComplete(this);
411             } catch (Throwable t) {
412                 invokeExceptionCaught(t);
413             }
414         } else {
415             fireChannelReadComplete();
416         }
417     }
418 
419     @Override
420     public ChannelHandlerContext fireChannelWritabilityChanged() {
421         invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED));
422         return this;
423     }
424 
425     static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
426         EventExecutor executor = next.executor();
427         if (executor.inEventLoop()) {
428             next.invokeChannelWritabilityChanged();
429         } else {
430             Tasks tasks = next.invokeTasks;
431             if (tasks == null) {
432                 next.invokeTasks = tasks = new Tasks(next);
433             }
434             executor.execute(tasks.invokeChannelWritableStateChangedTask);
435         }
436     }
437 
438     private void invokeChannelWritabilityChanged() {
439         if (invokeHandler()) {
440             try {
441                 ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
442             } catch (Throwable t) {
443                 invokeExceptionCaught(t);
444             }
445         } else {
446             fireChannelWritabilityChanged();
447         }
448     }
449 
450     @Override
451     public ChannelFuture bind(SocketAddress localAddress) {
452         return bind(localAddress, newPromise());
453     }
454 
455     @Override
456     public ChannelFuture connect(SocketAddress remoteAddress) {
457         return connect(remoteAddress, newPromise());
458     }
459 
460     @Override
461     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
462         return connect(remoteAddress, localAddress, newPromise());
463     }
464 
465     @Override
466     public ChannelFuture disconnect() {
467         return disconnect(newPromise());
468     }
469 
470     @Override
471     public ChannelFuture close() {
472         return close(newPromise());
473     }
474 
475     @Override
476     public ChannelFuture deregister() {
477         return deregister(newPromise());
478     }
479 
480     @Override
481     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
482         ObjectUtil.checkNotNull(localAddress, "localAddress");
483         if (isNotValidPromise(promise, false)) {
484             // cancelled
485             return promise;
486         }
487 
488         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
489         EventExecutor executor = next.executor();
490         if (executor.inEventLoop()) {
491             next.invokeBind(localAddress, promise);
492         } else {
493             safeExecute(executor, new Runnable() {
494                 @Override
495                 public void run() {
496                     next.invokeBind(localAddress, promise);
497                 }
498             }, promise, null, false);
499         }
500         return promise;
501     }
502 
503     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
504         if (invokeHandler()) {
505             try {
506                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
507             } catch (Throwable t) {
508                 notifyOutboundHandlerException(t, promise);
509             }
510         } else {
511             bind(localAddress, promise);
512         }
513     }
514 
515     @Override
516     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
517         return connect(remoteAddress, null, promise);
518     }
519 
520     @Override
521     public ChannelFuture connect(
522             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
523         ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
524 
525         if (isNotValidPromise(promise, false)) {
526             // cancelled
527             return promise;
528         }
529 
530         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
531         EventExecutor executor = next.executor();
532         if (executor.inEventLoop()) {
533             next.invokeConnect(remoteAddress, localAddress, promise);
534         } else {
535             safeExecute(executor, new Runnable() {
536                 @Override
537                 public void run() {
538                     next.invokeConnect(remoteAddress, localAddress, promise);
539                 }
540             }, promise, null, false);
541         }
542         return promise;
543     }
544 
545     private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
546         if (invokeHandler()) {
547             try {
548                 ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
549             } catch (Throwable t) {
550                 notifyOutboundHandlerException(t, promise);
551             }
552         } else {
553             connect(remoteAddress, localAddress, promise);
554         }
555     }
556 
557     @Override
558     public ChannelFuture disconnect(final ChannelPromise promise) {
559         if (!channel().metadata().hasDisconnect()) {
560             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
561             // So far, UDP/IP is the only transport that has such behavior.
562             return close(promise);
563         }
564         if (isNotValidPromise(promise, false)) {
565             // cancelled
566             return promise;
567         }
568 
569         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
570         EventExecutor executor = next.executor();
571         if (executor.inEventLoop()) {
572             next.invokeDisconnect(promise);
573         } else {
574             safeExecute(executor, new Runnable() {
575                 @Override
576                 public void run() {
577                     next.invokeDisconnect(promise);
578                 }
579             }, promise, null, false);
580         }
581         return promise;
582     }
583 
584     private void invokeDisconnect(ChannelPromise promise) {
585         if (invokeHandler()) {
586             try {
587                 ((ChannelOutboundHandler) handler()).disconnect(this, promise);
588             } catch (Throwable t) {
589                 notifyOutboundHandlerException(t, promise);
590             }
591         } else {
592             disconnect(promise);
593         }
594     }
595 
596     @Override
597     public ChannelFuture close(final ChannelPromise promise) {
598         if (isNotValidPromise(promise, false)) {
599             // cancelled
600             return promise;
601         }
602 
603         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
604         EventExecutor executor = next.executor();
605         if (executor.inEventLoop()) {
606             next.invokeClose(promise);
607         } else {
608             safeExecute(executor, new Runnable() {
609                 @Override
610                 public void run() {
611                     next.invokeClose(promise);
612                 }
613             }, promise, null, false);
614         }
615 
616         return promise;
617     }
618 
619     private void invokeClose(ChannelPromise promise) {
620         if (invokeHandler()) {
621             try {
622                 ((ChannelOutboundHandler) handler()).close(this, promise);
623             } catch (Throwable t) {
624                 notifyOutboundHandlerException(t, promise);
625             }
626         } else {
627             close(promise);
628         }
629     }
630 
631     @Override
632     public ChannelFuture deregister(final ChannelPromise promise) {
633         if (isNotValidPromise(promise, false)) {
634             // cancelled
635             return promise;
636         }
637 
638         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
639         EventExecutor executor = next.executor();
640         if (executor.inEventLoop()) {
641             next.invokeDeregister(promise);
642         } else {
643             safeExecute(executor, new Runnable() {
644                 @Override
645                 public void run() {
646                     next.invokeDeregister(promise);
647                 }
648             }, promise, null, false);
649         }
650 
651         return promise;
652     }
653 
654     private void invokeDeregister(ChannelPromise promise) {
655         if (invokeHandler()) {
656             try {
657                 ((ChannelOutboundHandler) handler()).deregister(this, promise);
658             } catch (Throwable t) {
659                 notifyOutboundHandlerException(t, promise);
660             }
661         } else {
662             deregister(promise);
663         }
664     }
665 
666     @Override
667     public ChannelHandlerContext read() {
668         final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
669         EventExecutor executor = next.executor();
670         if (executor.inEventLoop()) {
671             next.invokeRead();
672         } else {
673             Tasks tasks = next.invokeTasks;
674             if (tasks == null) {
675                 next.invokeTasks = tasks = new Tasks(next);
676             }
677             executor.execute(tasks.invokeReadTask);
678         }
679 
680         return this;
681     }
682 
683     private void invokeRead() {
684         if (invokeHandler()) {
685             try {
686                 ((ChannelOutboundHandler) handler()).read(this);
687             } catch (Throwable t) {
688                 invokeExceptionCaught(t);
689             }
690         } else {
691             read();
692         }
693     }
694 
695     @Override
696     public ChannelFuture write(Object msg) {
697         return write(msg, newPromise());
698     }
699 
700     @Override
701     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
702         write(msg, false, promise);
703 
704         return promise;
705     }
706 
707     void invokeWrite(Object msg, ChannelPromise promise) {
708         if (invokeHandler()) {
709             invokeWrite0(msg, promise);
710         } else {
711             write(msg, promise);
712         }
713     }
714 
715     private void invokeWrite0(Object msg, ChannelPromise promise) {
716         try {
717             ((ChannelOutboundHandler) handler()).write(this, msg, promise);
718         } catch (Throwable t) {
719             notifyOutboundHandlerException(t, promise);
720         }
721     }
722 
723     @Override
724     public ChannelHandlerContext flush() {
725         final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
726         EventExecutor executor = next.executor();
727         if (executor.inEventLoop()) {
728             next.invokeFlush();
729         } else {
730             Tasks tasks = next.invokeTasks;
731             if (tasks == null) {
732                 next.invokeTasks = tasks = new Tasks(next);
733             }
734             safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
735         }
736 
737         return this;
738     }
739 
740     private void invokeFlush() {
741         if (invokeHandler()) {
742             invokeFlush0();
743         } else {
744             flush();
745         }
746     }
747 
748     private void invokeFlush0() {
749         try {
750             ((ChannelOutboundHandler) handler()).flush(this);
751         } catch (Throwable t) {
752             invokeExceptionCaught(t);
753         }
754     }
755 
756     @Override
757     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
758         write(msg, true, promise);
759         return promise;
760     }
761 
762     void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
763         if (invokeHandler()) {
764             invokeWrite0(msg, promise);
765             invokeFlush0();
766         } else {
767             writeAndFlush(msg, promise);
768         }
769     }
770 
771     private void write(Object msg, boolean flush, ChannelPromise promise) {
772         ObjectUtil.checkNotNull(msg, "msg");
773         try {
774             if (isNotValidPromise(promise, true)) {
775                 ReferenceCountUtil.release(msg);
776                 // cancelled
777                 return;
778             }
779         } catch (RuntimeException e) {
780             ReferenceCountUtil.release(msg);
781             throw e;
782         }
783 
784         final AbstractChannelHandlerContext next = findContextOutbound(flush ?
785                 (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
786         final Object m = pipeline.touch(msg, next);
787         EventExecutor executor = next.executor();
788         if (executor.inEventLoop()) {
789             if (flush) {
790                 next.invokeWriteAndFlush(m, promise);
791             } else {
792                 next.invokeWrite(m, promise);
793             }
794         } else {
795             final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
796             if (!safeExecute(executor, task, promise, m, !flush)) {
797                 // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
798                 // and put it back in the Recycler for re-use later.
799                 //
800                 // See https://github.com/netty/netty/issues/8343.
801                 task.cancel();
802             }
803         }
804     }
805 
806     @Override
807     public ChannelFuture writeAndFlush(Object msg) {
808         return writeAndFlush(msg, newPromise());
809     }
810 
811     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
812         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
813         // false.
814         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
815     }
816 
817     @Override
818     public ChannelPromise newPromise() {
819         return new DefaultChannelPromise(channel(), executor());
820     }
821 
822     @Override
823     public ChannelProgressivePromise newProgressivePromise() {
824         return new DefaultChannelProgressivePromise(channel(), executor());
825     }
826 
827     @Override
828     public ChannelFuture newSucceededFuture() {
829         ChannelFuture succeededFuture = this.succeededFuture;
830         if (succeededFuture == null) {
831             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
832         }
833         return succeededFuture;
834     }
835 
836     @Override
837     public ChannelFuture newFailedFuture(Throwable cause) {
838         return new FailedChannelFuture(channel(), executor(), cause);
839     }
840 
841     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
842         ObjectUtil.checkNotNull(promise, "promise");
843 
844         if (promise.isDone()) {
845             // Check if the promise was cancelled and if so signal that the processing of the operation
846             // should not be performed.
847             //
848             // See https://github.com/netty/netty/issues/2349
849             if (promise.isCancelled()) {
850                 return true;
851             }
852             throw new IllegalArgumentException("promise already done: " + promise);
853         }
854 
855         if (promise.channel() != channel()) {
856             throw new IllegalArgumentException(String.format(
857                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
858         }
859 
860         if (promise.getClass() == DefaultChannelPromise.class) {
861             return false;
862         }
863 
864         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
865             throw new IllegalArgumentException(
866                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
867         }
868 
869         if (promise instanceof AbstractChannel.CloseFuture) {
870             throw new IllegalArgumentException(
871                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
872         }
873         return false;
874     }
875 
876     private AbstractChannelHandlerContext findContextInbound(int mask) {
877         AbstractChannelHandlerContext ctx = this;
878         EventExecutor currentExecutor = executor();
879         do {
880             ctx = ctx.next;
881         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
882         return ctx;
883     }
884 
885     private AbstractChannelHandlerContext findContextOutbound(int mask) {
886         AbstractChannelHandlerContext ctx = this;
887         EventExecutor currentExecutor = executor();
888         do {
889             ctx = ctx.prev;
890         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
891         return ctx;
892     }
893 
894     private static boolean skipContext(
895             AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
896         // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
897         return (ctx.executionMask & (onlyMask | mask)) == 0 ||
898                 // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
899                 // everything to preserve ordering.
900                 //
901                 // See https://github.com/netty/netty/issues/10067
902                 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
903     }
904 
905     @Override
906     public ChannelPromise voidPromise() {
907         return channel().voidPromise();
908     }
909 
910     final void setRemoved() {
911         handlerState = REMOVE_COMPLETE;
912     }
913 
914     final boolean setAddComplete() {
915         for (;;) {
916             int oldState = handlerState;
917             if (oldState == REMOVE_COMPLETE) {
918                 return false;
919             }
920             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
921             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
922             // exposing ordering guarantees.
923             if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
924                 return true;
925             }
926         }
927     }
928 
929     final void setAddPending() {
930         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
931         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
932     }
933 
934     final void callHandlerAdded() throws Exception {
935         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
936         // any pipeline events ctx.handler() will miss them because the state will not allow it.
937         if (setAddComplete()) {
938             handler().handlerAdded(this);
939         }
940     }
941 
942     final void callHandlerRemoved() throws Exception {
943         try {
944             // Only call handlerRemoved(...) if we called handlerAdded(...) before.
945             if (handlerState == ADD_COMPLETE) {
946                 handler().handlerRemoved(this);
947             }
948         } finally {
949             // Mark the handler as removed in any case.
950             setRemoved();
951         }
952     }
953 
954     /**
955      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
956      * yet. If not return {@code false} and if called or could not detect return {@code true}.
957      *
958      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
959      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
960      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
961      */
962     private boolean invokeHandler() {
963         // Store in local variable to reduce volatile reads.
964         int handlerState = this.handlerState;
965         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
966     }
967 
968     @Override
969     public boolean isRemoved() {
970         return handlerState == REMOVE_COMPLETE;
971     }
972 
973     @Override
974     public <T> Attribute<T> attr(AttributeKey<T> key) {
975         return channel().attr(key);
976     }
977 
978     @Override
979     public <T> boolean hasAttr(AttributeKey<T> key) {
980         return channel().hasAttr(key);
981     }
982 
983     private static boolean safeExecute(EventExecutor executor, Runnable runnable,
984             ChannelPromise promise, Object msg, boolean lazy) {
985         try {
986             if (lazy && executor instanceof AbstractEventExecutor) {
987                 ((AbstractEventExecutor) executor).lazyExecute(runnable);
988             } else {
989                 executor.execute(runnable);
990             }
991             return true;
992         } catch (Throwable cause) {
993             try {
994                 if (msg != null) {
995                     ReferenceCountUtil.release(msg);
996                 }
997             } finally {
998                 promise.setFailure(cause);
999             }
1000             return false;
1001         }
1002     }
1003 
1004     @Override
1005     public String toHintString() {
1006         return '\'' + name + "' will handle the message from this point.";
1007     }
1008 
1009     @Override
1010     public String toString() {
1011         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1012     }
1013 
1014     static final class WriteTask implements Runnable {
1015         private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
1016             @Override
1017             public WriteTask newObject(Handle<WriteTask> handle) {
1018                 return new WriteTask(handle);
1019             }
1020         });
1021 
1022         static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1023                 Object msg, ChannelPromise promise, boolean flush) {
1024             WriteTask task = RECYCLER.get();
1025             init(task, ctx, msg, promise, flush);
1026             return task;
1027         }
1028 
1029         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1030                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1031 
1032         // Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field
1033         private static final int WRITE_TASK_OVERHEAD =
1034                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1035 
1036         private final Handle<WriteTask> handle;
1037         private AbstractChannelHandlerContext ctx;
1038         private Object msg;
1039         private ChannelPromise promise;
1040         private int size; // sign bit controls flush
1041 
1042         @SuppressWarnings("unchecked")
1043         private WriteTask(Handle<? extends WriteTask> handle) {
1044             this.handle = (Handle<WriteTask>) handle;
1045         }
1046 
1047         protected static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1048                                    Object msg, ChannelPromise promise, boolean flush) {
1049             task.ctx = ctx;
1050             task.msg = msg;
1051             task.promise = promise;
1052 
1053             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1054                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1055                 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1056             } else {
1057                 task.size = 0;
1058             }
1059             if (flush) {
1060                 task.size |= Integer.MIN_VALUE;
1061             }
1062         }
1063 
1064         @Override
1065         public void run() {
1066             try {
1067                 decrementPendingOutboundBytes();
1068                 if (size >= 0) {
1069                     ctx.invokeWrite(msg, promise);
1070                 } else {
1071                     ctx.invokeWriteAndFlush(msg, promise);
1072                 }
1073             } finally {
1074                 recycle();
1075             }
1076         }
1077 
1078         void cancel() {
1079             try {
1080                 decrementPendingOutboundBytes();
1081             } finally {
1082                 recycle();
1083             }
1084         }
1085 
1086         private void decrementPendingOutboundBytes() {
1087             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1088                 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1089             }
1090         }
1091 
1092         private void recycle() {
1093             // Set to null so the GC can collect them directly
1094             ctx = null;
1095             msg = null;
1096             promise = null;
1097             handle.recycle(this);
1098         }
1099     }
1100 
1101     private static final class Tasks {
1102         private final AbstractChannelHandlerContext next;
1103         private final Runnable invokeChannelReadCompleteTask = new Runnable() {
1104             @Override
1105             public void run() {
1106                 next.invokeChannelReadComplete();
1107             }
1108         };
1109         private final Runnable invokeReadTask = new Runnable() {
1110             @Override
1111             public void run() {
1112                 next.invokeRead();
1113             }
1114         };
1115         private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
1116             @Override
1117             public void run() {
1118                 next.invokeChannelWritabilityChanged();
1119             }
1120         };
1121         private final Runnable invokeFlushTask = new Runnable() {
1122             @Override
1123             public void run() {
1124                 next.invokeFlush();
1125             }
1126         };
1127 
1128         Tasks(AbstractChannelHandlerContext next) {
1129             this.next = next;
1130         }
1131     }
1132 }