1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.ReferenceCountUtil;
22 import io.netty.util.ResourceLeakHint;
23 import io.netty.util.concurrent.AbstractEventExecutor;
24 import io.netty.util.concurrent.EventExecutor;
25 import io.netty.util.concurrent.OrderedEventExecutor;
26 import io.netty.util.internal.ObjectPool;
27 import io.netty.util.internal.ObjectPool.Handle;
28 import io.netty.util.internal.ObjectPool.ObjectCreator;
29 import io.netty.util.internal.PromiseNotificationUtil;
30 import io.netty.util.internal.ThrowableUtil;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.StringUtil;
33 import io.netty.util.internal.SystemPropertyUtil;
34 import io.netty.util.internal.logging.InternalLogger;
35 import io.netty.util.internal.logging.InternalLoggerFactory;
36
37 import java.net.SocketAddress;
38 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39
40 import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
41 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
42 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
43 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
44 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
45 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
46 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
47 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
48 import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
49 import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
50 import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
51 import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
52 import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
53 import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
54 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
55 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
56 import static io.netty.channel.ChannelHandlerMask.MASK_READ;
57 import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
58 import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
59 import static io.netty.channel.ChannelHandlerMask.mask;
60
61 abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
62
63 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
64 volatile AbstractChannelHandlerContext next;
65 volatile AbstractChannelHandlerContext prev;
66
67 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
68 AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
69
70
71
72
73 private static final int ADD_PENDING = 1;
74
75
76
77 private static final int ADD_COMPLETE = 2;
78
79
80
81 private static final int REMOVE_COMPLETE = 3;
82
83
84
85
86 private static final int INIT = 0;
87
88 private final DefaultChannelPipeline pipeline;
89 private final String name;
90 private final boolean ordered;
91 private final int executionMask;
92
93
94
95 final EventExecutor executor;
96 private ChannelFuture succeededFuture;
97
98
99
100 private Tasks invokeTasks;
101
102 private volatile int handlerState = INIT;
103
104 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
105 String name, Class<? extends ChannelHandler> handlerClass) {
106 this.name = ObjectUtil.checkNotNull(name, "name");
107 this.pipeline = pipeline;
108 this.executor = executor;
109 this.executionMask = mask(handlerClass);
110
111 ordered = executor == null || executor instanceof OrderedEventExecutor;
112 }
113
114 @Override
115 public Channel channel() {
116 return pipeline.channel();
117 }
118
119 @Override
120 public ChannelPipeline pipeline() {
121 return pipeline;
122 }
123
124 @Override
125 public ByteBufAllocator alloc() {
126 return channel().config().getAllocator();
127 }
128
129 @Override
130 public EventExecutor executor() {
131 if (executor == null) {
132 return channel().eventLoop();
133 } else {
134 return executor;
135 }
136 }
137
138 @Override
139 public String name() {
140 return name;
141 }
142
143 @Override
144 public ChannelHandlerContext fireChannelRegistered() {
145 invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
146 return this;
147 }
148
149 static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
150 EventExecutor executor = next.executor();
151 if (executor.inEventLoop()) {
152 next.invokeChannelRegistered();
153 } else {
154 executor.execute(new Runnable() {
155 @Override
156 public void run() {
157 next.invokeChannelRegistered();
158 }
159 });
160 }
161 }
162
163 private void invokeChannelRegistered() {
164 if (invokeHandler()) {
165 try {
166
167
168
169 final ChannelHandler handler = handler();
170 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
171 if (handler == headContext) {
172 headContext.channelRegistered(this);
173 } else if (handler instanceof ChannelInboundHandlerAdapter) {
174 ((ChannelInboundHandlerAdapter) handler).channelRegistered(this);
175 } else {
176 ((ChannelInboundHandler) handler).channelRegistered(this);
177 }
178 } catch (Throwable t) {
179 invokeExceptionCaught(t);
180 }
181 } else {
182 fireChannelRegistered();
183 }
184 }
185
186 @Override
187 public ChannelHandlerContext fireChannelUnregistered() {
188 invokeChannelUnregistered(findContextInbound(MASK_CHANNEL_UNREGISTERED));
189 return this;
190 }
191
192 static void invokeChannelUnregistered(final AbstractChannelHandlerContext next) {
193 EventExecutor executor = next.executor();
194 if (executor.inEventLoop()) {
195 next.invokeChannelUnregistered();
196 } else {
197 executor.execute(new Runnable() {
198 @Override
199 public void run() {
200 next.invokeChannelUnregistered();
201 }
202 });
203 }
204 }
205
206 private void invokeChannelUnregistered() {
207 if (invokeHandler()) {
208 try {
209
210
211
212 final ChannelHandler handler = handler();
213 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
214 if (handler == headContext) {
215 headContext.channelUnregistered(this);
216 } else if (handler instanceof ChannelInboundHandlerAdapter) {
217 ((ChannelInboundHandlerAdapter) handler).channelUnregistered(this);
218 } else {
219 ((ChannelInboundHandler) handler).channelUnregistered(this);
220 }
221 } catch (Throwable t) {
222 invokeExceptionCaught(t);
223 }
224 } else {
225 fireChannelUnregistered();
226 }
227 }
228
229 @Override
230 public ChannelHandlerContext fireChannelActive() {
231 invokeChannelActive(findContextInbound(MASK_CHANNEL_ACTIVE));
232 return this;
233 }
234
235 static void invokeChannelActive(final AbstractChannelHandlerContext next) {
236 EventExecutor executor = next.executor();
237 if (executor.inEventLoop()) {
238 next.invokeChannelActive();
239 } else {
240 executor.execute(new Runnable() {
241 @Override
242 public void run() {
243 next.invokeChannelActive();
244 }
245 });
246 }
247 }
248
249 private void invokeChannelActive() {
250 if (invokeHandler()) {
251 try {
252
253
254
255 final ChannelHandler handler = handler();
256 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
257 if (handler == headContext) {
258 headContext.channelActive(this);
259 } else if (handler instanceof ChannelInboundHandlerAdapter) {
260 ((ChannelInboundHandlerAdapter) handler).channelActive(this);
261 } else {
262 ((ChannelInboundHandler) handler).channelActive(this);
263 }
264 } catch (Throwable t) {
265 invokeExceptionCaught(t);
266 }
267 } else {
268 fireChannelActive();
269 }
270 }
271
272 @Override
273 public ChannelHandlerContext fireChannelInactive() {
274 invokeChannelInactive(findContextInbound(MASK_CHANNEL_INACTIVE));
275 return this;
276 }
277
278 static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
279 EventExecutor executor = next.executor();
280 if (executor.inEventLoop()) {
281 next.invokeChannelInactive();
282 } else {
283 executor.execute(new Runnable() {
284 @Override
285 public void run() {
286 next.invokeChannelInactive();
287 }
288 });
289 }
290 }
291
292 private void invokeChannelInactive() {
293 if (invokeHandler()) {
294 try {
295
296
297
298 final ChannelHandler handler = handler();
299 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
300 if (handler == headContext) {
301 headContext.channelInactive(this);
302 } else if (handler instanceof ChannelInboundHandlerAdapter) {
303 ((ChannelInboundHandlerAdapter) handler).channelInactive(this);
304 } else {
305 ((ChannelInboundHandler) handler).channelInactive(this);
306 }
307 } catch (Throwable t) {
308 invokeExceptionCaught(t);
309 }
310 } else {
311 fireChannelInactive();
312 }
313 }
314
315 @Override
316 public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
317 invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);
318 return this;
319 }
320
321 static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
322 ObjectUtil.checkNotNull(cause, "cause");
323 EventExecutor executor = next.executor();
324 if (executor.inEventLoop()) {
325 next.invokeExceptionCaught(cause);
326 } else {
327 try {
328 executor.execute(new Runnable() {
329 @Override
330 public void run() {
331 next.invokeExceptionCaught(cause);
332 }
333 });
334 } catch (Throwable t) {
335 if (logger.isWarnEnabled()) {
336 logger.warn("Failed to submit an exceptionCaught() event.", t);
337 logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
338 }
339 }
340 }
341 }
342
343 private void invokeExceptionCaught(final Throwable cause) {
344 if (invokeHandler()) {
345 try {
346 handler().exceptionCaught(this, cause);
347 } catch (Throwable error) {
348 if (logger.isDebugEnabled()) {
349 logger.debug(
350 "An exception {}" +
351 "was thrown by a user handler's exceptionCaught() " +
352 "method while handling the following exception:",
353 ThrowableUtil.stackTraceToString(error), cause);
354 } else if (logger.isWarnEnabled()) {
355 logger.warn(
356 "An exception '{}' [enable DEBUG level for full stacktrace] " +
357 "was thrown by a user handler's exceptionCaught() " +
358 "method while handling the following exception:", error, cause);
359 }
360 }
361 } else {
362 fireExceptionCaught(cause);
363 }
364 }
365
366 @Override
367 public ChannelHandlerContext fireUserEventTriggered(final Object event) {
368 invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
369 return this;
370 }
371
372 static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
373 ObjectUtil.checkNotNull(event, "event");
374 EventExecutor executor = next.executor();
375 if (executor.inEventLoop()) {
376 next.invokeUserEventTriggered(event);
377 } else {
378 executor.execute(new Runnable() {
379 @Override
380 public void run() {
381 next.invokeUserEventTriggered(event);
382 }
383 });
384 }
385 }
386
387 private void invokeUserEventTriggered(Object event) {
388 if (invokeHandler()) {
389 try {
390
391
392
393 final ChannelHandler handler = handler();
394 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
395 if (handler == headContext) {
396 headContext.userEventTriggered(this, event);
397 } else if (handler instanceof ChannelInboundHandlerAdapter) {
398 ((ChannelInboundHandlerAdapter) handler).userEventTriggered(this, event);
399 } else {
400 ((ChannelInboundHandler) handler).userEventTriggered(this, event);
401 }
402 } catch (Throwable t) {
403 invokeExceptionCaught(t);
404 }
405 } else {
406 fireUserEventTriggered(event);
407 }
408 }
409
410 @Override
411 public ChannelHandlerContext fireChannelRead(final Object msg) {
412 invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
413 return this;
414 }
415
416 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
417 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
418 EventExecutor executor = next.executor();
419 if (executor.inEventLoop()) {
420 next.invokeChannelRead(m);
421 } else {
422 executor.execute(new Runnable() {
423 @Override
424 public void run() {
425 next.invokeChannelRead(m);
426 }
427 });
428 }
429 }
430
431 private void invokeChannelRead(Object msg) {
432 if (invokeHandler()) {
433 try {
434
435
436
437 final ChannelHandler handler = handler();
438 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
439 if (handler == headContext) {
440 headContext.channelRead(this, msg);
441 } else if (handler instanceof ChannelDuplexHandler) {
442 ((ChannelDuplexHandler) handler).channelRead(this, msg);
443 } else {
444 ((ChannelInboundHandler) handler).channelRead(this, msg);
445 }
446 } catch (Throwable t) {
447 invokeExceptionCaught(t);
448 }
449 } else {
450 fireChannelRead(msg);
451 }
452 }
453
454 @Override
455 public ChannelHandlerContext fireChannelReadComplete() {
456 invokeChannelReadComplete(findContextInbound(MASK_CHANNEL_READ_COMPLETE));
457 return this;
458 }
459
460 static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
461 EventExecutor executor = next.executor();
462 if (executor.inEventLoop()) {
463 next.invokeChannelReadComplete();
464 } else {
465 Tasks tasks = next.invokeTasks;
466 if (tasks == null) {
467 next.invokeTasks = tasks = new Tasks(next);
468 }
469 executor.execute(tasks.invokeChannelReadCompleteTask);
470 }
471 }
472
473 private void invokeChannelReadComplete() {
474 if (invokeHandler()) {
475 try {
476
477
478
479 final ChannelHandler handler = handler();
480 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
481 if (handler == headContext) {
482 headContext.channelReadComplete(this);
483 } else if (handler instanceof ChannelDuplexHandler) {
484 ((ChannelDuplexHandler) handler).channelReadComplete(this);
485 } else {
486 ((ChannelInboundHandler) handler).channelReadComplete(this);
487 }
488 } catch (Throwable t) {
489 invokeExceptionCaught(t);
490 }
491 } else {
492 fireChannelReadComplete();
493 }
494 }
495
496 @Override
497 public ChannelHandlerContext fireChannelWritabilityChanged() {
498 invokeChannelWritabilityChanged(findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED));
499 return this;
500 }
501
502 static void invokeChannelWritabilityChanged(final AbstractChannelHandlerContext next) {
503 EventExecutor executor = next.executor();
504 if (executor.inEventLoop()) {
505 next.invokeChannelWritabilityChanged();
506 } else {
507 Tasks tasks = next.invokeTasks;
508 if (tasks == null) {
509 next.invokeTasks = tasks = new Tasks(next);
510 }
511 executor.execute(tasks.invokeChannelWritableStateChangedTask);
512 }
513 }
514
515 private void invokeChannelWritabilityChanged() {
516 if (invokeHandler()) {
517 try {
518
519
520
521 final ChannelHandler handler = handler();
522 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
523 if (handler == headContext) {
524 headContext.channelWritabilityChanged(this);
525 } else if (handler instanceof ChannelInboundHandlerAdapter) {
526 ((ChannelInboundHandlerAdapter) handler).channelWritabilityChanged(this);
527 } else {
528 ((ChannelInboundHandler) handler).channelWritabilityChanged(this);
529 }
530 } catch (Throwable t) {
531 invokeExceptionCaught(t);
532 }
533 } else {
534 fireChannelWritabilityChanged();
535 }
536 }
537
538 @Override
539 public ChannelFuture bind(SocketAddress localAddress) {
540 return bind(localAddress, newPromise());
541 }
542
543 @Override
544 public ChannelFuture connect(SocketAddress remoteAddress) {
545 return connect(remoteAddress, newPromise());
546 }
547
548 @Override
549 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
550 return connect(remoteAddress, localAddress, newPromise());
551 }
552
553 @Override
554 public ChannelFuture disconnect() {
555 return disconnect(newPromise());
556 }
557
558 @Override
559 public ChannelFuture close() {
560 return close(newPromise());
561 }
562
563 @Override
564 public ChannelFuture deregister() {
565 return deregister(newPromise());
566 }
567
568 @Override
569 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
570 ObjectUtil.checkNotNull(localAddress, "localAddress");
571 if (isNotValidPromise(promise, false)) {
572
573 return promise;
574 }
575
576 final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
577 EventExecutor executor = next.executor();
578 if (executor.inEventLoop()) {
579 next.invokeBind(localAddress, promise);
580 } else {
581 safeExecute(executor, new Runnable() {
582 @Override
583 public void run() {
584 next.invokeBind(localAddress, promise);
585 }
586 }, promise, null, false);
587 }
588 return promise;
589 }
590
591 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
592 if (invokeHandler()) {
593 try {
594
595
596
597 final ChannelHandler handler = handler();
598 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
599 if (handler == headContext) {
600 headContext.bind(this, localAddress, promise);
601 } else if (handler instanceof ChannelDuplexHandler) {
602 ((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
603 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
604 ((ChannelOutboundHandlerAdapter) handler).bind(this, localAddress, promise);
605 } else {
606 ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
607 }
608 } catch (Throwable t) {
609 notifyOutboundHandlerException(t, promise);
610 }
611 } else {
612 bind(localAddress, promise);
613 }
614 }
615
616 @Override
617 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
618 return connect(remoteAddress, null, promise);
619 }
620
621 @Override
622 public ChannelFuture connect(
623 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
624 ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
625
626 if (isNotValidPromise(promise, false)) {
627
628 return promise;
629 }
630
631 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
632 EventExecutor executor = next.executor();
633 if (executor.inEventLoop()) {
634 next.invokeConnect(remoteAddress, localAddress, promise);
635 } else {
636 safeExecute(executor, new Runnable() {
637 @Override
638 public void run() {
639 next.invokeConnect(remoteAddress, localAddress, promise);
640 }
641 }, promise, null, false);
642 }
643 return promise;
644 }
645
646 private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
647 if (invokeHandler()) {
648 try {
649
650
651
652 final ChannelHandler handler = handler();
653 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
654 if (handler == headContext) {
655 headContext.connect(this, remoteAddress, localAddress, promise);
656 } else if (handler instanceof ChannelDuplexHandler) {
657 ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise);
658 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
659 ((ChannelOutboundHandlerAdapter) handler).connect(this, remoteAddress, localAddress, promise);
660 } else {
661 ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
662 }
663 } catch (Throwable t) {
664 notifyOutboundHandlerException(t, promise);
665 }
666 } else {
667 connect(remoteAddress, localAddress, promise);
668 }
669 }
670
671 @Override
672 public ChannelFuture disconnect(final ChannelPromise promise) {
673 if (!channel().metadata().hasDisconnect()) {
674
675
676 return close(promise);
677 }
678 if (isNotValidPromise(promise, false)) {
679
680 return promise;
681 }
682
683 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
684 EventExecutor executor = next.executor();
685 if (executor.inEventLoop()) {
686 next.invokeDisconnect(promise);
687 } else {
688 safeExecute(executor, new Runnable() {
689 @Override
690 public void run() {
691 next.invokeDisconnect(promise);
692 }
693 }, promise, null, false);
694 }
695 return promise;
696 }
697
698 private void invokeDisconnect(ChannelPromise promise) {
699 if (invokeHandler()) {
700 try {
701
702
703
704 final ChannelHandler handler = handler();
705 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
706 if (handler == headContext) {
707 headContext.disconnect(this, promise);
708 } else if (handler instanceof ChannelDuplexHandler) {
709 ((ChannelDuplexHandler) handler).disconnect(this, promise);
710 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
711 ((ChannelOutboundHandlerAdapter) handler).disconnect(this, promise);
712 } else {
713 ((ChannelOutboundHandler) handler).disconnect(this, promise);
714 }
715 } catch (Throwable t) {
716 notifyOutboundHandlerException(t, promise);
717 }
718 } else {
719 disconnect(promise);
720 }
721 }
722
723 @Override
724 public ChannelFuture close(final ChannelPromise promise) {
725 if (isNotValidPromise(promise, false)) {
726
727 return promise;
728 }
729
730 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
731 EventExecutor executor = next.executor();
732 if (executor.inEventLoop()) {
733 next.invokeClose(promise);
734 } else {
735 safeExecute(executor, new Runnable() {
736 @Override
737 public void run() {
738 next.invokeClose(promise);
739 }
740 }, promise, null, false);
741 }
742
743 return promise;
744 }
745
746 private void invokeClose(ChannelPromise promise) {
747 if (invokeHandler()) {
748 try {
749
750
751
752 final ChannelHandler handler = handler();
753 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
754 if (handler == headContext) {
755 headContext.close(this, promise);
756 } else if (handler instanceof ChannelDuplexHandler) {
757 ((ChannelDuplexHandler) handler).close(this, promise);
758 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
759 ((ChannelOutboundHandlerAdapter) handler).close(this, promise);
760 } else {
761 ((ChannelOutboundHandler) handler).close(this, promise);
762 }
763 } catch (Throwable t) {
764 notifyOutboundHandlerException(t, promise);
765 }
766 } else {
767 close(promise);
768 }
769 }
770
771 @Override
772 public ChannelFuture deregister(final ChannelPromise promise) {
773 if (isNotValidPromise(promise, false)) {
774
775 return promise;
776 }
777
778 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
779 EventExecutor executor = next.executor();
780 if (executor.inEventLoop()) {
781 next.invokeDeregister(promise);
782 } else {
783 safeExecute(executor, new Runnable() {
784 @Override
785 public void run() {
786 next.invokeDeregister(promise);
787 }
788 }, promise, null, false);
789 }
790
791 return promise;
792 }
793
794 private void invokeDeregister(ChannelPromise promise) {
795 if (invokeHandler()) {
796 try {
797
798
799
800 final ChannelHandler handler = handler();
801 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
802 if (handler == headContext) {
803 headContext.deregister(this, promise);
804 } else if (handler instanceof ChannelDuplexHandler) {
805 ((ChannelDuplexHandler) handler).deregister(this, promise);
806 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
807 ((ChannelOutboundHandlerAdapter) handler).deregister(this, promise);
808 } else {
809 ((ChannelOutboundHandler) handler).deregister(this, promise);
810 }
811 } catch (Throwable t) {
812 notifyOutboundHandlerException(t, promise);
813 }
814 } else {
815 deregister(promise);
816 }
817 }
818
819 @Override
820 public ChannelHandlerContext read() {
821 final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
822 EventExecutor executor = next.executor();
823 if (executor.inEventLoop()) {
824 next.invokeRead();
825 } else {
826 Tasks tasks = next.invokeTasks;
827 if (tasks == null) {
828 next.invokeTasks = tasks = new Tasks(next);
829 }
830 executor.execute(tasks.invokeReadTask);
831 }
832
833 return this;
834 }
835
836 private void invokeRead() {
837 if (invokeHandler()) {
838 try {
839
840
841
842 final ChannelHandler handler = handler();
843 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
844 if (handler == headContext) {
845 headContext.read(this);
846 } else if (handler instanceof ChannelDuplexHandler) {
847 ((ChannelDuplexHandler) handler).read(this);
848 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
849 ((ChannelOutboundHandlerAdapter) handler).read(this);
850 } else {
851 ((ChannelOutboundHandler) handler).read(this);
852 }
853 } catch (Throwable t) {
854 invokeExceptionCaught(t);
855 }
856 } else {
857 read();
858 }
859 }
860
861 @Override
862 public ChannelFuture write(Object msg) {
863 return write(msg, newPromise());
864 }
865
866 @Override
867 public ChannelFuture write(final Object msg, final ChannelPromise promise) {
868 write(msg, false, promise);
869
870 return promise;
871 }
872
873 void invokeWrite(Object msg, ChannelPromise promise) {
874 if (invokeHandler()) {
875 invokeWrite0(msg, promise);
876 } else {
877 write(msg, promise);
878 }
879 }
880
881 private void invokeWrite0(Object msg, ChannelPromise promise) {
882 try {
883
884
885
886 final ChannelHandler handler = handler();
887 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
888 if (handler == headContext) {
889 headContext.write(this, msg, promise);
890 } else if (handler instanceof ChannelDuplexHandler) {
891 ((ChannelDuplexHandler) handler).write(this, msg, promise);
892 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
893 ((ChannelOutboundHandlerAdapter) handler).write(this, msg, promise);
894 } else {
895 ((ChannelOutboundHandler) handler).write(this, msg, promise);
896 }
897 } catch (Throwable t) {
898 notifyOutboundHandlerException(t, promise);
899 }
900 }
901
902 @Override
903 public ChannelHandlerContext flush() {
904 final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
905 EventExecutor executor = next.executor();
906 if (executor.inEventLoop()) {
907 next.invokeFlush();
908 } else {
909 Tasks tasks = next.invokeTasks;
910 if (tasks == null) {
911 next.invokeTasks = tasks = new Tasks(next);
912 }
913 safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
914 }
915
916 return this;
917 }
918
919 private void invokeFlush() {
920 if (invokeHandler()) {
921 invokeFlush0();
922 } else {
923 flush();
924 }
925 }
926
927 private void invokeFlush0() {
928 try {
929
930
931
932 final ChannelHandler handler = handler();
933 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
934 if (handler == headContext) {
935 headContext.flush(this);
936 } else if (handler instanceof ChannelDuplexHandler) {
937 ((ChannelDuplexHandler) handler).flush(this);
938 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
939 ((ChannelOutboundHandlerAdapter) handler).flush(this);
940 } else {
941 ((ChannelOutboundHandler) handler).flush(this);
942 }
943 } catch (Throwable t) {
944 invokeExceptionCaught(t);
945 }
946 }
947
948 @Override
949 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
950 write(msg, true, promise);
951 return promise;
952 }
953
954 void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
955 if (invokeHandler()) {
956 invokeWrite0(msg, promise);
957 invokeFlush0();
958 } else {
959 writeAndFlush(msg, promise);
960 }
961 }
962
963 private void write(Object msg, boolean flush, ChannelPromise promise) {
964 ObjectUtil.checkNotNull(msg, "msg");
965 try {
966 if (isNotValidPromise(promise, true)) {
967 ReferenceCountUtil.release(msg);
968
969 return;
970 }
971 } catch (RuntimeException e) {
972 ReferenceCountUtil.release(msg);
973 throw e;
974 }
975
976 final AbstractChannelHandlerContext next = findContextOutbound(flush ?
977 (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
978 final Object m = pipeline.touch(msg, next);
979 EventExecutor executor = next.executor();
980 if (executor.inEventLoop()) {
981 if (flush) {
982 next.invokeWriteAndFlush(m, promise);
983 } else {
984 next.invokeWrite(m, promise);
985 }
986 } else {
987 final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
988 if (!safeExecute(executor, task, promise, m, !flush)) {
989
990
991
992
993 task.cancel();
994 }
995 }
996 }
997
998 @Override
999 public ChannelFuture writeAndFlush(Object msg) {
1000 return writeAndFlush(msg, newPromise());
1001 }
1002
1003 private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
1004
1005
1006 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
1007 }
1008
1009 @Override
1010 public ChannelPromise newPromise() {
1011 return new DefaultChannelPromise(channel(), executor());
1012 }
1013
1014 @Override
1015 public ChannelProgressivePromise newProgressivePromise() {
1016 return new DefaultChannelProgressivePromise(channel(), executor());
1017 }
1018
1019 @Override
1020 public ChannelFuture newSucceededFuture() {
1021 ChannelFuture succeededFuture = this.succeededFuture;
1022 if (succeededFuture == null) {
1023 this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
1024 }
1025 return succeededFuture;
1026 }
1027
1028 @Override
1029 public ChannelFuture newFailedFuture(Throwable cause) {
1030 return new FailedChannelFuture(channel(), executor(), cause);
1031 }
1032
1033 private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
1034 ObjectUtil.checkNotNull(promise, "promise");
1035
1036 if (promise.isDone()) {
1037
1038
1039
1040
1041 if (promise.isCancelled()) {
1042 return true;
1043 }
1044 throw new IllegalArgumentException("promise already done: " + promise);
1045 }
1046
1047 if (promise.channel() != channel()) {
1048 throw new IllegalArgumentException(String.format(
1049 "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
1050 }
1051
1052 if (promise.getClass() == DefaultChannelPromise.class) {
1053 return false;
1054 }
1055
1056 if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
1057 throw new IllegalArgumentException(
1058 StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
1059 }
1060
1061 if (promise instanceof AbstractChannel.CloseFuture) {
1062 throw new IllegalArgumentException(
1063 StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
1064 }
1065 return false;
1066 }
1067
1068 private AbstractChannelHandlerContext findContextInbound(int mask) {
1069 AbstractChannelHandlerContext ctx = this;
1070 EventExecutor currentExecutor = executor();
1071 do {
1072 ctx = ctx.next;
1073 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
1074 return ctx;
1075 }
1076
1077 private AbstractChannelHandlerContext findContextOutbound(int mask) {
1078 AbstractChannelHandlerContext ctx = this;
1079 EventExecutor currentExecutor = executor();
1080 do {
1081 ctx = ctx.prev;
1082 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
1083 return ctx;
1084 }
1085
1086 private static boolean skipContext(
1087 AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
1088
1089 return (ctx.executionMask & (onlyMask | mask)) == 0 ||
1090
1091
1092
1093
1094 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
1095 }
1096
1097 @Override
1098 public ChannelPromise voidPromise() {
1099 return channel().voidPromise();
1100 }
1101
1102 final void setRemoved() {
1103 handlerState = REMOVE_COMPLETE;
1104 }
1105
1106 final boolean setAddComplete() {
1107 for (;;) {
1108 int oldState = handlerState;
1109 if (oldState == REMOVE_COMPLETE) {
1110 return false;
1111 }
1112
1113
1114
1115 if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
1116 return true;
1117 }
1118 }
1119 }
1120
1121 final void setAddPending() {
1122 boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
1123 assert updated;
1124 }
1125
1126 final void callHandlerAdded() throws Exception {
1127
1128
1129 if (setAddComplete()) {
1130 handler().handlerAdded(this);
1131 }
1132 }
1133
1134 final void callHandlerRemoved() throws Exception {
1135 try {
1136
1137 if (handlerState == ADD_COMPLETE) {
1138 handler().handlerRemoved(this);
1139 }
1140 } finally {
1141
1142 setRemoved();
1143 }
1144 }
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154 private boolean invokeHandler() {
1155
1156 int handlerState = this.handlerState;
1157 return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
1158 }
1159
1160 @Override
1161 public boolean isRemoved() {
1162 return handlerState == REMOVE_COMPLETE;
1163 }
1164
1165 @Override
1166 public <T> Attribute<T> attr(AttributeKey<T> key) {
1167 return channel().attr(key);
1168 }
1169
1170 @Override
1171 public <T> boolean hasAttr(AttributeKey<T> key) {
1172 return channel().hasAttr(key);
1173 }
1174
1175 private static boolean safeExecute(EventExecutor executor, Runnable runnable,
1176 ChannelPromise promise, Object msg, boolean lazy) {
1177 try {
1178 if (lazy && executor instanceof AbstractEventExecutor) {
1179 ((AbstractEventExecutor) executor).lazyExecute(runnable);
1180 } else {
1181 executor.execute(runnable);
1182 }
1183 return true;
1184 } catch (Throwable cause) {
1185 try {
1186 if (msg != null) {
1187 ReferenceCountUtil.release(msg);
1188 }
1189 } finally {
1190 promise.setFailure(cause);
1191 }
1192 return false;
1193 }
1194 }
1195
1196 @Override
1197 public String toHintString() {
1198 return '\'' + name + "' will handle the message from this point.";
1199 }
1200
1201 @Override
1202 public String toString() {
1203 return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1204 }
1205
1206 static final class WriteTask implements Runnable {
1207 private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
1208 @Override
1209 public WriteTask newObject(Handle<WriteTask> handle) {
1210 return new WriteTask(handle);
1211 }
1212 });
1213
1214 static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1215 Object msg, ChannelPromise promise, boolean flush) {
1216 WriteTask task = RECYCLER.get();
1217 init(task, ctx, msg, promise, flush);
1218 return task;
1219 }
1220
1221 private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1222 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1223
1224
1225 private static final int WRITE_TASK_OVERHEAD =
1226 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1227
1228 private final Handle<WriteTask> handle;
1229 private AbstractChannelHandlerContext ctx;
1230 private Object msg;
1231 private ChannelPromise promise;
1232 private int size;
1233
1234 @SuppressWarnings("unchecked")
1235 private WriteTask(Handle<? extends WriteTask> handle) {
1236 this.handle = (Handle<WriteTask>) handle;
1237 }
1238
1239 protected static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1240 Object msg, ChannelPromise promise, boolean flush) {
1241 task.ctx = ctx;
1242 task.msg = msg;
1243 task.promise = promise;
1244
1245 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1246 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1247 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1248 } else {
1249 task.size = 0;
1250 }
1251 if (flush) {
1252 task.size |= Integer.MIN_VALUE;
1253 }
1254 }
1255
1256 @Override
1257 public void run() {
1258 try {
1259 decrementPendingOutboundBytes();
1260 if (size >= 0) {
1261 ctx.invokeWrite(msg, promise);
1262 } else {
1263 ctx.invokeWriteAndFlush(msg, promise);
1264 }
1265 } finally {
1266 recycle();
1267 }
1268 }
1269
1270 void cancel() {
1271 try {
1272 decrementPendingOutboundBytes();
1273 } finally {
1274 recycle();
1275 }
1276 }
1277
1278 private void decrementPendingOutboundBytes() {
1279 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1280 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1281 }
1282 }
1283
1284 private void recycle() {
1285
1286 ctx = null;
1287 msg = null;
1288 promise = null;
1289 handle.recycle(this);
1290 }
1291 }
1292
1293 private static final class Tasks {
1294 private final AbstractChannelHandlerContext next;
1295 private final Runnable invokeChannelReadCompleteTask = new Runnable() {
1296 @Override
1297 public void run() {
1298 next.invokeChannelReadComplete();
1299 }
1300 };
1301 private final Runnable invokeReadTask = new Runnable() {
1302 @Override
1303 public void run() {
1304 next.invokeRead();
1305 }
1306 };
1307 private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
1308 @Override
1309 public void run() {
1310 next.invokeChannelWritabilityChanged();
1311 }
1312 };
1313 private final Runnable invokeFlushTask = new Runnable() {
1314 @Override
1315 public void run() {
1316 next.invokeFlush();
1317 }
1318 };
1319
1320 Tasks(AbstractChannelHandlerContext next) {
1321 this.next = next;
1322 }
1323 }
1324 }