View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel;
17  
18  import io.netty5.util.Resource;
19  import io.netty5.util.ResourceLeakHint;
20  import io.netty5.util.concurrent.EventExecutor;
21  import io.netty5.util.concurrent.Future;
22  import io.netty5.util.concurrent.Promise;
23  import io.netty5.util.internal.ObjectPool;
24  import io.netty5.util.internal.StringUtil;
25  import io.netty5.util.internal.SystemPropertyUtil;
26  import io.netty5.util.internal.ThrowableUtil;
27  import io.netty5.util.internal.logging.InternalLogger;
28  import io.netty5.util.internal.logging.InternalLoggerFactory;
29  
30  import java.net.SocketAddress;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.TimeUnit;
33  
34  import static io.netty5.channel.ChannelHandlerMask.MASK_BIND;
35  import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
36  import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
37  import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
38  import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
39  import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
40  import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_SHUTDOWN;
41  import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
42  import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
43  import static io.netty5.channel.ChannelHandlerMask.MASK_CLOSE;
44  import static io.netty5.channel.ChannelHandlerMask.MASK_CONNECT;
45  import static io.netty5.channel.ChannelHandlerMask.MASK_DEREGISTER;
46  import static io.netty5.channel.ChannelHandlerMask.MASK_DISCONNECT;
47  import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_EXCEPTION_CAUGHT;
48  import static io.netty5.channel.ChannelHandlerMask.MASK_FLUSH;
49  import static io.netty5.channel.ChannelHandlerMask.MASK_READ;
50  import static io.netty5.channel.ChannelHandlerMask.MASK_REGISTER;
51  import static io.netty5.channel.ChannelHandlerMask.MASK_SHUTDOWN;
52  import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_INBOUND_EVENT;
53  import static io.netty5.channel.ChannelHandlerMask.MASK_SEND_OUTBOUND_EVENT;
54  import static io.netty5.channel.ChannelHandlerMask.MASK_WRITE;
55  import static io.netty5.channel.ChannelHandlerMask.MASK_PENDING_OUTBOUND_BYTES;
56  import static io.netty5.channel.ChannelHandlerMask.mask;
57  import static java.util.Objects.requireNonNull;
58  
59  final class DefaultChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
60  
61      private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelHandlerContext.class);
62  
63      /**
64       * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
65       * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
66       */
67      private static final int INIT = 0;
68  
69      /**
70       * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
71       */
72      private static final int ADD_COMPLETE = 1;
73  
74      /**
75       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} is about to be called.
76       */
77      private static final int REMOVE_STARTED = 2;
78  
79      /**
80       * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
81       */
82      private static final int REMOVE_COMPLETE = 3;
83  
84      private final int executionMask;
85      private final DefaultChannelPipeline pipeline;
86      private final ChannelHandler handler;
87      private final String name;
88  
89      // Is null if the ChannelHandler not implements pendingOutboundBytes(...).
90      private final DefaultChannelHandlerContextAwareEventExecutor executor;
91      private long currentPendingBytes;
92  
93      // Lazily instantiated tasks used to trigger events to a handler with different executor.
94      // There is no need to make this volatile as at worse it will just create a few more instances than needed.
95      private Tasks invokeTasks;
96      private int handlerState = INIT;
97  
98      private volatile boolean removed;
99  
100     DefaultChannelHandlerContext next;
101     DefaultChannelHandlerContext prev;
102 
103     DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name,
104                                  ChannelHandler handler) {
105         this.name = requireNonNull(name, "name");
106         this.pipeline = pipeline;
107         executionMask = mask(handler.getClass());
108         this.handler = handler;
109         // Wrap the executor if the ChannelHandler implements pendingOutboundBytes(ChannelHandlerContext) so we are
110         // sure that the pending bytes will be updated correctly in all cases. Otherwise, we don't need any special
111         // wrapping and so can save some work (which is true most of the time).
112         this.executor = handlesPendingOutboundBytes(executionMask) ?
113                 new DefaultChannelHandlerContextAwareEventExecutor(pipeline.executor(), this) : null;
114     }
115 
116     private static boolean handlesPendingOutboundBytes(int mask) {
117         return (mask & MASK_PENDING_OUTBOUND_BYTES) != 0;
118     }
119 
120     private static Future<Void> failRemoved(DefaultChannelHandlerContext ctx) {
121         return ctx.newFailedFuture(newRemovedException(ctx, null));
122     }
123 
124     private void notifyHandlerRemovedAlready() {
125         notifyHandlerRemovedAlready(null);
126     }
127 
128     private void notifyHandlerRemovedAlready(Throwable cause) {
129         pipeline().fireChannelExceptionCaught(newRemovedException(this, cause));
130     }
131 
132     private static ChannelPipelineException newRemovedException(ChannelHandlerContext ctx, Throwable cause) {
133         return new ChannelPipelineException("Context " + ctx + " already removed", cause);
134     }
135 
136     private Tasks invokeTasks() {
137         Tasks tasks = invokeTasks;
138         if (tasks == null) {
139             invokeTasks = tasks = new Tasks(this);
140         }
141         return tasks;
142     }
143 
144     @Override
145     public EventExecutor executor() {
146         return executor == null ? pipeline().executor() : executor;
147     }
148 
149     @Override
150     public ChannelHandler handler() {
151         return handler;
152     }
153 
154     @Override
155     public ChannelPipeline pipeline() {
156         return pipeline;
157     }
158 
159     @Override
160     public String name() {
161         return name;
162     }
163 
164     private EventExecutor originalExecutor() {
165         return executor == null ? pipeline().executor() : executor.wrappedExecutor();
166     }
167 
168     @Override
169     public ChannelHandlerContext fireChannelRegistered() {
170         EventExecutor executor = originalExecutor();
171         if (executor.inEventLoop()) {
172             findAndInvokeChannelRegistered();
173         } else {
174             executor.execute(this::findAndInvokeChannelRegistered);
175         }
176         return this;
177     }
178 
179     private void findAndInvokeChannelRegistered() {
180         DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_REGISTERED);
181         if (ctx == null) {
182             notifyHandlerRemovedAlready();
183             return;
184         }
185         ctx.invokeChannelRegistered();
186     }
187 
188     void invokeChannelRegistered() {
189         try {
190             if (!saveCurrentPendingBytesIfNeededInbound()) {
191                 return;
192             }
193             handler().channelRegistered(this);
194         } catch (Throwable t) {
195             invokeChannelExceptionCaught(t);
196         } finally {
197             updatePendingBytesIfNeeded();
198         }
199     }
200 
201     @Override
202     public ChannelHandlerContext fireChannelUnregistered() {
203         EventExecutor executor = originalExecutor();
204         if (executor.inEventLoop()) {
205             findAndInvokeChannelUnregistered();
206         } else {
207             executor.execute(this::findAndInvokeChannelUnregistered);
208         }
209         return this;
210     }
211 
212     private void findAndInvokeChannelUnregistered() {
213         DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_UNREGISTERED);
214         if (ctx == null) {
215             notifyHandlerRemovedAlready();
216             return;
217         }
218         ctx.invokeChannelUnregistered();
219     }
220 
221     void invokeChannelUnregistered() {
222         if (!saveCurrentPendingBytesIfNeededInbound()) {
223             return;
224         }
225         try {
226             handler().channelUnregistered(this);
227         } catch (Throwable t) {
228             invokeChannelExceptionCaught(t);
229         } finally {
230             updatePendingBytesIfNeeded();
231         }
232     }
233 
234     @Override
235     public ChannelHandlerContext fireChannelActive() {
236         EventExecutor executor = originalExecutor();
237         if (executor.inEventLoop()) {
238             findAndInvokeChannelActive();
239         } else {
240             executor.execute(this::findAndInvokeChannelActive);
241         }
242         return this;
243     }
244 
245     private void findAndInvokeChannelActive() {
246         DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_ACTIVE);
247         if (ctx == null) {
248             notifyHandlerRemovedAlready();
249             return;
250         }
251         ctx.invokeChannelActive();
252     }
253 
254     void invokeChannelActive() {
255         if (!saveCurrentPendingBytesIfNeededInbound()) {
256             return;
257         }
258         try {
259             handler().channelActive(this);
260         } catch (Throwable t) {
261             invokeChannelExceptionCaught(t);
262         } finally {
263             updatePendingBytesIfNeeded();
264         }
265     }
266 
267     @Override
268     public ChannelHandlerContext fireChannelInactive() {
269         EventExecutor executor = originalExecutor();
270         if (executor.inEventLoop()) {
271             findAndInvokeChannelInactive();
272         } else {
273             executor.execute(this::findAndInvokeChannelInactive);
274         }
275         return this;
276     }
277 
278     private void findAndInvokeChannelInactive() {
279         DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_INACTIVE);
280         if (ctx == null) {
281             notifyHandlerRemovedAlready();
282             return;
283         }
284         ctx.invokeChannelInactive();
285     }
286 
287     void invokeChannelInactive() {
288         if (!saveCurrentPendingBytesIfNeededInbound()) {
289             return;
290         }
291         try {
292             handler().channelInactive(this);
293         } catch (Throwable t) {
294             invokeChannelExceptionCaught(t);
295         } finally {
296             updatePendingBytesIfNeeded();
297         }
298     }
299 
300     @Override
301     public ChannelHandlerContext fireChannelShutdown(ChannelShutdownDirection direction) {
302         EventExecutor executor = originalExecutor();
303         if (executor.inEventLoop()) {
304             findAndInvokeChannelShutdown(direction);
305         } else {
306             executor.execute(() -> findAndInvokeChannelShutdown(direction));
307         }
308         return this;
309     }
310 
311     private void findAndInvokeChannelShutdown(ChannelShutdownDirection direction) {
312         DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_SHUTDOWN);
313         if (ctx == null) {
314             notifyHandlerRemovedAlready();
315             return;
316         }
317         ctx.invokeChannelShutdown(direction);
318     }
319 
320     void invokeChannelShutdown(ChannelShutdownDirection direction) {
321         if (!saveCurrentPendingBytesIfNeededInbound()) {
322             return;
323         }
324         try {
325             handler().channelShutdown(this, direction);
326         } catch (Throwable t) {
327             invokeChannelExceptionCaught(t);
328         } finally {
329             updatePendingBytesIfNeeded();
330         }
331     }
332 
333     @Override
334     public ChannelHandlerContext fireChannelExceptionCaught(Throwable cause) {
335         requireNonNull(cause, "cause");
336         EventExecutor executor = originalExecutor();
337         if (executor.inEventLoop()) {
338             findAndInvokeChannelExceptionCaught(cause);
339         } else {
340             try {
341                 executor.execute(() -> findAndInvokeChannelExceptionCaught(cause));
342             } catch (Throwable t) {
343                 if (logger.isWarnEnabled()) {
344                     logger.warn("Failed to submit an exceptionCaught() event.", t);
345                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
346                 }
347             }
348         }
349         return this;
350     }
351 
352     private void findAndInvokeChannelExceptionCaught(Throwable cause) {
353         DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_EXCEPTION_CAUGHT);
354         if (ctx == null) {
355             notifyHandlerRemovedAlready(cause);
356             return;
357         }
358         ctx.invokeChannelExceptionCaught(cause);
359     }
360 
361     void invokeChannelExceptionCaught(final Throwable cause) {
362         if (!saveCurrentPendingBytesIfNeededInbound()) {
363             return;
364         }
365         try {
366             handler().channelExceptionCaught(this, cause);
367         } catch (Throwable error) {
368             if (logger.isDebugEnabled()) {
369                 logger.debug(
370                         "An exception {}" +
371                                 "was thrown by a user handler's exceptionCaught() " +
372                                 "method while handling the following exception:",
373                         ThrowableUtil.stackTraceToString(error), cause);
374             } else if (logger.isWarnEnabled()) {
375                 logger.warn(
376                         "An exception '{}' [enable DEBUG level for full stacktrace] " +
377                                 "was thrown by a user handler's exceptionCaught() " +
378                                 "method while handling the following exception:", error, cause);
379             }
380         } finally {
381             updatePendingBytesIfNeeded();
382         }
383     }
384 
385     @Override
386     public ChannelHandlerContext fireChannelInboundEvent(Object event) {
387         requireNonNull(event, "event");
388         EventExecutor executor = originalExecutor();
389         if (executor.inEventLoop()) {
390             findAndInvokeChannelInboundEvent(event);
391         } else {
392             executor.execute(() -> findAndInvokeChannelInboundEvent(event));
393         }
394         return this;
395     }
396 
397     private void findAndInvokeChannelInboundEvent(Object event) {
398         DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_INBOUND_EVENT);
399         if (ctx == null) {
400             Resource.dispose(event);
401             notifyHandlerRemovedAlready();
402             return;
403         }
404         ctx.invokeChannelInboundEvent(event);
405     }
406 
407     void invokeChannelInboundEvent(Object event) {
408         if (!saveCurrentPendingBytesIfNeededInbound()) {
409             Resource.dispose(event);
410             return;
411         }
412         try {
413             handler().channelInboundEvent(this, event);
414         } catch (Throwable t) {
415             invokeChannelExceptionCaught(t);
416         } finally {
417             updatePendingBytesIfNeeded();
418         }
419     }
420 
421     @Override
422     public ChannelHandlerContext fireChannelRead(final Object msg) {
423         requireNonNull(msg, "msg");
424         EventExecutor executor = originalExecutor();
425         if (executor.inEventLoop()) {
426             findAndInvokeChannelRead(msg);
427         } else {
428             try {
429                 executor.execute(() -> findAndInvokeChannelRead(msg));
430             } catch (Throwable cause) {
431                 Resource.dispose(msg);
432                 throw cause;
433             }
434         }
435         return this;
436     }
437 
438     private void findAndInvokeChannelRead(Object msg) {
439         DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_READ);
440         if (ctx == null) {
441             Resource.dispose(msg);
442             notifyHandlerRemovedAlready();
443             return;
444         }
445         ctx.invokeChannelRead(msg);
446     }
447 
448     void invokeChannelRead(Object msg) {
449         final Object m = pipeline.touch(requireNonNull(msg, "msg"), this);
450         if (!saveCurrentPendingBytesIfNeededInbound()) {
451             Resource.dispose(m);
452             return;
453         }
454         try {
455             handler().channelRead(this, m);
456         } catch (Throwable t) {
457             invokeChannelExceptionCaught(t);
458         } finally {
459             updatePendingBytesIfNeeded();
460         }
461     }
462 
463     @Override
464     public ChannelHandlerContext fireChannelReadComplete() {
465         EventExecutor executor = originalExecutor();
466         if (executor.inEventLoop()) {
467             findAndInvokeChannelReadComplete();
468         } else {
469             Tasks tasks = invokeTasks();
470             executor.execute(tasks.invokeChannelReadCompleteTask);
471         }
472         return this;
473     }
474 
475     private void findAndInvokeChannelReadComplete() {
476         DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
477         if (ctx == null) {
478             notifyHandlerRemovedAlready();
479             return;
480         }
481         ctx.invokeChannelReadComplete();
482     }
483 
484     void invokeChannelReadComplete() {
485         if (!saveCurrentPendingBytesIfNeededInbound()) {
486             return;
487         }
488         try {
489             handler().channelReadComplete(this);
490         } catch (Throwable t) {
491             invokeChannelExceptionCaught(t);
492         } finally {
493             updatePendingBytesIfNeeded();
494         }
495     }
496 
497     @Override
498     public ChannelHandlerContext fireChannelWritabilityChanged() {
499         EventExecutor executor = originalExecutor();
500         if (executor.inEventLoop()) {
501             findAndInvokeChannelWritabilityChanged();
502         } else {
503             Tasks tasks = invokeTasks();
504             executor.execute(tasks.invokeChannelWritableStateChangedTask);
505         }
506         return this;
507     }
508 
509     private void findAndInvokeChannelWritabilityChanged() {
510         DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
511         if (ctx == null) {
512             notifyHandlerRemovedAlready();
513             return;
514         }
515         ctx.invokeChannelWritabilityChanged();
516     }
517 
518     void invokeChannelWritabilityChanged() {
519         if (!saveCurrentPendingBytesIfNeededInbound()) {
520             return;
521         }
522         try {
523             handler().channelWritabilityChanged(this);
524         } catch (Throwable t) {
525             invokeChannelExceptionCaught(t);
526         } finally {
527             updatePendingBytesIfNeeded();
528         }
529     }
530 
531     @Override
532     public Future<Void> bind(SocketAddress localAddress) {
533         requireNonNull(localAddress, "localAddress");
534 
535         EventExecutor executor = originalExecutor();
536         if (executor.inEventLoop()) {
537             return findAndInvokeBind(localAddress);
538         }
539 
540         Promise<Void> promise  = newPromise();
541         safeExecute(executor, () -> findAndInvokeBind(localAddress).cascadeTo(promise), promise, null);
542         return promise.asFuture();
543     }
544 
545     @Override
546     public Future<Void> connect(SocketAddress remoteAddress) {
547         return connect(remoteAddress, null);
548     }
549 
550     @Override
551     public Future<Void> deregister() {
552         EventExecutor executor = originalExecutor();
553         if (executor.inEventLoop()) {
554             return findAndInvokeDeregister();
555         }
556         Promise<Void> promise  = newPromise();
557         safeExecute(executor, () -> findAndInvokeDeregister().cascadeTo(promise), promise, null);
558         return promise.asFuture();
559     }
560     private Future<Void> findAndInvokeBind(SocketAddress localAddress) {
561         DefaultChannelHandlerContext ctx = findContextOutbound(MASK_BIND);
562         if (ctx == null) {
563             return failRemoved(this);
564         }
565         return ctx.invokeBind(localAddress);
566     }
567 
568     private Future<Void> invokeBind(SocketAddress localAddress) {
569         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
570         if (failed != null) {
571             return failed;
572         }
573 
574         try {
575             return handler().bind(this, localAddress);
576         } catch (Throwable t) {
577             return handleOutboundHandlerException(t, false);
578         } finally {
579             updatePendingBytesIfNeeded();
580         }
581     }
582 
583     @Override
584     public Future<Void> connect(
585             final SocketAddress remoteAddress, final SocketAddress localAddress) {
586         requireNonNull(remoteAddress, "remoteAddress");
587         EventExecutor executor = originalExecutor();
588         if (executor.inEventLoop()) {
589             return findAndInvokeConnect(remoteAddress, localAddress);
590         }
591         Promise<Void> promise  = newPromise();
592         safeExecute(executor, () ->
593                 findAndInvokeConnect(remoteAddress, localAddress).cascadeTo(promise), promise, null);
594 
595         return promise.asFuture();
596     }
597 
598     private Future<Void> findAndInvokeConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
599         DefaultChannelHandlerContext ctx = findContextOutbound(MASK_CONNECT);
600         if (ctx == null) {
601             return failRemoved(this);
602         }
603         return ctx.invokeConnect(remoteAddress, localAddress);
604     }
605 
606     private Future<Void> invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
607         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
608         if (failed != null) {
609             return failed;
610         }
611 
612         try {
613             return handler().connect(this, remoteAddress, localAddress);
614         } catch (Throwable t) {
615             return handleOutboundHandlerException(t, false);
616         } finally {
617             updatePendingBytesIfNeeded();
618         }
619     }
620 
621     @Override
622     public Future<Void> disconnect() {
623         if (!channel().metadata().hasDisconnect()) {
624             // Translate disconnect to close if the channel has no notion of disconnect-reconnect.
625             // So far, UDP/IP is the only transport that has such behavior.
626             return close();
627         }
628 
629         EventExecutor executor = originalExecutor();
630         if (executor.inEventLoop()) {
631             return findAndInvokeDisconnect();
632         }
633         Promise<Void> promise  = newPromise();
634         safeExecute(executor, () -> findAndInvokeDisconnect().cascadeTo(promise), promise, null);
635         return promise.asFuture();
636     }
637 
638     private Future<Void> findAndInvokeDisconnect() {
639         DefaultChannelHandlerContext ctx = findContextOutbound(MASK_DISCONNECT);
640         if (ctx == null) {
641             return failRemoved(this);
642         }
643         return ctx.invokeDisconnect();
644     }
645 
646     private Future<Void> invokeDisconnect() {
647         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
648         if (failed != null) {
649             return failed;
650         }
651 
652         try {
653             return handler().disconnect(this);
654         } catch (Throwable t) {
655             return handleOutboundHandlerException(t, false);
656         } finally {
657             updatePendingBytesIfNeeded();
658         }
659     }
660 
661     @Override
662     public Future<Void> close() {
663         EventExecutor executor = originalExecutor();
664         if (executor.inEventLoop()) {
665             return findAndInvokeClose();
666         }
667         Promise<Void> promise  = newPromise();
668         safeExecute(executor, () -> findAndInvokeClose().cascadeTo(promise), promise, null);
669         return promise.asFuture();
670     }
671 
672     private Future<Void> findAndInvokeClose() {
673         DefaultChannelHandlerContext ctx = findContextOutbound(MASK_CLOSE);
674         if (ctx == null) {
675             return failRemoved(this);
676         }
677         return ctx.invokeClose();
678     }
679 
680     private Future<Void> invokeClose() {
681         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
682         if (failed != null) {
683             return failed;
684         }
685 
686         try {
687             return handler().close(this);
688         } catch (Throwable t) {
689             return handleOutboundHandlerException(t, true);
690         } finally {
691             updatePendingBytesIfNeeded();
692         }
693     }
694 
695     @Override
696     public Future<Void> shutdown(ChannelShutdownDirection direction) {
697         EventExecutor executor = originalExecutor();
698         if (executor.inEventLoop()) {
699             return findAndInvokeShutdown(direction);
700         }
701         Promise<Void> promise  = newPromise();
702         safeExecute(executor, () -> findAndInvokeShutdown(direction).cascadeTo(promise), promise, null);
703         return promise.asFuture();
704     }
705 
706     private Future<Void> findAndInvokeShutdown(ChannelShutdownDirection direction) {
707         DefaultChannelHandlerContext ctx = findContextOutbound(MASK_SHUTDOWN);
708         if (ctx == null) {
709             return failRemoved(this);
710         }
711         return ctx.invokeShutdown(direction);
712     }
713 
714     private Future<Void> invokeShutdown(ChannelShutdownDirection direction) {
715         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
716         if (failed != null) {
717             return failed;
718         }
719 
720         try {
721             return handler().shutdown(this, direction);
722         } catch (Throwable t) {
723             return handleOutboundHandlerException(t, true);
724         } finally {
725             updatePendingBytesIfNeeded();
726         }
727     }
728 
729     @Override
730     public Future<Void> register() {
731         EventExecutor executor = originalExecutor();
732         if (executor.inEventLoop()) {
733             return findAndInvokeRegister();
734         }
735         Promise<Void> promise  = newPromise();
736         safeExecute(executor, () -> findAndInvokeRegister().cascadeTo(promise), promise, null);
737         return promise.asFuture();
738     }
739 
740     private Future<Void> findAndInvokeRegister() {
741         DefaultChannelHandlerContext ctx = findContextOutbound(MASK_REGISTER);
742         if (ctx == null) {
743             return failRemoved(this);
744         }
745         return ctx.invokeRegister();
746     }
747 
748     private Future<Void> invokeRegister() {
749         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
750         if (failed != null) {
751             return failed;
752         }
753 
754         try {
755             return handler().register(this);
756         } catch (Throwable t) {
757             return handleOutboundHandlerException(t, false);
758         } finally {
759             updatePendingBytesIfNeeded();
760         }
761     }
762 
763     private Future<Void> findAndInvokeDeregister() {
764         DefaultChannelHandlerContext ctx = findContextOutbound(MASK_DEREGISTER);
765         if (ctx == null) {
766             return failRemoved(this);
767         }
768         return ctx.invokeDeregister();
769     }
770 
771     private Future<Void> invokeDeregister() {
772         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
773         if (failed != null) {
774             return failed;
775         }
776 
777         try {
778             return handler().deregister(this);
779         } catch (Throwable t) {
780             return handleOutboundHandlerException(t, false);
781         } finally {
782             updatePendingBytesIfNeeded();
783         }
784     }
785 
786     @Override
787     public ChannelHandlerContext read() {
788         EventExecutor executor = originalExecutor();
789         if (executor.inEventLoop()) {
790             findAndInvokeRead();
791         } else {
792             Tasks tasks = invokeTasks();
793             executor.execute(tasks.invokeReadTask);
794         }
795         return this;
796     }
797 
798     private void findAndInvokeRead() {
799         DefaultChannelHandlerContext ctx = findContextOutbound(MASK_READ);
800         if (ctx != null) {
801             ctx.invokeRead();
802         }
803     }
804 
805     private void invokeRead() {
806         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
807         if (failed != null) {
808             return;
809         }
810 
811         try {
812             handler().read(this);
813         } catch (Throwable t) {
814             handleOutboundHandlerException(t, false);
815         } finally {
816             updatePendingBytesIfNeeded();
817         }
818     }
819 
820     @Override
821     public Future<Void> write(Object msg) {
822         return write(msg, false);
823     }
824 
825     private Future<Void> invokeWrite(Object msg) {
826         final Object m = pipeline.touch(msg, this);
827         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
828         if (failed != null) {
829             Resource.dispose(m);
830             return failed;
831         }
832 
833         try {
834             return handler().write(this, m);
835         } catch (Throwable t) {
836             return handleOutboundHandlerException(t, false);
837         } finally {
838             updatePendingBytesIfNeeded();
839         }
840     }
841 
842     @Override
843     public ChannelHandlerContext flush() {
844         EventExecutor executor = originalExecutor();
845         if (executor.inEventLoop()) {
846             findAndInvokeFlush();
847         } else {
848             Tasks tasks = invokeTasks();
849             Promise<Void> promise = newPromise();
850             promise.asFuture().addListener(channel(), ChannelFutureListeners.FIRE_EXCEPTION_ON_FAILURE);
851             // If flush throws we want to at least propagate the exception through the ChannelPipeline
852             // as otherwise the user will not be made aware of the failure at all.
853             safeExecute(executor, tasks.invokeFlushTask, promise, null);
854         }
855 
856         return this;
857     }
858 
859     private void findAndInvokeFlush() {
860         DefaultChannelHandlerContext ctx = findContextOutbound(MASK_FLUSH);
861         if (ctx != null) {
862             ctx.invokeFlush();
863         }
864     }
865 
866     private void invokeFlush() {
867         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
868         if (failed != null) {
869             return;
870         }
871 
872         try {
873             handler().flush(this);
874         } catch (Throwable t) {
875             handleOutboundHandlerException(t, false);
876         } finally {
877             updatePendingBytesIfNeeded();
878         }
879     }
880 
881     @Override
882     public Future<Void> writeAndFlush(Object msg) {
883         return write(msg, true);
884     }
885 
886     private Future<Void> invokeWriteAndFlush(Object msg) {
887         Future<Void> f = invokeWrite(msg);
888         invokeFlush();
889         return f;
890     }
891 
892     private Future<Void> write(Object msg, boolean flush) {
893         requireNonNull(msg, "msg");
894 
895         EventExecutor executor = originalExecutor();
896         if (executor.inEventLoop()) {
897             final DefaultChannelHandlerContext next = findContextOutbound(flush ?
898                     (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
899             if (next == null) {
900                 Resource.dispose(msg);
901                 return failRemoved(this);
902             }
903             if (flush) {
904                 return next.invokeWriteAndFlush(msg);
905             }
906             return next.invokeWrite(msg);
907         } else {
908             Promise<Void> promise  = newPromise();
909             final AbstractWriteTask task;
910             if (flush) {
911                 task = WriteAndFlushTask.newInstance(this, msg, promise);
912             }  else {
913                 task = WriteTask.newInstance(this, msg, promise);
914             }
915             if (task != null && !safeExecute(executor, task, promise, msg)) {
916                 // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
917                 // and put it back in the Recycler for re-use later.
918                 //
919                 // See https://github.com/netty/netty/issues/8343.
920                 task.cancel();
921             }
922             return promise.asFuture();
923         }
924     }
925 
926     @Override
927     public Future<Void> sendOutboundEvent(Object event) {
928         EventExecutor executor = originalExecutor();
929         if (executor.inEventLoop()) {
930             return findAndInvokeSendOutboundEvent(event);
931         }
932         Promise<Void> promise  = newPromise();
933         safeExecute(executor, () -> findAndInvokeSendOutboundEvent(event).cascadeTo(promise), promise, event);
934         return promise.asFuture();
935     }
936 
937     private Future<Void> findAndInvokeSendOutboundEvent(Object event) {
938         DefaultChannelHandlerContext ctx = findContextOutbound(MASK_SEND_OUTBOUND_EVENT);
939         if (ctx == null) {
940             return failRemoved(this);
941         }
942         return ctx.invokeSendOutboundEvent(event);
943     }
944 
945     private Future<Void> invokeSendOutboundEvent(Object event) {
946         Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
947         if (failed != null) {
948             Resource.dispose(event);
949             return failed;
950         }
951 
952         try {
953             return handler().sendOutboundEvent(this, event);
954         } catch (Throwable t) {
955             return handleOutboundHandlerException(t, false);
956         } finally {
957             updatePendingBytesIfNeeded();
958         }
959     }
960 
961     private Future<Void> handleOutboundHandlerException(Throwable cause, boolean closeDidThrow) {
962         String msg = handler() + " threw an exception while handling an outbound event. This is most likely a bug";
963 
964         logger.warn("{}. This is most likely a bug, closing the channel.", msg, cause);
965         if (closeDidThrow) {
966             // Close itself did throw, just call close() directly and so have the next handler invoked. If we would
967             // call close() on the Channel we would risk an infinite-loop.
968             close();
969         } else {
970             // Let's close the channel. Calling close on the Channel ensure we start from the end of the pipeline
971             // and so give all handlers the chance to do something during close.
972             channel().close();
973         }
974         return newFailedFuture(new IllegalStateException(msg, cause));
975     }
976 
977     private DefaultChannelHandlerContext findContextInbound(int mask) {
978         DefaultChannelHandlerContext ctx = this;
979         if (ctx.next == null) {
980             return null;
981         }
982         do {
983             ctx = ctx.next;
984         } while ((ctx.executionMask & mask) == 0 || ctx.handlerState == REMOVE_STARTED);
985         return ctx;
986     }
987 
988     private DefaultChannelHandlerContext findContextOutbound(int mask) {
989         DefaultChannelHandlerContext ctx = this;
990         if (ctx.prev == null) {
991             return null;
992         }
993         do {
994             ctx = ctx.prev;
995         } while ((ctx.executionMask & mask) == 0 || ctx.handlerState == REMOVE_STARTED);
996         return ctx;
997     }
998 
999     boolean setAddComplete() {
1000         // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
1001         // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
1002         // exposing ordering guarantees.
1003         if (handlerState == INIT) {
1004             handlerState = ADD_COMPLETE;
1005             return true;
1006         }
1007         return false;
1008     }
1009 
1010     void callHandlerAdded() throws Exception {
1011         // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
1012         // any pipeline events ctx.handler() will miss them because the state will not allow it.
1013         if (setAddComplete()) {
1014             handler().handlerAdded(this);
1015             if (handlesPendingOutboundBytes(executionMask)) {
1016                 long pending = pendingOutboundBytes();
1017                 currentPendingBytes = -1;
1018                 if (pending > 0) {
1019                     pipeline.incrementPendingOutboundBytes(pending);
1020                 }
1021             }
1022         }
1023     }
1024 
1025     void callHandlerRemoved() throws Exception {
1026         try {
1027             // Only call handlerRemoved(...) if we called handlerAdded(...) before.
1028             if (handlerState == ADD_COMPLETE) {
1029                 handlerState = REMOVE_STARTED;
1030                 try {
1031                     handler().handlerRemoved(this);
1032                 } finally {
1033                     if (handlesPendingOutboundBytes(executionMask)) {
1034                         long pending = pendingOutboundBytes();
1035                         currentPendingBytes = -1;
1036                         if (pending > 0) {
1037                             pipeline.decrementPendingOutboundBytes(pending);
1038                         }
1039                     }
1040                 }
1041             }
1042         } finally {
1043             // Mark the handler as removed in any case.
1044             handlerState = REMOVE_COMPLETE;
1045             removed = true;
1046         }
1047     }
1048 
1049     @Override
1050     public boolean isRemoved() {
1051         return removed;
1052     }
1053 
1054     void remove(boolean relink) {
1055         assert handlerState == REMOVE_COMPLETE;
1056         if (relink) {
1057             DefaultChannelHandlerContext prev = this.prev;
1058             DefaultChannelHandlerContext next = this.next;
1059             // prev and next may be null if the handler was never really added to the pipeline
1060             if (prev != null) {
1061                 prev.next = next;
1062             }
1063             if (next != null) {
1064                 next.prev = prev;
1065             }
1066         }
1067 
1068         prev = null;
1069         next = null;
1070     }
1071 
1072     static boolean safeExecute(EventExecutor executor, Runnable runnable, Promise<Void> promise, Object msg) {
1073         try {
1074             executor.execute(runnable);
1075             return true;
1076         } catch (Throwable cause) {
1077             try {
1078                 if (msg != null) {
1079                     Resource.dispose(msg);
1080                 }
1081             } finally {
1082                 if (promise != null) {
1083                     promise.setFailure(cause);
1084                 }
1085             }
1086             return false;
1087         }
1088     }
1089 
1090     @Override
1091     public String toHintString() {
1092         return '\'' + name + "' will handle the message from this point.";
1093     }
1094 
1095     @Override
1096     public String toString() {
1097         return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1098     }
1099 
1100     private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1101             SystemPropertyUtil.getBoolean("io.netty5.transport.estimateSizeOnSubmit", true);
1102 
1103     // Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
1104     private static final int WRITE_TASK_OVERHEAD =
1105             SystemPropertyUtil.getInt("io.netty5.transport.writeTaskSizeOverhead", 48);
1106 
1107     abstract static class AbstractWriteTask implements Runnable {
1108 
1109         private final ObjectPool.Handle<AbstractWriteTask> handle;
1110         private DefaultChannelHandlerContext ctx;
1111         private Object msg;
1112         private Promise<Void> promise;
1113         private int size;
1114 
1115         @SuppressWarnings("unchecked")
1116         private AbstractWriteTask(ObjectPool.Handle<? extends AbstractWriteTask> handle) {
1117             this.handle = (ObjectPool.Handle<AbstractWriteTask>) handle;
1118         }
1119 
1120         /**
1121          * Init the given {@link AbstractWriteTask} if possible.
1122          *
1123          * @param task      the task.
1124          * @param ctx       the context.
1125          * @param msg       the message
1126          * @param promise   the promise that will be notified.
1127          * @return          {@code true} if the task could be init successfully, {@code false} otherwise. If the init
1128          *                  failed it will automatically release the msg, fail the promise and recycle the task itself.
1129          */
1130         protected static boolean init(AbstractWriteTask task, DefaultChannelHandlerContext ctx,
1131                                    Object msg, Promise<Void> promise) {
1132             task.ctx = ctx;
1133             task.msg = msg;
1134             task.promise = promise;
1135 
1136             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1137                 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1138                 try {
1139                     ctx.pipeline.incrementPendingOutboundBytes(task.size);
1140                 } catch (IllegalStateException e) {
1141                     task.recycle();
1142                     Resource.dispose(msg);
1143                     promise.setFailure(e);
1144                     return false;
1145                 }
1146             } else {
1147                 task.size = 0;
1148             }
1149             return true;
1150         }
1151 
1152         protected abstract DefaultChannelHandlerContext findContext(DefaultChannelHandlerContext ctx);
1153         @Override
1154         public final void run() {
1155             try {
1156                 decrementPendingOutboundBytes();
1157                 if (promise.isCancelled()) {
1158                     Resource.dispose(msg);
1159                     return;
1160                 }
1161                 DefaultChannelHandlerContext next = findContext(ctx);
1162                 if (next == null) {
1163                     Resource.dispose(msg);
1164                     failRemoved(ctx).cascadeTo(promise);
1165                     return;
1166                 }
1167                 write(next, msg, promise);
1168             } finally {
1169                 recycle();
1170             }
1171         }
1172 
1173         void cancel() {
1174             try {
1175                 decrementPendingOutboundBytes();
1176             } finally {
1177                 recycle();
1178             }
1179         }
1180 
1181         private void decrementPendingOutboundBytes() {
1182             if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1183                 // Update the pending bytes
1184                 ctx.pipeline.decrementPendingOutboundBytes(size);
1185             }
1186         }
1187 
1188         private void recycle() {
1189             // Set to null so the GC can collect them directly
1190             ctx = null;
1191             msg = null;
1192             promise = null;
1193             handle.recycle(this);
1194         }
1195 
1196         protected void write(DefaultChannelHandlerContext ctx, Object msg, Promise<Void> promise) {
1197             ctx.invokeWrite(msg).cascadeTo(promise);
1198         }
1199     }
1200 
1201     static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
1202 
1203         private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(WriteTask::new);
1204 
1205         static WriteTask newInstance(
1206                 DefaultChannelHandlerContext ctx, Object msg, Promise<Void> promise) {
1207             WriteTask task = RECYCLER.get();
1208             if (!init(task, ctx, msg, promise)) {
1209                 return null;
1210             }
1211             return task;
1212         }
1213 
1214         @Override
1215         protected DefaultChannelHandlerContext findContext(DefaultChannelHandlerContext ctx) {
1216             return ctx.findContextOutbound(MASK_WRITE);
1217         }
1218 
1219         private WriteTask(ObjectPool.Handle<WriteTask> handle) {
1220             super(handle);
1221         }
1222     }
1223 
1224     static final class WriteAndFlushTask extends AbstractWriteTask {
1225 
1226         private static final ObjectPool<WriteAndFlushTask> RECYCLER = ObjectPool.newPool(WriteAndFlushTask::new);
1227 
1228         static WriteAndFlushTask newInstance(
1229                 DefaultChannelHandlerContext ctx, Object msg, Promise<Void> promise) {
1230             WriteAndFlushTask task = RECYCLER.get();
1231             if (!init(task, ctx, msg, promise)) {
1232                 return null;
1233             }
1234             return task;
1235         }
1236 
1237         private WriteAndFlushTask(ObjectPool.Handle<WriteAndFlushTask> handle) {
1238             super(handle);
1239         }
1240 
1241         @Override
1242         protected DefaultChannelHandlerContext findContext(DefaultChannelHandlerContext ctx) {
1243             return ctx.findContextOutbound(MASK_WRITE | MASK_FLUSH);
1244         }
1245 
1246         @Override
1247         public void write(DefaultChannelHandlerContext ctx, Object msg, Promise<Void> promise) {
1248             super.write(ctx, msg, promise);
1249             ctx.invokeFlush();
1250         }
1251     }
1252 
1253     private static final class Tasks {
1254         private final Runnable invokeChannelReadCompleteTask;
1255         private final Runnable invokeReadTask;
1256         private final Runnable invokeChannelWritableStateChangedTask;
1257         private final Runnable invokeFlushTask;
1258 
1259         Tasks(DefaultChannelHandlerContext ctx) {
1260             invokeChannelReadCompleteTask = ctx::findAndInvokeChannelReadComplete;
1261             invokeReadTask = ctx::findAndInvokeRead;
1262             invokeChannelWritableStateChangedTask = ctx::invokeChannelWritabilityChanged;
1263             invokeFlushTask = ctx::findAndInvokeFlush;
1264         }
1265     }
1266 
1267     private boolean saveCurrentPendingBytesIfNeededInbound() {
1268         IllegalStateException e = saveCurrentPendingBytesIfNeeded();
1269         if (e != null) {
1270             logger.error(e);
1271             return false;
1272         }
1273         return true;
1274     }
1275 
1276     private Future<Void> saveCurrentPendingBytesIfNeededOutbound() {
1277         IllegalStateException e = saveCurrentPendingBytesIfNeeded();
1278         if (e != null) {
1279             logger.error(e);
1280             return newFailedFuture(e);
1281         }
1282         return null;
1283     }
1284 
1285     private IllegalStateException saveCurrentPendingBytesIfNeeded() {
1286         if (!handlesPendingOutboundBytes(executionMask)) {
1287             assert currentPendingBytes == 0;
1288             return null;
1289         }
1290         // We only save the current pending bytes if not already done before.
1291         // This is important as otherwise we might run into issues in case of reentrancy.
1292         if (currentPendingBytes == -1) {
1293             try {
1294                 currentPendingBytes = pendingOutboundBytes();
1295             } catch (IllegalStateException e) {
1296                 return e;
1297             }
1298         }
1299         return null;
1300     }
1301 
1302     private long pendingOutboundBytes() {
1303         long pending = handler().pendingOutboundBytes(this);
1304         if (pending < 0) {
1305             pipeline.forceCloseTransport();
1306             String message = StringUtil.simpleClassName(handler.getClass()) +
1307                     ".pendingOutboundBytes(ChannelHandlerContext) returned a negative value: " +
1308                     pending + ". Force closed transport.";
1309             throw new IllegalStateException(message);
1310         }
1311         return pending;
1312     }
1313 
1314     private void updatePendingBytesIfNeeded() {
1315         if (!handlesPendingOutboundBytes(executionMask)) {
1316             assert currentPendingBytes == 0;
1317             return;
1318         }
1319         long current = currentPendingBytes;
1320         if (current == -1) {
1321             return;
1322         }
1323         this.currentPendingBytes = -1;
1324         try {
1325             long newPendingBytes = pendingOutboundBytes();
1326             long delta = current - newPendingBytes;
1327             if (delta == 0) {
1328                 // No changes
1329                 return;
1330             }
1331             if (delta > 0) {
1332                 pipeline.decrementPendingOutboundBytes(delta);
1333             } else {
1334                 pipeline.incrementPendingOutboundBytes(-delta);
1335             }
1336         } catch (IllegalStateException e) {
1337             logger.error(e);
1338         }
1339     }
1340 
1341     private static final class DefaultChannelHandlerContextAwareEventExecutor implements EventExecutor {
1342 
1343         private final EventExecutor executor;
1344         private final DefaultChannelHandlerContext ctx;
1345 
1346         DefaultChannelHandlerContextAwareEventExecutor(EventExecutor executor, DefaultChannelHandlerContext ctx) {
1347             this.executor = executor;
1348             this.ctx = ctx;
1349         }
1350 
1351         EventExecutor wrappedExecutor() {
1352             return executor;
1353         }
1354 
1355         @Override
1356         public boolean inEventLoop() {
1357             return executor.inEventLoop();
1358         }
1359 
1360         @Override
1361         public boolean inEventLoop(Thread thread) {
1362             return executor.inEventLoop(thread);
1363         }
1364 
1365         @Override
1366         public boolean isShuttingDown() {
1367             return executor.isShuttingDown();
1368         }
1369 
1370         @Override
1371         public boolean isShutdown() {
1372             return executor.isShutdown();
1373         }
1374 
1375         @Override
1376         public boolean isTerminated() {
1377             return executor.isTerminated();
1378         }
1379 
1380         @Override
1381         public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
1382             return executor.awaitTermination(timeout, unit);
1383         }
1384 
1385         @Override
1386         public Future<Void> shutdownGracefully() {
1387             return executor.shutdownGracefully();
1388         }
1389 
1390         @Override
1391         public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
1392             return executor.shutdownGracefully(quietPeriod, timeout, unit);
1393         }
1394 
1395         @Override
1396         public Future<Void> terminationFuture() {
1397             return executor.terminationFuture();
1398         }
1399 
1400         @Override
1401         public Future<Void> submit(Runnable task) {
1402             return executor.submit(new DefaultHandlerContextRunnable(task));
1403         }
1404 
1405         @Override
1406         public <T> Future<T> submit(Runnable task, T result) {
1407             return executor.submit(new DefaultHandlerContextRunnable(task), result);
1408         }
1409 
1410         @Override
1411         public <T> Future<T> submit(Callable<T> task) {
1412             return executor.submit(new DefaultHandlerContextCallable<>(task));
1413         }
1414 
1415         @Override
1416         public Future<Void> schedule(Runnable task, long delay, TimeUnit unit) {
1417             return executor.schedule(new DefaultHandlerContextRunnable(task), delay, unit);
1418         }
1419 
1420         @Override
1421         public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
1422             return executor.schedule(new DefaultHandlerContextCallable<>(task), delay, unit);
1423         }
1424 
1425         @Override
1426         public Future<Void> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
1427             return executor.scheduleAtFixedRate(
1428                     new DefaultHandlerContextRunnable(task), initialDelay, period, unit);
1429         }
1430 
1431         @Override
1432         public Future<Void> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
1433             return executor.scheduleWithFixedDelay(
1434                     new DefaultHandlerContextRunnable(task), initialDelay, delay, unit);
1435         }
1436 
1437         @Override
1438         public void execute(Runnable task) {
1439             executor.execute(new DefaultHandlerContextRunnable(task));
1440         }
1441 
1442         private final class DefaultHandlerContextCallable<V> implements Callable<V> {
1443 
1444             private final Callable<V> task;
1445 
1446             DefaultHandlerContextCallable(Callable<V> task) {
1447                 this.task = task;
1448             }
1449 
1450             @Override
1451             public V call() throws Exception {
1452                 IllegalStateException e = ctx.saveCurrentPendingBytesIfNeeded();
1453                 try {
1454                     V value = null;
1455                     try {
1456                         value = task.call();
1457                         return value;
1458                     } finally {
1459                         if (e != null) {
1460                             Resource.dispose(value);
1461                             logger.error(e);
1462                             throw e;
1463                         }
1464                     }
1465                 } finally {
1466                     if (e == null) {
1467                         ctx.updatePendingBytesIfNeeded();
1468                     }
1469                 }
1470             }
1471         }
1472 
1473         private final class DefaultHandlerContextRunnable implements Runnable {
1474             private final Runnable task;
1475             DefaultHandlerContextRunnable(Runnable task) {
1476                 this.task = task;
1477             }
1478 
1479             @Override
1480             public void run() {
1481                 IllegalStateException e = ctx.saveCurrentPendingBytesIfNeeded();
1482                 try {
1483                     task.run();
1484                     if (e != null) {
1485                         logger.error(e);
1486                         throw e;
1487                     }
1488                 } finally {
1489                     if (e == null) {
1490                         ctx.updatePendingBytesIfNeeded();
1491                     }
1492                 }
1493             }
1494         }
1495     }
1496 }