1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.DefaultAttributeMap;
20 import io.netty.util.Recycler;
21 import io.netty.util.ReferenceCountUtil;
22 import io.netty.util.concurrent.EventExecutor;
23 import io.netty.util.concurrent.OrderedEventExecutor;
24 import io.netty.util.internal.PromiseNotificationUtil;
25 import io.netty.util.internal.ThrowableUtil;
26 import io.netty.util.internal.ObjectUtil;
27 import io.netty.util.internal.StringUtil;
28 import io.netty.util.internal.SystemPropertyUtil;
29
30 import java.net.SocketAddress;
31 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
32
33 import static io.netty.channel.DefaultChannelPipeline.*;
34
35 abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
36
37 volatile AbstractChannelHandlerContext next;
38 volatile AbstractChannelHandlerContext prev;
39
40 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
41 AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
42
43
44
45
46 private static final int ADD_PENDING = 1;
47
48
49
50 private static final int ADD_COMPLETE = 2;
51
52
53
54 private static final int REMOVE_COMPLETE = 3;
55
56
57
58
59 private static final int INIT = 0;
60
61 private final boolean inbound;
62 private final boolean outbound;
63 private final DefaultChannelPipeline pipeline;
64 private final String name;
65 private final boolean ordered;
66
67
68
69 final EventExecutor executor;
70 private ChannelFuture succeededFuture;
71
72
73
74 private Runnable invokeChannelReadCompleteTask;
75 private Runnable invokeReadTask;
76 private Runnable invokeChannelWritableStateChangedTask;
77 private Runnable invokeFlushTask;
78
79 private volatile int handlerState = INIT;
80
81 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
82 boolean inbound, boolean outbound) {
83 this.name = ObjectUtil.checkNotNull(name, "name");
84 this.pipeline = pipeline;
85 this.executor = executor;
86 this.inbound = inbound;
87 this.outbound = outbound;
88
89 ordered = executor == null || executor instanceof OrderedEventExecutor;
90 }
91
92 @Override
93 public Channel channel() {
94 return pipeline.channel();
95 }
96
97 @Override
98 public ChannelPipeline pipeline() {
99 return pipeline;
100 }
101
102 @Override
103 public ByteBufAllocator alloc() {
104 return channel().config().getAllocator();
105 }
106
107 @Override
108 public EventExecutor executor() {
109 if (executor == null) {
110 return channel().eventLoop();
111 } else {
112 return executor;
113 }
114 }
115
116 @Override
117 public String name() {
118 return name;
119 }
120
121 @Override
122 public ChannelHandlerContext fireChannelRegistered() {
123 invokeChannelRegistered(findContextInbound());
124 return this;
125 }
126
127 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
128 EventExecutor executor = next.executor();
129 if (executor.inEventLoop()) {
130 next.invokeChannelRegistered();
131 } else {
132 executor.execute(new Runnable() {
133 @Override
134 public void run() {
135 next.invokeChannelRegistered();
136 }
137 });
138 }
139 }
140
141 private void invokeChannelRegistered() {
142 if (invokeHandler()) {
143 try {
144 ((ChannelInboundHandler) handler()).channelRegistered(this);
145 } catch (Throwable t) {
146 notifyHandlerException(t);
147 }
148 } else {
149 fireChannelRegistered();
150 }
151 }
152
153 @Override
154 public ChannelHandlerContext fireChannelUnregistered() {
155 invokeChannelUnregistered(findContextInbound());
156 return this;
157 }
158
159 static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
160 EventExecutor executor = next.executor();
161 if (executor.inEventLoop()) {
162 next.invokeChannelUnregistered();
163 } else {
164 executor.execute(new Runnable() {
165 @Override
166 public void run() {
167 next.invokeChannelUnregistered();
168 }
169 });
170 }
171 }
172
173 private void invokeChannelUnregistered() {
174 if (invokeHandler()) {
175 try {
176 ((ChannelInboundHandler) handler()).channelUnregistered(this);
177 } catch (Throwable t) {
178 notifyHandlerException(t);
179 }
180 } else {
181 fireChannelUnregistered();
182 }
183 }
184
185 @Override
186 public ChannelHandlerContext fireChannelActive() {
187 invokeChannelActive(findContextInbound());
188 return this;
189 }
190
191 static void invokeChannelActive(final AbstractChannelHandlerContext next) {
192 EventExecutor executor = next.executor();
193 if (executor.inEventLoop()) {
194 next.invokeChannelActive();
195 } else {
196 executor.execute(new Runnable() {
197 @Override
198 public void run() {
199 next.invokeChannelActive();
200 }
201 });
202 }
203 }
204
205 private void invokeChannelActive() {
206 if (invokeHandler()) {
207 try {
208 ((ChannelInboundHandler) handler()).channelActive(this);
209 } catch (Throwable t) {
210 notifyHandlerException(t);
211 }
212 } else {
213 fireChannelActive();
214 }
215 }
216
217 @Override
218 public ChannelHandlerContext fireChannelInactive() {
219 invokeChannelInactive(findContextInbound());
220 return this;
221 }
222
223 static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
224 EventExecutor executor = next.executor();
225 if (executor.inEventLoop()) {
226 next.invokeChannelInactive();
227 } else {
228 executor.execute(new Runnable() {
229 @Override
230 public void run() {
231 next.invokeChannelInactive();
232 }
233 });
234 }
235 }
236
237 private void invokeChannelInactive() {
238 if (invokeHandler()) {
239 try {
240 ((ChannelInboundHandler) handler()).channelInactive(this);
241 } catch (Throwable t) {
242 notifyHandlerException(t);
243 }
244 } else {
245 fireChannelInactive();
246 }
247 }
248
249 @Override
250 public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
251 invokeExceptionCaught(next, cause);
252 return this;
253 }
254
255 static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
256 ObjectUtil.checkNotNull(cause, "cause");
257 EventExecutor executor = next.executor();
258 if (executor.inEventLoop()) {
259 next.invokeExceptionCaught(cause);
260 } else {
261 try {
262 executor.execute(new Runnable() {
263 @Override
264 public void run() {
265 next.invokeExceptionCaught(cause);
266 }
267 });
268 } catch (Throwable t) {
269 if (logger.isWarnEnabled()) {
270 logger.warn("Failed to submit an exceptionCaught() event.", t);
271 logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
272 }
273 }
274 }
275 }
276
277 private void invokeExceptionCaught(final Throwable cause) {
278 if (invokeHandler()) {
279 try {
280 handler().exceptionCaught(this, cause);
281 } catch (Throwable error) {
282 if (logger.isDebugEnabled()) {
283 logger.debug(
284 "An exception {}" +
285 "was thrown by a user handler's exceptionCaught() " +
286 "method while handling the following exception:",
287 ThrowableUtil.stackTraceToString(error), cause);
288 } else if (logger.isWarnEnabled()) {
289 logger.warn(
290 "An exception '{}' [enable DEBUG level for full stacktrace] " +
291 "was thrown by a user handler's exceptionCaught() " +
292 "method while handling the following exception:", error, cause);
293 }
294 }
295 } else {
296 fireExceptionCaught(cause);
297 }
298 }
299
300 @Override
301 public ChannelHandlerContext fireUserEventTriggered(final Object event) {
302 invokeUserEventTriggered(findContextInbound(), event);
303 return this;
304 }
305
306 static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
307 ObjectUtil.checkNotNull(event, "event");
308 EventExecutor executor = next.executor();
309 if (executor.inEventLoop()) {
310 next.invokeUserEventTriggered(event);
311 } else {
312 executor.execute(new Runnable() {
313 @Override
314 public void run() {
315 next.invokeUserEventTriggered(event);
316 }
317 });
318 }
319 }
320
321 private void invokeUserEventTriggered(Object event) {
322 if (invokeHandler()) {
323 try {
324 ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
325 } catch (Throwable t) {
326 notifyHandlerException(t);
327 }
328 } else {
329 fireUserEventTriggered(event);
330 }
331 }
332
333 @Override
334 public ChannelHandlerContext fireChannelRead(final Object msg) {
335 invokeChannelRead(findContextInbound(), msg);
336 return this;
337 }
338 static void invokeChannelRead(final AbstractChannelHandlerContext next, final Object msg) {
339 ObjectUtil.checkNotNull(msg, "msg");
340 EventExecutor executor = next.executor();
341 if (executor.inEventLoop()) {
342 next.invokeChannelRead(msg);
343 } else {
344 executor.execute(new Runnable() {
345 @Override
346 public void run() {
347 next.invokeChannelRead(msg);
348 }
349 });
350 }
351 }
352
353 private void invokeChannelRead(Object msg) {
354 if (invokeHandler()) {
355 try {
356 ((ChannelInboundHandler) handler()).channelRead(this, msg);
357 } catch (Throwable t) {
358 notifyHandlerException(t);
359 }
360 } else {
361 fireChannelRead(msg);
362 }
363 }
364
365 @Override
366 public ChannelHandlerContext fireChannelReadComplete() {
367 invokeChannelReadComplete(findContextInbound());
368 return this;
369 }
370
371 static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
372 EventExecutor executor = next.executor();
373 if (executor.inEventLoop()) {
374 next.invokeChannelReadComplete();
375 } else {
376 Runnable task = next.invokeChannelReadCompleteTask;
377 if (task == null) {
378 next.invokeChannelReadCompleteTask = task = new Runnable() {
379 @Override
380 public void run() {
381 next.invokeChannelReadComplete();
382 }
383 };
384 }
385 executor.execute(task);
386 }
387 }
388
389 private void invokeChannelReadComplete() {
390 if (invokeHandler()) {
391 try {
392 ((ChannelInboundHandler) handler()).channelReadComplete(this);
393 } catch (Throwable t) {
394 notifyHandlerException(t);
395 }
396 } else {
397 fireChannelReadComplete();
398 }
399 }
400
401 @Override
402 public ChannelHandlerContext fireChannelWritabilityChanged() {
403 invokeChannelWritabilityChanged(findContextInbound());
404 return this;
405 }
406
407 static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
408 EventExecutor executor = next.executor();
409 if (executor.inEventLoop()) {
410 next.invokeChannelWritabilityChanged();
411 } else {
412 Runnable task = next.invokeChannelWritableStateChangedTask;
413 if (task == null) {
414 next.invokeChannelWritableStateChangedTask = task = new Runnable() {
415 @Override
416 public void run() {
417 next.invokeChannelWritabilityChanged();
418 }
419 };
420 }
421 executor.execute(task);
422 }
423 }
424
425 private void invokeChannelWritabilityChanged() {
426 if (invokeHandler()) {
427 try {
428 ((ChannelInboundHandler) handler()).channelWritabilityChanged(this);
429 } catch (Throwable t) {
430 notifyHandlerException(t);
431 }
432 } else {
433 fireChannelWritabilityChanged();
434 }
435 }
436
437 @Override
438 public ChannelFuture bind(SocketAddress localAddress) {
439 return bind(localAddress, newPromise());
440 }
441
442 @Override
443 public ChannelFuture connect(SocketAddress remoteAddress) {
444 return connect(remoteAddress, newPromise());
445 }
446
447 @Override
448 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
449 return connect(remoteAddress, localAddress, newPromise());
450 }
451
452 @Override
453 public ChannelFuture disconnect() {
454 return disconnect(newPromise());
455 }
456
457 @Override
458 public ChannelFuture close() {
459 return close(newPromise());
460 }
461
462 @Override
463 public ChannelFuture deregister() {
464 return deregister(newPromise());
465 }
466
467 @Override
468 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
469 if (localAddress == null) {
470 throw new NullPointerException("localAddress");
471 }
472 if (isNotValidPromise(promise, false)) {
473
474 return promise;
475 }
476
477 final AbstractChannelHandlerContext next = findContextOutbound();
478 EventExecutor executor = next.executor();
479 if (executor.inEventLoop()) {
480 next.invokeBind(localAddress, promise);
481 } else {
482 safeExecute(executor, new Runnable() {
483 @Override
484 public void run() {
485 next.invokeBind(localAddress, promise);
486 }
487 }, promise, null);
488 }
489 return promise;
490 }
491
492 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
493 if (invokeHandler()) {
494 try {
495 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
496 } catch (Throwable t) {
497 notifyOutboundHandlerException(t, promise);
498 }
499 } else {
500 bind(localAddress, promise);
501 }
502 }
503
504 @Override
505 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
506 return connect(remoteAddress, null, promise);
507 }
508
509 @Override
510 public ChannelFuture connect(
511 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
512
513 if (remoteAddress == null) {
514 throw new NullPointerException("remoteAddress");
515 }
516 if (isNotValidPromise(promise, false)) {
517
518 return promise;
519 }
520
521 final AbstractChannelHandlerContext next = findContextOutbound();
522 EventExecutor executor = next.executor();
523 if (executor.inEventLoop()) {
524 next.invokeConnect(remoteAddress, localAddress, promise);
525 } else {
526 safeExecute(executor, new Runnable() {
527 @Override
528 public void run() {
529 next.invokeConnect(remoteAddress, localAddress, promise);
530 }
531 }, promise, null);
532 }
533 return promise;
534 }
535
536 private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
537 if (invokeHandler()) {
538 try {
539 ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
540 } catch (Throwable t) {
541 notifyOutboundHandlerException(t, promise);
542 }
543 } else {
544 connect(remoteAddress, localAddress, promise);
545 }
546 }
547
548 @Override
549 public ChannelFuture disconnect(final ChannelPromise promise) {
550 if (isNotValidPromise(promise, false)) {
551
552 return promise;
553 }
554
555 final AbstractChannelHandlerContext next = findContextOutbound();
556 EventExecutor executor = next.executor();
557 if (executor.inEventLoop()) {
558
559
560 if (!channel().metadata().hasDisconnect()) {
561 next.invokeClose(promise);
562 } else {
563 next.invokeDisconnect(promise);
564 }
565 } else {
566 safeExecute(executor, new Runnable() {
567 @Override
568 public void run() {
569 if (!channel().metadata().hasDisconnect()) {
570 next.invokeClose(promise);
571 } else {
572 next.invokeDisconnect(promise);
573 }
574 }
575 }, promise, null);
576 }
577 return promise;
578 }
579
580 private void invokeDisconnect(ChannelPromise promise) {
581 if (invokeHandler()) {
582 try {
583 ((ChannelOutboundHandler) handler()).disconnect(this, promise);
584 } catch (Throwable t) {
585 notifyOutboundHandlerException(t, promise);
586 }
587 } else {
588 disconnect(promise);
589 }
590 }
591
592 @Override
593 public ChannelFuture close(final ChannelPromise promise) {
594 if (isNotValidPromise(promise, false)) {
595
596 return promise;
597 }
598
599 final AbstractChannelHandlerContext next = findContextOutbound();
600 EventExecutor executor = next.executor();
601 if (executor.inEventLoop()) {
602 next.invokeClose(promise);
603 } else {
604 safeExecute(executor, new Runnable() {
605 @Override
606 public void run() {
607 next.invokeClose(promise);
608 }
609 }, promise, null);
610 }
611
612 return promise;
613 }
614
615 private void invokeClose(ChannelPromise promise) {
616 if (invokeHandler()) {
617 try {
618 ((ChannelOutboundHandler) handler()).close(this, promise);
619 } catch (Throwable t) {
620 notifyOutboundHandlerException(t, promise);
621 }
622 } else {
623 close(promise);
624 }
625 }
626
627 @Override
628 public ChannelFuture deregister(final ChannelPromise promise) {
629 if (isNotValidPromise(promise, false)) {
630
631 return promise;
632 }
633
634 final AbstractChannelHandlerContext next = findContextOutbound();
635 EventExecutor executor = next.executor();
636 if (executor.inEventLoop()) {
637 next.invokeDeregister(promise);
638 } else {
639 safeExecute(executor, new Runnable() {
640 @Override
641 public void run() {
642 next.invokeDeregister(promise);
643 }
644 }, promise, null);
645 }
646
647 return promise;
648 }
649
650 private void invokeDeregister(ChannelPromise promise) {
651 if (invokeHandler()) {
652 try {
653 ((ChannelOutboundHandler) handler()).deregister(this, promise);
654 } catch (Throwable t) {
655 notifyOutboundHandlerException(t, promise);
656 }
657 } else {
658 deregister(promise);
659 }
660 }
661
662 @Override
663 public ChannelHandlerContext read() {
664 final AbstractChannelHandlerContext next = findContextOutbound();
665 EventExecutor executor = next.executor();
666 if (executor.inEventLoop()) {
667 next.invokeRead();
668 } else {
669 Runnable task = next.invokeReadTask;
670 if (task == null) {
671 next.invokeReadTask = task = new Runnable() {
672 @Override
673 public void run() {
674 next.invokeRead();
675 }
676 };
677 }
678 executor.execute(task);
679 }
680
681 return this;
682 }
683
684 private void invokeRead() {
685 if (invokeHandler()) {
686 try {
687 ((ChannelOutboundHandler) handler()).read(this);
688 } catch (Throwable t) {
689 notifyHandlerException(t);
690 }
691 } else {
692 read();
693 }
694 }
695
696 @Override
697 public ChannelFuture write(Object msg) {
698 return write(msg, newPromise());
699 }
700
701 @Override
702 public ChannelFuture write(final Object msg, final ChannelPromise promise) {
703 if (msg == null) {
704 throw new NullPointerException("msg");
705 }
706
707 try {
708 if (isNotValidPromise(promise, true)) {
709 ReferenceCountUtil.release(msg);
710
711 return promise;
712 }
713 } catch (RuntimeException e) {
714 ReferenceCountUtil.release(msg);
715 throw e;
716 }
717 write(msg, false, promise);
718
719 return promise;
720 }
721
722 private void invokeWrite(Object msg, ChannelPromise promise) {
723 if (invokeHandler()) {
724 invokeWrite0(msg, promise);
725 } else {
726 write(msg, promise);
727 }
728 }
729
730 private void invokeWrite0(Object msg, ChannelPromise promise) {
731 try {
732 ((ChannelOutboundHandler) handler()).write(this, msg, promise);
733 } catch (Throwable t) {
734 notifyOutboundHandlerException(t, promise);
735 }
736 }
737
738 @Override
739 public ChannelHandlerContext flush() {
740 final AbstractChannelHandlerContext next = findContextOutbound();
741 EventExecutor executor = next.executor();
742 if (executor.inEventLoop()) {
743 next.invokeFlush();
744 } else {
745 Runnable task = next.invokeFlushTask;
746 if (task == null) {
747 next.invokeFlushTask = task = new Runnable() {
748 @Override
749 public void run() {
750 next.invokeFlush();
751 }
752 };
753 }
754 safeExecute(executor, task, channel().voidPromise(), null);
755 }
756
757 return this;
758 }
759
760 private void invokeFlush() {
761 if (invokeHandler()) {
762 invokeFlush0();
763 } else {
764 flush();
765 }
766 }
767
768 private void invokeFlush0() {
769 try {
770 ((ChannelOutboundHandler) handler()).flush(this);
771 } catch (Throwable t) {
772 notifyHandlerException(t);
773 }
774 }
775
776 @Override
777 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
778 if (msg == null) {
779 throw new NullPointerException("msg");
780 }
781
782 if (isNotValidPromise(promise, true)) {
783 ReferenceCountUtil.release(msg);
784
785 return promise;
786 }
787
788 write(msg, true, promise);
789
790 return promise;
791 }
792
793 private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
794 if (invokeHandler()) {
795 invokeWrite0(msg, promise);
796 invokeFlush0();
797 } else {
798 writeAndFlush(msg, promise);
799 }
800 }
801
802 private void write(Object msg, boolean flush, ChannelPromise promise) {
803 AbstractChannelHandlerContext next = findContextOutbound();
804 EventExecutor executor = next.executor();
805 if (executor.inEventLoop()) {
806 if (flush) {
807 next.invokeWriteAndFlush(msg, promise);
808 } else {
809 next.invokeWrite(msg, promise);
810 }
811 } else {
812 AbstractWriteTask task;
813 if (flush) {
814 task = WriteAndFlushTask.newInstance(next, msg, promise);
815 } else {
816 task = WriteTask.newInstance(next, msg, promise);
817 }
818 safeExecute(executor, task, promise, msg);
819 }
820 }
821
822 @Override
823 public ChannelFuture writeAndFlush(Object msg) {
824 return writeAndFlush(msg, newPromise());
825 }
826
827 private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
828
829
830 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
831 }
832
833 private void notifyHandlerException(Throwable cause) {
834 if (inExceptionCaught(cause)) {
835 if (logger.isWarnEnabled()) {
836 logger.warn(
837 "An exception was thrown by a user handler " +
838 "while handling an exceptionCaught event", cause);
839 }
840 return;
841 }
842
843 invokeExceptionCaught(cause);
844 }
845
846 private static boolean inExceptionCaught(Throwable cause) {
847 do {
848 StackTraceElement[] trace = cause.getStackTrace();
849 if (trace != null) {
850 for (StackTraceElement t : trace) {
851 if (t == null) {
852 break;
853 }
854 if ("exceptionCaught".equals(t.getMethodName())) {
855 return true;
856 }
857 }
858 }
859
860 cause = cause.getCause();
861 } while (cause != null);
862
863 return false;
864 }
865
866 @Override
867 public ChannelPromise newPromise() {
868 return new DefaultChannelPromise(channel(), executor());
869 }
870
871 @Override
872 public ChannelProgressivePromise newProgressivePromise() {
873 return new DefaultChannelProgressivePromise(channel(), executor());
874 }
875
876 @Override
877 public ChannelFuture newSucceededFuture() {
878 ChannelFuture succeededFuture = this.succeededFuture;
879 if (succeededFuture == null) {
880 this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
881 }
882 return succeededFuture;
883 }
884
885 @Override
886 public ChannelFuture newFailedFuture(Throwable cause) {
887 return new FailedChannelFuture(channel(), executor(), cause);
888 }
889
890 private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
891 if (promise == null) {
892 throw new NullPointerException("promise");
893 }
894
895 if (promise.isDone()) {
896
897
898
899
900 if (promise.isCancelled()) {
901 return true;
902 }
903 throw new IllegalArgumentException("promise already done: " + promise);
904 }
905
906 if (promise.channel() != channel()) {
907 throw new IllegalArgumentException(String.format(
908 "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
909 }
910
911 if (promise.getClass() == DefaultChannelPromise.class) {
912 return false;
913 }
914
915 if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
916 throw new IllegalArgumentException(
917 StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
918 }
919
920 if (promise instanceof AbstractChannel.CloseFuture) {
921 throw new IllegalArgumentException(
922 StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
923 }
924 return false;
925 }
926
927 private AbstractChannelHandlerContext findContextInbound() {
928 AbstractChannelHandlerContext ctx = this;
929 do {
930 ctx = ctx.next;
931 } while (!ctx.inbound);
932 return ctx;
933 }
934
935 private AbstractChannelHandlerContext findContextOutbound() {
936 AbstractChannelHandlerContext ctx = this;
937 do {
938 ctx = ctx.prev;
939 } while (!ctx.outbound);
940 return ctx;
941 }
942
943 @Override
944 public ChannelPromise voidPromise() {
945 return channel().voidPromise();
946 }
947
948 final void setRemoved() {
949 handlerState = REMOVE_COMPLETE;
950 }
951
952 final void setAddComplete() {
953 for (;;) {
954 int oldState = handlerState;
955
956
957
958 if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
959 return;
960 }
961 }
962 }
963
964 final void setAddPending() {
965 boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
966 assert updated;
967 }
968
969
970
971
972
973
974
975
976
977 private boolean invokeHandler() {
978
979 int handlerState = this.handlerState;
980 return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
981 }
982
983 @Override
984 public boolean isRemoved() {
985 return handlerState == REMOVE_COMPLETE;
986 }
987
988 private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
989 try {
990 executor.execute(runnable);
991 } catch (Throwable cause) {
992 try {
993 promise.setFailure(cause);
994 } finally {
995 if (msg != null) {
996 ReferenceCountUtil.release(msg);
997 }
998 }
999 }
1000 }
1001
1002 abstract static class AbstractWriteTask implements Runnable {
1003
1004 private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1005 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1006
1007
1008 private static final int WRITE_TASK_OVERHEAD =
1009 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
1010
1011 private final Recycler.Handle handle;
1012 private AbstractChannelHandlerContext ctx;
1013 private Object msg;
1014 private ChannelPromise promise;
1015 private int size;
1016
1017 private AbstractWriteTask(Recycler.Handle handle) {
1018 this.handle = handle;
1019 }
1020
1021 protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
1022 Object msg, ChannelPromise promise) {
1023 task.ctx = ctx;
1024 task.msg = msg;
1025 task.promise = promise;
1026
1027 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1028 ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
1029
1030
1031 if (buffer != null) {
1032 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1033 buffer.incrementPendingOutboundBytes(task.size);
1034 } else {
1035 task.size = 0;
1036 }
1037 } else {
1038 task.size = 0;
1039 }
1040 }
1041
1042 @Override
1043 public final void run() {
1044 try {
1045 ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
1046
1047 if (ESTIMATE_TASK_SIZE_ON_SUBMIT && buffer != null) {
1048 buffer.decrementPendingOutboundBytes(size);
1049 }
1050 write(ctx, msg, promise);
1051 } finally {
1052
1053 ctx = null;
1054 msg = null;
1055 promise = null;
1056 recycle(handle);
1057 }
1058 }
1059
1060 protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1061 ctx.invokeWrite(msg, promise);
1062 }
1063
1064 protected abstract void recycle(Recycler.Handle handle);
1065 }
1066
1067 static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
1068
1069 private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
1070 @Override
1071 protected WriteTask newObject(Handle handle) {
1072 return new WriteTask(handle);
1073 }
1074 };
1075
1076 private static WriteTask newInstance(
1077 AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1078 WriteTask task = RECYCLER.get();
1079 init(task, ctx, msg, promise);
1080 return task;
1081 }
1082
1083 private WriteTask(Recycler.Handle handle) {
1084 super(handle);
1085 }
1086
1087 @Override
1088 protected void recycle(Recycler.Handle handle) {
1089 RECYCLER.recycle(this, handle);
1090 }
1091 }
1092
1093 static final class WriteAndFlushTask extends AbstractWriteTask {
1094
1095 private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
1096 @Override
1097 protected WriteAndFlushTask newObject(Handle handle) {
1098 return new WriteAndFlushTask(handle);
1099 }
1100 };
1101
1102 private static WriteAndFlushTask newInstance(
1103 AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1104 WriteAndFlushTask task = RECYCLER.get();
1105 init(task, ctx, msg, promise);
1106 return task;
1107 }
1108
1109 private WriteAndFlushTask(Recycler.Handle handle) {
1110 super(handle);
1111 }
1112
1113 @Override
1114 public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
1115 super.write(ctx, msg, promise);
1116 ctx.invokeFlush();
1117 }
1118
1119 @Override
1120 protected void recycle(Recycler.Handle handle) {
1121 RECYCLER.recycle(this, handle);
1122 }
1123 }
1124 }