1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.ReferenceCountUtil;
22 import io.netty.util.ResourceLeakHint;
23 import io.netty.util.concurrent.AbstractEventExecutor;
24 import io.netty.util.concurrent.EventExecutor;
25 import io.netty.util.concurrent.OrderedEventExecutor;
26 import io.netty.util.internal.ObjectPool;
27 import io.netty.util.internal.ObjectPool.Handle;
28 import io.netty.util.internal.ObjectPool.ObjectCreator;
29 import io.netty.util.internal.ObjectUtil;
30 import io.netty.util.internal.PromiseNotificationUtil;
31 import io.netty.util.internal.StringUtil;
32 import io.netty.util.internal.SystemPropertyUtil;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.net.SocketAddress;
37 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
38
39 import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
40 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
41 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
42 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
43 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
44 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
45 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
46 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
47 import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
48 import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
49 import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
50 import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
51 import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
52 import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
53 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
54 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
55 import static io.netty.channel.ChannelHandlerMask.MASK_READ;
56 import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
57 import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
58 import static io.netty.channel.ChannelHandlerMask.mask;
59
60 abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
61
62 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
63 volatile AbstractChannelHandlerContext next;
64 volatile AbstractChannelHandlerContext prev;
65
66 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
67 AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
68
69
70
71
72 private static final int ADD_PENDING = 1;
73
74
75
76 private static final int ADD_COMPLETE = 2;
77
78
79
80 private static final int REMOVE_COMPLETE = 3;
81
82
83
84
85 private static final int INIT = 0;
86
87 private final DefaultChannelPipeline pipeline;
88 private final String name;
89 private final boolean ordered;
90 private final int executionMask;
91
92
93
94 final EventExecutor childExecutor;
95
96
97
98
99 EventExecutor contextExecutor;
100 private ChannelFuture succeededFuture;
101
102
103
104 private Tasks invokeTasks;
105
106 private volatile int handlerState = INIT;
107
108 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
109 String name, Class<? extends ChannelHandler> handlerClass) {
110 this.name = ObjectUtil.checkNotNull(name, "name");
111 this.pipeline = pipeline;
112 childExecutor = executor;
113 executionMask = mask(handlerClass);
114
115 ordered = executor == null || executor instanceof OrderedEventExecutor;
116 }
117
118 @Override
119 public Channel channel() {
120 return pipeline.channel();
121 }
122
123 @Override
124 public ChannelPipeline pipeline() {
125 return pipeline;
126 }
127
128 @Override
129 public ByteBufAllocator alloc() {
130 return channel().config().getAllocator();
131 }
132
133 @Override
134 public EventExecutor executor() {
135 EventExecutor ex = contextExecutor;
136 if (ex == null) {
137 contextExecutor = ex = childExecutor != null ? childExecutor : channel().eventLoop();
138 }
139 return ex;
140 }
141
142 @Override
143 public String name() {
144 return name;
145 }
146
147 @Override
148 public ChannelHandlerContext fireChannelRegistered() {
149 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
150 if (next.executor().inEventLoop()) {
151 if (next.invokeHandler()) {
152 try {
153
154
155
156 final ChannelHandler handler = next.handler();
157 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
158 if (handler == headContext) {
159 headContext.channelRegistered(next);
160 } else if (handler instanceof ChannelInboundHandlerAdapter) {
161 ((ChannelInboundHandlerAdapter) handler).channelRegistered(next);
162 } else {
163 ((ChannelInboundHandler) handler).channelRegistered(next);
164 }
165 } catch (Throwable t) {
166 next.invokeExceptionCaught(t);
167 }
168 } else {
169 next.fireChannelRegistered();
170 }
171 } else {
172 next.executor().execute(this::fireChannelRegistered);
173 }
174 return this;
175 }
176
177 @Override
178 public ChannelHandlerContext fireChannelUnregistered() {
179 final AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED);
180 if (next.executor().inEventLoop()) {
181 if (next.invokeHandler()) {
182 try {
183
184
185
186 final ChannelHandler handler = next.handler();
187 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
188 if (handler == headContext) {
189 headContext.channelUnregistered(next);
190 } else if (handler instanceof ChannelInboundHandlerAdapter) {
191 ((ChannelInboundHandlerAdapter) handler).channelUnregistered(next);
192 } else {
193 ((ChannelInboundHandler) handler).channelUnregistered(next);
194 }
195 } catch (Throwable t) {
196 next.invokeExceptionCaught(t);
197 }
198 } else {
199 next.fireChannelUnregistered();
200 }
201 } else {
202 next.executor().execute(this::fireChannelUnregistered);
203 }
204 return this;
205 }
206
207 @Override
208 public ChannelHandlerContext fireChannelActive() {
209 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE);
210 if (next.executor().inEventLoop()) {
211 if (next.invokeHandler()) {
212 try {
213
214
215
216 final ChannelHandler handler = next.handler();
217 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
218 if (handler == headContext) {
219 headContext.channelActive(next);
220 } else if (handler instanceof ChannelInboundHandlerAdapter) {
221 ((ChannelInboundHandlerAdapter) handler).channelActive(next);
222 } else {
223 ((ChannelInboundHandler) handler).channelActive(next);
224 }
225 } catch (Throwable t) {
226 next.invokeExceptionCaught(t);
227 }
228 } else {
229 next.fireChannelActive();
230 }
231 } else {
232 next.executor().execute(this::fireChannelActive);
233 }
234 return this;
235 }
236
237 @Override
238 public ChannelHandlerContext fireChannelInactive() {
239 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE);
240 if (next.executor().inEventLoop()) {
241 if (next.invokeHandler()) {
242 try {
243
244
245
246 final ChannelHandler handler = next.handler();
247 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
248 if (handler == headContext) {
249 headContext.channelInactive(next);
250 } else if (handler instanceof ChannelInboundHandlerAdapter) {
251 ((ChannelInboundHandlerAdapter) handler).channelInactive(next);
252 } else {
253 ((ChannelInboundHandler) handler).channelInactive(next);
254 }
255 } catch (Throwable t) {
256 next.invokeExceptionCaught(t);
257 }
258 } else {
259 next.fireChannelInactive();
260 }
261 } else {
262 next.executor().execute(this::fireChannelInactive);
263 }
264 return this;
265 }
266
267 @Override
268 public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
269 AbstractChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT);
270 ObjectUtil.checkNotNull(cause, "cause");
271 if (next.executor().inEventLoop()) {
272 next.invokeExceptionCaught(cause);
273 } else {
274 try {
275 next.executor().execute(() -> next.invokeExceptionCaught(cause));
276 } catch (Throwable t) {
277 if (logger.isWarnEnabled()) {
278 logger.warn("Failed to submit an exceptionCaught() event.", t);
279 logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
280 }
281 }
282 }
283 return this;
284 }
285
286 @SuppressWarnings("deprecation")
287 private void invokeExceptionCaught(final Throwable cause) {
288 if (invokeHandler()) {
289 try {
290 handler().exceptionCaught(this, cause);
291 } catch (Throwable error) {
292 if (logger.isDebugEnabled()) {
293 logger.debug(
294 "An exception " +
295 "was thrown by a user handler's exceptionCaught() " +
296 "method while handling the following exception:", cause);
297 } else if (logger.isWarnEnabled()) {
298 logger.warn(
299 "An exception '{}' [enable DEBUG level for full stacktrace] " +
300 "was thrown by a user handler's exceptionCaught() " +
301 "method while handling the following exception:", error, cause);
302 }
303 }
304 } else {
305 fireExceptionCaught(cause);
306 }
307 }
308
309 @Override
310 public ChannelHandlerContext fireUserEventTriggered(final Object event) {
311 ObjectUtil.checkNotNull(event, "event");
312 AbstractChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED);
313 if (next.executor().inEventLoop()) {
314 if (next.invokeHandler()) {
315 try {
316
317
318
319 final ChannelHandler handler = next.handler();
320 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
321 if (handler == headContext) {
322 headContext.userEventTriggered(next, event);
323 } else if (handler instanceof ChannelInboundHandlerAdapter) {
324 ((ChannelInboundHandlerAdapter) handler).userEventTriggered(next, event);
325 } else {
326 ((ChannelInboundHandler) handler).userEventTriggered(next, event);
327 }
328 } catch (Throwable t) {
329 next.invokeExceptionCaught(t);
330 }
331 } else {
332 next.fireUserEventTriggered(event);
333 }
334 } else {
335 next.executor().execute(() -> fireUserEventTriggered(event));
336 }
337 return this;
338 }
339
340 @Override
341 public ChannelHandlerContext fireChannelRead(final Object msg) {
342 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
343 if (next.executor().inEventLoop()) {
344 final Object m = pipeline.touch(msg, next);
345 if (next.invokeHandler()) {
346 try {
347
348
349
350 final ChannelHandler handler = next.handler();
351 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
352 if (handler == headContext) {
353 headContext.channelRead(next, m);
354 } else if (handler instanceof ChannelDuplexHandler) {
355 ((ChannelDuplexHandler) handler).channelRead(next, m);
356 } else {
357 ((ChannelInboundHandler) handler).channelRead(next, m);
358 }
359 } catch (Throwable t) {
360 next.invokeExceptionCaught(t);
361 }
362 } else {
363 next.fireChannelRead(m);
364 }
365 } else {
366 next.executor().execute(() -> fireChannelRead(msg));
367 }
368 return this;
369 }
370
371 @Override
372 public ChannelHandlerContext fireChannelReadComplete() {
373 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
374 if (next.executor().inEventLoop()) {
375 if (next.invokeHandler()) {
376 try {
377
378
379
380 final ChannelHandler handler = next.handler();
381 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
382 if (handler == headContext) {
383 headContext.channelReadComplete(next);
384 } else if (handler instanceof ChannelDuplexHandler) {
385 ((ChannelDuplexHandler) handler).channelReadComplete(next);
386 } else {
387 ((ChannelInboundHandler) handler).channelReadComplete(next);
388 }
389 } catch (Throwable t) {
390 next.invokeExceptionCaught(t);
391 }
392 } else {
393 next.fireChannelReadComplete();
394 }
395 } else {
396 next.executor().execute(getInvokeTasks().invokeChannelReadCompleteTask);
397 }
398 return this;
399 }
400
401 @Override
402 public ChannelHandlerContext fireChannelWritabilityChanged() {
403 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
404 if (next.executor().inEventLoop()) {
405 if (next.invokeHandler()) {
406 try {
407
408
409
410 final ChannelHandler handler = next.handler();
411 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
412 if (handler == headContext) {
413 headContext.channelWritabilityChanged(next);
414 } else if (handler instanceof ChannelInboundHandlerAdapter) {
415 ((ChannelInboundHandlerAdapter) handler).channelWritabilityChanged(next);
416 } else {
417 ((ChannelInboundHandler) handler).channelWritabilityChanged(next);
418 }
419 } catch (Throwable t) {
420 next.invokeExceptionCaught(t);
421 }
422 } else {
423 next.fireChannelWritabilityChanged();
424 }
425 } else {
426 next.executor().execute(getInvokeTasks().invokeChannelWritableStateChangedTask);
427 }
428 return this;
429 }
430
431 @Override
432 public ChannelFuture bind(SocketAddress localAddress) {
433 return bind(localAddress, newPromise());
434 }
435
436 @Override
437 public ChannelFuture connect(SocketAddress remoteAddress) {
438 return connect(remoteAddress, newPromise());
439 }
440
441 @Override
442 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
443 return connect(remoteAddress, localAddress, newPromise());
444 }
445
446 @Override
447 public ChannelFuture disconnect() {
448 return disconnect(newPromise());
449 }
450
451 @Override
452 public ChannelFuture close() {
453 return close(newPromise());
454 }
455
456 @Override
457 public ChannelFuture deregister() {
458 return deregister(newPromise());
459 }
460
461 @Override
462 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
463 ObjectUtil.checkNotNull(localAddress, "localAddress");
464 if (isNotValidPromise(promise, false)) {
465
466 return promise;
467 }
468
469 final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
470 EventExecutor executor = next.executor();
471 if (executor.inEventLoop()) {
472 next.invokeBind(localAddress, promise);
473 } else {
474 safeExecute(executor, new Runnable() {
475 @Override
476 public void run() {
477 next.invokeBind(localAddress, promise);
478 }
479 }, promise, null, false);
480 }
481 return promise;
482 }
483
484 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
485 if (invokeHandler()) {
486 try {
487
488
489
490 final ChannelHandler handler = handler();
491 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
492 if (handler == headContext) {
493 headContext.bind(this, localAddress, promise);
494 } else if (handler instanceof ChannelDuplexHandler) {
495 ((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
496 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
497 ((ChannelOutboundHandlerAdapter) handler).bind(this, localAddress, promise);
498 } else {
499 ((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
500 }
501 } catch (Throwable t) {
502 notifyOutboundHandlerException(t, promise);
503 }
504 } else {
505 bind(localAddress, promise);
506 }
507 }
508
509 @Override
510 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
511 return connect(remoteAddress, null, promise);
512 }
513
514 @Override
515 public ChannelFuture connect(
516 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
517 ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
518
519 if (isNotValidPromise(promise, false)) {
520
521 return promise;
522 }
523
524 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
525 EventExecutor executor = next.executor();
526 if (executor.inEventLoop()) {
527 next.invokeConnect(remoteAddress, localAddress, promise);
528 } else {
529 safeExecute(executor, new Runnable() {
530 @Override
531 public void run() {
532 next.invokeConnect(remoteAddress, localAddress, promise);
533 }
534 }, promise, null, false);
535 }
536 return promise;
537 }
538
539 private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
540 if (invokeHandler()) {
541 try {
542
543
544
545 final ChannelHandler handler = handler();
546 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
547 if (handler == headContext) {
548 headContext.connect(this, remoteAddress, localAddress, promise);
549 } else if (handler instanceof ChannelDuplexHandler) {
550 ((ChannelDuplexHandler) handler).connect(this, remoteAddress, localAddress, promise);
551 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
552 ((ChannelOutboundHandlerAdapter) handler).connect(this, remoteAddress, localAddress, promise);
553 } else {
554 ((ChannelOutboundHandler) handler).connect(this, remoteAddress, localAddress, promise);
555 }
556 } catch (Throwable t) {
557 notifyOutboundHandlerException(t, promise);
558 }
559 } else {
560 connect(remoteAddress, localAddress, promise);
561 }
562 }
563
564 @Override
565 public ChannelFuture disconnect(final ChannelPromise promise) {
566 if (!channel().metadata().hasDisconnect()) {
567
568
569 return close(promise);
570 }
571 if (isNotValidPromise(promise, false)) {
572
573 return promise;
574 }
575
576 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
577 EventExecutor executor = next.executor();
578 if (executor.inEventLoop()) {
579 next.invokeDisconnect(promise);
580 } else {
581 safeExecute(executor, new Runnable() {
582 @Override
583 public void run() {
584 next.invokeDisconnect(promise);
585 }
586 }, promise, null, false);
587 }
588 return promise;
589 }
590
591 private void invokeDisconnect(ChannelPromise promise) {
592 if (invokeHandler()) {
593 try {
594
595
596
597 final ChannelHandler handler = handler();
598 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
599 if (handler == headContext) {
600 headContext.disconnect(this, promise);
601 } else if (handler instanceof ChannelDuplexHandler) {
602 ((ChannelDuplexHandler) handler).disconnect(this, promise);
603 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
604 ((ChannelOutboundHandlerAdapter) handler).disconnect(this, promise);
605 } else {
606 ((ChannelOutboundHandler) handler).disconnect(this, promise);
607 }
608 } catch (Throwable t) {
609 notifyOutboundHandlerException(t, promise);
610 }
611 } else {
612 disconnect(promise);
613 }
614 }
615
616 @Override
617 public ChannelFuture close(final ChannelPromise promise) {
618 if (isNotValidPromise(promise, false)) {
619
620 return promise;
621 }
622
623 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
624 EventExecutor executor = next.executor();
625 if (executor.inEventLoop()) {
626 next.invokeClose(promise);
627 } else {
628 safeExecute(executor, new Runnable() {
629 @Override
630 public void run() {
631 next.invokeClose(promise);
632 }
633 }, promise, null, false);
634 }
635
636 return promise;
637 }
638
639 private void invokeClose(ChannelPromise promise) {
640 if (invokeHandler()) {
641 try {
642
643
644
645 final ChannelHandler handler = handler();
646 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
647 if (handler == headContext) {
648 headContext.close(this, promise);
649 } else if (handler instanceof ChannelDuplexHandler) {
650 ((ChannelDuplexHandler) handler).close(this, promise);
651 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
652 ((ChannelOutboundHandlerAdapter) handler).close(this, promise);
653 } else {
654 ((ChannelOutboundHandler) handler).close(this, promise);
655 }
656 } catch (Throwable t) {
657 notifyOutboundHandlerException(t, promise);
658 }
659 } else {
660 close(promise);
661 }
662 }
663
664 @Override
665 public ChannelFuture deregister(final ChannelPromise promise) {
666 if (isNotValidPromise(promise, false)) {
667
668 return promise;
669 }
670
671 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
672 EventExecutor executor = next.executor();
673 if (executor.inEventLoop()) {
674 next.invokeDeregister(promise);
675 } else {
676 safeExecute(executor, new Runnable() {
677 @Override
678 public void run() {
679 next.invokeDeregister(promise);
680 }
681 }, promise, null, false);
682 }
683
684 return promise;
685 }
686
687 private void invokeDeregister(ChannelPromise promise) {
688 if (invokeHandler()) {
689 try {
690
691
692
693 final ChannelHandler handler = handler();
694 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
695 if (handler == headContext) {
696 headContext.deregister(this, promise);
697 } else if (handler instanceof ChannelDuplexHandler) {
698 ((ChannelDuplexHandler) handler).deregister(this, promise);
699 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
700 ((ChannelOutboundHandlerAdapter) handler).deregister(this, promise);
701 } else {
702 ((ChannelOutboundHandler) handler).deregister(this, promise);
703 }
704 } catch (Throwable t) {
705 notifyOutboundHandlerException(t, promise);
706 }
707 } else {
708 deregister(promise);
709 }
710 }
711
712 @Override
713 public ChannelHandlerContext read() {
714 final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
715 if (next.executor().inEventLoop()) {
716 if (next.invokeHandler()) {
717 try {
718
719
720
721 final ChannelHandler handler = next.handler();
722 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
723 if (handler == headContext) {
724 headContext.read(next);
725 } else if (handler instanceof ChannelDuplexHandler) {
726 ((ChannelDuplexHandler) handler).read(next);
727 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
728 ((ChannelOutboundHandlerAdapter) handler).read(next);
729 } else {
730 ((ChannelOutboundHandler) handler).read(next);
731 }
732 } catch (Throwable t) {
733 invokeExceptionCaught(t);
734 }
735 } else {
736 next.read();
737 }
738 } else {
739 next.executor().execute(getInvokeTasks().invokeReadTask);
740 }
741 return this;
742 }
743
744 @Override
745 public ChannelFuture write(Object msg) {
746 ChannelPromise promise = newPromise();
747 write(msg, false, promise);
748 return promise;
749 }
750
751 @Override
752 public ChannelFuture write(final Object msg, final ChannelPromise promise) {
753 write(msg, false, promise);
754 return promise;
755 }
756
757 @Override
758 public ChannelHandlerContext flush() {
759 final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
760 EventExecutor executor = next.executor();
761 if (executor.inEventLoop()) {
762 next.invokeFlush();
763 } else {
764 Tasks tasks = next.invokeTasks;
765 if (tasks == null) {
766 next.invokeTasks = tasks = new Tasks(next);
767 }
768 safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
769 }
770
771 return this;
772 }
773
774 private void invokeFlush() {
775 if (invokeHandler()) {
776 invokeFlush0();
777 } else {
778 flush();
779 }
780 }
781
782 private void invokeFlush0() {
783 try {
784
785
786
787 final ChannelHandler handler = handler();
788 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
789 if (handler == headContext) {
790 headContext.flush(this);
791 } else if (handler instanceof ChannelDuplexHandler) {
792 ((ChannelDuplexHandler) handler).flush(this);
793 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
794 ((ChannelOutboundHandlerAdapter) handler).flush(this);
795 } else {
796 ((ChannelOutboundHandler) handler).flush(this);
797 }
798 } catch (Throwable t) {
799 invokeExceptionCaught(t);
800 }
801 }
802
803 @Override
804 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
805 write(msg, true, promise);
806 return promise;
807 }
808
809 void write(Object msg, boolean flush, ChannelPromise promise) {
810 if (validateWrite(msg, promise)) {
811 final AbstractChannelHandlerContext next = findContextOutbound(flush ?
812 MASK_WRITE | MASK_FLUSH : MASK_WRITE);
813 final Object m = pipeline.touch(msg, next);
814 EventExecutor executor = next.executor();
815 if (executor.inEventLoop()) {
816 if (next.invokeHandler()) {
817 try {
818
819
820
821 final ChannelHandler handler = next.handler();
822 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
823 if (handler == headContext) {
824 headContext.write(next, msg, promise);
825 } else if (handler instanceof ChannelDuplexHandler) {
826 ((ChannelDuplexHandler) handler).write(next, msg, promise);
827 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
828 ((ChannelOutboundHandlerAdapter) handler).write(next, msg, promise);
829 } else {
830 ((ChannelOutboundHandler) handler).write(next, msg, promise);
831 }
832 } catch (Throwable t) {
833 notifyOutboundHandlerException(t, promise);
834 }
835 if (flush) {
836 next.invokeFlush0();
837 }
838 } else {
839 next.write(msg, flush, promise);
840 }
841 } else {
842 final WriteTask task = WriteTask.newInstance(this, m, promise, flush);
843 if (!safeExecute(executor, task, promise, m, !flush)) {
844
845
846
847
848 task.cancel();
849 }
850 }
851 }
852 }
853
854 private boolean validateWrite(Object msg, ChannelPromise promise) {
855 ObjectUtil.checkNotNull(msg, "msg");
856 try {
857 if (isNotValidPromise(promise, true)) {
858 ReferenceCountUtil.release(msg);
859 return false;
860 }
861 } catch (RuntimeException e) {
862 ReferenceCountUtil.release(msg);
863 throw e;
864 }
865 return true;
866 }
867
868 @Override
869 public ChannelFuture writeAndFlush(Object msg) {
870 return writeAndFlush(msg, newPromise());
871 }
872
873 private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
874
875
876 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
877 }
878
879 @Override
880 public ChannelPromise newPromise() {
881 return new DefaultChannelPromise(channel(), executor());
882 }
883
884 @Override
885 public ChannelProgressivePromise newProgressivePromise() {
886 return new DefaultChannelProgressivePromise(channel(), executor());
887 }
888
889 @Override
890 public ChannelFuture newSucceededFuture() {
891 ChannelFuture succeededFuture = this.succeededFuture;
892 if (succeededFuture == null) {
893 this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
894 }
895 return succeededFuture;
896 }
897
898 @Override
899 public ChannelFuture newFailedFuture(Throwable cause) {
900 return new FailedChannelFuture(channel(), executor(), cause);
901 }
902
903 private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
904 ObjectUtil.checkNotNull(promise, "promise");
905
906 if (promise.isDone()) {
907
908
909
910
911 if (promise.isCancelled()) {
912 return true;
913 }
914 throw new IllegalArgumentException("promise already done: " + promise);
915 }
916
917 if (promise.channel() != channel()) {
918 throw new IllegalArgumentException(String.format(
919 "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
920 }
921
922 if (promise.getClass() == DefaultChannelPromise.class) {
923 return false;
924 }
925
926 if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
927 throw new IllegalArgumentException(
928 StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
929 }
930
931 if (promise instanceof AbstractChannel.CloseFuture) {
932 throw new IllegalArgumentException(
933 StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
934 }
935 return false;
936 }
937
938 private AbstractChannelHandlerContext findContextInbound(int mask) {
939 AbstractChannelHandlerContext ctx = this;
940 EventExecutor currentExecutor = executor();
941 do {
942 ctx = ctx.next;
943 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
944 return ctx;
945 }
946
947 private AbstractChannelHandlerContext findContextOutbound(int mask) {
948 AbstractChannelHandlerContext ctx = this;
949 EventExecutor currentExecutor = executor();
950 do {
951 ctx = ctx.prev;
952 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
953 return ctx;
954 }
955
956 private static boolean skipContext(
957 AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
958
959 return (ctx.executionMask & (onlyMask | mask)) == 0 ||
960
961
962
963
964 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
965 }
966
967 @Override
968 public ChannelPromise voidPromise() {
969 return channel().voidPromise();
970 }
971
972 final void setRemoved() {
973 handlerState = REMOVE_COMPLETE;
974 }
975
976 final boolean setAddComplete() {
977 for (;;) {
978 int oldState = handlerState;
979 if (oldState == REMOVE_COMPLETE) {
980 return false;
981 }
982
983
984
985 if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
986 return true;
987 }
988 }
989 }
990
991 final void setAddPending() {
992 boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
993 assert updated;
994 }
995
996 final void callHandlerAdded() throws Exception {
997
998
999 if (setAddComplete()) {
1000 handler().handlerAdded(this);
1001 }
1002 }
1003
1004 final void callHandlerRemoved() throws Exception {
1005 try {
1006
1007 if (handlerState == ADD_COMPLETE) {
1008 handler().handlerRemoved(this);
1009 }
1010 } finally {
1011
1012 setRemoved();
1013 }
1014 }
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024 boolean invokeHandler() {
1025
1026 int handlerState = this.handlerState;
1027 return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
1028 }
1029
1030 @Override
1031 public boolean isRemoved() {
1032 return handlerState == REMOVE_COMPLETE;
1033 }
1034
1035 @Override
1036 public <T> Attribute<T> attr(AttributeKey<T> key) {
1037 return channel().attr(key);
1038 }
1039
1040 @Override
1041 public <T> boolean hasAttr(AttributeKey<T> key) {
1042 return channel().hasAttr(key);
1043 }
1044
1045 private static boolean safeExecute(EventExecutor executor, Runnable runnable,
1046 ChannelPromise promise, Object msg, boolean lazy) {
1047 try {
1048 if (lazy && executor instanceof AbstractEventExecutor) {
1049 ((AbstractEventExecutor) executor).lazyExecute(runnable);
1050 } else {
1051 executor.execute(runnable);
1052 }
1053 return true;
1054 } catch (Throwable cause) {
1055 try {
1056 if (msg != null) {
1057 ReferenceCountUtil.release(msg);
1058 }
1059 } finally {
1060 promise.setFailure(cause);
1061 }
1062 return false;
1063 }
1064 }
1065
1066 @Override
1067 public String toHintString() {
1068 return '\'' + name + "' will handle the message from this point.";
1069 }
1070
1071 @Override
1072 public String toString() {
1073 return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1074 }
1075
1076 Tasks getInvokeTasks() {
1077 Tasks tasks = invokeTasks;
1078 if (tasks == null) {
1079 invokeTasks = tasks = new Tasks(this);
1080 }
1081 return tasks;
1082 }
1083
1084 static final class WriteTask implements Runnable {
1085 private static final ObjectPool<WriteTask> RECYCLER = ObjectPool.newPool(new ObjectCreator<WriteTask>() {
1086 @Override
1087 public WriteTask newObject(Handle<WriteTask> handle) {
1088 return new WriteTask(handle);
1089 }
1090 });
1091
1092 static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1093 Object msg, ChannelPromise promise, boolean flush) {
1094 WriteTask task = RECYCLER.get();
1095 init(task, ctx, msg, promise, flush);
1096 return task;
1097 }
1098
1099 private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1100 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1101
1102
1103 private static final int WRITE_TASK_OVERHEAD =
1104 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1105
1106 private final Handle<WriteTask> handle;
1107 private AbstractChannelHandlerContext ctx;
1108 private Object msg;
1109 private ChannelPromise promise;
1110 private int size;
1111
1112 private WriteTask(Handle<WriteTask> handle) {
1113 this.handle = handle;
1114 }
1115
1116 static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1117 Object msg, ChannelPromise promise, boolean flush) {
1118 task.ctx = ctx;
1119 task.msg = msg;
1120 task.promise = promise;
1121
1122 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1123 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1124 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1125 } else {
1126 task.size = 0;
1127 }
1128 if (flush) {
1129 task.size |= Integer.MIN_VALUE;
1130 }
1131 }
1132
1133 @Override
1134 public void run() {
1135 try {
1136 decrementPendingOutboundBytes();
1137 ctx.write(msg, size < 0, promise);
1138 } finally {
1139 recycle();
1140 }
1141 }
1142
1143 void cancel() {
1144 try {
1145 decrementPendingOutboundBytes();
1146 } finally {
1147 recycle();
1148 }
1149 }
1150
1151 private void decrementPendingOutboundBytes() {
1152 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1153 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1154 }
1155 }
1156
1157 private void recycle() {
1158
1159 ctx = null;
1160 msg = null;
1161 promise = null;
1162 handle.recycle(this);
1163 }
1164 }
1165
1166 static final class Tasks {
1167 final Runnable invokeChannelReadCompleteTask;
1168 private final Runnable invokeReadTask;
1169 private final Runnable invokeChannelWritableStateChangedTask;
1170 private final Runnable invokeFlushTask;
1171
1172 Tasks(AbstractChannelHandlerContext ctx) {
1173 invokeChannelReadCompleteTask = ctx::fireChannelReadComplete;
1174 invokeReadTask = ctx::read;
1175 invokeChannelWritableStateChangedTask = ctx::fireChannelWritabilityChanged;
1176 invokeFlushTask = ctx::invokeFlush;
1177 }
1178 }
1179 }