View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.Recycler;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.ResourceLeakHint;
24  import io.netty.util.concurrent.AbstractEventExecutor;
25  import io.netty.util.concurrent.EventExecutor;
26  import io.netty.util.concurrent.OrderedEventExecutor;
27  import io.netty.util.internal.ObjectPool.Handle;
28  import io.netty.util.internal.ObjectUtil;
29  import io.netty.util.internal.PromiseNotificationUtil;
30  import io.netty.util.internal.StringUtil;
31  import io.netty.util.internal.SystemPropertyUtil;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.net.SocketAddress;
36  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
37  
38  import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
39  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
40  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
41  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
42  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
43  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
44  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
45  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
46  import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
47  import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
48  import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
49  import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
50  import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
51  import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
52  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
53  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
54  import static io.netty.channel.ChannelHandlerMask.MASK_READ;
55  import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
56  import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
57  import static io.netty.channel.ChannelHandlerMask.mask;
58  
59  abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
60  
61      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
62      volatile AbstractChannelHandlerContext next;
63      volatile AbstractChannelHandlerContext prev;
64  
65      private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
66              AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
67  
68      /**
69       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
70       */
71      private static final int ADD_PENDING = 1;
72      /**
73       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
74       */
75      private static final int ADD_COMPLETE = 2;
76      /**
77       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
78       */
79      private static final int REMOVE_COMPLETE = 3;
80      /**
81       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
82       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
83       */
84      private static final int INIT = 0;
85  
86      private final DefaultChannelPipeline pipeline;
87      private final String name;
88      private final boolean ordered;
89      private final int executionMask;
90  
91      // Will be set to null if no child executor should be used, otherwise it will be set to the
92      // child executor.
93      final EventExecutor childExecutor;
94      // Cache the concrete value for the executor() method. This method is in the hot-path,
95      // and it's a profitable optimisation to avoid as many dependent-loads as possible.
96      // It does not need to be volatile, because it's always the same value for a given context,
97      // within the lifetime of its registration with an event loop, and deregistering will clear it.
98      EventExecutor contextExecutor;
99      private ChannelFuture succeededFuture;
100 
101     // Lazily instantiated tasks used to trigger events to a handler with different executor.
102     // There is no need to make this volatile as at worse it will just create a few more instances then needed.
103     private Tasks invokeTasks;
104 
105     private volatile int handlerState = INIT;
106 
107     AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
108                                   String name, Class<? extends ChannelHandler> handlerClass) {
109         this.name = ObjectUtil.checkNotNull(name, "name");
110         this.pipeline = pipeline;
111         childExecutor = executor;
112         executionMask = mask(handlerClass);
113         // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
114         ordered = executor == null || executor instanceof OrderedEventExecutor;
115     }
116 
117     @Override
118     public Channel channel() {
119         return pipeline.channel();
120     }
121 
122     @Override
123     public ChannelPipeline pipeline() {
124         return pipeline;
125     }
126 
127     @Override
128     public ByteBufAllocator alloc() {
129         return channel().config().getAllocator();
130     }
131 
132     @Override
133     public EventExecutor executor() {
134         EventExecutor ex = contextExecutor;
135         if (ex == null) {
136             contextExecutor = ex = childExecutor != null ? childExecutor : channel().eventLoop();
137         }
138         return ex;
139     }
140 
141     @Override
142     public String name() {
143         return name;
144     }
145 
146     @Override
147     public ChannelHandlerContext fireChannelRegistered() {
148         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
149         if (next.executor().inEventLoop()) {
150             if (next.invokeHandler()) {
151                 try {
152                     // DON'T CHANGE
153                     // Duplex handlers implements both out/in interfaces causing a scalability issue
154                     // see https://bugs.openjdk.org/browse/JDK-8180450
155                     final ChannelHandler handler = next.handler();
156                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
157                     if (handler == headContext) {
158                         headContext.channelRegistered(next);
159                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
160                         ((ChannelInboundHandlerAdapter) handler).channelRegistered(next);
161                     } else {
162                         ((ChannelInboundHandler) handler).channelRegistered(next);
163                     }
164                 } catch (Throwable t) {
165                     next.invokeExceptionCaught(t);
166                 }
167             } else {
168                 next.fireChannelRegistered();
169             }
170         } else {
171             next.executor().execute(this::fireChannelRegistered);
172         }
173         return this;
174     }
175 
176     @Override
177     public ChannelHandlerContext fireChannelUnregistered() {
178         final AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED);
179         if (next.executor().inEventLoop()) {
180             if (next.invokeHandler()) {
181                 try {
182                     // DON'T CHANGE
183                     // Duplex handlers implements both out/in interfaces causing a scalability issue
184                     // see https://bugs.openjdk.org/browse/JDK-8180450
185                     final ChannelHandler handler = next.handler();
186                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
187                     if (handler == headContext) {
188                         headContext.channelUnregistered(next);
189                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
190                         ((ChannelInboundHandlerAdapter) handler).channelUnregistered(next);
191                     } else {
192                         ((ChannelInboundHandler) handler).channelUnregistered(next);
193                     }
194                 } catch (Throwable t) {
195                     next.invokeExceptionCaught(t);
196                 }
197             } else {
198                 next.fireChannelUnregistered();
199             }
200         } else {
201             next.executor().execute(this::fireChannelUnregistered);
202         }
203         return this;
204     }
205 
206     @Override
207     public ChannelHandlerContext fireChannelActive() {
208         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE);
209         if (next.executor().inEventLoop()) {
210             if (next.invokeHandler()) {
211                 try {
212                     // DON'T CHANGE
213                     // Duplex handlers implements both out/in interfaces causing a scalability issue
214                     // see https://bugs.openjdk.org/browse/JDK-8180450
215                     final ChannelHandler handler = next.handler();
216                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
217                     if (handler == headContext) {
218                         headContext.channelActive(next);
219                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
220                         ((ChannelInboundHandlerAdapter) handler).channelActive(next);
221                     } else {
222                         ((ChannelInboundHandler) handler).channelActive(next);
223                     }
224                 } catch (Throwable t) {
225                     next.invokeExceptionCaught(t);
226                 }
227             } else {
228                 next.fireChannelActive();
229             }
230         } else {
231             next.executor().execute(this::fireChannelActive);
232         }
233         return this;
234     }
235 
236     @Override
237     public ChannelHandlerContext fireChannelInactive() {
238         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE);
239         if (next.executor().inEventLoop()) {
240             if (next.invokeHandler()) {
241                 try {
242                     // DON'T CHANGE
243                     // Duplex handlers implements both out/in interfaces causing a scalability issue
244                     // see https://bugs.openjdk.org/browse/JDK-8180450
245                     final ChannelHandler handler = next.handler();
246                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
247                     if (handler == headContext) {
248                         headContext.channelInactive(next);
249                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
250                         ((ChannelInboundHandlerAdapter) handler).channelInactive(next);
251                     } else {
252                         ((ChannelInboundHandler) handler).channelInactive(next);
253                     }
254                 } catch (Throwable t) {
255                     next.invokeExceptionCaught(t);
256                 }
257             } else {
258                 next.fireChannelInactive();
259             }
260         } else {
261             next.executor().execute(this::fireChannelInactive);
262         }
263         return this;
264     }
265 
266     @Override
267     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
268         AbstractChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT);
269         ObjectUtil.checkNotNull(cause, "cause");
270         if (next.executor().inEventLoop()) {
271             next.invokeExceptionCaught(cause);
272         } else {
273             try {
274                 next.executor().execute(() -> next.invokeExceptionCaught(cause));
275             } catch (Throwable t) {
276                 if (logger.isWarnEnabled()) {
277                     logger.warn("Failed to submit an exceptionCaught() event.", t);
278                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
279                 }
280             }
281         }
282         return this;
283     }
284 
285     @SuppressWarnings("deprecation")
286     private void invokeExceptionCaught(final Throwable cause) {
287         if (invokeHandler()) {
288             try {
289                 handler().exceptionCaught(this, cause);
290             } catch (Throwable error) {
291                 if (logger.isDebugEnabled()) {
292                     logger.debug(
293                         "An exception " +
294                         "was thrown by a user handler's exceptionCaught() " +
295                         "method while handling the following exception:", cause);
296                 } else if (logger.isWarnEnabled()) {
297                     logger.warn(
298                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
299                         "was thrown by a user handler's exceptionCaught() " +
300                         "method while handling the following exception:", error, cause);
301                 }
302             }
303         } else {
304             fireExceptionCaught(cause);
305         }
306     }
307 
308     @Override
309     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
310         ObjectUtil.checkNotNull(event, "event");
311         AbstractChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED);
312         if (next.executor().inEventLoop()) {
313             if (next.invokeHandler()) {
314                 try {
315                     // DON'T CHANGE
316                     // Duplex handlers implements both out/in interfaces causing a scalability issue
317                     // see https://bugs.openjdk.org/browse/JDK-8180450
318                     final ChannelHandler handler = next.handler();
319                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
320                     if (handler == headContext) {
321                         headContext.userEventTriggered(next, event);
322                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
323                         ((ChannelInboundHandlerAdapter) handler).userEventTriggered(next, event);
324                     } else {
325                         ((ChannelInboundHandler) handler).userEventTriggered(next, event);
326                     }
327                 } catch (Throwable t) {
328                     next.invokeExceptionCaught(t);
329                 }
330             } else {
331                 next.fireUserEventTriggered(event);
332             }
333         } else {
334             next.executor().execute(() -> fireUserEventTriggered(event));
335         }
336         return this;
337     }
338 
339     @Override
340     public ChannelHandlerContext fireChannelRead(final Object msg) {
341         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
342         if (next.executor().inEventLoop()) {
343             final Object m = pipeline.touch(msg, next);
344             if (next.invokeHandler()) {
345                 try {
346                     // DON'T CHANGE
347                     // Duplex handlers implements both out/in interfaces causing a scalability issue
348                     // see https://bugs.openjdk.org/browse/JDK-8180450
349                     final ChannelHandler handler = next.handler();
350                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
351                     if (handler == headContext) {
352                         headContext.channelRead(next, m);
353                     } else if (handler instanceof ChannelDuplexHandler) {
354                         ((ChannelDuplexHandler) handler).channelRead(next, m);
355                     } else {
356                         ((ChannelInboundHandler) handler).channelRead(next, m);
357                     }
358                 } catch (Throwable t) {
359                     next.invokeExceptionCaught(t);
360                 }
361             } else {
362                 next.fireChannelRead(m);
363             }
364         } else {
365             next.executor().execute(() -> fireChannelRead(msg));
366         }
367         return this;
368     }
369 
370     @Override
371     public ChannelHandlerContext fireChannelReadComplete() {
372         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
373         if (next.executor().inEventLoop()) {
374             if (next.invokeHandler()) {
375                 try {
376                     // DON'T CHANGE
377                     // Duplex handlers implements both out/in interfaces causing a scalability issue
378                     // see https://bugs.openjdk.org/browse/JDK-8180450
379                     final ChannelHandler handler = next.handler();
380                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
381                     if (handler == headContext) {
382                         headContext.channelReadComplete(next);
383                     } else if (handler instanceof ChannelDuplexHandler) {
384                         ((ChannelDuplexHandler) handler).channelReadComplete(next);
385                     } else {
386                         ((ChannelInboundHandler) handler).channelReadComplete(next);
387                     }
388                 } catch (Throwable t) {
389                     next.invokeExceptionCaught(t);
390                 }
391             } else {
392                 next.fireChannelReadComplete();
393             }
394         } else {
395             next.executor().execute(getInvokeTasks().invokeChannelReadCompleteTask);
396         }
397         return this;
398     }
399 
400     @Override
401     public ChannelHandlerContext fireChannelWritabilityChanged() {
402         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
403         if (next.executor().inEventLoop()) {
404             if (next.invokeHandler()) {
405                 try {
406                     // DON'T CHANGE
407                     // Duplex handlers implements both out/in interfaces causing a scalability issue
408                     // see https://bugs.openjdk.org/browse/JDK-8180450
409                     final ChannelHandler handler = next.handler();
410                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
411                     if (handler == headContext) {
412                         headContext.channelWritabilityChanged(next);
413                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
414                         ((ChannelInboundHandlerAdapter) handler).channelWritabilityChanged(next);
415                     } else {
416                         ((ChannelInboundHandler) handler).channelWritabilityChanged(next);
417                     }
418                 } catch (Throwable t) {
419                     next.invokeExceptionCaught(t);
420                 }
421             } else {
422                 next.fireChannelWritabilityChanged();
423             }
424         } else {
425             next.executor().execute(getInvokeTasks().invokeChannelWritableStateChangedTask);
426         }
427         return this;
428     }
429 
430     @Override
431     public ChannelFuture bind(SocketAddress localAddress) {
432         return bind(localAddress, newPromise());
433     }
434 
435     @Override
436     public ChannelFuture connect(SocketAddress remoteAddress) {
437         return connect(remoteAddress, newPromise());
438     }
439 
440     @Override
441     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
442         return connect(remoteAddress, localAddress, newPromise());
443     }
444 
445     @Override
446     public ChannelFuture disconnect() {
447         return disconnect(newPromise());
448     }
449 
450     @Override
451     public ChannelFuture close() {
452         return close(newPromise());
453     }
454 
455     @Override
456     public ChannelFuture deregister() {
457         return deregister(newPromise());
458     }
459 
460     @Override
461     public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
462         ObjectUtil.checkNotNull(localAddress, "localAddress");
463         if (isNotValidPromise(promise, false)) {
464             // cancelled
465             return promise;
466         }
467 
468         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
469         EventExecutor executor = next.executor();
470         if (executor.inEventLoop()) {
471             next.invokeBind(localAddress, promise);
472         } else {
473             safeExecute(executor, new Runnable() {
474                 @Override
475                 public void run() {
476                     next.invokeBind(localAddress, promise);
477                 }
478             }, promise, null, false);
479         }
480         return promise;
481     }
482 
483     private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
484         if (invokeHandler()) {
485             try {
486                 // DON'T CHANGE
487                 // Duplex handlers implements both out/in interfaces causing a scalability issue
488                 // see https://bugs.openjdk.org/browse/JDK-8180450
489                 final ChannelHandler handler = handler();
490                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
491                 if (handler == headContext) {
492                     headContext.bind(this, localAddress, promise);
493                 } else if (handler instanceof ChannelDuplexHandler) {
494                     ((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
495                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
496                     ((ChannelOutboundHandlerAdapter) handler).bind(this, localAddress, promise);
497                 } else {
498                     ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
499                 }
500             } catch (Throwable t) {
501                 notifyOutboundHandlerException(t, promise);
502             }
503         } else {
504             bind(localAddress, promise);
505         }
506     }
507 
508     @Override
509     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
510         return connect(remoteAddress, null, promise);
511     }
512 
513     @Override
514     public ChannelFuture connect(
515             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
516         ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
517 
518         if (isNotValidPromise(promise, false)) {
519             // cancelled
520             return promise;
521         }
522 
523         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
524         EventExecutor executor = next.executor();
525         if (executor.inEventLoop()) {
526             next.invokeConnect(remoteAddress, localAddress, promise);
527         } else {
528             safeExecute(executor, new Runnable() {
529                 @Override
530                 public void run() {
531                     next.invokeConnect(remoteAddress, localAddress, promise);
532                 }
533             }, promise, null, false);
534         }
535         return promise;
536     }
537 
538     private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
539         if (invokeHandler()) {
540             try {
541                 // DON'T CHANGE
542                 // Duplex handlers implements both out/in interfaces causing a scalability issue
543                 // see https://bugs.openjdk.org/browse/JDK-8180450
544                 final ChannelHandler handler = handler();
545                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
546                 if (handler == headContext) {
547                     headContext.connect(this, remoteAddress, localAddress, promise);
548                 } else if (handler instanceof ChannelDuplexHandler) {
549                     ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise);
550                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
551                     ((ChannelOutboundHandlerAdapter) handler).connect(this, remoteAddress, localAddress, promise);
552                 } else {
553                     ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
554                 }
555             } catch (Throwable t) {
556                 notifyOutboundHandlerException(t, promise);
557             }
558         } else {
559             connect(remoteAddress, localAddress, promise);
560         }
561     }
562 
563     @Override
564     public ChannelFuture disconnect(final ChannelPromise promise) {
565         if (!channel().metadata().hasDisconnect()) {
566             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
567             // So far, UDP/IP is the only transport that has such behavior.
568             return close(promise);
569         }
570         if (isNotValidPromise(promise, false)) {
571             // cancelled
572             return promise;
573         }
574 
575         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
576         EventExecutor executor = next.executor();
577         if (executor.inEventLoop()) {
578             next.invokeDisconnect(promise);
579         } else {
580             safeExecute(executor, new Runnable() {
581                 @Override
582                 public void run() {
583                     next.invokeDisconnect(promise);
584                 }
585             }, promise, null, false);
586         }
587         return promise;
588     }
589 
590     private void invokeDisconnect(ChannelPromise promise) {
591         if (invokeHandler()) {
592             try {
593                 // DON'T CHANGE
594                 // Duplex handlers implements both out/in interfaces causing a scalability issue
595                 // see https://bugs.openjdk.org/browse/JDK-8180450
596                 final ChannelHandler handler = handler();
597                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
598                 if (handler == headContext) {
599                     headContext.disconnect(this, promise);
600                 } else if (handler instanceof ChannelDuplexHandler) {
601                     ((ChannelDuplexHandler) handler).disconnect(this, promise);
602                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
603                     ((ChannelOutboundHandlerAdapter) handler).disconnect(this, promise);
604                 } else {
605                     ((ChannelOutboundHandler) handler).disconnect(this, promise);
606                 }
607             } catch (Throwable t) {
608                 notifyOutboundHandlerException(t, promise);
609             }
610         } else {
611             disconnect(promise);
612         }
613     }
614 
615     @Override
616     public ChannelFuture close(final ChannelPromise promise) {
617         if (isNotValidPromise(promise, false)) {
618             // cancelled
619             return promise;
620         }
621 
622         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
623         EventExecutor executor = next.executor();
624         if (executor.inEventLoop()) {
625             next.invokeClose(promise);
626         } else {
627             safeExecute(executor, new Runnable() {
628                 @Override
629                 public void run() {
630                     next.invokeClose(promise);
631                 }
632             }, promise, null, false);
633         }
634 
635         return promise;
636     }
637 
638     private void invokeClose(ChannelPromise promise) {
639         if (invokeHandler()) {
640             try {
641                 // DON'T CHANGE
642                 // Duplex handlers implements both out/in interfaces causing a scalability issue
643                 // see https://bugs.openjdk.org/browse/JDK-8180450
644                 final ChannelHandler handler = handler();
645                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
646                 if (handler == headContext) {
647                     headContext.close(this, promise);
648                 } else if (handler instanceof ChannelDuplexHandler) {
649                     ((ChannelDuplexHandler) handler).close(this, promise);
650                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
651                     ((ChannelOutboundHandlerAdapter) handler).close(this, promise);
652                 } else {
653                     ((ChannelOutboundHandler) handler).close(this, promise);
654                 }
655             } catch (Throwable t) {
656                 notifyOutboundHandlerException(t, promise);
657             }
658         } else {
659             close(promise);
660         }
661     }
662 
663     @Override
664     public ChannelFuture deregister(final ChannelPromise promise) {
665         if (isNotValidPromise(promise, false)) {
666             // cancelled
667             return promise;
668         }
669 
670         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
671         EventExecutor executor = next.executor();
672         if (executor.inEventLoop()) {
673             next.invokeDeregister(promise);
674         } else {
675             safeExecute(executor, new Runnable() {
676                 @Override
677                 public void run() {
678                     next.invokeDeregister(promise);
679                 }
680             }, promise, null, false);
681         }
682 
683         return promise;
684     }
685 
686     private void invokeDeregister(ChannelPromise promise) {
687         if (invokeHandler()) {
688             try {
689                 // DON'T CHANGE
690                 // Duplex handlers implements both out/in interfaces causing a scalability issue
691                 // see https://bugs.openjdk.org/browse/JDK-8180450
692                 final ChannelHandler handler = handler();
693                 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
694                 if (handler == headContext) {
695                     headContext.deregister(this, promise);
696                 } else if (handler instanceof ChannelDuplexHandler) {
697                     ((ChannelDuplexHandler) handler).deregister(this, promise);
698                 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
699                     ((ChannelOutboundHandlerAdapter) handler).deregister(this, promise);
700                 } else {
701                     ((ChannelOutboundHandler) handler).deregister(this, promise);
702                 }
703             } catch (Throwable t) {
704                 notifyOutboundHandlerException(t, promise);
705             }
706         } else {
707             deregister(promise);
708         }
709     }
710 
711     @Override
712     public ChannelHandlerContext read() {
713         final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
714         if (next.executor().inEventLoop()) {
715             if (next.invokeHandler()) {
716                 try {
717                     // DON'T CHANGE
718                     // Duplex handlers implements both out/in interfaces causing a scalability issue
719                     // see https://bugs.openjdk.org/browse/JDK-8180450
720                     final ChannelHandler handler = next.handler();
721                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
722                     if (handler == headContext) {
723                         headContext.read(next);
724                     } else if (handler instanceof ChannelDuplexHandler) {
725                         ((ChannelDuplexHandler) handler).read(next);
726                     } else if (handler instanceof ChannelOutboundHandlerAdapter) {
727                         ((ChannelOutboundHandlerAdapter) handler).read(next);
728                     } else {
729                         ((ChannelOutboundHandler) handler).read(next);
730                     }
731                 } catch (Throwable t) {
732                     invokeExceptionCaught(t);
733                 }
734             } else {
735                 next.read();
736             }
737         } else {
738             next.executor().execute(getInvokeTasks().invokeReadTask);
739         }
740         return this;
741     }
742 
743     @Override
744     public ChannelFuture write(Object msg) {
745         ChannelPromise promise = newPromise();
746         write(msg, false, promise);
747         return promise;
748     }
749 
750     @Override
751     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
752         write(msg, false, promise);
753         return promise;
754     }
755 
756     @Override
757     public ChannelHandlerContext flush() {
758         final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
759         EventExecutor executor = next.executor();
760         if (executor.inEventLoop()) {
761             next.invokeFlush();
762         } else {
763             Tasks tasks = next.invokeTasks;
764             if (tasks == null) {
765                 next.invokeTasks = tasks = new Tasks(next);
766             }
767             safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
768         }
769 
770         return this;
771     }
772 
773     private void invokeFlush() {
774         if (invokeHandler()) {
775             invokeFlush0();
776         } else {
777             flush();
778         }
779     }
780 
781     private void invokeFlush0() {
782         try {
783             // DON'T CHANGE
784             // Duplex handlers implements both out/in interfaces causing a scalability issue
785             // see https://bugs.openjdk.org/browse/JDK-8180450
786             final ChannelHandler handler = handler();
787             final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
788             if (handler == headContext) {
789                 headContext.flush(this);
790             } else if (handler instanceof ChannelDuplexHandler) {
791                 ((ChannelDuplexHandler) handler).flush(this);
792             } else if (handler instanceof ChannelOutboundHandlerAdapter) {
793                 ((ChannelOutboundHandlerAdapter) handler).flush(this);
794             } else {
795                 ((ChannelOutboundHandler) handler).flush(this);
796             }
797         } catch (Throwable t) {
798             invokeExceptionCaught(t);
799         }
800     }
801 
802     @Override
803     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
804         write(msg, true, promise);
805         return promise;
806     }
807 
808     void write(Object msg, boolean flush, ChannelPromise promise) {
809         if (validateWrite(msg, promise)) {
810             final AbstractChannelHandlerContext next = findContextOutbound(flush ?
811                     MASK_WRITE | MASK_FLUSH : MASK_WRITE);
812             final Object m = pipeline.touch(msg, next);
813             EventExecutor executor = next.executor();
814             if (executor.inEventLoop()) {
815                 if (next.invokeHandler()) {
816                     try {
817                         // DON'T CHANGE
818                         // Duplex handlers implements both out/in interfaces causing a scalability issue
819                         // see https://bugs.openjdk.org/browse/JDK-8180450
820                         final ChannelHandler handler = next.handler();
821                         final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
822                         if (handler == headContext) {
823                             headContext.write(next, msg, promise);
824                         } else if (handler instanceof ChannelDuplexHandler) {
825                             ((ChannelDuplexHandler) handler).write(next, msg, promise);
826                         } else if (handler instanceof ChannelOutboundHandlerAdapter) {
827                             ((ChannelOutboundHandlerAdapter) handler).write(next, msg, promise);
828                         } else {
829                             ((ChannelOutboundHandler) handler).write(next, msg, promise);
830                         }
831                     } catch (Throwable t) {
832                         notifyOutboundHandlerException(t, promise);
833                     }
834                     if (flush) {
835                         next.invokeFlush0();
836                     }
837                 } else {
838                     next.write(msg, flush, promise);
839                 }
840             } else {
841                 final WriteTask task = WriteTask.newInstance(this, m, promise, flush);
842                 if (!safeExecute(executor, task, promise, m, !flush)) {
843                     // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
844                     // and put it back in the Recycler for re-use later.
845                     //
846                     // See https://github.com/netty/netty/issues/8343.
847                     task.cancel();
848                 }
849             }
850         }
851     }
852 
853     private boolean validateWrite(Object msg, ChannelPromise promise) {
854         ObjectUtil.checkNotNull(msg, "msg");
855         try {
856             if (isNotValidPromise(promise, true)) {
857                 ReferenceCountUtil.release(msg);
858                 return false; // cancelled
859             }
860         } catch (RuntimeException e) {
861             ReferenceCountUtil.release(msg);
862             throw e;
863         }
864         return true;
865     }
866 
867     @Override
868     public ChannelFuture writeAndFlush(Object msg) {
869         return writeAndFlush(msg, newPromise());
870     }
871 
872     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
873         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
874         // false.
875         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
876     }
877 
878     @Override
879     public ChannelPromise newPromise() {
880         return new DefaultChannelPromise(channel(), executor());
881     }
882 
883     @Override
884     public ChannelProgressivePromise newProgressivePromise() {
885         return new DefaultChannelProgressivePromise(channel(), executor());
886     }
887 
888     @Override
889     public ChannelFuture newSucceededFuture() {
890         ChannelFuture succeededFuture = this.succeededFuture;
891         if (succeededFuture == null) {
892             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
893         }
894         return succeededFuture;
895     }
896 
897     @Override
898     public ChannelFuture newFailedFuture(Throwable cause) {
899         return new FailedChannelFuture(channel(), executor(), cause);
900     }
901 
902     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
903         ObjectUtil.checkNotNull(promise, "promise");
904 
905         if (promise.isDone()) {
906             // Check if the promise was cancelled and if so signal that the processing of the operation
907             // should not be performed.
908             //
909             // See https://github.com/netty/netty/issues/2349
910             if (promise.isCancelled()) {
911                 return true;
912             }
913             throw new IllegalArgumentException("promise already done: " + promise);
914         }
915 
916         if (promise.channel() != channel()) {
917             throw new IllegalArgumentException(String.format(
918                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
919         }
920 
921         if (promise.getClass() == DefaultChannelPromise.class) {
922             return false;
923         }
924 
925         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
926             throw new IllegalArgumentException(
927                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
928         }
929 
930         if (promise instanceof AbstractChannel.CloseFuture) {
931             throw new IllegalArgumentException(
932                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
933         }
934         return false;
935     }
936 
937     private AbstractChannelHandlerContext findContextInbound(int mask) {
938         AbstractChannelHandlerContext ctx = this;
939         EventExecutor currentExecutor = executor();
940         do {
941             ctx = ctx.next;
942         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
943         return ctx;
944     }
945 
946     private AbstractChannelHandlerContext findContextOutbound(int mask) {
947         AbstractChannelHandlerContext ctx = this;
948         EventExecutor currentExecutor = executor();
949         do {
950             ctx = ctx.prev;
951         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
952         return ctx;
953     }
954 
955     private static boolean skipContext(
956             AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
957         // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
958         return (ctx.executionMask & (onlyMask | mask)) == 0 ||
959                 // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
960                 // everything to preserve ordering.
961                 //
962                 // See https://github.com/netty/netty/issues/10067
963                 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
964     }
965 
966     @Override
967     public ChannelPromise voidPromise() {
968         return channel().voidPromise();
969     }
970 
971     final void setRemoved() {
972         handlerState = REMOVE_COMPLETE;
973     }
974 
975     final boolean setAddComplete() {
976         for (;;) {
977             int oldState = handlerState;
978             if (oldState == REMOVE_COMPLETE) {
979                 return false;
980             }
981             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
982             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
983             // exposing ordering guarantees.
984             if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
985                 return true;
986             }
987         }
988     }
989 
990     final void setAddPending() {
991         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
992         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
993     }
994 
995     final void callHandlerAdded() throws Exception {
996         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
997         // any pipeline events ctx.handler() will miss them because the state will not allow it.
998         if (setAddComplete()) {
999             handler().handlerAdded(this);
1000         }
1001     }
1002 
1003     final void callHandlerRemoved() throws Exception {
1004         try {
1005             // Only call handlerRemoved(...) if we called handlerAdded(...) before.
1006             if (handlerState == ADD_COMPLETE) {
1007                 handler().handlerRemoved(this);
1008             }
1009         } finally {
1010             // Mark the handler as removed in any case.
1011             setRemoved();
1012         }
1013     }
1014 
1015     /**
1016      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
1017      * yet. If not return {@code false} and if called or could not detect return {@code true}.
1018      *
1019      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
1020      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
1021      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
1022      */
1023     boolean invokeHandler() {
1024         // Store in local variable to reduce volatile reads.
1025         int handlerState = this.handlerState;
1026         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
1027     }
1028 
1029     @Override
1030     public boolean isRemoved() {
1031         return handlerState == REMOVE_COMPLETE;
1032     }
1033 
1034     @Override
1035     public <T> Attribute<T> attr(AttributeKey<T> key) {
1036         return channel().attr(key);
1037     }
1038 
1039     @Override
1040     public <T> boolean hasAttr(AttributeKey<T> key) {
1041         return channel().hasAttr(key);
1042     }
1043 
1044     private static boolean safeExecute(EventExecutor executor, Runnable runnable,
1045             ChannelPromise promise, Object msg, boolean lazy) {
1046         try {
1047             if (lazy && executor instanceof AbstractEventExecutor) {
1048                 ((AbstractEventExecutor) executor).lazyExecute(runnable);
1049             } else {
1050                 executor.execute(runnable);
1051             }
1052             return true;
1053         } catch (Throwable cause) {
1054             try {
1055                 if (msg != null) {
1056                     ReferenceCountUtil.release(msg);
1057                 }
1058             } finally {
1059                 promise.setFailure(cause);
1060             }
1061             return false;
1062         }
1063     }
1064 
1065     @Override
1066     public String toHintString() {
1067         return '\'' + name + "' will handle the message from this point.";
1068     }
1069 
1070     @Override
1071     public String toString() {
1072         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1073     }
1074 
1075     Tasks getInvokeTasks() {
1076         Tasks tasks = invokeTasks;
1077         if (tasks == null) {
1078             invokeTasks = tasks = new Tasks(this);
1079         }
1080         return tasks;
1081     }
1082 
1083     static final class WriteTask implements Runnable {
1084         private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
1085             @Override
1086             protected WriteTask newObject(Handle<WriteTask> handle) {
1087                 return new WriteTask(handle);
1088             }
1089         };
1090 
1091         static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1092                 Object msg, ChannelPromise promise, boolean flush) {
1093             WriteTask task = RECYCLER.get();
1094             init(task, ctx, msg, promise, flush);
1095             return task;
1096         }
1097 
1098         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1099                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1100 
1101         // Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field
1102         private static final int WRITE_TASK_OVERHEAD =
1103                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1104 
1105         private final Handle<WriteTask> handle;
1106         private AbstractChannelHandlerContext ctx;
1107         private Object msg;
1108         private ChannelPromise promise;
1109         private int size; // sign bit controls flush
1110 
1111         private WriteTask(Handle<WriteTask> handle) {
1112             this.handle = handle;
1113         }
1114 
1115         static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1116                                    Object msg, ChannelPromise promise, boolean flush) {
1117             task.ctx = ctx;
1118             task.msg = msg;
1119             task.promise = promise;
1120 
1121             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1122                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1123                 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1124             } else {
1125                 task.size = 0;
1126             }
1127             if (flush) {
1128                 task.size |= Integer.MIN_VALUE;
1129             }
1130         }
1131 
1132         @Override
1133         public void run() {
1134             try {
1135                 decrementPendingOutboundBytes();
1136                 ctx.write(msg, size < 0, promise);
1137             } finally {
1138                 recycle();
1139             }
1140         }
1141 
1142         void cancel() {
1143             try {
1144                 decrementPendingOutboundBytes();
1145             } finally {
1146                 recycle();
1147             }
1148         }
1149 
1150         private void decrementPendingOutboundBytes() {
1151             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1152                 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1153             }
1154         }
1155 
1156         private void recycle() {
1157             // Set to null so the GC can collect them directly
1158             ctx = null;
1159             msg = null;
1160             promise = null;
1161             handle.recycle(this);
1162         }
1163     }
1164 
1165     static final class Tasks {
1166         final Runnable invokeChannelReadCompleteTask;
1167         private final Runnable invokeReadTask;
1168         private final Runnable invokeChannelWritableStateChangedTask;
1169         private final Runnable invokeFlushTask;
1170 
1171         Tasks(AbstractChannelHandlerContext ctx) {
1172             invokeChannelReadCompleteTask = ctx::fireChannelReadComplete;
1173             invokeReadTask = ctx::read;
1174             invokeChannelWritableStateChangedTask = ctx::fireChannelWritabilityChanged;
1175             invokeFlushTask = ctx::invokeFlush;
1176         }
1177     }
1178 }