1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.ReferenceCountUtil;
22 import io.netty.util.ResourceLeakHint;
23 import io.netty.util.concurrent.AbstractEventExecutor;
24 import io.netty.util.concurrent.EventExecutor;
25 import io.netty.util.concurrent.OrderedEventExecutor;
26 import io.netty.util.internal.ObjectPool;
27 import io.netty.util.internal.ObjectPool.Handle;
28 import io.netty.util.internal.ObjectPool.ObjectCreator;
29 import io.netty.util.internal.PromiseNotificationUtil;
30 import io.netty.util.internal.ThrowableUtil;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.StringUtil;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.net.SocketAddress;
38 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39
40 import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
41 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
42 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
43 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
44 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
45 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
46 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
47 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
48 import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
49 import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
50 import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
51 import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
52 import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
53 import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
54 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
55 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
56 import static io.netty.channel.ChannelHandlerMask.MASK_READ;
57 import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
58 import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
59 import static io.netty.channel.ChannelHandlerMask.mask;
60
61 abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
62
63 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
64 volatile AbstractChannelHandlerContext next;
65 volatile AbstractChannelHandlerContext prev;
66
67 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
68 AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
69
70
71
72
73 private static final int ADD_PENDING = 1;
74
75
76
77 private static final int ADD_COMPLETE = 2;
78
79
80
81 private static final int REMOVE_COMPLETE = 3;
82
83
84
85
86 private static final int INIT = 0;
87
88 private final DefaultChannelPipeline pipeline;
89 private final String name;
90 private final boolean ordered;
91 private final int executionMask;
92
93
94
95 final EventExecutor childExecutor;
96
97
98
99
100 EventExecutor contextExecutor;
101 private ChannelFuture succeededFuture;
102
103
104
105 private Tasks invokeTasks;
106
107 private volatile int handlerState = INIT;
108
109 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
110 String name, Class<? extends ChannelHandler> handlerClass) {
111 this.name = ObjectUtil.checkNotNull(name, "name");
112 this.pipeline = pipeline;
113 childExecutor = executor;
114 executionMask = mask(handlerClass);
115
116 ordered = executor == null || executor instanceof OrderedEventExecutor;
117 }
118
119 @Override
120 public Channel channel() {
121 return pipeline.channel();
122 }
123
124 @Override
125 public ChannelPipeline pipeline() {
126 return pipeline;
127 }
128
129 @Override
130 public ByteBufAllocator alloc() {
131 return channel().config().getAllocator();
132 }
133
134 @Override
135 public EventExecutor executor() {
136 EventExecutor ex = contextExecutor;
137 if (ex == null) {
138 contextExecutor = ex = childExecutor != null ? childExecutor : channel().eventLoop();
139 }
140 return ex;
141 }
142
143 @Override
144 public String name() {
145 return name;
146 }
147
148 @Override
149 public ChannelHandlerContext fireChannelRegistered() {
150 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
151 if (next.executor().inEventLoop()) {
152 if (next.invokeHandler()) {
153 try {
154
155
156
157 final ChannelHandler handler = next.handler();
158 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
159 if (handler == headContext) {
160 headContext.channelRegistered(next);
161 } else if (handler instanceof ChannelInboundHandlerAdapter) {
162 ((ChannelInboundHandlerAdapter) handler).channelRegistered(next);
163 } else {
164 ((ChannelInboundHandler) handler).channelRegistered(next);
165 }
166 } catch (Throwable t) {
167 next.invokeExceptionCaught(t);
168 }
169 } else {
170 next.fireChannelRegistered();
171 }
172 } else {
173 next.executor().execute(this::fireChannelRegistered);
174 }
175 return this;
176 }
177
178 @Override
179 public ChannelHandlerContext fireChannelUnregistered() {
180 final AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED);
181 if (next.executor().inEventLoop()) {
182 if (next.invokeHandler()) {
183 try {
184
185
186
187 final ChannelHandler handler = next.handler();
188 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
189 if (handler == headContext) {
190 headContext.channelUnregistered(next);
191 } else if (handler instanceof ChannelInboundHandlerAdapter) {
192 ((ChannelInboundHandlerAdapter) handler).channelUnregistered(next);
193 } else {
194 ((ChannelInboundHandler) handler).channelUnregistered(next);
195 }
196 } catch (Throwable t) {
197 next.invokeExceptionCaught(t);
198 }
199 } else {
200 next.fireChannelUnregistered();
201 }
202 } else {
203 next.executor().execute(this::fireChannelUnregistered);
204 }
205 return this;
206 }
207
208 @Override
209 public ChannelHandlerContext fireChannelActive() {
210 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE);
211 if (next.executor().inEventLoop()) {
212 if (next.invokeHandler()) {
213 try {
214
215
216
217 final ChannelHandler handler = next.handler();
218 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
219 if (handler == headContext) {
220 headContext.channelActive(next);
221 } else if (handler instanceof ChannelInboundHandlerAdapter) {
222 ((ChannelInboundHandlerAdapter) handler).channelActive(next);
223 } else {
224 ((ChannelInboundHandler) handler).channelActive(next);
225 }
226 } catch (Throwable t) {
227 next.invokeExceptionCaught(t);
228 }
229 } else {
230 next.fireChannelActive();
231 }
232 } else {
233 next.executor().execute(this::fireChannelActive);
234 }
235 return this;
236 }
237
238 @Override
239 public ChannelHandlerContext fireChannelInactive() {
240 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE);
241 if (next.executor().inEventLoop()) {
242 if (next.invokeHandler()) {
243 try {
244
245
246
247 final ChannelHandler handler = next.handler();
248 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
249 if (handler == headContext) {
250 headContext.channelInactive(next);
251 } else if (handler instanceof ChannelInboundHandlerAdapter) {
252 ((ChannelInboundHandlerAdapter) handler).channelInactive(next);
253 } else {
254 ((ChannelInboundHandler) handler).channelInactive(next);
255 }
256 } catch (Throwable t) {
257 next.invokeExceptionCaught(t);
258 }
259 } else {
260 next.fireChannelInactive();
261 }
262 } else {
263 next.executor().execute(this::fireChannelInactive);
264 }
265 return this;
266 }
267
268 @Override
269 public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
270 AbstractChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT);
271 ObjectUtil.checkNotNull(cause, "cause");
272 if (next.executor().inEventLoop()) {
273 next.invokeExceptionCaught(cause);
274 } else {
275 try {
276 next.executor().execute(() -> next.invokeExceptionCaught(cause));
277 } catch (Throwable t) {
278 if (logger.isWarnEnabled()) {
279 logger.warn("Failed to submit an exceptionCaught() event.", t);
280 logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
281 }
282 }
283 }
284 return this;
285 }
286
287 @SuppressWarnings("deprecation")
288 private void invokeExceptionCaught(final Throwable cause) {
289 if (invokeHandler()) {
290 try {
291 handler().exceptionCaught(this, cause);
292 } catch (Throwable error) {
293 if (logger.isDebugEnabled()) {
294 logger.debug(
295 "An exception {}" +
296 "was thrown by a user handler's exceptionCaught() " +
297 "method while handling the following exception:",
298 ThrowableUtil.stackTraceToString(error), cause);
299 } else if (logger.isWarnEnabled()) {
300 logger.warn(
301 "An exception '{}' [enable DEBUG level for full stacktrace] " +
302 "was thrown by a user handler's exceptionCaught() " +
303 "method while handling the following exception:", error, cause);
304 }
305 }
306 } else {
307 fireExceptionCaught(cause);
308 }
309 }
310
311 @Override
312 public ChannelHandlerContext fireUserEventTriggered(final Object event) {
313 ObjectUtil.checkNotNull(event, "event");
314 AbstractChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED);
315 if (next.executor().inEventLoop()) {
316 if (next.invokeHandler()) {
317 try {
318
319
320
321 final ChannelHandler handler = next.handler();
322 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
323 if (handler == headContext) {
324 headContext.userEventTriggered(next, event);
325 } else if (handler instanceof ChannelInboundHandlerAdapter) {
326 ((ChannelInboundHandlerAdapter) handler).userEventTriggered(next, event);
327 } else {
328 ((ChannelInboundHandler) handler).userEventTriggered(next, event);
329 }
330 } catch (Throwable t) {
331 next.invokeExceptionCaught(t);
332 }
333 } else {
334 next.fireUserEventTriggered(event);
335 }
336 } else {
337 next.executor().execute(() -> fireUserEventTriggered(event));
338 }
339 return this;
340 }
341
342 @Override
343 public ChannelHandlerContext fireChannelRead(final Object msg) {
344 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
345 if (next.executor().inEventLoop()) {
346 final Object m = pipeline.touch(msg, next);
347 if (next.invokeHandler()) {
348 try {
349
350
351
352 final ChannelHandler handler = next.handler();
353 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
354 if (handler == headContext) {
355 headContext.channelRead(next, m);
356 } else if (handler instanceof ChannelDuplexHandler) {
357 ((ChannelDuplexHandler) handler).channelRead(next, m);
358 } else {
359 ((ChannelInboundHandler) handler).channelRead(next, m);
360 }
361 } catch (Throwable t) {
362 next.invokeExceptionCaught(t);
363 }
364 } else {
365 next.fireChannelRead(m);
366 }
367 } else {
368 next.executor().execute(() -> fireChannelRead(msg));
369 }
370 return this;
371 }
372
373 @Override
374 public ChannelHandlerContext fireChannelReadComplete() {
375 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
376 if (next.executor().inEventLoop()) {
377 if (next.invokeHandler()) {
378 try {
379
380
381
382 final ChannelHandler handler = next.handler();
383 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
384 if (handler == headContext) {
385 headContext.channelReadComplete(next);
386 } else if (handler instanceof ChannelDuplexHandler) {
387 ((ChannelDuplexHandler) handler).channelReadComplete(next);
388 } else {
389 ((ChannelInboundHandler) handler).channelReadComplete(next);
390 }
391 } catch (Throwable t) {
392 next.invokeExceptionCaught(t);
393 }
394 } else {
395 next.fireChannelReadComplete();
396 }
397 } else {
398 next.executor().execute(getInvokeTasks().invokeChannelReadCompleteTask);
399 }
400 return this;
401 }
402
403 @Override
404 public ChannelHandlerContext fireChannelWritabilityChanged() {
405 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
406 if (next.executor().inEventLoop()) {
407 if (next.invokeHandler()) {
408 try {
409
410
411
412 final ChannelHandler handler = next.handler();
413 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
414 if (handler == headContext) {
415 headContext.channelWritabilityChanged(next);
416 } else if (handler instanceof ChannelInboundHandlerAdapter) {
417 ((ChannelInboundHandlerAdapter) handler).channelWritabilityChanged(next);
418 } else {
419 ((ChannelInboundHandler) handler).channelWritabilityChanged(next);
420 }
421 } catch (Throwable t) {
422 next.invokeExceptionCaught(t);
423 }
424 } else {
425 next.fireChannelWritabilityChanged();
426 }
427 } else {
428 next.executor().execute(getInvokeTasks().invokeChannelWritableStateChangedTask);
429 }
430 return this;
431 }
432
433 @Override
434 public ChannelFuture bind(SocketAddress localAddress) {
435 return bind(localAddress, newPromise());
436 }
437
438 @Override
439 public ChannelFuture connect(SocketAddress remoteAddress) {
440 return connect(remoteAddress, newPromise());
441 }
442
443 @Override
444 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
445 return connect(remoteAddress, localAddress, newPromise());
446 }
447
448 @Override
449 public ChannelFuture disconnect() {
450 return disconnect(newPromise());
451 }
452
453 @Override
454 public ChannelFuture close() {
455 return close(newPromise());
456 }
457
458 @Override
459 public ChannelFuture deregister() {
460 return deregister(newPromise());
461 }
462
463 @Override
464 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
465 ObjectUtil.checkNotNull(localAddress, "localAddress");
466 if (isNotValidPromise(promise, false)) {
467
468 return promise;
469 }
470
471 final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
472 EventExecutor executor = next.executor();
473 if (executor.inEventLoop()) {
474 next.invokeBind(localAddress, promise);
475 } else {
476 safeExecute(executor, new Runnable() {
477 @Override
478 public void run() {
479 next.invokeBind(localAddress, promise);
480 }
481 }, promise, null, false);
482 }
483 return promise;
484 }
485
486 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
487 if (invokeHandler()) {
488 try {
489
490
491
492 final ChannelHandler handler = handler();
493 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
494 if (handler == headContext) {
495 headContext.bind(this, localAddress, promise);
496 } else if (handler instanceof ChannelDuplexHandler) {
497 ((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
498 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
499 ((ChannelOutboundHandlerAdapter) handler).bind(this, localAddress, promise);
500 } else {
501 ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
502 }
503 } catch (Throwable t) {
504 notifyOutboundHandlerException(t, promise);
505 }
506 } else {
507 bind(localAddress, promise);
508 }
509 }
510
511 @Override
512 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
513 return connect(remoteAddress, null, promise);
514 }
515
516 @Override
517 public ChannelFuture connect(
518 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
519 ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
520
521 if (isNotValidPromise(promise, false)) {
522
523 return promise;
524 }
525
526 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
527 EventExecutor executor = next.executor();
528 if (executor.inEventLoop()) {
529 next.invokeConnect(remoteAddress, localAddress, promise);
530 } else {
531 safeExecute(executor, new Runnable() {
532 @Override
533 public void run() {
534 next.invokeConnect(remoteAddress, localAddress, promise);
535 }
536 }, promise, null, false);
537 }
538 return promise;
539 }
540
541 private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
542 if (invokeHandler()) {
543 try {
544
545
546
547 final ChannelHandler handler = handler();
548 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
549 if (handler == headContext) {
550 headContext.connect(this, remoteAddress, localAddress, promise);
551 } else if (handler instanceof ChannelDuplexHandler) {
552 ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise);
553 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
554 ((ChannelOutboundHandlerAdapter) handler).connect(this, remoteAddress, localAddress, promise);
555 } else {
556 ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
557 }
558 } catch (Throwable t) {
559 notifyOutboundHandlerException(t, promise);
560 }
561 } else {
562 connect(remoteAddress, localAddress, promise);
563 }
564 }
565
566 @Override
567 public ChannelFuture disconnect(final ChannelPromise promise) {
568 if (!channel().metadata().hasDisconnect()) {
569
570
571 return close(promise);
572 }
573 if (isNotValidPromise(promise, false)) {
574
575 return promise;
576 }
577
578 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
579 EventExecutor executor = next.executor();
580 if (executor.inEventLoop()) {
581 next.invokeDisconnect(promise);
582 } else {
583 safeExecute(executor, new Runnable() {
584 @Override
585 public void run() {
586 next.invokeDisconnect(promise);
587 }
588 }, promise, null, false);
589 }
590 return promise;
591 }
592
593 private void invokeDisconnect(ChannelPromise promise) {
594 if (invokeHandler()) {
595 try {
596
597
598
599 final ChannelHandler handler = handler();
600 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
601 if (handler == headContext) {
602 headContext.disconnect(this, promise);
603 } else if (handler instanceof ChannelDuplexHandler) {
604 ((ChannelDuplexHandler) handler).disconnect(this, promise);
605 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
606 ((ChannelOutboundHandlerAdapter) handler).disconnect(this, promise);
607 } else {
608 ((ChannelOutboundHandler) handler).disconnect(this, promise);
609 }
610 } catch (Throwable t) {
611 notifyOutboundHandlerException(t, promise);
612 }
613 } else {
614 disconnect(promise);
615 }
616 }
617
618 @Override
619 public ChannelFuture close(final ChannelPromise promise) {
620 if (isNotValidPromise(promise, false)) {
621
622 return promise;
623 }
624
625 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
626 EventExecutor executor = next.executor();
627 if (executor.inEventLoop()) {
628 next.invokeClose(promise);
629 } else {
630 safeExecute(executor, new Runnable() {
631 @Override
632 public void run() {
633 next.invokeClose(promise);
634 }
635 }, promise, null, false);
636 }
637
638 return promise;
639 }
640
641 private void invokeClose(ChannelPromise promise) {
642 if (invokeHandler()) {
643 try {
644
645
646
647 final ChannelHandler handler = handler();
648 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
649 if (handler == headContext) {
650 headContext.close(this, promise);
651 } else if (handler instanceof ChannelDuplexHandler) {
652 ((ChannelDuplexHandler) handler).close(this, promise);
653 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
654 ((ChannelOutboundHandlerAdapter) handler).close(this, promise);
655 } else {
656 ((ChannelOutboundHandler) handler).close(this, promise);
657 }
658 } catch (Throwable t) {
659 notifyOutboundHandlerException(t, promise);
660 }
661 } else {
662 close(promise);
663 }
664 }
665
666 @Override
667 public ChannelFuture deregister(final ChannelPromise promise) {
668 if (isNotValidPromise(promise, false)) {
669
670 return promise;
671 }
672
673 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
674 EventExecutor executor = next.executor();
675 if (executor.inEventLoop()) {
676 next.invokeDeregister(promise);
677 } else {
678 safeExecute(executor, new Runnable() {
679 @Override
680 public void run() {
681 next.invokeDeregister(promise);
682 }
683 }, promise, null, false);
684 }
685
686 return promise;
687 }
688
689 private void invokeDeregister(ChannelPromise promise) {
690 if (invokeHandler()) {
691 try {
692
693
694
695 final ChannelHandler handler = handler();
696 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
697 if (handler == headContext) {
698 headContext.deregister(this, promise);
699 } else if (handler instanceof ChannelDuplexHandler) {
700 ((ChannelDuplexHandler) handler).deregister(this, promise);
701 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
702 ((ChannelOutboundHandlerAdapter) handler).deregister(this, promise);
703 } else {
704 ((ChannelOutboundHandler) handler).deregister(this, promise);
705 }
706 } catch (Throwable t) {
707 notifyOutboundHandlerException(t, promise);
708 }
709 } else {
710 deregister(promise);
711 }
712 }
713
714 @Override
715 public ChannelHandlerContext read() {
716 final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
717 if (next.executor().inEventLoop()) {
718 if (next.invokeHandler()) {
719 try {
720
721
722
723 final ChannelHandler handler = next.handler();
724 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
725 if (handler == headContext) {
726 headContext.read(next);
727 } else if (handler instanceof ChannelDuplexHandler) {
728 ((ChannelDuplexHandler) handler).read(next);
729 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
730 ((ChannelOutboundHandlerAdapter) handler).read(next);
731 } else {
732 ((ChannelOutboundHandler) handler).read(next);
733 }
734 } catch (Throwable t) {
735 invokeExceptionCaught(t);
736 }
737 } else {
738 next.read();
739 }
740 } else {
741 next.executor().execute(getInvokeTasks().invokeReadTask);
742 }
743 return this;
744 }
745
746 @Override
747 public ChannelFuture write(Object msg) {
748 ChannelPromise promise = newPromise();
749 write(msg, false, promise);
750 return promise;
751 }
752
753 @Override
754 public ChannelFuture write(final Object msg, final ChannelPromise promise) {
755 write(msg, false, promise);
756 return promise;
757 }
758
759 @Override
760 public ChannelHandlerContext flush() {
761 final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
762 EventExecutor executor = next.executor();
763 if (executor.inEventLoop()) {
764 next.invokeFlush();
765 } else {
766 Tasks tasks = next.invokeTasks;
767 if (tasks == null) {
768 next.invokeTasks = tasks = new Tasks(next);
769 }
770 safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
771 }
772
773 return this;
774 }
775
776 private void invokeFlush() {
777 if (invokeHandler()) {
778 invokeFlush0();
779 } else {
780 flush();
781 }
782 }
783
784 private void invokeFlush0() {
785 try {
786
787
788
789 final ChannelHandler handler = handler();
790 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
791 if (handler == headContext) {
792 headContext.flush(this);
793 } else if (handler instanceof ChannelDuplexHandler) {
794 ((ChannelDuplexHandler) handler).flush(this);
795 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
796 ((ChannelOutboundHandlerAdapter) handler).flush(this);
797 } else {
798 ((ChannelOutboundHandler) handler).flush(this);
799 }
800 } catch (Throwable t) {
801 invokeExceptionCaught(t);
802 }
803 }
804
805 @Override
806 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
807 write(msg, true, promise);
808 return promise;
809 }
810
811 void write(Object msg, boolean flush, ChannelPromise promise) {
812 if (validateWrite(msg, promise)) {
813 final AbstractChannelHandlerContext next = findContextOutbound(flush ?
814 MASK_WRITE | MASK_FLUSH : MASK_WRITE);
815 final Object m = pipeline.touch(msg, next);
816 EventExecutor executor = next.executor();
817 if (executor.inEventLoop()) {
818 if (next.invokeHandler()) {
819 try {
820
821
822
823 final ChannelHandler handler = next.handler();
824 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
825 if (handler == headContext) {
826 headContext.write(next, msg, promise);
827 } else if (handler instanceof ChannelDuplexHandler) {
828 ((ChannelDuplexHandler) handler).write(next, msg, promise);
829 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
830 ((ChannelOutboundHandlerAdapter) handler).write(next, msg, promise);
831 } else {
832 ((ChannelOutboundHandler) handler).write(next, msg, promise);
833 }
834 } catch (Throwable t) {
835 notifyOutboundHandlerException(t, promise);
836 }
837 if (flush) {
838 next.invokeFlush0();
839 }
840 } else {
841 next.write(msg, flush, promise);
842 }
843 } else {
844 final WriteTask task = WriteTask.newInstance(this, m, promise, flush);
845 if (!safeExecute(executor, task, promise, m, !flush)) {
846
847
848
849
850 task.cancel();
851 }
852 }
853 }
854 }
855
856 private boolean validateWrite(Object msg, ChannelPromise promise) {
857 ObjectUtil.checkNotNull(msg, "msg");
858 try {
859 if (isNotValidPromise(promise, true)) {
860 ReferenceCountUtil.release(msg);
861 return false;
862 }
863 } catch (RuntimeException e) {
864 ReferenceCountUtil.release(msg);
865 throw e;
866 }
867 return true;
868 }
869
870 @Override
871 public ChannelFuture writeAndFlush(Object msg) {
872 return writeAndFlush(msg, newPromise());
873 }
874
875 private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
876
877
878 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
879 }
880
881 @Override
882 public ChannelPromise newPromise() {
883 return new DefaultChannelPromise(channel(), executor());
884 }
885
886 @Override
887 public ChannelProgressivePromise newProgressivePromise() {
888 return new DefaultChannelProgressivePromise(channel(), executor());
889 }
890
891 @Override
892 public ChannelFuture newSucceededFuture() {
893 ChannelFuture succeededFuture = this.succeededFuture;
894 if (succeededFuture == null) {
895 this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
896 }
897 return succeededFuture;
898 }
899
900 @Override
901 public ChannelFuture newFailedFuture(Throwable cause) {
902 return new FailedChannelFuture(channel(), executor(), cause);
903 }
904
905 private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
906 ObjectUtil.checkNotNull(promise, "promise");
907
908 if (promise.isDone()) {
909
910
911
912
913 if (promise.isCancelled()) {
914 return true;
915 }
916 throw new IllegalArgumentException("promise already done: " + promise);
917 }
918
919 if (promise.channel() != channel()) {
920 throw new IllegalArgumentException(String.format(
921 "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
922 }
923
924 if (promise.getClass() == DefaultChannelPromise.class) {
925 return false;
926 }
927
928 if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
929 throw new IllegalArgumentException(
930 StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
931 }
932
933 if (promise instanceof AbstractChannel.CloseFuture) {
934 throw new IllegalArgumentException(
935 StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
936 }
937 return false;
938 }
939
940 private AbstractChannelHandlerContext findContextInbound(int mask) {
941 AbstractChannelHandlerContext ctx = this;
942 EventExecutor currentExecutor = executor();
943 do {
944 ctx = ctx.next;
945 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
946 return ctx;
947 }
948
949 private AbstractChannelHandlerContext findContextOutbound(int mask) {
950 AbstractChannelHandlerContext ctx = this;
951 EventExecutor currentExecutor = executor();
952 do {
953 ctx = ctx.prev;
954 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
955 return ctx;
956 }
957
958 private static boolean skipContext(
959 AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
960
961 return (ctx.executionMask & (onlyMask | mask)) == 0 ||
962
963
964
965
966 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
967 }
968
969 @Override
970 public ChannelPromise voidPromise() {
971 return channel().voidPromise();
972 }
973
974 final void setRemoved() {
975 handlerState = REMOVE_COMPLETE;
976 }
977
978 final boolean setAddComplete() {
979 for (;;) {
980 int oldState = handlerState;
981 if (oldState == REMOVE_COMPLETE) {
982 return false;
983 }
984
985
986
987 if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
988 return true;
989 }
990 }
991 }
992
993 final void setAddPending() {
994 boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
995 assert updated;
996 }
997
998 final void callHandlerAdded() throws Exception {
999
1000
1001 if (setAddComplete()) {
1002 handler().handlerAdded(this);
1003 }
1004 }
1005
1006 final void callHandlerRemoved() throws Exception {
1007 try {
1008
1009 if (handlerState == ADD_COMPLETE) {
1010 handler().handlerRemoved(this);
1011 }
1012 } finally {
1013
1014 setRemoved();
1015 }
1016 }
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026 boolean invokeHandler() {
1027
1028 int handlerState = this.handlerState;
1029 return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
1030 }
1031
1032 @Override
1033 public boolean isRemoved() {
1034 return handlerState == REMOVE_COMPLETE;
1035 }
1036
1037 @Override
1038 public <T> Attribute<T> attr(AttributeKey<T> key) {
1039 return channel().attr(key);
1040 }
1041
1042 @Override
1043 public <T> boolean hasAttr(AttributeKey<T> key) {
1044 return channel().hasAttr(key);
1045 }
1046
1047 private static boolean safeExecute(EventExecutor executor, Runnable runnable,
1048 ChannelPromise promise, Object msg, boolean lazy) {
1049 try {
1050 if (lazy && executor instanceof AbstractEventExecutor) {
1051 ((AbstractEventExecutor) executor).lazyExecute(runnable);
1052 } else {
1053 executor.execute(runnable);
1054 }
1055 return true;
1056 } catch (Throwable cause) {
1057 try {
1058 if (msg != null) {
1059 ReferenceCountUtil.release(msg);
1060 }
1061 } finally {
1062 promise.setFailure(cause);
1063 }
1064 return false;
1065 }
1066 }
1067
1068 @Override
1069 public String toHintString() {
1070 return '\'' + name + "' will handle the message from this point.";
1071 }
1072
1073 @Override
1074 public String toString() {
1075 return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1076 }
1077
1078 Tasks getInvokeTasks() {
1079 Tasks tasks = invokeTasks;
1080 if (tasks == null) {
1081 invokeTasks = tasks = new Tasks(this);
1082 }
1083 return tasks;
1084 }
1085
1086 static final class WriteTask implements Runnable {
1087 private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
1088 @Override
1089 public WriteTask newObject(Handle<WriteTask> handle) {
1090 return new WriteTask(handle);
1091 }
1092 });
1093
1094 static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1095 Object msg, ChannelPromise promise, boolean flush) {
1096 WriteTask task = RECYCLER.get();
1097 init(task, ctx, msg, promise, flush);
1098 return task;
1099 }
1100
1101 private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1102 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1103
1104
1105 private static final int WRITE_TASK_OVERHEAD =
1106 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1107
1108 private final Handle<WriteTask> handle;
1109 private AbstractChannelHandlerContext ctx;
1110 private Object msg;
1111 private ChannelPromise promise;
1112 private int size;
1113
1114 private WriteTask(Handle<WriteTask> handle) {
1115 this.handle = handle;
1116 }
1117
1118 static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1119 Object msg, ChannelPromise promise, boolean flush) {
1120 task.ctx = ctx;
1121 task.msg = msg;
1122 task.promise = promise;
1123
1124 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1125 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1126 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1127 } else {
1128 task.size = 0;
1129 }
1130 if (flush) {
1131 task.size |= Integer.MIN_VALUE;
1132 }
1133 }
1134
1135 @Override
1136 public void run() {
1137 try {
1138 decrementPendingOutboundBytes();
1139 ctx.write(msg, size < 0, promise);
1140 } finally {
1141 recycle();
1142 }
1143 }
1144
1145 void cancel() {
1146 try {
1147 decrementPendingOutboundBytes();
1148 } finally {
1149 recycle();
1150 }
1151 }
1152
1153 private void decrementPendingOutboundBytes() {
1154 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1155 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1156 }
1157 }
1158
1159 private void recycle() {
1160
1161 ctx = null;
1162 msg = null;
1163 promise = null;
1164 handle.recycle(this);
1165 }
1166 }
1167
1168 static final class Tasks {
1169 final Runnable invokeChannelReadCompleteTask;
1170 private final Runnable invokeReadTask;
1171 private final Runnable invokeChannelWritableStateChangedTask;
1172 private final Runnable invokeFlushTask;
1173
1174 Tasks(AbstractChannelHandlerContext ctx) {
1175 invokeChannelReadCompleteTask = ctx::fireChannelReadComplete;
1176 invokeReadTask = ctx::read;
1177 invokeChannelWritableStateChangedTask = ctx::fireChannelWritabilityChanged;
1178 invokeFlushTask = ctx::invokeFlush;
1179 }
1180 }
1181 }