View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.buffer.ByteBufAllocator;
19  import io.netty.util.Attribute;
20  import io.netty.util.AttributeKey;
21  import io.netty.util.Recycler;
22  import io.netty.util.ReferenceCountUtil;
23  import io.netty.util.ResourceLeakHint;
24  import io.netty.util.concurrent.AbstractEventExecutor;
25  import io.netty.util.concurrent.EventExecutor;
26  import io.netty.util.concurrent.OrderedEventExecutor;
27  import io.netty.util.concurrent.PromiseNotifier;
28  import io.netty.util.internal.ObjectPool.Handle;
29  import io.netty.util.internal.ObjectUtil;
30  import io.netty.util.internal.PromiseNotificationUtil;
31  import io.netty.util.internal.StringUtil;
32  import io.netty.util.internal.SystemPropertyUtil;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.net.SocketAddress;
37  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
38  
39  import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
40  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
41  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
42  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
43  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
44  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
45  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
46  import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
47  import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
48  import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
49  import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
50  import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
51  import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
52  import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
53  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
54  import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
55  import static io.netty.channel.ChannelHandlerMask.MASK_READ;
56  import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
57  import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
58  import static io.netty.channel.ChannelHandlerMask.mask;
59  
60  abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
61  
62      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
63      volatile AbstractChannelHandlerContext next;
64      volatile AbstractChannelHandlerContext prev;
65  
66      private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
67              AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
68  
69      /**
70       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
71       */
72      private static final int ADD_PENDING = 1;
73      /**
74       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
75       */
76      private static final int ADD_COMPLETE = 2;
77      /**
78       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
79       */
80      private static final int REMOVE_COMPLETE = 3;
81      /**
82       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
83       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
84       */
85      private static final int INIT = 0;
86  
87      private final DefaultChannelPipeline pipeline;
88      private final String name;
89      private final boolean ordered;
90      private final int executionMask;
91  
92      // Will be set to null if no child executor should be used, otherwise it will be set to the
93      // child executor.
94      final EventExecutor childExecutor;
95      // Cache the concrete value for the executor() method. This method is in the hot-path,
96      // and it's a profitable optimisation to avoid as many dependent-loads as possible.
97      // It does not need to be volatile, because it's always the same value for a given context,
98      // within the lifetime of its registration with an event loop, and deregistering will clear it.
99      EventExecutor contextExecutor;
100     private ChannelFuture succeededFuture;
101 
102     // Lazily instantiated tasks used to trigger events to a handler with different executor.
103     // There is no need to make this volatile as at worse it will just create a few more instances then needed.
104     private Tasks invokeTasks;
105 
106     private volatile int handlerState = INIT;
107 
108     AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
109                                   String name, Class<? extends ChannelHandler> handlerClass) {
110         this.name = ObjectUtil.checkNotNull(name, "name");
111         this.pipeline = pipeline;
112         childExecutor = executor;
113         executionMask = mask(handlerClass);
114         // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
115         ordered = executor == null || executor instanceof OrderedEventExecutor;
116     }
117 
118     @Override
119     public Channel channel() {
120         return pipeline.channel();
121     }
122 
123     @Override
124     public ChannelPipeline pipeline() {
125         return pipeline;
126     }
127 
128     @Override
129     public ByteBufAllocator alloc() {
130         return channel().config().getAllocator();
131     }
132 
133     @Override
134     public EventExecutor executor() {
135         EventExecutor ex = contextExecutor;
136         if (ex == null) {
137             contextExecutor = ex = childExecutor != null ? childExecutor : channel().eventLoop();
138         }
139         return ex;
140     }
141 
142     @Override
143     public String name() {
144         return name;
145     }
146 
147     @Override
148     public ChannelHandlerContext fireChannelRegistered() {
149         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
150         if (next.executor().inEventLoop()) {
151             if (next.invokeHandler()) {
152                 try {
153                     // DON'T CHANGE
154                     // Duplex handlers implements both out/in interfaces causing a scalability issue
155                     // see https://bugs.openjdk.org/browse/JDK-8180450
156                     final ChannelHandler handler = next.handler();
157                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
158                     if (handler == headContext) {
159                         headContext.channelRegistered(next);
160                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
161                         ((ChannelInboundHandlerAdapter) handler).channelRegistered(next);
162                     } else {
163                         ((ChannelInboundHandler) handler).channelRegistered(next);
164                     }
165                 } catch (Throwable t) {
166                     next.invokeExceptionCaught(t);
167                 }
168             } else {
169                 next.fireChannelRegistered();
170             }
171         } else {
172             next.executor().execute(this::fireChannelRegistered);
173         }
174         return this;
175     }
176 
177     @Override
178     public ChannelHandlerContext fireChannelUnregistered() {
179         final AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED);
180         if (next.executor().inEventLoop()) {
181             if (next.invokeHandler()) {
182                 try {
183                     // DON'T CHANGE
184                     // Duplex handlers implements both out/in interfaces causing a scalability issue
185                     // see https://bugs.openjdk.org/browse/JDK-8180450
186                     final ChannelHandler handler = next.handler();
187                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
188                     if (handler == headContext) {
189                         headContext.channelUnregistered(next);
190                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
191                         ((ChannelInboundHandlerAdapter) handler).channelUnregistered(next);
192                     } else {
193                         ((ChannelInboundHandler) handler).channelUnregistered(next);
194                     }
195                 } catch (Throwable t) {
196                     next.invokeExceptionCaught(t);
197                 }
198             } else {
199                 next.fireChannelUnregistered();
200             }
201         } else {
202             next.executor().execute(this::fireChannelUnregistered);
203         }
204         return this;
205     }
206 
207     @Override
208     public ChannelHandlerContext fireChannelActive() {
209         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE);
210         if (next.executor().inEventLoop()) {
211             if (next.invokeHandler()) {
212                 try {
213                     // DON'T CHANGE
214                     // Duplex handlers implements both out/in interfaces causing a scalability issue
215                     // see https://bugs.openjdk.org/browse/JDK-8180450
216                     final ChannelHandler handler = next.handler();
217                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
218                     if (handler == headContext) {
219                         headContext.channelActive(next);
220                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
221                         ((ChannelInboundHandlerAdapter) handler).channelActive(next);
222                     } else {
223                         ((ChannelInboundHandler) handler).channelActive(next);
224                     }
225                 } catch (Throwable t) {
226                     next.invokeExceptionCaught(t);
227                 }
228             } else {
229                 next.fireChannelActive();
230             }
231         } else {
232             next.executor().execute(this::fireChannelActive);
233         }
234         return this;
235     }
236 
237     @Override
238     public ChannelHandlerContext fireChannelInactive() {
239         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE);
240         if (next.executor().inEventLoop()) {
241             if (next.invokeHandler()) {
242                 try {
243                     // DON'T CHANGE
244                     // Duplex handlers implements both out/in interfaces causing a scalability issue
245                     // see https://bugs.openjdk.org/browse/JDK-8180450
246                     final ChannelHandler handler = next.handler();
247                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
248                     if (handler == headContext) {
249                         headContext.channelInactive(next);
250                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
251                         ((ChannelInboundHandlerAdapter) handler).channelInactive(next);
252                     } else {
253                         ((ChannelInboundHandler) handler).channelInactive(next);
254                     }
255                 } catch (Throwable t) {
256                     next.invokeExceptionCaught(t);
257                 }
258             } else {
259                 next.fireChannelInactive();
260             }
261         } else {
262             next.executor().execute(this::fireChannelInactive);
263         }
264         return this;
265     }
266 
267     @Override
268     public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
269         AbstractChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT);
270         ObjectUtil.checkNotNull(cause, "cause");
271         if (next.executor().inEventLoop()) {
272             next.invokeExceptionCaught(cause);
273         } else {
274             try {
275                 next.executor().execute(() -> next.invokeExceptionCaught(cause));
276             } catch (Throwable t) {
277                 if (logger.isWarnEnabled()) {
278                     logger.warn("Failed to submit an exceptionCaught() event.", t);
279                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
280                 }
281             }
282         }
283         return this;
284     }
285 
286     @SuppressWarnings("deprecation")
287     private void invokeExceptionCaught(final Throwable cause) {
288         if (invokeHandler()) {
289             try {
290                 handler().exceptionCaught(this, cause);
291             } catch (Throwable error) {
292                 if (logger.isDebugEnabled()) {
293                     logger.debug(
294                         "An exception " +
295                         "was thrown by a user handler's exceptionCaught() " +
296                         "method while handling the following exception:", cause);
297                 } else if (logger.isWarnEnabled()) {
298                     logger.warn(
299                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
300                         "was thrown by a user handler's exceptionCaught() " +
301                         "method while handling the following exception:", error, cause);
302                 }
303             }
304         } else {
305             fireExceptionCaught(cause);
306         }
307     }
308 
309     @Override
310     public ChannelHandlerContext fireUserEventTriggered(final Object event) {
311         ObjectUtil.checkNotNull(event, "event");
312         AbstractChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED);
313         if (next.executor().inEventLoop()) {
314             if (next.invokeHandler()) {
315                 try {
316                     // DON'T CHANGE
317                     // Duplex handlers implements both out/in interfaces causing a scalability issue
318                     // see https://bugs.openjdk.org/browse/JDK-8180450
319                     final ChannelHandler handler = next.handler();
320                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
321                     if (handler == headContext) {
322                         headContext.userEventTriggered(next, event);
323                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
324                         ((ChannelInboundHandlerAdapter) handler).userEventTriggered(next, event);
325                     } else {
326                         ((ChannelInboundHandler) handler).userEventTriggered(next, event);
327                     }
328                 } catch (Throwable t) {
329                     next.invokeExceptionCaught(t);
330                 }
331             } else {
332                 next.fireUserEventTriggered(event);
333             }
334         } else {
335             next.executor().execute(() -> fireUserEventTriggered(event));
336         }
337         return this;
338     }
339 
340     @Override
341     public ChannelHandlerContext fireChannelRead(final Object msg) {
342         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
343         if (next.executor().inEventLoop()) {
344             final Object m = pipeline.touch(msg, next);
345             if (next.invokeHandler()) {
346                 try {
347                     // DON'T CHANGE
348                     // Duplex handlers implements both out/in interfaces causing a scalability issue
349                     // see https://bugs.openjdk.org/browse/JDK-8180450
350                     final ChannelHandler handler = next.handler();
351                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
352                     if (handler == headContext) {
353                         headContext.channelRead(next, m);
354                     } else if (handler instanceof ChannelDuplexHandler) {
355                         ((ChannelDuplexHandler) handler).channelRead(next, m);
356                     } else {
357                         ((ChannelInboundHandler) handler).channelRead(next, m);
358                     }
359                 } catch (Throwable t) {
360                     next.invokeExceptionCaught(t);
361                 }
362             } else {
363                 next.fireChannelRead(m);
364             }
365         } else {
366             next.executor().execute(() -> fireChannelRead(msg));
367         }
368         return this;
369     }
370 
371     @Override
372     public ChannelHandlerContext fireChannelReadComplete() {
373         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
374         if (next.executor().inEventLoop()) {
375             if (next.invokeHandler()) {
376                 try {
377                     // DON'T CHANGE
378                     // Duplex handlers implements both out/in interfaces causing a scalability issue
379                     // see https://bugs.openjdk.org/browse/JDK-8180450
380                     final ChannelHandler handler = next.handler();
381                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
382                     if (handler == headContext) {
383                         headContext.channelReadComplete(next);
384                     } else if (handler instanceof ChannelDuplexHandler) {
385                         ((ChannelDuplexHandler) handler).channelReadComplete(next);
386                     } else {
387                         ((ChannelInboundHandler) handler).channelReadComplete(next);
388                     }
389                 } catch (Throwable t) {
390                     next.invokeExceptionCaught(t);
391                 }
392             } else {
393                 next.fireChannelReadComplete();
394             }
395         } else {
396             next.executor().execute(getInvokeTasks().fireChannelReadCompleteTask);
397         }
398         return this;
399     }
400 
401     @Override
402     public ChannelHandlerContext fireChannelWritabilityChanged() {
403         AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
404         if (next.executor().inEventLoop()) {
405             if (next.invokeHandler()) {
406                 try {
407                     // DON'T CHANGE
408                     // Duplex handlers implements both out/in interfaces causing a scalability issue
409                     // see https://bugs.openjdk.org/browse/JDK-8180450
410                     final ChannelHandler handler = next.handler();
411                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
412                     if (handler == headContext) {
413                         headContext.channelWritabilityChanged(next);
414                     } else if (handler instanceof ChannelInboundHandlerAdapter) {
415                         ((ChannelInboundHandlerAdapter) handler).channelWritabilityChanged(next);
416                     } else {
417                         ((ChannelInboundHandler) handler).channelWritabilityChanged(next);
418                     }
419                 } catch (Throwable t) {
420                     next.invokeExceptionCaught(t);
421                 }
422             } else {
423                 next.fireChannelWritabilityChanged();
424             }
425         } else {
426             next.executor().execute(getInvokeTasks().fireChannelWritabilityChangedTask);
427         }
428         return this;
429     }
430 
431     @Override
432     public ChannelFuture bind(SocketAddress localAddress) {
433         return bind(localAddress, newPromise());
434     }
435 
436     @Override
437     public ChannelFuture connect(SocketAddress remoteAddress) {
438         return connect(remoteAddress, newPromise());
439     }
440 
441     @Override
442     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
443         return connect(remoteAddress, localAddress, newPromise());
444     }
445 
446     @Override
447     public ChannelFuture disconnect() {
448         return disconnect(newPromise());
449     }
450 
451     @Override
452     public ChannelFuture close() {
453         return close(newPromise());
454     }
455 
456     @Override
457     public ChannelFuture deregister() {
458         return deregister(newPromise());
459     }
460 
461     /**
462      * If possible check if the given {@link ChannelPromise} is using the same {@link EventExecutor} as this
463      * {@link ChannelHandlerContext} and if not return a new {@link ChannelPromise} that runs on the same
464      * {@link EventExecutor} as this {@link ChannelHandlerContext}. The result of the new {@link ChannelPromise} is
465      * cascaded to the old {@link ChannelPromise}.
466      *
467      * This is done to ensure that {@link ChannelFutureListener}s that are added to the {@link ChannelPromise} by an
468      * {@link ChannelOutboundHandler} are executed in the same thread as the handler itself. By doing so we can
469      * ensure that there are not issues even if fields etc that are stored in the handler are modified by the listener.
470      */
471     private ChannelPromise ensurePromiseUseCorrectExecutor(ChannelPromise promise) {
472         if (promise instanceof DefaultChannelPromise && !((DefaultChannelPromise) promise).executor().inEventLoop()) {
473             ChannelPromise newPromise = newPromise();
474             PromiseNotifier.cascade(newPromise, promise);
475             return newPromise;
476         }
477         return promise;
478     }
479 
480     @Override
481     public ChannelFuture bind(final SocketAddress localAddress, ChannelPromise promise) {
482         ObjectUtil.checkNotNull(localAddress, "localAddress");
483         if (isNotValidPromise(promise, false)) {
484             // cancelled
485             return promise;
486         }
487 
488         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
489         EventExecutor executor = next.executor();
490         if (executor.inEventLoop()) {
491             if (next.invokeHandler()) {
492                 promise = ensurePromiseUseCorrectExecutor(promise);
493                 try {
494                     // DON'T CHANGE
495                     // Duplex handlers implements both out/in interfaces causing a scalability issue
496                     // see https://bugs.openjdk.org/browse/JDK-8180450
497                     final ChannelHandler handler = next.handler();
498                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
499                     if (handler == headContext) {
500                         headContext.bind(next, localAddress, promise);
501                     } else if (handler instanceof ChannelDuplexHandler) {
502                         ((ChannelDuplexHandler) handler).bind(next, localAddress, promise);
503                     } else if (handler instanceof ChannelOutboundHandlerAdapter) {
504                         ((ChannelOutboundHandlerAdapter) handler).bind(next, localAddress, promise);
505                     } else {
506                         ((ChannelOutboundHandler) handler).bind(next, localAddress, promise);
507                     }
508                 } catch (Throwable t) {
509                     notifyOutboundHandlerException(t, promise);
510                 }
511             } else {
512                 next.bind(localAddress, promise);
513             }
514         } else {
515             final ChannelPromise p = promise;
516             safeExecute(executor, () -> bind(localAddress, p), promise, null, false);
517         }
518         return promise;
519     }
520 
521     @Override
522     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
523         return connect(remoteAddress, null, promise);
524     }
525 
526     @Override
527     public ChannelFuture connect(
528             final SocketAddress remoteAddress, final SocketAddress localAddress, ChannelPromise promise) {
529         ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
530 
531         if (isNotValidPromise(promise, false)) {
532             // cancelled
533             return promise;
534         }
535 
536         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
537         EventExecutor executor = next.executor();
538         if (executor.inEventLoop()) {
539             if (next.invokeHandler()) {
540                 promise = ensurePromiseUseCorrectExecutor(promise);
541                 try {
542                     // DON'T CHANGE
543                     // Duplex handlers implements both out/in interfaces causing a scalability issue
544                     // see https://bugs.openjdk.org/browse/JDK-8180450
545                     final ChannelHandler handler = next.handler();
546                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
547                     if (handler == headContext) {
548                         headContext.connect(next, remoteAddress, localAddress, promise);
549                     } else if (handler instanceof ChannelDuplexHandler) {
550                         ((ChannelDuplexHandler) handler).connect(next, remoteAddress, localAddress, promise);
551                     } else if (handler instanceof ChannelOutboundHandlerAdapter) {
552                         ((ChannelOutboundHandlerAdapter) handler).connect(next, remoteAddress, localAddress, promise);
553                     } else {
554                         ((ChannelOutboundHandler) handler).connect(next, remoteAddress, localAddress, promise);
555                     }
556                 } catch (Throwable t) {
557                     notifyOutboundHandlerException(t, promise);
558                 }
559             } else {
560                 next.connect(remoteAddress, localAddress, promise);
561             }
562         } else {
563             final ChannelPromise p = promise;
564             safeExecute(executor, () -> connect(remoteAddress, localAddress, p), promise, null, false);
565         }
566         return promise;
567     }
568 
569     @Override
570     public ChannelFuture disconnect(ChannelPromise promise) {
571         if (!channel().metadata().hasDisconnect()) {
572             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
573             // So far, UDP/IP is the only transport that has such behavior.
574             return close(promise);
575         }
576         if (isNotValidPromise(promise, false)) {
577             // cancelled
578             return promise;
579         }
580 
581         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
582         EventExecutor executor = next.executor();
583         if (executor.inEventLoop()) {
584             if (next.invokeHandler()) {
585                 promise = ensurePromiseUseCorrectExecutor(promise);
586                 try {
587                     // DON'T CHANGE
588                     // Duplex handlers implements both out/in interfaces causing a scalability issue
589                     // see https://bugs.openjdk.org/browse/JDK-8180450
590                     final ChannelHandler handler = next.handler();
591                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
592                     if (handler == headContext) {
593                         headContext.disconnect(next, promise);
594                     } else if (handler instanceof ChannelDuplexHandler) {
595                         ((ChannelDuplexHandler) handler).disconnect(next, promise);
596                     } else if (handler instanceof ChannelOutboundHandlerAdapter) {
597                         ((ChannelOutboundHandlerAdapter) handler).disconnect(next, promise);
598                     } else {
599                         ((ChannelOutboundHandler) handler).disconnect(next, promise);
600                     }
601                 } catch (Throwable t) {
602                     notifyOutboundHandlerException(t, promise);
603                 }
604             } else {
605                 next.disconnect(promise);
606             }
607         } else {
608             final ChannelPromise p = promise;
609             safeExecute(executor, () -> disconnect(p), promise, null, false);
610         }
611         return promise;
612     }
613 
614     @Override
615     public ChannelFuture close(ChannelPromise promise) {
616         if (isNotValidPromise(promise, false)) {
617             // cancelled
618             return promise;
619         }
620 
621         final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
622         EventExecutor executor = next.executor();
623         if (executor.inEventLoop()) {
624             if (next.invokeHandler()) {
625                 promise = ensurePromiseUseCorrectExecutor(promise);
626                 try {
627                     // DON'T CHANGE
628                     // Duplex handlers implements both out/in interfaces causing a scalability issue
629                     // see https://bugs.openjdk.org/browse/JDK-8180450
630                     final ChannelHandler handler = next.handler();
631                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
632                     if (handler == headContext) {
633                         headContext.close(next, promise);
634                     } else if (handler instanceof ChannelDuplexHandler) {
635                         ((ChannelDuplexHandler) handler).close(next, promise);
636                     } else if (handler instanceof ChannelOutboundHandlerAdapter) {
637                         ((ChannelOutboundHandlerAdapter) handler).close(next, promise);
638                     } else {
639                         ((ChannelOutboundHandler) handler).close(next, promise);
640                     }
641                 } catch (Throwable t) {
642                     notifyOutboundHandlerException(t, promise);
643                 }
644             } else {
645                 next.close(promise);
646             }
647         } else {
648             final ChannelPromise p = promise;
649             safeExecute(executor, () -> close(p), promise, null, false);
650         }
651 
652         return promise;
653     }
654 
655     @Override
656     public ChannelFuture deregister(ChannelPromise promise) {
657         if (isNotValidPromise(promise, false)) {
658             // cancelled
659             return promise;
660         }
661 
662         final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
663         EventExecutor executor = next.executor();
664         if (executor.inEventLoop()) {
665             if (next.invokeHandler()) {
666                 promise = ensurePromiseUseCorrectExecutor(promise);
667                 try {
668                     // DON'T CHANGE
669                     // Duplex handlers implements both out/in interfaces causing a scalability issue
670                     // see https://bugs.openjdk.org/browse/JDK-8180450
671                     final ChannelHandler handler = next.handler();
672                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
673                     if (handler == headContext) {
674                         headContext.deregister(next, promise);
675                     } else if (handler instanceof ChannelDuplexHandler) {
676                         ((ChannelDuplexHandler) handler).deregister(next, promise);
677                     } else if (handler instanceof ChannelOutboundHandlerAdapter) {
678                         ((ChannelOutboundHandlerAdapter) handler).deregister(next, promise);
679                     } else {
680                         ((ChannelOutboundHandler) handler).deregister(next, promise);
681                     }
682                 } catch (Throwable t) {
683                     notifyOutboundHandlerException(t, promise);
684                 }
685             } else {
686                 deregister(promise);
687             }
688         } else {
689             final ChannelPromise p = promise;
690             safeExecute(executor, () -> deregister(p), promise, null, false);
691         }
692 
693         return promise;
694     }
695 
696     @Override
697     public ChannelHandlerContext read() {
698         final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
699         if (next.executor().inEventLoop()) {
700             if (next.invokeHandler()) {
701                 try {
702                     // DON'T CHANGE
703                     // Duplex handlers implements both out/in interfaces causing a scalability issue
704                     // see https://bugs.openjdk.org/browse/JDK-8180450
705                     final ChannelHandler handler = next.handler();
706                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
707                     if (handler == headContext) {
708                         headContext.read(next);
709                     } else if (handler instanceof ChannelDuplexHandler) {
710                         ((ChannelDuplexHandler) handler).read(next);
711                     } else if (handler instanceof ChannelOutboundHandlerAdapter) {
712                         ((ChannelOutboundHandlerAdapter) handler).read(next);
713                     } else {
714                         ((ChannelOutboundHandler) handler).read(next);
715                     }
716                 } catch (Throwable t) {
717                     next.invokeExceptionCaught(t);
718                 }
719             } else {
720                 next.read();
721             }
722         } else {
723             next.executor().execute(getInvokeTasks().readTask);
724         }
725         return this;
726     }
727 
728     @Override
729     public ChannelFuture write(Object msg) {
730         ChannelPromise promise = newPromise();
731         write(msg, false, promise);
732         return promise;
733     }
734 
735     @Override
736     public ChannelFuture write(final Object msg, final ChannelPromise promise) {
737         write(msg, false, promise);
738         return promise;
739     }
740 
741     @Override
742     public ChannelHandlerContext flush() {
743         final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
744         EventExecutor executor = next.executor();
745         if (executor.inEventLoop()) {
746             if (next.invokeHandler()) {
747                 try {
748                     // DON'T CHANGE
749                     // Duplex handlers implements both out/in interfaces causing a scalability issue
750                     // see https://bugs.openjdk.org/browse/JDK-8180450
751                     final ChannelHandler handler = next.handler();
752                     final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
753                     if (handler == headContext) {
754                         headContext.flush(next);
755                     } else if (handler instanceof ChannelDuplexHandler) {
756                         ((ChannelDuplexHandler) handler).flush(next);
757                     } else if (handler instanceof ChannelOutboundHandlerAdapter) {
758                         ((ChannelOutboundHandlerAdapter) handler).flush(next);
759                     } else {
760                         ((ChannelOutboundHandler) handler).flush(next);
761                     }
762                 } catch (Throwable t) {
763                     next.invokeExceptionCaught(t);
764                 }
765             } else {
766                 next.flush();
767             }
768         } else {
769             safeExecute(executor, getInvokeTasks().flushTask, channel().voidPromise(), null, false);
770         }
771         return this;
772     }
773 
774     @Override
775     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
776         write(msg, true, promise);
777         return promise;
778     }
779 
780     void write(Object msg, boolean flush, ChannelPromise promise) {
781         if (validateWrite(msg, promise)) {
782             final AbstractChannelHandlerContext next = findContextOutbound(flush ?
783                     MASK_WRITE | MASK_FLUSH : MASK_WRITE);
784             final Object m = pipeline.touch(msg, next);
785             EventExecutor executor = next.executor();
786             if (executor.inEventLoop()) {
787                 if (next.invokeHandler()) {
788                     promise = ensurePromiseUseCorrectExecutor(promise);
789                     try {
790                         // DON'T CHANGE
791                         // Duplex handlers implements both out/in interfaces causing a scalability issue
792                         // see https://bugs.openjdk.org/browse/JDK-8180450
793                         final ChannelHandler handler = next.handler();
794                         final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
795                         if (handler == headContext) {
796                             headContext.write(next, msg, promise);
797                         } else if (handler instanceof ChannelDuplexHandler) {
798                             ((ChannelDuplexHandler) handler).write(next, msg, promise);
799                         } else if (handler instanceof ChannelOutboundHandlerAdapter) {
800                             ((ChannelOutboundHandlerAdapter) handler).write(next, msg, promise);
801                         } else {
802                             ((ChannelOutboundHandler) handler).write(next, msg, promise);
803                         }
804                     } catch (Throwable t) {
805                         notifyOutboundHandlerException(t, promise);
806                     }
807                     if (flush) {
808                         try {
809                             // DON'T CHANGE
810                             // Duplex handlers implements both out/in interfaces causing a scalability issue
811                             // see https://bugs.openjdk.org/browse/JDK-8180450
812                             final ChannelHandler handler = next.handler();
813                             final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
814                             if (handler == headContext) {
815                                 headContext.flush(next);
816                             } else if (handler instanceof ChannelDuplexHandler) {
817                                 ((ChannelDuplexHandler) handler).flush(next);
818                             } else if (handler instanceof ChannelOutboundHandlerAdapter) {
819                                 ((ChannelOutboundHandlerAdapter) handler).flush(next);
820                             } else {
821                                 ((ChannelOutboundHandler) handler).flush(next);
822                             }
823                         } catch (Throwable t) {
824                             next.invokeExceptionCaught(t);
825                         }
826                     }
827                 } else {
828                     next.write(msg, flush, promise);
829                 }
830             } else {
831                 final WriteTask task = WriteTask.newInstance(this, m, promise, flush);
832                 if (!safeExecute(executor, task, promise, m, !flush)) {
833                     // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
834                     // and put it back in the Recycler for re-use later.
835                     //
836                     // See https://github.com/netty/netty/issues/8343.
837                     task.cancel();
838                 }
839             }
840         }
841     }
842 
843     private boolean validateWrite(Object msg, ChannelPromise promise) {
844         ObjectUtil.checkNotNull(msg, "msg");
845         try {
846             if (isNotValidPromise(promise, true)) {
847                 ReferenceCountUtil.release(msg);
848                 return false; // cancelled
849             }
850         } catch (RuntimeException e) {
851             ReferenceCountUtil.release(msg);
852             throw e;
853         }
854         return true;
855     }
856 
857     @Override
858     public ChannelFuture writeAndFlush(Object msg) {
859         return writeAndFlush(msg, newPromise());
860     }
861 
862     private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
863         // Only log if the given promise is not of type VoidChannelPromise as tryFailure(...) is expected to return
864         // false.
865         PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
866     }
867 
868     @Override
869     public ChannelPromise newPromise() {
870         return new DefaultChannelPromise(channel(), executor());
871     }
872 
873     @Override
874     public ChannelProgressivePromise newProgressivePromise() {
875         return new DefaultChannelProgressivePromise(channel(), executor());
876     }
877 
878     @Override
879     public ChannelFuture newSucceededFuture() {
880         ChannelFuture succeededFuture = this.succeededFuture;
881         if (succeededFuture == null) {
882             this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
883         }
884         return succeededFuture;
885     }
886 
887     @Override
888     public ChannelFuture newFailedFuture(Throwable cause) {
889         return new FailedChannelFuture(channel(), executor(), cause);
890     }
891 
892     private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
893         ObjectUtil.checkNotNull(promise, "promise");
894 
895         if (promise.isDone()) {
896             // Check if the promise was cancelled and if so signal that the processing of the operation
897             // should not be performed.
898             //
899             // See https://github.com/netty/netty/issues/2349
900             if (promise.isCancelled()) {
901                 return true;
902             }
903             throw new IllegalArgumentException("promise already done: " + promise);
904         }
905 
906         if (promise.channel() != channel()) {
907             throw new IllegalArgumentException(String.format(
908                     "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
909         }
910 
911         if (promise.getClass() == DefaultChannelPromise.class) {
912             return false;
913         }
914 
915         if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
916             throw new IllegalArgumentException(
917                     StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
918         }
919 
920         if (promise instanceof AbstractChannel.CloseFuture) {
921             throw new IllegalArgumentException(
922                     StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
923         }
924         return false;
925     }
926 
927     private AbstractChannelHandlerContext findContextInbound(int mask) {
928         AbstractChannelHandlerContext ctx = this;
929         EventExecutor currentExecutor = executor();
930         do {
931             ctx = ctx.next;
932         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
933         return ctx;
934     }
935 
936     private AbstractChannelHandlerContext findContextOutbound(int mask) {
937         AbstractChannelHandlerContext ctx = this;
938         EventExecutor currentExecutor = executor();
939         do {
940             ctx = ctx.prev;
941         } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
942         return ctx;
943     }
944 
945     private static boolean skipContext(
946             AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
947         // Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
948         return (ctx.executionMask & (onlyMask | mask)) == 0 ||
949                 // We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
950                 // everything to preserve ordering.
951                 //
952                 // See https://github.com/netty/netty/issues/10067
953                 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
954     }
955 
956     @Override
957     public ChannelPromise voidPromise() {
958         return channel().voidPromise();
959     }
960 
961     final void setRemoved() {
962         handlerState = REMOVE_COMPLETE;
963     }
964 
965     final boolean setAddComplete() {
966         for (;;) {
967             int oldState = handlerState;
968             if (oldState == REMOVE_COMPLETE) {
969                 return false;
970             }
971             // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
972             // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
973             // exposing ordering guarantees.
974             if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
975                 return true;
976             }
977         }
978     }
979 
980     final void setAddPending() {
981         boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
982         assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
983     }
984 
985     final void callHandlerAdded() throws Exception {
986         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
987         // any pipeline events ctx.handler() will miss them because the state will not allow it.
988         if (setAddComplete()) {
989             handler().handlerAdded(this);
990         }
991     }
992 
993     final void callHandlerRemoved() throws Exception {
994         try {
995             // Only call handlerRemoved(...) if we called handlerAdded(...) before.
996             if (handlerState == ADD_COMPLETE) {
997                 handler().handlerRemoved(this);
998             }
999         } finally {
1000             // Mark the handler as removed in any case.
1001             setRemoved();
1002         }
1003     }
1004 
1005     /**
1006      * Makes best possible effort to detect if {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called
1007      * yet. If not return {@code false} and if called or could not detect return {@code true}.
1008      *
1009      * If this method returns {@code false} we will not invoke the {@link ChannelHandler} but just forward the event.
1010      * This is needed as {@link DefaultChannelPipeline} may already put the {@link ChannelHandler} in the linked-list
1011      * but not called {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}.
1012      */
1013     boolean invokeHandler() {
1014         // Store in local variable to reduce volatile reads.
1015         int handlerState = this.handlerState;
1016         return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
1017     }
1018 
1019     @Override
1020     public boolean isRemoved() {
1021         return handlerState == REMOVE_COMPLETE;
1022     }
1023 
1024     @Override
1025     public <T> Attribute<T> attr(AttributeKey<T> key) {
1026         return channel().attr(key);
1027     }
1028 
1029     @Override
1030     public <T> boolean hasAttr(AttributeKey<T> key) {
1031         return channel().hasAttr(key);
1032     }
1033 
1034     private static boolean safeExecute(EventExecutor executor, Runnable runnable,
1035             ChannelPromise promise, Object msg, boolean lazy) {
1036         try {
1037             if (lazy && executor instanceof AbstractEventExecutor) {
1038                 ((AbstractEventExecutor) executor).lazyExecute(runnable);
1039             } else {
1040                 executor.execute(runnable);
1041             }
1042             return true;
1043         } catch (Throwable cause) {
1044             try {
1045                 if (msg != null) {
1046                     ReferenceCountUtil.release(msg);
1047                 }
1048             } finally {
1049                 promise.setFailure(cause);
1050             }
1051             return false;
1052         }
1053     }
1054 
1055     @Override
1056     public String toHintString() {
1057         return '\'' + name + "' will handle the message from this point.";
1058     }
1059 
1060     @Override
1061     public String toString() {
1062         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1063     }
1064 
1065     Tasks getInvokeTasks() {
1066         Tasks tasks = invokeTasks;
1067         if (tasks == null) {
1068             invokeTasks = tasks = new Tasks(this);
1069         }
1070         return tasks;
1071     }
1072 
1073     static final class WriteTask implements Runnable {
1074         private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
1075             @Override
1076             protected WriteTask newObject(Handle<WriteTask> handle) {
1077                 return new WriteTask(handle);
1078             }
1079         };
1080 
1081         static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1082                 Object msg, ChannelPromise promise, boolean flush) {
1083             WriteTask task = RECYCLER.get();
1084             init(task, ctx, msg, promise, flush);
1085             return task;
1086         }
1087 
1088         private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1089                 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1090 
1091         // Assuming compressed oops, 12 bytes obj header, 4 ref fields and one int field
1092         private static final int WRITE_TASK_OVERHEAD =
1093                 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1094 
1095         private final Handle<WriteTask> handle;
1096         private AbstractChannelHandlerContext ctx;
1097         private Object msg;
1098         private ChannelPromise promise;
1099         private int size; // sign bit controls flush
1100 
1101         private WriteTask(Handle<WriteTask> handle) {
1102             this.handle = handle;
1103         }
1104 
1105         static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1106                                    Object msg, ChannelPromise promise, boolean flush) {
1107             task.ctx = ctx;
1108             task.msg = msg;
1109             task.promise = promise;
1110 
1111             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1112                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1113                 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1114             } else {
1115                 task.size = 0;
1116             }
1117             if (flush) {
1118                 task.size |= Integer.MIN_VALUE;
1119             }
1120         }
1121 
1122         @Override
1123         public void run() {
1124             try {
1125                 decrementPendingOutboundBytes();
1126                 ctx.write(msg, size < 0, promise);
1127             } finally {
1128                 recycle();
1129             }
1130         }
1131 
1132         void cancel() {
1133             try {
1134                 decrementPendingOutboundBytes();
1135             } finally {
1136                 recycle();
1137             }
1138         }
1139 
1140         private void decrementPendingOutboundBytes() {
1141             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1142                 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1143             }
1144         }
1145 
1146         private void recycle() {
1147             // Set to null so the GC can collect them directly
1148             ctx = null;
1149             msg = null;
1150             promise = null;
1151             handle.recycle(this);
1152         }
1153     }
1154 
1155     static final class Tasks {
1156         final Runnable fireChannelReadCompleteTask;
1157         private final Runnable readTask;
1158         private final Runnable fireChannelWritabilityChangedTask;
1159         private final Runnable flushTask;
1160 
1161         Tasks(AbstractChannelHandlerContext ctx) {
1162             fireChannelReadCompleteTask = ctx::fireChannelReadComplete;
1163             readTask = ctx::read;
1164             fireChannelWritabilityChangedTask = ctx::fireChannelWritabilityChanged;
1165             flushTask = ctx::flush;
1166         }
1167     }
1168 }