1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.Recycler;
22 import io.netty.util.ReferenceCountUtil;
23 import io.netty.util.ResourceLeakHint;
24 import io.netty.util.concurrent.AbstractEventExecutor;
25 import io.netty.util.concurrent.EventExecutor;
26 import io.netty.util.concurrent.OrderedEventExecutor;
27 import io.netty.util.internal.ObjectPool.Handle;
28 import io.netty.util.internal.ObjectUtil;
29 import io.netty.util.internal.PromiseNotificationUtil;
30 import io.netty.util.internal.StringUtil;
31 import io.netty.util.internal.SystemPropertyUtil;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.net.SocketAddress;
36 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
37
38 import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
39 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
40 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
41 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
42 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
43 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
44 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
45 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
46 import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
47 import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
48 import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
49 import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
50 import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
51 import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
52 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
53 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
54 import static io.netty.channel.ChannelHandlerMask.MASK_READ;
55 import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
56 import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
57 import static io.netty.channel.ChannelHandlerMask.mask;
58
59 abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
60
61 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
62 volatile AbstractChannelHandlerContext next;
63 volatile AbstractChannelHandlerContext prev;
64
65 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
66 AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
67
68
69
70
71 private static final int ADD_PENDING = 1;
72
73
74
75 private static final int ADD_COMPLETE = 2;
76
77
78
79 private static final int REMOVE_COMPLETE = 3;
80
81
82
83
84 private static final int INIT = 0;
85
86 private final DefaultChannelPipeline pipeline;
87 private final String name;
88 private final boolean ordered;
89 private final int executionMask;
90
91
92
93 final EventExecutor childExecutor;
94
95
96
97
98 EventExecutor contextExecutor;
99 private ChannelFuture succeededFuture;
100
101
102
103 private Tasks invokeTasks;
104
105 private volatile int handlerState = INIT;
106
107 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
108 String name, Class<? extends ChannelHandler> handlerClass) {
109 this.name = ObjectUtil.checkNotNull(name, "name");
110 this.pipeline = pipeline;
111 childExecutor = executor;
112 executionMask = mask(handlerClass);
113
114 ordered = executor == null || executor instanceof OrderedEventExecutor;
115 }
116
117 @Override
118 public Channel channel() {
119 return pipeline.channel();
120 }
121
122 @Override
123 public ChannelPipeline pipeline() {
124 return pipeline;
125 }
126
127 @Override
128 public ByteBufAllocator alloc() {
129 return channel().config().getAllocator();
130 }
131
132 @Override
133 public EventExecutor executor() {
134 EventExecutor ex = contextExecutor;
135 if (ex == null) {
136 contextExecutor = ex = childExecutor != null ? childExecutor : channel().eventLoop();
137 }
138 return ex;
139 }
140
141 @Override
142 public String name() {
143 return name;
144 }
145
146 @Override
147 public ChannelHandlerContext fireChannelRegistered() {
148 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
149 if (next.executor().inEventLoop()) {
150 if (next.invokeHandler()) {
151 try {
152
153
154
155 final ChannelHandler handler = next.handler();
156 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
157 if (handler == headContext) {
158 headContext.channelRegistered(next);
159 } else if (handler instanceof ChannelInboundHandlerAdapter) {
160 ((ChannelInboundHandlerAdapter) handler).channelRegistered(next);
161 } else {
162 ((ChannelInboundHandler) handler).channelRegistered(next);
163 }
164 } catch (Throwable t) {
165 next.invokeExceptionCaught(t);
166 }
167 } else {
168 next.fireChannelRegistered();
169 }
170 } else {
171 next.executor().execute(this::fireChannelRegistered);
172 }
173 return this;
174 }
175
176 @Override
177 public ChannelHandlerContext fireChannelUnregistered() {
178 final AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED);
179 if (next.executor().inEventLoop()) {
180 if (next.invokeHandler()) {
181 try {
182
183
184
185 final ChannelHandler handler = next.handler();
186 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
187 if (handler == headContext) {
188 headContext.channelUnregistered(next);
189 } else if (handler instanceof ChannelInboundHandlerAdapter) {
190 ((ChannelInboundHandlerAdapter) handler).channelUnregistered(next);
191 } else {
192 ((ChannelInboundHandler) handler).channelUnregistered(next);
193 }
194 } catch (Throwable t) {
195 next.invokeExceptionCaught(t);
196 }
197 } else {
198 next.fireChannelUnregistered();
199 }
200 } else {
201 next.executor().execute(this::fireChannelUnregistered);
202 }
203 return this;
204 }
205
206 @Override
207 public ChannelHandlerContext fireChannelActive() {
208 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE);
209 if (next.executor().inEventLoop()) {
210 if (next.invokeHandler()) {
211 try {
212
213
214
215 final ChannelHandler handler = next.handler();
216 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
217 if (handler == headContext) {
218 headContext.channelActive(next);
219 } else if (handler instanceof ChannelInboundHandlerAdapter) {
220 ((ChannelInboundHandlerAdapter) handler).channelActive(next);
221 } else {
222 ((ChannelInboundHandler) handler).channelActive(next);
223 }
224 } catch (Throwable t) {
225 next.invokeExceptionCaught(t);
226 }
227 } else {
228 next.fireChannelActive();
229 }
230 } else {
231 next.executor().execute(this::fireChannelActive);
232 }
233 return this;
234 }
235
236 @Override
237 public ChannelHandlerContext fireChannelInactive() {
238 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE);
239 if (next.executor().inEventLoop()) {
240 if (next.invokeHandler()) {
241 try {
242
243
244
245 final ChannelHandler handler = next.handler();
246 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
247 if (handler == headContext) {
248 headContext.channelInactive(next);
249 } else if (handler instanceof ChannelInboundHandlerAdapter) {
250 ((ChannelInboundHandlerAdapter) handler).channelInactive(next);
251 } else {
252 ((ChannelInboundHandler) handler).channelInactive(next);
253 }
254 } catch (Throwable t) {
255 next.invokeExceptionCaught(t);
256 }
257 } else {
258 next.fireChannelInactive();
259 }
260 } else {
261 next.executor().execute(this::fireChannelInactive);
262 }
263 return this;
264 }
265
266 @Override
267 public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
268 AbstractChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT);
269 ObjectUtil.checkNotNull(cause, "cause");
270 if (next.executor().inEventLoop()) {
271 next.invokeExceptionCaught(cause);
272 } else {
273 try {
274 next.executor().execute(() -> next.invokeExceptionCaught(cause));
275 } catch (Throwable t) {
276 if (logger.isWarnEnabled()) {
277 logger.warn("Failed to submit an exceptionCaught() event.", t);
278 logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
279 }
280 }
281 }
282 return this;
283 }
284
285 @SuppressWarnings("deprecation")
286 private void invokeExceptionCaught(final Throwable cause) {
287 if (invokeHandler()) {
288 try {
289 handler().exceptionCaught(this, cause);
290 } catch (Throwable error) {
291 if (logger.isDebugEnabled()) {
292 logger.debug(
293 "An exception " +
294 "was thrown by a user handler's exceptionCaught() " +
295 "method while handling the following exception:", cause);
296 } else if (logger.isWarnEnabled()) {
297 logger.warn(
298 "An exception '{}' [enable DEBUG level for full stacktrace] " +
299 "was thrown by a user handler's exceptionCaught() " +
300 "method while handling the following exception:", error, cause);
301 }
302 }
303 } else {
304 fireExceptionCaught(cause);
305 }
306 }
307
308 @Override
309 public ChannelHandlerContext fireUserEventTriggered(final Object event) {
310 ObjectUtil.checkNotNull(event, "event");
311 AbstractChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED);
312 if (next.executor().inEventLoop()) {
313 if (next.invokeHandler()) {
314 try {
315
316
317
318 final ChannelHandler handler = next.handler();
319 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
320 if (handler == headContext) {
321 headContext.userEventTriggered(next, event);
322 } else if (handler instanceof ChannelInboundHandlerAdapter) {
323 ((ChannelInboundHandlerAdapter) handler).userEventTriggered(next, event);
324 } else {
325 ((ChannelInboundHandler) handler).userEventTriggered(next, event);
326 }
327 } catch (Throwable t) {
328 next.invokeExceptionCaught(t);
329 }
330 } else {
331 next.fireUserEventTriggered(event);
332 }
333 } else {
334 next.executor().execute(() -> fireUserEventTriggered(event));
335 }
336 return this;
337 }
338
339 @Override
340 public ChannelHandlerContext fireChannelRead(final Object msg) {
341 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
342 if (next.executor().inEventLoop()) {
343 final Object m = pipeline.touch(msg, next);
344 if (next.invokeHandler()) {
345 try {
346
347
348
349 final ChannelHandler handler = next.handler();
350 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
351 if (handler == headContext) {
352 headContext.channelRead(next, m);
353 } else if (handler instanceof ChannelDuplexHandler) {
354 ((ChannelDuplexHandler) handler).channelRead(next, m);
355 } else {
356 ((ChannelInboundHandler) handler).channelRead(next, m);
357 }
358 } catch (Throwable t) {
359 next.invokeExceptionCaught(t);
360 }
361 } else {
362 next.fireChannelRead(m);
363 }
364 } else {
365 next.executor().execute(() -> fireChannelRead(msg));
366 }
367 return this;
368 }
369
370 @Override
371 public ChannelHandlerContext fireChannelReadComplete() {
372 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
373 if (next.executor().inEventLoop()) {
374 if (next.invokeHandler()) {
375 try {
376
377
378
379 final ChannelHandler handler = next.handler();
380 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
381 if (handler == headContext) {
382 headContext.channelReadComplete(next);
383 } else if (handler instanceof ChannelDuplexHandler) {
384 ((ChannelDuplexHandler) handler).channelReadComplete(next);
385 } else {
386 ((ChannelInboundHandler) handler).channelReadComplete(next);
387 }
388 } catch (Throwable t) {
389 next.invokeExceptionCaught(t);
390 }
391 } else {
392 next.fireChannelReadComplete();
393 }
394 } else {
395 next.executor().execute(getInvokeTasks().invokeChannelReadCompleteTask);
396 }
397 return this;
398 }
399
400 @Override
401 public ChannelHandlerContext fireChannelWritabilityChanged() {
402 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
403 if (next.executor().inEventLoop()) {
404 if (next.invokeHandler()) {
405 try {
406
407
408
409 final ChannelHandler handler = next.handler();
410 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
411 if (handler == headContext) {
412 headContext.channelWritabilityChanged(next);
413 } else if (handler instanceof ChannelInboundHandlerAdapter) {
414 ((ChannelInboundHandlerAdapter) handler).channelWritabilityChanged(next);
415 } else {
416 ((ChannelInboundHandler) handler).channelWritabilityChanged(next);
417 }
418 } catch (Throwable t) {
419 next.invokeExceptionCaught(t);
420 }
421 } else {
422 next.fireChannelWritabilityChanged();
423 }
424 } else {
425 next.executor().execute(getInvokeTasks().invokeChannelWritableStateChangedTask);
426 }
427 return this;
428 }
429
430 @Override
431 public ChannelFuture bind(SocketAddress localAddress) {
432 return bind(localAddress, newPromise());
433 }
434
435 @Override
436 public ChannelFuture connect(SocketAddress remoteAddress) {
437 return connect(remoteAddress, newPromise());
438 }
439
440 @Override
441 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
442 return connect(remoteAddress, localAddress, newPromise());
443 }
444
445 @Override
446 public ChannelFuture disconnect() {
447 return disconnect(newPromise());
448 }
449
450 @Override
451 public ChannelFuture close() {
452 return close(newPromise());
453 }
454
455 @Override
456 public ChannelFuture deregister() {
457 return deregister(newPromise());
458 }
459
460 @Override
461 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
462 ObjectUtil.checkNotNull(localAddress, "localAddress");
463 if (isNotValidPromise(promise, false)) {
464
465 return promise;
466 }
467
468 final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
469 EventExecutor executor = next.executor();
470 if (executor.inEventLoop()) {
471 next.invokeBind(localAddress, promise);
472 } else {
473 safeExecute(executor, new Runnable() {
474 @Override
475 public void run() {
476 next.invokeBind(localAddress, promise);
477 }
478 }, promise, null, false);
479 }
480 return promise;
481 }
482
483 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
484 if (invokeHandler()) {
485 try {
486
487
488
489 final ChannelHandler handler = handler();
490 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
491 if (handler == headContext) {
492 headContext.bind(this, localAddress, promise);
493 } else if (handler instanceof ChannelDuplexHandler) {
494 ((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
495 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
496 ((ChannelOutboundHandlerAdapter) handler).bind(this, localAddress, promise);
497 } else {
498 ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
499 }
500 } catch (Throwable t) {
501 notifyOutboundHandlerException(t, promise);
502 }
503 } else {
504 bind(localAddress, promise);
505 }
506 }
507
508 @Override
509 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
510 return connect(remoteAddress, null, promise);
511 }
512
513 @Override
514 public ChannelFuture connect(
515 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
516 ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
517
518 if (isNotValidPromise(promise, false)) {
519
520 return promise;
521 }
522
523 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
524 EventExecutor executor = next.executor();
525 if (executor.inEventLoop()) {
526 next.invokeConnect(remoteAddress, localAddress, promise);
527 } else {
528 safeExecute(executor, new Runnable() {
529 @Override
530 public void run() {
531 next.invokeConnect(remoteAddress, localAddress, promise);
532 }
533 }, promise, null, false);
534 }
535 return promise;
536 }
537
538 private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
539 if (invokeHandler()) {
540 try {
541
542
543
544 final ChannelHandler handler = handler();
545 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
546 if (handler == headContext) {
547 headContext.connect(this, remoteAddress, localAddress, promise);
548 } else if (handler instanceof ChannelDuplexHandler) {
549 ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise);
550 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
551 ((ChannelOutboundHandlerAdapter) handler).connect(this, remoteAddress, localAddress, promise);
552 } else {
553 ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
554 }
555 } catch (Throwable t) {
556 notifyOutboundHandlerException(t, promise);
557 }
558 } else {
559 connect(remoteAddress, localAddress, promise);
560 }
561 }
562
563 @Override
564 public ChannelFuture disconnect(final ChannelPromise promise) {
565 if (!channel().metadata().hasDisconnect()) {
566
567
568 return close(promise);
569 }
570 if (isNotValidPromise(promise, false)) {
571
572 return promise;
573 }
574
575 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
576 EventExecutor executor = next.executor();
577 if (executor.inEventLoop()) {
578 next.invokeDisconnect(promise);
579 } else {
580 safeExecute(executor, new Runnable() {
581 @Override
582 public void run() {
583 next.invokeDisconnect(promise);
584 }
585 }, promise, null, false);
586 }
587 return promise;
588 }
589
590 private void invokeDisconnect(ChannelPromise promise) {
591 if (invokeHandler()) {
592 try {
593
594
595
596 final ChannelHandler handler = handler();
597 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
598 if (handler == headContext) {
599 headContext.disconnect(this, promise);
600 } else if (handler instanceof ChannelDuplexHandler) {
601 ((ChannelDuplexHandler) handler).disconnect(this, promise);
602 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
603 ((ChannelOutboundHandlerAdapter) handler).disconnect(this, promise);
604 } else {
605 ((ChannelOutboundHandler) handler).disconnect(this, promise);
606 }
607 } catch (Throwable t) {
608 notifyOutboundHandlerException(t, promise);
609 }
610 } else {
611 disconnect(promise);
612 }
613 }
614
615 @Override
616 public ChannelFuture close(final ChannelPromise promise) {
617 if (isNotValidPromise(promise, false)) {
618
619 return promise;
620 }
621
622 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
623 EventExecutor executor = next.executor();
624 if (executor.inEventLoop()) {
625 next.invokeClose(promise);
626 } else {
627 safeExecute(executor, new Runnable() {
628 @Override
629 public void run() {
630 next.invokeClose(promise);
631 }
632 }, promise, null, false);
633 }
634
635 return promise;
636 }
637
638 private void invokeClose(ChannelPromise promise) {
639 if (invokeHandler()) {
640 try {
641
642
643
644 final ChannelHandler handler = handler();
645 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
646 if (handler == headContext) {
647 headContext.close(this, promise);
648 } else if (handler instanceof ChannelDuplexHandler) {
649 ((ChannelDuplexHandler) handler).close(this, promise);
650 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
651 ((ChannelOutboundHandlerAdapter) handler).close(this, promise);
652 } else {
653 ((ChannelOutboundHandler) handler).close(this, promise);
654 }
655 } catch (Throwable t) {
656 notifyOutboundHandlerException(t, promise);
657 }
658 } else {
659 close(promise);
660 }
661 }
662
663 @Override
664 public ChannelFuture deregister(final ChannelPromise promise) {
665 if (isNotValidPromise(promise, false)) {
666
667 return promise;
668 }
669
670 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
671 EventExecutor executor = next.executor();
672 if (executor.inEventLoop()) {
673 next.invokeDeregister(promise);
674 } else {
675 safeExecute(executor, new Runnable() {
676 @Override
677 public void run() {
678 next.invokeDeregister(promise);
679 }
680 }, promise, null, false);
681 }
682
683 return promise;
684 }
685
686 private void invokeDeregister(ChannelPromise promise) {
687 if (invokeHandler()) {
688 try {
689
690
691
692 final ChannelHandler handler = handler();
693 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
694 if (handler == headContext) {
695 headContext.deregister(this, promise);
696 } else if (handler instanceof ChannelDuplexHandler) {
697 ((ChannelDuplexHandler) handler).deregister(this, promise);
698 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
699 ((ChannelOutboundHandlerAdapter) handler).deregister(this, promise);
700 } else {
701 ((ChannelOutboundHandler) handler).deregister(this, promise);
702 }
703 } catch (Throwable t) {
704 notifyOutboundHandlerException(t, promise);
705 }
706 } else {
707 deregister(promise);
708 }
709 }
710
711 @Override
712 public ChannelHandlerContext read() {
713 final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
714 if (next.executor().inEventLoop()) {
715 if (next.invokeHandler()) {
716 try {
717
718
719
720 final ChannelHandler handler = next.handler();
721 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
722 if (handler == headContext) {
723 headContext.read(next);
724 } else if (handler instanceof ChannelDuplexHandler) {
725 ((ChannelDuplexHandler) handler).read(next);
726 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
727 ((ChannelOutboundHandlerAdapter) handler).read(next);
728 } else {
729 ((ChannelOutboundHandler) handler).read(next);
730 }
731 } catch (Throwable t) {
732 invokeExceptionCaught(t);
733 }
734 } else {
735 next.read();
736 }
737 } else {
738 next.executor().execute(getInvokeTasks().invokeReadTask);
739 }
740 return this;
741 }
742
743 @Override
744 public ChannelFuture write(Object msg) {
745 ChannelPromise promise = newPromise();
746 write(msg, false, promise);
747 return promise;
748 }
749
750 @Override
751 public ChannelFuture write(final Object msg, final ChannelPromise promise) {
752 write(msg, false, promise);
753 return promise;
754 }
755
756 @Override
757 public ChannelHandlerContext flush() {
758 final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
759 EventExecutor executor = next.executor();
760 if (executor.inEventLoop()) {
761 next.invokeFlush();
762 } else {
763 Tasks tasks = next.invokeTasks;
764 if (tasks == null) {
765 next.invokeTasks = tasks = new Tasks(next);
766 }
767 safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
768 }
769
770 return this;
771 }
772
773 private void invokeFlush() {
774 if (invokeHandler()) {
775 invokeFlush0();
776 } else {
777 flush();
778 }
779 }
780
781 private void invokeFlush0() {
782 try {
783
784
785
786 final ChannelHandler handler = handler();
787 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
788 if (handler == headContext) {
789 headContext.flush(this);
790 } else if (handler instanceof ChannelDuplexHandler) {
791 ((ChannelDuplexHandler) handler).flush(this);
792 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
793 ((ChannelOutboundHandlerAdapter) handler).flush(this);
794 } else {
795 ((ChannelOutboundHandler) handler).flush(this);
796 }
797 } catch (Throwable t) {
798 invokeExceptionCaught(t);
799 }
800 }
801
802 @Override
803 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
804 write(msg, true, promise);
805 return promise;
806 }
807
808 void write(Object msg, boolean flush, ChannelPromise promise) {
809 if (validateWrite(msg, promise)) {
810 final AbstractChannelHandlerContext next = findContextOutbound(flush ?
811 MASK_WRITE | MASK_FLUSH : MASK_WRITE);
812 final Object m = pipeline.touch(msg, next);
813 EventExecutor executor = next.executor();
814 if (executor.inEventLoop()) {
815 if (next.invokeHandler()) {
816 try {
817
818
819
820 final ChannelHandler handler = next.handler();
821 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
822 if (handler == headContext) {
823 headContext.write(next, msg, promise);
824 } else if (handler instanceof ChannelDuplexHandler) {
825 ((ChannelDuplexHandler) handler).write(next, msg, promise);
826 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
827 ((ChannelOutboundHandlerAdapter) handler).write(next, msg, promise);
828 } else {
829 ((ChannelOutboundHandler) handler).write(next, msg, promise);
830 }
831 } catch (Throwable t) {
832 notifyOutboundHandlerException(t, promise);
833 }
834 if (flush) {
835 next.invokeFlush0();
836 }
837 } else {
838 next.write(msg, flush, promise);
839 }
840 } else {
841 final WriteTask task = WriteTask.newInstance(this, m, promise, flush);
842 if (!safeExecute(executor, task, promise, m, !flush)) {
843
844
845
846
847 task.cancel();
848 }
849 }
850 }
851 }
852
853 private boolean validateWrite(Object msg, ChannelPromise promise) {
854 ObjectUtil.checkNotNull(msg, "msg");
855 try {
856 if (isNotValidPromise(promise, true)) {
857 ReferenceCountUtil.release(msg);
858 return false;
859 }
860 } catch (RuntimeException e) {
861 ReferenceCountUtil.release(msg);
862 throw e;
863 }
864 return true;
865 }
866
867 @Override
868 public ChannelFuture writeAndFlush(Object msg) {
869 return writeAndFlush(msg, newPromise());
870 }
871
872 private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
873
874
875 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
876 }
877
878 @Override
879 public ChannelPromise newPromise() {
880 return new DefaultChannelPromise(channel(), executor());
881 }
882
883 @Override
884 public ChannelProgressivePromise newProgressivePromise() {
885 return new DefaultChannelProgressivePromise(channel(), executor());
886 }
887
888 @Override
889 public ChannelFuture newSucceededFuture() {
890 ChannelFuture succeededFuture = this.succeededFuture;
891 if (succeededFuture == null) {
892 this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
893 }
894 return succeededFuture;
895 }
896
897 @Override
898 public ChannelFuture newFailedFuture(Throwable cause) {
899 return new FailedChannelFuture(channel(), executor(), cause);
900 }
901
902 private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
903 ObjectUtil.checkNotNull(promise, "promise");
904
905 if (promise.isDone()) {
906
907
908
909
910 if (promise.isCancelled()) {
911 return true;
912 }
913 throw new IllegalArgumentException("promise already done: " + promise);
914 }
915
916 if (promise.channel() != channel()) {
917 throw new IllegalArgumentException(String.format(
918 "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
919 }
920
921 if (promise.getClass() == DefaultChannelPromise.class) {
922 return false;
923 }
924
925 if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
926 throw new IllegalArgumentException(
927 StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
928 }
929
930 if (promise instanceof AbstractChannel.CloseFuture) {
931 throw new IllegalArgumentException(
932 StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
933 }
934 return false;
935 }
936
937 private AbstractChannelHandlerContext findContextInbound(int mask) {
938 AbstractChannelHandlerContext ctx = this;
939 EventExecutor currentExecutor = executor();
940 do {
941 ctx = ctx.next;
942 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
943 return ctx;
944 }
945
946 private AbstractChannelHandlerContext findContextOutbound(int mask) {
947 AbstractChannelHandlerContext ctx = this;
948 EventExecutor currentExecutor = executor();
949 do {
950 ctx = ctx.prev;
951 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
952 return ctx;
953 }
954
955 private static boolean skipContext(
956 AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
957
958 return (ctx.executionMask & (onlyMask | mask)) == 0 ||
959
960
961
962
963 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
964 }
965
966 @Override
967 public ChannelPromise voidPromise() {
968 return channel().voidPromise();
969 }
970
971 final void setRemoved() {
972 handlerState = REMOVE_COMPLETE;
973 }
974
975 final boolean setAddComplete() {
976 for (;;) {
977 int oldState = handlerState;
978 if (oldState == REMOVE_COMPLETE) {
979 return false;
980 }
981
982
983
984 if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
985 return true;
986 }
987 }
988 }
989
990 final void setAddPending() {
991 boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
992 assert updated;
993 }
994
995 final void callHandlerAdded() throws Exception {
996
997
998 if (setAddComplete()) {
999 handler().handlerAdded(this);
1000 }
1001 }
1002
1003 final void callHandlerRemoved() throws Exception {
1004 try {
1005
1006 if (handlerState == ADD_COMPLETE) {
1007 handler().handlerRemoved(this);
1008 }
1009 } finally {
1010
1011 setRemoved();
1012 }
1013 }
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023 boolean invokeHandler() {
1024
1025 int handlerState = this.handlerState;
1026 return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
1027 }
1028
1029 @Override
1030 public boolean isRemoved() {
1031 return handlerState == REMOVE_COMPLETE;
1032 }
1033
1034 @Override
1035 public <T> Attribute<T> attr(AttributeKey<T> key) {
1036 return channel().attr(key);
1037 }
1038
1039 @Override
1040 public <T> boolean hasAttr(AttributeKey<T> key) {
1041 return channel().hasAttr(key);
1042 }
1043
1044 private static boolean safeExecute(EventExecutor executor, Runnable runnable,
1045 ChannelPromise promise, Object msg, boolean lazy) {
1046 try {
1047 if (lazy && executor instanceof AbstractEventExecutor) {
1048 ((AbstractEventExecutor) executor).lazyExecute(runnable);
1049 } else {
1050 executor.execute(runnable);
1051 }
1052 return true;
1053 } catch (Throwable cause) {
1054 try {
1055 if (msg != null) {
1056 ReferenceCountUtil.release(msg);
1057 }
1058 } finally {
1059 promise.setFailure(cause);
1060 }
1061 return false;
1062 }
1063 }
1064
1065 @Override
1066 public String toHintString() {
1067 return '\'' + name + "' will handle the message from this point.";
1068 }
1069
1070 @Override
1071 public String toString() {
1072 return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1073 }
1074
1075 Tasks getInvokeTasks() {
1076 Tasks tasks = invokeTasks;
1077 if (tasks == null) {
1078 invokeTasks = tasks = new Tasks(this);
1079 }
1080 return tasks;
1081 }
1082
1083 static final class WriteTask implements Runnable {
1084 private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
1085 @Override
1086 protected WriteTask newObject(Handle<WriteTask> handle) {
1087 return new WriteTask(handle);
1088 }
1089 };
1090
1091 static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1092 Object msg, ChannelPromise promise, boolean flush) {
1093 WriteTask task = RECYCLER.get();
1094 init(task, ctx, msg, promise, flush);
1095 return task;
1096 }
1097
1098 private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1099 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1100
1101
1102 private static final int WRITE_TASK_OVERHEAD =
1103 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1104
1105 private final Handle<WriteTask> handle;
1106 private AbstractChannelHandlerContext ctx;
1107 private Object msg;
1108 private ChannelPromise promise;
1109 private int size;
1110
1111 private WriteTask(Handle<WriteTask> handle) {
1112 this.handle = handle;
1113 }
1114
1115 static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1116 Object msg, ChannelPromise promise, boolean flush) {
1117 task.ctx = ctx;
1118 task.msg = msg;
1119 task.promise = promise;
1120
1121 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1122 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1123 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1124 } else {
1125 task.size = 0;
1126 }
1127 if (flush) {
1128 task.size |= Integer.MIN_VALUE;
1129 }
1130 }
1131
1132 @Override
1133 public void run() {
1134 try {
1135 decrementPendingOutboundBytes();
1136 ctx.write(msg, size < 0, promise);
1137 } finally {
1138 recycle();
1139 }
1140 }
1141
1142 void cancel() {
1143 try {
1144 decrementPendingOutboundBytes();
1145 } finally {
1146 recycle();
1147 }
1148 }
1149
1150 private void decrementPendingOutboundBytes() {
1151 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1152 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1153 }
1154 }
1155
1156 private void recycle() {
1157
1158 ctx = null;
1159 msg = null;
1160 promise = null;
1161 handle.recycle(this);
1162 }
1163 }
1164
1165 static final class Tasks {
1166 final Runnable invokeChannelReadCompleteTask;
1167 private final Runnable invokeReadTask;
1168 private final Runnable invokeChannelWritableStateChangedTask;
1169 private final Runnable invokeFlushTask;
1170
1171 Tasks(AbstractChannelHandlerContext ctx) {
1172 invokeChannelReadCompleteTask = ctx::fireChannelReadComplete;
1173 invokeReadTask = ctx::read;
1174 invokeChannelWritableStateChangedTask = ctx::fireChannelWritabilityChanged;
1175 invokeFlushTask = ctx::invokeFlush;
1176 }
1177 }
1178 }