1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel;
17
18 import io.netty5.util.Resource;
19 import io.netty5.util.ResourceLeakHint;
20 import io.netty5.util.concurrent.EventExecutor;
21 import io.netty5.util.concurrent.Future;
22 import io.netty5.util.concurrent.Promise;
23 import io.netty5.util.internal.ObjectPool;
24 import io.netty5.util.internal.StringUtil;
25 import io.netty5.util.internal.SystemPropertyUtil;
26 import io.netty5.util.internal.ThrowableUtil;
27 import io.netty5.util.internal.logging.InternalLogger;
28 import io.netty5.util.internal.logging.InternalLoggerFactory;
29
30 import java.net.SocketAddress;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.TimeUnit;
33
34 import static io.netty5.channel.ChannelHandlerMask.MASK_BIND;
35 import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
36 import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
37 import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
38 import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
39 import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
40 import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_SHUTDOWN;
41 import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
42 import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
43 import static io.netty5.channel.ChannelHandlerMask.MASK_CLOSE;
44 import static io.netty5.channel.ChannelHandlerMask.MASK_CONNECT;
45 import static io.netty5.channel.ChannelHandlerMask.MASK_DEREGISTER;
46 import static io.netty5.channel.ChannelHandlerMask.MASK_DISCONNECT;
47 import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_EXCEPTION_CAUGHT;
48 import static io.netty5.channel.ChannelHandlerMask.MASK_FLUSH;
49 import static io.netty5.channel.ChannelHandlerMask.MASK_READ;
50 import static io.netty5.channel.ChannelHandlerMask.MASK_REGISTER;
51 import static io.netty5.channel.ChannelHandlerMask.MASK_SHUTDOWN;
52 import static io.netty5.channel.ChannelHandlerMask.MASK_CHANNEL_INBOUND_EVENT;
53 import static io.netty5.channel.ChannelHandlerMask.MASK_SEND_OUTBOUND_EVENT;
54 import static io.netty5.channel.ChannelHandlerMask.MASK_WRITE;
55 import static io.netty5.channel.ChannelHandlerMask.MASK_PENDING_OUTBOUND_BYTES;
56 import static io.netty5.channel.ChannelHandlerMask.mask;
57 import static java.util.Objects.requireNonNull;
58
59 final class DefaultChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
60
61 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelHandlerContext.class);
62
63
64
65
66
67 private static final int INIT = 0;
68
69
70
71
72 private static final int ADD_COMPLETE = 1;
73
74
75
76
77 private static final int REMOVE_STARTED = 2;
78
79
80
81
82 private static final int REMOVE_COMPLETE = 3;
83
84 private final int executionMask;
85 private final DefaultChannelPipeline pipeline;
86 private final ChannelHandler handler;
87 private final String name;
88
89
90 private final DefaultChannelHandlerContextAwareEventExecutor executor;
91 private long currentPendingBytes;
92
93
94
95 private Tasks invokeTasks;
96 private int handlerState = INIT;
97
98 private volatile boolean removed;
99
100 DefaultChannelHandlerContext next;
101 DefaultChannelHandlerContext prev;
102
103 DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, String name,
104 ChannelHandler handler) {
105 this.name = requireNonNull(name, "name");
106 this.pipeline = pipeline;
107 executionMask = mask(handler.getClass());
108 this.handler = handler;
109
110
111
112 this.executor = handlesPendingOutboundBytes(executionMask) ?
113 new DefaultChannelHandlerContextAwareEventExecutor(pipeline.executor(), this) : null;
114 }
115
116 private static boolean handlesPendingOutboundBytes(int mask) {
117 return (mask & MASK_PENDING_OUTBOUND_BYTES) != 0;
118 }
119
120 private static Future<Void> failRemoved(DefaultChannelHandlerContext ctx) {
121 return ctx.newFailedFuture(newRemovedException(ctx, null));
122 }
123
124 private void notifyHandlerRemovedAlready() {
125 notifyHandlerRemovedAlready(null);
126 }
127
128 private void notifyHandlerRemovedAlready(Throwable cause) {
129 pipeline().fireChannelExceptionCaught(newRemovedException(this, cause));
130 }
131
132 private static ChannelPipelineException newRemovedException(ChannelHandlerContext ctx, Throwable cause) {
133 return new ChannelPipelineException("Context " + ctx + " already removed", cause);
134 }
135
136 private Tasks invokeTasks() {
137 Tasks tasks = invokeTasks;
138 if (tasks == null) {
139 invokeTasks = tasks = new Tasks(this);
140 }
141 return tasks;
142 }
143
144 @Override
145 public EventExecutor executor() {
146 return executor == null ? pipeline().executor() : executor;
147 }
148
149 @Override
150 public ChannelHandler handler() {
151 return handler;
152 }
153
154 @Override
155 public ChannelPipeline pipeline() {
156 return pipeline;
157 }
158
159 @Override
160 public String name() {
161 return name;
162 }
163
164 private EventExecutor originalExecutor() {
165 return executor == null ? pipeline().executor() : executor.wrappedExecutor();
166 }
167
168 @Override
169 public ChannelHandlerContext fireChannelRegistered() {
170 EventExecutor executor = originalExecutor();
171 if (executor.inEventLoop()) {
172 findAndInvokeChannelRegistered();
173 } else {
174 executor.execute(this::findAndInvokeChannelRegistered);
175 }
176 return this;
177 }
178
179 private void findAndInvokeChannelRegistered() {
180 DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_REGISTERED);
181 if (ctx == null) {
182 notifyHandlerRemovedAlready();
183 return;
184 }
185 ctx.invokeChannelRegistered();
186 }
187
188 void invokeChannelRegistered() {
189 try {
190 if (!saveCurrentPendingBytesIfNeededInbound()) {
191 return;
192 }
193 handler().channelRegistered(this);
194 } catch (Throwable t) {
195 invokeChannelExceptionCaught(t);
196 } finally {
197 updatePendingBytesIfNeeded();
198 }
199 }
200
201 @Override
202 public ChannelHandlerContext fireChannelUnregistered() {
203 EventExecutor executor = originalExecutor();
204 if (executor.inEventLoop()) {
205 findAndInvokeChannelUnregistered();
206 } else {
207 executor.execute(this::findAndInvokeChannelUnregistered);
208 }
209 return this;
210 }
211
212 private void findAndInvokeChannelUnregistered() {
213 DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_UNREGISTERED);
214 if (ctx == null) {
215 notifyHandlerRemovedAlready();
216 return;
217 }
218 ctx.invokeChannelUnregistered();
219 }
220
221 void invokeChannelUnregistered() {
222 if (!saveCurrentPendingBytesIfNeededInbound()) {
223 return;
224 }
225 try {
226 handler().channelUnregistered(this);
227 } catch (Throwable t) {
228 invokeChannelExceptionCaught(t);
229 } finally {
230 updatePendingBytesIfNeeded();
231 }
232 }
233
234 @Override
235 public ChannelHandlerContext fireChannelActive() {
236 EventExecutor executor = originalExecutor();
237 if (executor.inEventLoop()) {
238 findAndInvokeChannelActive();
239 } else {
240 executor.execute(this::findAndInvokeChannelActive);
241 }
242 return this;
243 }
244
245 private void findAndInvokeChannelActive() {
246 DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_ACTIVE);
247 if (ctx == null) {
248 notifyHandlerRemovedAlready();
249 return;
250 }
251 ctx.invokeChannelActive();
252 }
253
254 void invokeChannelActive() {
255 if (!saveCurrentPendingBytesIfNeededInbound()) {
256 return;
257 }
258 try {
259 handler().channelActive(this);
260 } catch (Throwable t) {
261 invokeChannelExceptionCaught(t);
262 } finally {
263 updatePendingBytesIfNeeded();
264 }
265 }
266
267 @Override
268 public ChannelHandlerContext fireChannelInactive() {
269 EventExecutor executor = originalExecutor();
270 if (executor.inEventLoop()) {
271 findAndInvokeChannelInactive();
272 } else {
273 executor.execute(this::findAndInvokeChannelInactive);
274 }
275 return this;
276 }
277
278 private void findAndInvokeChannelInactive() {
279 DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_INACTIVE);
280 if (ctx == null) {
281 notifyHandlerRemovedAlready();
282 return;
283 }
284 ctx.invokeChannelInactive();
285 }
286
287 void invokeChannelInactive() {
288 if (!saveCurrentPendingBytesIfNeededInbound()) {
289 return;
290 }
291 try {
292 handler().channelInactive(this);
293 } catch (Throwable t) {
294 invokeChannelExceptionCaught(t);
295 } finally {
296 updatePendingBytesIfNeeded();
297 }
298 }
299
300 @Override
301 public ChannelHandlerContext fireChannelShutdown(ChannelShutdownDirection direction) {
302 EventExecutor executor = originalExecutor();
303 if (executor.inEventLoop()) {
304 findAndInvokeChannelShutdown(direction);
305 } else {
306 executor.execute(() -> findAndInvokeChannelShutdown(direction));
307 }
308 return this;
309 }
310
311 private void findAndInvokeChannelShutdown(ChannelShutdownDirection direction) {
312 DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_SHUTDOWN);
313 if (ctx == null) {
314 notifyHandlerRemovedAlready();
315 return;
316 }
317 ctx.invokeChannelShutdown(direction);
318 }
319
320 void invokeChannelShutdown(ChannelShutdownDirection direction) {
321 if (!saveCurrentPendingBytesIfNeededInbound()) {
322 return;
323 }
324 try {
325 handler().channelShutdown(this, direction);
326 } catch (Throwable t) {
327 invokeChannelExceptionCaught(t);
328 } finally {
329 updatePendingBytesIfNeeded();
330 }
331 }
332
333 @Override
334 public ChannelHandlerContext fireChannelExceptionCaught(Throwable cause) {
335 requireNonNull(cause, "cause");
336 EventExecutor executor = originalExecutor();
337 if (executor.inEventLoop()) {
338 findAndInvokeChannelExceptionCaught(cause);
339 } else {
340 try {
341 executor.execute(() -> findAndInvokeChannelExceptionCaught(cause));
342 } catch (Throwable t) {
343 if (logger.isWarnEnabled()) {
344 logger.warn("Failed to submit an exceptionCaught() event.", t);
345 logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
346 }
347 }
348 }
349 return this;
350 }
351
352 private void findAndInvokeChannelExceptionCaught(Throwable cause) {
353 DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_EXCEPTION_CAUGHT);
354 if (ctx == null) {
355 notifyHandlerRemovedAlready(cause);
356 return;
357 }
358 ctx.invokeChannelExceptionCaught(cause);
359 }
360
361 void invokeChannelExceptionCaught(final Throwable cause) {
362 if (!saveCurrentPendingBytesIfNeededInbound()) {
363 return;
364 }
365 try {
366 handler().channelExceptionCaught(this, cause);
367 } catch (Throwable error) {
368 if (logger.isDebugEnabled()) {
369 logger.debug(
370 "An exception {}" +
371 "was thrown by a user handler's exceptionCaught() " +
372 "method while handling the following exception:",
373 ThrowableUtil.stackTraceToString(error), cause);
374 } else if (logger.isWarnEnabled()) {
375 logger.warn(
376 "An exception '{}' [enable DEBUG level for full stacktrace] " +
377 "was thrown by a user handler's exceptionCaught() " +
378 "method while handling the following exception:", error, cause);
379 }
380 } finally {
381 updatePendingBytesIfNeeded();
382 }
383 }
384
385 @Override
386 public ChannelHandlerContext fireChannelInboundEvent(Object event) {
387 requireNonNull(event, "event");
388 EventExecutor executor = originalExecutor();
389 if (executor.inEventLoop()) {
390 findAndInvokeChannelInboundEvent(event);
391 } else {
392 executor.execute(() -> findAndInvokeChannelInboundEvent(event));
393 }
394 return this;
395 }
396
397 private void findAndInvokeChannelInboundEvent(Object event) {
398 DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_INBOUND_EVENT);
399 if (ctx == null) {
400 Resource.dispose(event);
401 notifyHandlerRemovedAlready();
402 return;
403 }
404 ctx.invokeChannelInboundEvent(event);
405 }
406
407 void invokeChannelInboundEvent(Object event) {
408 if (!saveCurrentPendingBytesIfNeededInbound()) {
409 Resource.dispose(event);
410 return;
411 }
412 try {
413 handler().channelInboundEvent(this, event);
414 } catch (Throwable t) {
415 invokeChannelExceptionCaught(t);
416 } finally {
417 updatePendingBytesIfNeeded();
418 }
419 }
420
421 @Override
422 public ChannelHandlerContext fireChannelRead(final Object msg) {
423 requireNonNull(msg, "msg");
424 EventExecutor executor = originalExecutor();
425 if (executor.inEventLoop()) {
426 findAndInvokeChannelRead(msg);
427 } else {
428 try {
429 executor.execute(() -> findAndInvokeChannelRead(msg));
430 } catch (Throwable cause) {
431 Resource.dispose(msg);
432 throw cause;
433 }
434 }
435 return this;
436 }
437
438 private void findAndInvokeChannelRead(Object msg) {
439 DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_READ);
440 if (ctx == null) {
441 Resource.dispose(msg);
442 notifyHandlerRemovedAlready();
443 return;
444 }
445 ctx.invokeChannelRead(msg);
446 }
447
448 void invokeChannelRead(Object msg) {
449 final Object m = pipeline.touch(requireNonNull(msg, "msg"), this);
450 if (!saveCurrentPendingBytesIfNeededInbound()) {
451 Resource.dispose(m);
452 return;
453 }
454 try {
455 handler().channelRead(this, m);
456 } catch (Throwable t) {
457 invokeChannelExceptionCaught(t);
458 } finally {
459 updatePendingBytesIfNeeded();
460 }
461 }
462
463 @Override
464 public ChannelHandlerContext fireChannelReadComplete() {
465 EventExecutor executor = originalExecutor();
466 if (executor.inEventLoop()) {
467 findAndInvokeChannelReadComplete();
468 } else {
469 Tasks tasks = invokeTasks();
470 executor.execute(tasks.invokeChannelReadCompleteTask);
471 }
472 return this;
473 }
474
475 private void findAndInvokeChannelReadComplete() {
476 DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
477 if (ctx == null) {
478 notifyHandlerRemovedAlready();
479 return;
480 }
481 ctx.invokeChannelReadComplete();
482 }
483
484 void invokeChannelReadComplete() {
485 if (!saveCurrentPendingBytesIfNeededInbound()) {
486 return;
487 }
488 try {
489 handler().channelReadComplete(this);
490 } catch (Throwable t) {
491 invokeChannelExceptionCaught(t);
492 } finally {
493 updatePendingBytesIfNeeded();
494 }
495 }
496
497 @Override
498 public ChannelHandlerContext fireChannelWritabilityChanged() {
499 EventExecutor executor = originalExecutor();
500 if (executor.inEventLoop()) {
501 findAndInvokeChannelWritabilityChanged();
502 } else {
503 Tasks tasks = invokeTasks();
504 executor.execute(tasks.invokeChannelWritableStateChangedTask);
505 }
506 return this;
507 }
508
509 private void findAndInvokeChannelWritabilityChanged() {
510 DefaultChannelHandlerContext ctx = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
511 if (ctx == null) {
512 notifyHandlerRemovedAlready();
513 return;
514 }
515 ctx.invokeChannelWritabilityChanged();
516 }
517
518 void invokeChannelWritabilityChanged() {
519 if (!saveCurrentPendingBytesIfNeededInbound()) {
520 return;
521 }
522 try {
523 handler().channelWritabilityChanged(this);
524 } catch (Throwable t) {
525 invokeChannelExceptionCaught(t);
526 } finally {
527 updatePendingBytesIfNeeded();
528 }
529 }
530
531 @Override
532 public Future<Void> bind(SocketAddress localAddress) {
533 requireNonNull(localAddress, "localAddress");
534
535 EventExecutor executor = originalExecutor();
536 if (executor.inEventLoop()) {
537 return findAndInvokeBind(localAddress);
538 }
539
540 Promise<Void> promise = newPromise();
541 safeExecute(executor, () -> findAndInvokeBind(localAddress).cascadeTo(promise), promise, null);
542 return promise.asFuture();
543 }
544
545 @Override
546 public Future<Void> connect(SocketAddress remoteAddress) {
547 return connect(remoteAddress, null);
548 }
549
550 @Override
551 public Future<Void> deregister() {
552 EventExecutor executor = originalExecutor();
553 if (executor.inEventLoop()) {
554 return findAndInvokeDeregister();
555 }
556 Promise<Void> promise = newPromise();
557 safeExecute(executor, () -> findAndInvokeDeregister().cascadeTo(promise), promise, null);
558 return promise.asFuture();
559 }
560 private Future<Void> findAndInvokeBind(SocketAddress localAddress) {
561 DefaultChannelHandlerContext ctx = findContextOutbound(MASK_BIND);
562 if (ctx == null) {
563 return failRemoved(this);
564 }
565 return ctx.invokeBind(localAddress);
566 }
567
568 private Future<Void> invokeBind(SocketAddress localAddress) {
569 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
570 if (failed != null) {
571 return failed;
572 }
573
574 try {
575 return handler().bind(this, localAddress);
576 } catch (Throwable t) {
577 return handleOutboundHandlerException(t, false);
578 } finally {
579 updatePendingBytesIfNeeded();
580 }
581 }
582
583 @Override
584 public Future<Void> connect(
585 final SocketAddress remoteAddress, final SocketAddress localAddress) {
586 requireNonNull(remoteAddress, "remoteAddress");
587 EventExecutor executor = originalExecutor();
588 if (executor.inEventLoop()) {
589 return findAndInvokeConnect(remoteAddress, localAddress);
590 }
591 Promise<Void> promise = newPromise();
592 safeExecute(executor, () ->
593 findAndInvokeConnect(remoteAddress, localAddress).cascadeTo(promise), promise, null);
594
595 return promise.asFuture();
596 }
597
598 private Future<Void> findAndInvokeConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
599 DefaultChannelHandlerContext ctx = findContextOutbound(MASK_CONNECT);
600 if (ctx == null) {
601 return failRemoved(this);
602 }
603 return ctx.invokeConnect(remoteAddress, localAddress);
604 }
605
606 private Future<Void> invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
607 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
608 if (failed != null) {
609 return failed;
610 }
611
612 try {
613 return handler().connect(this, remoteAddress, localAddress);
614 } catch (Throwable t) {
615 return handleOutboundHandlerException(t, false);
616 } finally {
617 updatePendingBytesIfNeeded();
618 }
619 }
620
621 @Override
622 public Future<Void> disconnect() {
623 if (!channel().metadata().hasDisconnect()) {
624
625
626 return close();
627 }
628
629 EventExecutor executor = originalExecutor();
630 if (executor.inEventLoop()) {
631 return findAndInvokeDisconnect();
632 }
633 Promise<Void> promise = newPromise();
634 safeExecute(executor, () -> findAndInvokeDisconnect().cascadeTo(promise), promise, null);
635 return promise.asFuture();
636 }
637
638 private Future<Void> findAndInvokeDisconnect() {
639 DefaultChannelHandlerContext ctx = findContextOutbound(MASK_DISCONNECT);
640 if (ctx == null) {
641 return failRemoved(this);
642 }
643 return ctx.invokeDisconnect();
644 }
645
646 private Future<Void> invokeDisconnect() {
647 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
648 if (failed != null) {
649 return failed;
650 }
651
652 try {
653 return handler().disconnect(this);
654 } catch (Throwable t) {
655 return handleOutboundHandlerException(t, false);
656 } finally {
657 updatePendingBytesIfNeeded();
658 }
659 }
660
661 @Override
662 public Future<Void> close() {
663 EventExecutor executor = originalExecutor();
664 if (executor.inEventLoop()) {
665 return findAndInvokeClose();
666 }
667 Promise<Void> promise = newPromise();
668 safeExecute(executor, () -> findAndInvokeClose().cascadeTo(promise), promise, null);
669 return promise.asFuture();
670 }
671
672 private Future<Void> findAndInvokeClose() {
673 DefaultChannelHandlerContext ctx = findContextOutbound(MASK_CLOSE);
674 if (ctx == null) {
675 return failRemoved(this);
676 }
677 return ctx.invokeClose();
678 }
679
680 private Future<Void> invokeClose() {
681 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
682 if (failed != null) {
683 return failed;
684 }
685
686 try {
687 return handler().close(this);
688 } catch (Throwable t) {
689 return handleOutboundHandlerException(t, true);
690 } finally {
691 updatePendingBytesIfNeeded();
692 }
693 }
694
695 @Override
696 public Future<Void> shutdown(ChannelShutdownDirection direction) {
697 EventExecutor executor = originalExecutor();
698 if (executor.inEventLoop()) {
699 return findAndInvokeShutdown(direction);
700 }
701 Promise<Void> promise = newPromise();
702 safeExecute(executor, () -> findAndInvokeShutdown(direction).cascadeTo(promise), promise, null);
703 return promise.asFuture();
704 }
705
706 private Future<Void> findAndInvokeShutdown(ChannelShutdownDirection direction) {
707 DefaultChannelHandlerContext ctx = findContextOutbound(MASK_SHUTDOWN);
708 if (ctx == null) {
709 return failRemoved(this);
710 }
711 return ctx.invokeShutdown(direction);
712 }
713
714 private Future<Void> invokeShutdown(ChannelShutdownDirection direction) {
715 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
716 if (failed != null) {
717 return failed;
718 }
719
720 try {
721 return handler().shutdown(this, direction);
722 } catch (Throwable t) {
723 return handleOutboundHandlerException(t, true);
724 } finally {
725 updatePendingBytesIfNeeded();
726 }
727 }
728
729 @Override
730 public Future<Void> register() {
731 EventExecutor executor = originalExecutor();
732 if (executor.inEventLoop()) {
733 return findAndInvokeRegister();
734 }
735 Promise<Void> promise = newPromise();
736 safeExecute(executor, () -> findAndInvokeRegister().cascadeTo(promise), promise, null);
737 return promise.asFuture();
738 }
739
740 private Future<Void> findAndInvokeRegister() {
741 DefaultChannelHandlerContext ctx = findContextOutbound(MASK_REGISTER);
742 if (ctx == null) {
743 return failRemoved(this);
744 }
745 return ctx.invokeRegister();
746 }
747
748 private Future<Void> invokeRegister() {
749 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
750 if (failed != null) {
751 return failed;
752 }
753
754 try {
755 return handler().register(this);
756 } catch (Throwable t) {
757 return handleOutboundHandlerException(t, false);
758 } finally {
759 updatePendingBytesIfNeeded();
760 }
761 }
762
763 private Future<Void> findAndInvokeDeregister() {
764 DefaultChannelHandlerContext ctx = findContextOutbound(MASK_DEREGISTER);
765 if (ctx == null) {
766 return failRemoved(this);
767 }
768 return ctx.invokeDeregister();
769 }
770
771 private Future<Void> invokeDeregister() {
772 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
773 if (failed != null) {
774 return failed;
775 }
776
777 try {
778 return handler().deregister(this);
779 } catch (Throwable t) {
780 return handleOutboundHandlerException(t, false);
781 } finally {
782 updatePendingBytesIfNeeded();
783 }
784 }
785
786 @Override
787 public ChannelHandlerContext read() {
788 EventExecutor executor = originalExecutor();
789 if (executor.inEventLoop()) {
790 findAndInvokeRead();
791 } else {
792 Tasks tasks = invokeTasks();
793 executor.execute(tasks.invokeReadTask);
794 }
795 return this;
796 }
797
798 private void findAndInvokeRead() {
799 DefaultChannelHandlerContext ctx = findContextOutbound(MASK_READ);
800 if (ctx != null) {
801 ctx.invokeRead();
802 }
803 }
804
805 private void invokeRead() {
806 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
807 if (failed != null) {
808 return;
809 }
810
811 try {
812 handler().read(this);
813 } catch (Throwable t) {
814 handleOutboundHandlerException(t, false);
815 } finally {
816 updatePendingBytesIfNeeded();
817 }
818 }
819
820 @Override
821 public Future<Void> write(Object msg) {
822 return write(msg, false);
823 }
824
825 private Future<Void> invokeWrite(Object msg) {
826 final Object m = pipeline.touch(msg, this);
827 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
828 if (failed != null) {
829 Resource.dispose(m);
830 return failed;
831 }
832
833 try {
834 return handler().write(this, m);
835 } catch (Throwable t) {
836 return handleOutboundHandlerException(t, false);
837 } finally {
838 updatePendingBytesIfNeeded();
839 }
840 }
841
842 @Override
843 public ChannelHandlerContext flush() {
844 EventExecutor executor = originalExecutor();
845 if (executor.inEventLoop()) {
846 findAndInvokeFlush();
847 } else {
848 Tasks tasks = invokeTasks();
849 Promise<Void> promise = newPromise();
850 promise.asFuture().addListener(channel(), ChannelFutureListeners.FIRE_EXCEPTION_ON_FAILURE);
851
852
853 safeExecute(executor, tasks.invokeFlushTask, promise, null);
854 }
855
856 return this;
857 }
858
859 private void findAndInvokeFlush() {
860 DefaultChannelHandlerContext ctx = findContextOutbound(MASK_FLUSH);
861 if (ctx != null) {
862 ctx.invokeFlush();
863 }
864 }
865
866 private void invokeFlush() {
867 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
868 if (failed != null) {
869 return;
870 }
871
872 try {
873 handler().flush(this);
874 } catch (Throwable t) {
875 handleOutboundHandlerException(t, false);
876 } finally {
877 updatePendingBytesIfNeeded();
878 }
879 }
880
881 @Override
882 public Future<Void> writeAndFlush(Object msg) {
883 return write(msg, true);
884 }
885
886 private Future<Void> invokeWriteAndFlush(Object msg) {
887 Future<Void> f = invokeWrite(msg);
888 invokeFlush();
889 return f;
890 }
891
892 private Future<Void> write(Object msg, boolean flush) {
893 requireNonNull(msg, "msg");
894
895 EventExecutor executor = originalExecutor();
896 if (executor.inEventLoop()) {
897 final DefaultChannelHandlerContext next = findContextOutbound(flush ?
898 (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
899 if (next == null) {
900 Resource.dispose(msg);
901 return failRemoved(this);
902 }
903 if (flush) {
904 return next.invokeWriteAndFlush(msg);
905 }
906 return next.invokeWrite(msg);
907 } else {
908 Promise<Void> promise = newPromise();
909 final AbstractWriteTask task;
910 if (flush) {
911 task = WriteAndFlushTask.newInstance(this, msg, promise);
912 } else {
913 task = WriteTask.newInstance(this, msg, promise);
914 }
915 if (task != null && !safeExecute(executor, task, promise, msg)) {
916
917
918
919
920 task.cancel();
921 }
922 return promise.asFuture();
923 }
924 }
925
926 @Override
927 public Future<Void> sendOutboundEvent(Object event) {
928 EventExecutor executor = originalExecutor();
929 if (executor.inEventLoop()) {
930 return findAndInvokeSendOutboundEvent(event);
931 }
932 Promise<Void> promise = newPromise();
933 safeExecute(executor, () -> findAndInvokeSendOutboundEvent(event).cascadeTo(promise), promise, event);
934 return promise.asFuture();
935 }
936
937 private Future<Void> findAndInvokeSendOutboundEvent(Object event) {
938 DefaultChannelHandlerContext ctx = findContextOutbound(MASK_SEND_OUTBOUND_EVENT);
939 if (ctx == null) {
940 return failRemoved(this);
941 }
942 return ctx.invokeSendOutboundEvent(event);
943 }
944
945 private Future<Void> invokeSendOutboundEvent(Object event) {
946 Future<Void> failed = saveCurrentPendingBytesIfNeededOutbound();
947 if (failed != null) {
948 Resource.dispose(event);
949 return failed;
950 }
951
952 try {
953 return handler().sendOutboundEvent(this, event);
954 } catch (Throwable t) {
955 return handleOutboundHandlerException(t, false);
956 } finally {
957 updatePendingBytesIfNeeded();
958 }
959 }
960
961 private Future<Void> handleOutboundHandlerException(Throwable cause, boolean closeDidThrow) {
962 String msg = handler() + " threw an exception while handling an outbound event. This is most likely a bug";
963
964 logger.warn("{}. This is most likely a bug, closing the channel.", msg, cause);
965 if (closeDidThrow) {
966
967
968 close();
969 } else {
970
971
972 channel().close();
973 }
974 return newFailedFuture(new IllegalStateException(msg, cause));
975 }
976
977 private DefaultChannelHandlerContext findContextInbound(int mask) {
978 DefaultChannelHandlerContext ctx = this;
979 if (ctx.next == null) {
980 return null;
981 }
982 do {
983 ctx = ctx.next;
984 } while ((ctx.executionMask & mask) == 0 || ctx.handlerState == REMOVE_STARTED);
985 return ctx;
986 }
987
988 private DefaultChannelHandlerContext findContextOutbound(int mask) {
989 DefaultChannelHandlerContext ctx = this;
990 if (ctx.prev == null) {
991 return null;
992 }
993 do {
994 ctx = ctx.prev;
995 } while ((ctx.executionMask & mask) == 0 || ctx.handlerState == REMOVE_STARTED);
996 return ctx;
997 }
998
999 boolean setAddComplete() {
1000
1001
1002
1003 if (handlerState == INIT) {
1004 handlerState = ADD_COMPLETE;
1005 return true;
1006 }
1007 return false;
1008 }
1009
1010 void callHandlerAdded() throws Exception {
1011
1012
1013 if (setAddComplete()) {
1014 handler().handlerAdded(this);
1015 if (handlesPendingOutboundBytes(executionMask)) {
1016 long pending = pendingOutboundBytes();
1017 currentPendingBytes = -1;
1018 if (pending > 0) {
1019 pipeline.incrementPendingOutboundBytes(pending);
1020 }
1021 }
1022 }
1023 }
1024
1025 void callHandlerRemoved() throws Exception {
1026 try {
1027
1028 if (handlerState == ADD_COMPLETE) {
1029 handlerState = REMOVE_STARTED;
1030 try {
1031 handler().handlerRemoved(this);
1032 } finally {
1033 if (handlesPendingOutboundBytes(executionMask)) {
1034 long pending = pendingOutboundBytes();
1035 currentPendingBytes = -1;
1036 if (pending > 0) {
1037 pipeline.decrementPendingOutboundBytes(pending);
1038 }
1039 }
1040 }
1041 }
1042 } finally {
1043
1044 handlerState = REMOVE_COMPLETE;
1045 removed = true;
1046 }
1047 }
1048
1049 @Override
1050 public boolean isRemoved() {
1051 return removed;
1052 }
1053
1054 void remove(boolean relink) {
1055 assert handlerState == REMOVE_COMPLETE;
1056 if (relink) {
1057 DefaultChannelHandlerContext prev = this.prev;
1058 DefaultChannelHandlerContext next = this.next;
1059
1060 if (prev != null) {
1061 prev.next = next;
1062 }
1063 if (next != null) {
1064 next.prev = prev;
1065 }
1066 }
1067
1068 prev = null;
1069 next = null;
1070 }
1071
1072 static boolean safeExecute(EventExecutor executor, Runnable runnable, Promise<Void> promise, Object msg) {
1073 try {
1074 executor.execute(runnable);
1075 return true;
1076 } catch (Throwable cause) {
1077 try {
1078 if (msg != null) {
1079 Resource.dispose(msg);
1080 }
1081 } finally {
1082 if (promise != null) {
1083 promise.setFailure(cause);
1084 }
1085 }
1086 return false;
1087 }
1088 }
1089
1090 @Override
1091 public String toHintString() {
1092 return '\'' + name + "' will handle the message from this point.";
1093 }
1094
1095 @Override
1096 public String toString() {
1097 return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1098 }
1099
1100 private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1101 SystemPropertyUtil.getBoolean("io.netty5.transport.estimateSizeOnSubmit", true);
1102
1103
1104 private static final int WRITE_TASK_OVERHEAD =
1105 SystemPropertyUtil.getInt("io.netty5.transport.writeTaskSizeOverhead", 48);
1106
1107 abstract static class AbstractWriteTask implements Runnable {
1108
1109 private final ObjectPool.Handle<AbstractWriteTask> handle;
1110 private DefaultChannelHandlerContext ctx;
1111 private Object msg;
1112 private Promise<Void> promise;
1113 private int size;
1114
1115 @SuppressWarnings("unchecked")
1116 private AbstractWriteTask(ObjectPool.Handle<? extends AbstractWriteTask> handle) {
1117 this.handle = (ObjectPool.Handle<AbstractWriteTask>) handle;
1118 }
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130 protected static boolean init(AbstractWriteTask task, DefaultChannelHandlerContext ctx,
1131 Object msg, Promise<Void> promise) {
1132 task.ctx = ctx;
1133 task.msg = msg;
1134 task.promise = promise;
1135
1136 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1137 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1138 try {
1139 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1140 } catch (IllegalStateException e) {
1141 task.recycle();
1142 Resource.dispose(msg);
1143 promise.setFailure(e);
1144 return false;
1145 }
1146 } else {
1147 task.size = 0;
1148 }
1149 return true;
1150 }
1151
1152 protected abstract DefaultChannelHandlerContext findContext(DefaultChannelHandlerContext ctx);
1153 @Override
1154 public final void run() {
1155 try {
1156 decrementPendingOutboundBytes();
1157 if (promise.isCancelled()) {
1158 Resource.dispose(msg);
1159 return;
1160 }
1161 DefaultChannelHandlerContext next = findContext(ctx);
1162 if (next == null) {
1163 Resource.dispose(msg);
1164 failRemoved(ctx).cascadeTo(promise);
1165 return;
1166 }
1167 write(next, msg, promise);
1168 } finally {
1169 recycle();
1170 }
1171 }
1172
1173 void cancel() {
1174 try {
1175 decrementPendingOutboundBytes();
1176 } finally {
1177 recycle();
1178 }
1179 }
1180
1181 private void decrementPendingOutboundBytes() {
1182 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1183
1184 ctx.pipeline.decrementPendingOutboundBytes(size);
1185 }
1186 }
1187
1188 private void recycle() {
1189
1190 ctx = null;
1191 msg = null;
1192 promise = null;
1193 handle.recycle(this);
1194 }
1195
1196 protected void write(DefaultChannelHandlerContext ctx, Object msg, Promise<Void> promise) {
1197 ctx.invokeWrite(msg).cascadeTo(promise);
1198 }
1199 }
1200
1201 static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
1202
1203 private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(WriteTask::new);
1204
1205 static WriteTask newInstance(
1206 DefaultChannelHandlerContext ctx, Object msg, Promise<Void> promise) {
1207 WriteTask task = RECYCLER.get();
1208 if (!init(task, ctx, msg, promise)) {
1209 return null;
1210 }
1211 return task;
1212 }
1213
1214 @Override
1215 protected DefaultChannelHandlerContext findContext(DefaultChannelHandlerContext ctx) {
1216 return ctx.findContextOutbound(MASK_WRITE);
1217 }
1218
1219 private WriteTask(ObjectPool.Handle<WriteTask> handle) {
1220 super(handle);
1221 }
1222 }
1223
1224 static final class WriteAndFlushTask extends AbstractWriteTask {
1225
1226 private static final ObjectPool<WriteAndFlushTask> RECYCLER = ObjectPool.newPool(WriteAndFlushTask::new);
1227
1228 static WriteAndFlushTask newInstance(
1229 DefaultChannelHandlerContext ctx, Object msg, Promise<Void> promise) {
1230 WriteAndFlushTask task = RECYCLER.get();
1231 if (!init(task, ctx, msg, promise)) {
1232 return null;
1233 }
1234 return task;
1235 }
1236
1237 private WriteAndFlushTask(ObjectPool.Handle<WriteAndFlushTask> handle) {
1238 super(handle);
1239 }
1240
1241 @Override
1242 protected DefaultChannelHandlerContext findContext(DefaultChannelHandlerContext ctx) {
1243 return ctx.findContextOutbound(MASK_WRITE | MASK_FLUSH);
1244 }
1245
1246 @Override
1247 public void write(DefaultChannelHandlerContext ctx, Object msg, Promise<Void> promise) {
1248 super.write(ctx, msg, promise);
1249 ctx.invokeFlush();
1250 }
1251 }
1252
1253 private static final class Tasks {
1254 private final Runnable invokeChannelReadCompleteTask;
1255 private final Runnable invokeReadTask;
1256 private final Runnable invokeChannelWritableStateChangedTask;
1257 private final Runnable invokeFlushTask;
1258
1259 Tasks(DefaultChannelHandlerContext ctx) {
1260 invokeChannelReadCompleteTask = ctx::findAndInvokeChannelReadComplete;
1261 invokeReadTask = ctx::findAndInvokeRead;
1262 invokeChannelWritableStateChangedTask = ctx::invokeChannelWritabilityChanged;
1263 invokeFlushTask = ctx::findAndInvokeFlush;
1264 }
1265 }
1266
1267 private boolean saveCurrentPendingBytesIfNeededInbound() {
1268 IllegalStateException e = saveCurrentPendingBytesIfNeeded();
1269 if (e != null) {
1270 logger.error(e);
1271 return false;
1272 }
1273 return true;
1274 }
1275
1276 private Future<Void> saveCurrentPendingBytesIfNeededOutbound() {
1277 IllegalStateException e = saveCurrentPendingBytesIfNeeded();
1278 if (e != null) {
1279 logger.error(e);
1280 return newFailedFuture(e);
1281 }
1282 return null;
1283 }
1284
1285 private IllegalStateException saveCurrentPendingBytesIfNeeded() {
1286 if (!handlesPendingOutboundBytes(executionMask)) {
1287 assert currentPendingBytes == 0;
1288 return null;
1289 }
1290
1291
1292 if (currentPendingBytes == -1) {
1293 try {
1294 currentPendingBytes = pendingOutboundBytes();
1295 } catch (IllegalStateException e) {
1296 return e;
1297 }
1298 }
1299 return null;
1300 }
1301
1302 private long pendingOutboundBytes() {
1303 long pending = handler().pendingOutboundBytes(this);
1304 if (pending < 0) {
1305 pipeline.forceCloseTransport();
1306 String message = StringUtil.simpleClassName(handler.getClass()) +
1307 ".pendingOutboundBytes(ChannelHandlerContext) returned a negative value: " +
1308 pending + ". Force closed transport.";
1309 throw new IllegalStateException(message);
1310 }
1311 return pending;
1312 }
1313
1314 private void updatePendingBytesIfNeeded() {
1315 if (!handlesPendingOutboundBytes(executionMask)) {
1316 assert currentPendingBytes == 0;
1317 return;
1318 }
1319 long current = currentPendingBytes;
1320 if (current == -1) {
1321 return;
1322 }
1323 this.currentPendingBytes = -1;
1324 try {
1325 long newPendingBytes = pendingOutboundBytes();
1326 long delta = current - newPendingBytes;
1327 if (delta == 0) {
1328
1329 return;
1330 }
1331 if (delta > 0) {
1332 pipeline.decrementPendingOutboundBytes(delta);
1333 } else {
1334 pipeline.incrementPendingOutboundBytes(-delta);
1335 }
1336 } catch (IllegalStateException e) {
1337 logger.error(e);
1338 }
1339 }
1340
1341 private static final class DefaultChannelHandlerContextAwareEventExecutor implements EventExecutor {
1342
1343 private final EventExecutor executor;
1344 private final DefaultChannelHandlerContext ctx;
1345
1346 DefaultChannelHandlerContextAwareEventExecutor(EventExecutor executor, DefaultChannelHandlerContext ctx) {
1347 this.executor = executor;
1348 this.ctx = ctx;
1349 }
1350
1351 EventExecutor wrappedExecutor() {
1352 return executor;
1353 }
1354
1355 @Override
1356 public boolean inEventLoop() {
1357 return executor.inEventLoop();
1358 }
1359
1360 @Override
1361 public boolean inEventLoop(Thread thread) {
1362 return executor.inEventLoop(thread);
1363 }
1364
1365 @Override
1366 public boolean isShuttingDown() {
1367 return executor.isShuttingDown();
1368 }
1369
1370 @Override
1371 public boolean isShutdown() {
1372 return executor.isShutdown();
1373 }
1374
1375 @Override
1376 public boolean isTerminated() {
1377 return executor.isTerminated();
1378 }
1379
1380 @Override
1381 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
1382 return executor.awaitTermination(timeout, unit);
1383 }
1384
1385 @Override
1386 public Future<Void> shutdownGracefully() {
1387 return executor.shutdownGracefully();
1388 }
1389
1390 @Override
1391 public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
1392 return executor.shutdownGracefully(quietPeriod, timeout, unit);
1393 }
1394
1395 @Override
1396 public Future<Void> terminationFuture() {
1397 return executor.terminationFuture();
1398 }
1399
1400 @Override
1401 public Future<Void> submit(Runnable task) {
1402 return executor.submit(new DefaultHandlerContextRunnable(task));
1403 }
1404
1405 @Override
1406 public <T> Future<T> submit(Runnable task, T result) {
1407 return executor.submit(new DefaultHandlerContextRunnable(task), result);
1408 }
1409
1410 @Override
1411 public <T> Future<T> submit(Callable<T> task) {
1412 return executor.submit(new DefaultHandlerContextCallable<>(task));
1413 }
1414
1415 @Override
1416 public Future<Void> schedule(Runnable task, long delay, TimeUnit unit) {
1417 return executor.schedule(new DefaultHandlerContextRunnable(task), delay, unit);
1418 }
1419
1420 @Override
1421 public <V> Future<V> schedule(Callable<V> task, long delay, TimeUnit unit) {
1422 return executor.schedule(new DefaultHandlerContextCallable<>(task), delay, unit);
1423 }
1424
1425 @Override
1426 public Future<Void> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) {
1427 return executor.scheduleAtFixedRate(
1428 new DefaultHandlerContextRunnable(task), initialDelay, period, unit);
1429 }
1430
1431 @Override
1432 public Future<Void> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) {
1433 return executor.scheduleWithFixedDelay(
1434 new DefaultHandlerContextRunnable(task), initialDelay, delay, unit);
1435 }
1436
1437 @Override
1438 public void execute(Runnable task) {
1439 executor.execute(new DefaultHandlerContextRunnable(task));
1440 }
1441
1442 private final class DefaultHandlerContextCallable<V> implements Callable<V> {
1443
1444 private final Callable<V> task;
1445
1446 DefaultHandlerContextCallable(Callable<V> task) {
1447 this.task = task;
1448 }
1449
1450 @Override
1451 public V call() throws Exception {
1452 IllegalStateException e = ctx.saveCurrentPendingBytesIfNeeded();
1453 try {
1454 V value = null;
1455 try {
1456 value = task.call();
1457 return value;
1458 } finally {
1459 if (e != null) {
1460 Resource.dispose(value);
1461 logger.error(e);
1462 throw e;
1463 }
1464 }
1465 } finally {
1466 if (e == null) {
1467 ctx.updatePendingBytesIfNeeded();
1468 }
1469 }
1470 }
1471 }
1472
1473 private final class DefaultHandlerContextRunnable implements Runnable {
1474 private final Runnable task;
1475 DefaultHandlerContextRunnable(Runnable task) {
1476 this.task = task;
1477 }
1478
1479 @Override
1480 public void run() {
1481 IllegalStateException e = ctx.saveCurrentPendingBytesIfNeeded();
1482 try {
1483 task.run();
1484 if (e != null) {
1485 logger.error(e);
1486 throw e;
1487 }
1488 } finally {
1489 if (e == null) {
1490 ctx.updatePendingBytesIfNeeded();
1491 }
1492 }
1493 }
1494 }
1495 }
1496 }