1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.Attribute;
20 import io.netty.util.AttributeKey;
21 import io.netty.util.Recycler;
22 import io.netty.util.ReferenceCountUtil;
23 import io.netty.util.ResourceLeakHint;
24 import io.netty.util.concurrent.AbstractEventExecutor;
25 import io.netty.util.concurrent.EventExecutor;
26 import io.netty.util.concurrent.OrderedEventExecutor;
27 import io.netty.util.concurrent.PromiseNotifier;
28 import io.netty.util.internal.ObjectPool.Handle;
29 import io.netty.util.internal.ObjectUtil;
30 import io.netty.util.internal.PromiseNotificationUtil;
31 import io.netty.util.internal.StringUtil;
32 import io.netty.util.internal.SystemPropertyUtil;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.net.SocketAddress;
37 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
38
39 import static io.netty.channel.ChannelHandlerMask.MASK_BIND;
40 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_ACTIVE;
41 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_INACTIVE;
42 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ;
43 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_READ_COMPLETE;
44 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_REGISTERED;
45 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_UNREGISTERED;
46 import static io.netty.channel.ChannelHandlerMask.MASK_CHANNEL_WRITABILITY_CHANGED;
47 import static io.netty.channel.ChannelHandlerMask.MASK_CLOSE;
48 import static io.netty.channel.ChannelHandlerMask.MASK_CONNECT;
49 import static io.netty.channel.ChannelHandlerMask.MASK_DEREGISTER;
50 import static io.netty.channel.ChannelHandlerMask.MASK_DISCONNECT;
51 import static io.netty.channel.ChannelHandlerMask.MASK_EXCEPTION_CAUGHT;
52 import static io.netty.channel.ChannelHandlerMask.MASK_FLUSH;
53 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_INBOUND;
54 import static io.netty.channel.ChannelHandlerMask.MASK_ONLY_OUTBOUND;
55 import static io.netty.channel.ChannelHandlerMask.MASK_READ;
56 import static io.netty.channel.ChannelHandlerMask.MASK_USER_EVENT_TRIGGERED;
57 import static io.netty.channel.ChannelHandlerMask.MASK_WRITE;
58 import static io.netty.channel.ChannelHandlerMask.mask;
59
60 abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
61
62 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
63 volatile AbstractChannelHandlerContext next;
64 volatile AbstractChannelHandlerContext prev;
65
66 private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
67 AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
68
69
70
71
72 private static final int ADD_PENDING = 1;
73
74
75
76 private static final int ADD_COMPLETE = 2;
77
78
79
80 private static final int REMOVE_COMPLETE = 3;
81
82
83
84
85 private static final int INIT = 0;
86
87 private final DefaultChannelPipeline pipeline;
88 private final String name;
89 private final boolean ordered;
90 private final int executionMask;
91
92
93
94 final EventExecutor childExecutor;
95
96
97
98
99 EventExecutor contextExecutor;
100 private ChannelFuture succeededFuture;
101
102
103
104 private Tasks invokeTasks;
105
106 private volatile int handlerState = INIT;
107
108 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
109 String name, Class<? extends ChannelHandler> handlerClass) {
110 this.name = ObjectUtil.checkNotNull(name, "name");
111 this.pipeline = pipeline;
112 childExecutor = executor;
113 executionMask = mask(handlerClass);
114
115 ordered = executor == null || executor instanceof OrderedEventExecutor;
116 }
117
118 @Override
119 public Channel channel() {
120 return pipeline.channel();
121 }
122
123 @Override
124 public ChannelPipeline pipeline() {
125 return pipeline;
126 }
127
128 @Override
129 public ByteBufAllocator alloc() {
130 return channel().config().getAllocator();
131 }
132
133 @Override
134 public EventExecutor executor() {
135 EventExecutor ex = contextExecutor;
136 if (ex == null) {
137 contextExecutor = ex = childExecutor != null ? childExecutor : channel().eventLoop();
138 }
139 return ex;
140 }
141
142 @Override
143 public String name() {
144 return name;
145 }
146
147 @Override
148 public ChannelHandlerContext fireChannelRegistered() {
149 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_REGISTERED);
150 if (next.executor().inEventLoop()) {
151 if (next.invokeHandler()) {
152 try {
153
154
155
156 final ChannelHandler handler = next.handler();
157 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
158 if (handler == headContext) {
159 headContext.channelRegistered(next);
160 } else if (handler instanceof ChannelInboundHandlerAdapter) {
161 ((ChannelInboundHandlerAdapter) handler).channelRegistered(next);
162 } else {
163 ((ChannelInboundHandler) handler).channelRegistered(next);
164 }
165 } catch (Throwable t) {
166 next.invokeExceptionCaught(t);
167 }
168 } else {
169 next.fireChannelRegistered();
170 }
171 } else {
172 next.executor().execute(this::fireChannelRegistered);
173 }
174 return this;
175 }
176
177 @Override
178 public ChannelHandlerContext fireChannelUnregistered() {
179 final AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_UNREGISTERED);
180 if (next.executor().inEventLoop()) {
181 if (next.invokeHandler()) {
182 try {
183
184
185
186 final ChannelHandler handler = next.handler();
187 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
188 if (handler == headContext) {
189 headContext.channelUnregistered(next);
190 } else if (handler instanceof ChannelInboundHandlerAdapter) {
191 ((ChannelInboundHandlerAdapter) handler).channelUnregistered(next);
192 } else {
193 ((ChannelInboundHandler) handler).channelUnregistered(next);
194 }
195 } catch (Throwable t) {
196 next.invokeExceptionCaught(t);
197 }
198 } else {
199 next.fireChannelUnregistered();
200 }
201 } else {
202 next.executor().execute(this::fireChannelUnregistered);
203 }
204 return this;
205 }
206
207 @Override
208 public ChannelHandlerContext fireChannelActive() {
209 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE);
210 if (next.executor().inEventLoop()) {
211 if (next.invokeHandler()) {
212 try {
213
214
215
216 final ChannelHandler handler = next.handler();
217 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
218 if (handler == headContext) {
219 headContext.channelActive(next);
220 } else if (handler instanceof ChannelInboundHandlerAdapter) {
221 ((ChannelInboundHandlerAdapter) handler).channelActive(next);
222 } else {
223 ((ChannelInboundHandler) handler).channelActive(next);
224 }
225 } catch (Throwable t) {
226 next.invokeExceptionCaught(t);
227 }
228 } else {
229 next.fireChannelActive();
230 }
231 } else {
232 next.executor().execute(this::fireChannelActive);
233 }
234 return this;
235 }
236
237 @Override
238 public ChannelHandlerContext fireChannelInactive() {
239 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_INACTIVE);
240 if (next.executor().inEventLoop()) {
241 if (next.invokeHandler()) {
242 try {
243
244
245
246 final ChannelHandler handler = next.handler();
247 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
248 if (handler == headContext) {
249 headContext.channelInactive(next);
250 } else if (handler instanceof ChannelInboundHandlerAdapter) {
251 ((ChannelInboundHandlerAdapter) handler).channelInactive(next);
252 } else {
253 ((ChannelInboundHandler) handler).channelInactive(next);
254 }
255 } catch (Throwable t) {
256 next.invokeExceptionCaught(t);
257 }
258 } else {
259 next.fireChannelInactive();
260 }
261 } else {
262 next.executor().execute(this::fireChannelInactive);
263 }
264 return this;
265 }
266
267 @Override
268 public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
269 AbstractChannelHandlerContext next = findContextInbound(MASK_EXCEPTION_CAUGHT);
270 ObjectUtil.checkNotNull(cause, "cause");
271 if (next.executor().inEventLoop()) {
272 next.invokeExceptionCaught(cause);
273 } else {
274 try {
275 next.executor().execute(() -> next.invokeExceptionCaught(cause));
276 } catch (Throwable t) {
277 if (logger.isWarnEnabled()) {
278 logger.warn("Failed to submit an exceptionCaught() event.", t);
279 logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
280 }
281 }
282 }
283 return this;
284 }
285
286 @SuppressWarnings("deprecation")
287 private void invokeExceptionCaught(final Throwable cause) {
288 if (invokeHandler()) {
289 try {
290 handler().exceptionCaught(this, cause);
291 } catch (Throwable error) {
292 if (logger.isDebugEnabled()) {
293 logger.debug(
294 "An exception " +
295 "was thrown by a user handler's exceptionCaught() " +
296 "method while handling the following exception:", cause);
297 } else if (logger.isWarnEnabled()) {
298 logger.warn(
299 "An exception '{}' [enable DEBUG level for full stacktrace] " +
300 "was thrown by a user handler's exceptionCaught() " +
301 "method while handling the following exception:", error, cause);
302 }
303 }
304 } else {
305 fireExceptionCaught(cause);
306 }
307 }
308
309 @Override
310 public ChannelHandlerContext fireUserEventTriggered(final Object event) {
311 ObjectUtil.checkNotNull(event, "event");
312 AbstractChannelHandlerContext next = findContextInbound(MASK_USER_EVENT_TRIGGERED);
313 if (next.executor().inEventLoop()) {
314 if (next.invokeHandler()) {
315 try {
316
317
318
319 final ChannelHandler handler = next.handler();
320 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
321 if (handler == headContext) {
322 headContext.userEventTriggered(next, event);
323 } else if (handler instanceof ChannelInboundHandlerAdapter) {
324 ((ChannelInboundHandlerAdapter) handler).userEventTriggered(next, event);
325 } else {
326 ((ChannelInboundHandler) handler).userEventTriggered(next, event);
327 }
328 } catch (Throwable t) {
329 next.invokeExceptionCaught(t);
330 }
331 } else {
332 next.fireUserEventTriggered(event);
333 }
334 } else {
335 next.executor().execute(() -> fireUserEventTriggered(event));
336 }
337 return this;
338 }
339
340 @Override
341 public ChannelHandlerContext fireChannelRead(final Object msg) {
342 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ);
343 if (next.executor().inEventLoop()) {
344 final Object m = pipeline.touch(msg, next);
345 if (next.invokeHandler()) {
346 try {
347
348
349
350 final ChannelHandler handler = next.handler();
351 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
352 if (handler == headContext) {
353 headContext.channelRead(next, m);
354 } else if (handler instanceof ChannelDuplexHandler) {
355 ((ChannelDuplexHandler) handler).channelRead(next, m);
356 } else {
357 ((ChannelInboundHandler) handler).channelRead(next, m);
358 }
359 } catch (Throwable t) {
360 next.invokeExceptionCaught(t);
361 }
362 } else {
363 next.fireChannelRead(m);
364 }
365 } else {
366 next.executor().execute(() -> fireChannelRead(msg));
367 }
368 return this;
369 }
370
371 @Override
372 public ChannelHandlerContext fireChannelReadComplete() {
373 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_READ_COMPLETE);
374 if (next.executor().inEventLoop()) {
375 if (next.invokeHandler()) {
376 try {
377
378
379
380 final ChannelHandler handler = next.handler();
381 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
382 if (handler == headContext) {
383 headContext.channelReadComplete(next);
384 } else if (handler instanceof ChannelDuplexHandler) {
385 ((ChannelDuplexHandler) handler).channelReadComplete(next);
386 } else {
387 ((ChannelInboundHandler) handler).channelReadComplete(next);
388 }
389 } catch (Throwable t) {
390 next.invokeExceptionCaught(t);
391 }
392 } else {
393 next.fireChannelReadComplete();
394 }
395 } else {
396 next.executor().execute(getInvokeTasks().fireChannelReadCompleteTask);
397 }
398 return this;
399 }
400
401 @Override
402 public ChannelHandlerContext fireChannelWritabilityChanged() {
403 AbstractChannelHandlerContext next = findContextInbound(MASK_CHANNEL_WRITABILITY_CHANGED);
404 if (next.executor().inEventLoop()) {
405 if (next.invokeHandler()) {
406 try {
407
408
409
410 final ChannelHandler handler = next.handler();
411 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
412 if (handler == headContext) {
413 headContext.channelWritabilityChanged(next);
414 } else if (handler instanceof ChannelInboundHandlerAdapter) {
415 ((ChannelInboundHandlerAdapter) handler).channelWritabilityChanged(next);
416 } else {
417 ((ChannelInboundHandler) handler).channelWritabilityChanged(next);
418 }
419 } catch (Throwable t) {
420 next.invokeExceptionCaught(t);
421 }
422 } else {
423 next.fireChannelWritabilityChanged();
424 }
425 } else {
426 next.executor().execute(getInvokeTasks().fireChannelWritabilityChangedTask);
427 }
428 return this;
429 }
430
431 @Override
432 public ChannelFuture bind(SocketAddress localAddress) {
433 return bind(localAddress, newPromise());
434 }
435
436 @Override
437 public ChannelFuture connect(SocketAddress remoteAddress) {
438 return connect(remoteAddress, newPromise());
439 }
440
441 @Override
442 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
443 return connect(remoteAddress, localAddress, newPromise());
444 }
445
446 @Override
447 public ChannelFuture disconnect() {
448 return disconnect(newPromise());
449 }
450
451 @Override
452 public ChannelFuture close() {
453 return close(newPromise());
454 }
455
456 @Override
457 public ChannelFuture deregister() {
458 return deregister(newPromise());
459 }
460
461
462
463
464
465
466
467
468
469
470
471 private ChannelPromise ensurePromiseUseCorrectExecutor(ChannelPromise promise) {
472 if (promise instanceof DefaultChannelPromise && !((DefaultChannelPromise) promise).executor().inEventLoop()) {
473 ChannelPromise newPromise = newPromise();
474 PromiseNotifier.cascade(newPromise, promise);
475 return newPromise;
476 }
477 return promise;
478 }
479
480 @Override
481 public ChannelFuture bind(final SocketAddress localAddress, ChannelPromise promise) {
482 ObjectUtil.checkNotNull(localAddress, "localAddress");
483 if (isNotValidPromise(promise, false)) {
484
485 return promise;
486 }
487
488 final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
489 EventExecutor executor = next.executor();
490 if (executor.inEventLoop()) {
491 if (next.invokeHandler()) {
492 promise = ensurePromiseUseCorrectExecutor(promise);
493 try {
494
495
496
497 final ChannelHandler handler = next.handler();
498 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
499 if (handler == headContext) {
500 headContext.bind(next, localAddress, promise);
501 } else if (handler instanceof ChannelDuplexHandler) {
502 ((ChannelDuplexHandler) handler).bind(next, localAddress, promise);
503 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
504 ((ChannelOutboundHandlerAdapter) handler).bind(next, localAddress, promise);
505 } else {
506 ((ChannelOutboundHandler) handler).bind(next, localAddress, promise);
507 }
508 } catch (Throwable t) {
509 notifyOutboundHandlerException(t, promise);
510 }
511 } else {
512 next.bind(localAddress, promise);
513 }
514 } else {
515 final ChannelPromise p = promise;
516 safeExecute(executor, () -> bind(localAddress, p), promise, null, false);
517 }
518 return promise;
519 }
520
521 @Override
522 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
523 return connect(remoteAddress, null, promise);
524 }
525
526 @Override
527 public ChannelFuture connect(
528 final SocketAddress remoteAddress, final SocketAddress localAddress, ChannelPromise promise) {
529 ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
530
531 if (isNotValidPromise(promise, false)) {
532
533 return promise;
534 }
535
536 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
537 EventExecutor executor = next.executor();
538 if (executor.inEventLoop()) {
539 if (next.invokeHandler()) {
540 promise = ensurePromiseUseCorrectExecutor(promise);
541 try {
542
543
544
545 final ChannelHandler handler = next.handler();
546 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
547 if (handler == headContext) {
548 headContext.connect(next, remoteAddress, localAddress, promise);
549 } else if (handler instanceof ChannelDuplexHandler) {
550 ((ChannelDuplexHandler) handler).connect(next, remoteAddress, localAddress, promise);
551 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
552 ((ChannelOutboundHandlerAdapter) handler).connect(next, remoteAddress, localAddress, promise);
553 } else {
554 ((ChannelOutboundHandler) handler).connect(next, remoteAddress, localAddress, promise);
555 }
556 } catch (Throwable t) {
557 notifyOutboundHandlerException(t, promise);
558 }
559 } else {
560 next.connect(remoteAddress, localAddress, promise);
561 }
562 } else {
563 final ChannelPromise p = promise;
564 safeExecute(executor, () -> connect(remoteAddress, localAddress, p), promise, null, false);
565 }
566 return promise;
567 }
568
569 @Override
570 public ChannelFuture disconnect(ChannelPromise promise) {
571 if (!channel().metadata().hasDisconnect()) {
572
573
574 return close(promise);
575 }
576 if (isNotValidPromise(promise, false)) {
577
578 return promise;
579 }
580
581 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DISCONNECT);
582 EventExecutor executor = next.executor();
583 if (executor.inEventLoop()) {
584 if (next.invokeHandler()) {
585 promise = ensurePromiseUseCorrectExecutor(promise);
586 try {
587
588
589
590 final ChannelHandler handler = next.handler();
591 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
592 if (handler == headContext) {
593 headContext.disconnect(next, promise);
594 } else if (handler instanceof ChannelDuplexHandler) {
595 ((ChannelDuplexHandler) handler).disconnect(next, promise);
596 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
597 ((ChannelOutboundHandlerAdapter) handler).disconnect(next, promise);
598 } else {
599 ((ChannelOutboundHandler) handler).disconnect(next, promise);
600 }
601 } catch (Throwable t) {
602 notifyOutboundHandlerException(t, promise);
603 }
604 } else {
605 next.disconnect(promise);
606 }
607 } else {
608 final ChannelPromise p = promise;
609 safeExecute(executor, () -> disconnect(p), promise, null, false);
610 }
611 return promise;
612 }
613
614 @Override
615 public ChannelFuture close(ChannelPromise promise) {
616 if (isNotValidPromise(promise, false)) {
617
618 return promise;
619 }
620
621 final AbstractChannelHandlerContext next = findContextOutbound(MASK_CLOSE);
622 EventExecutor executor = next.executor();
623 if (executor.inEventLoop()) {
624 if (next.invokeHandler()) {
625 promise = ensurePromiseUseCorrectExecutor(promise);
626 try {
627
628
629
630 final ChannelHandler handler = next.handler();
631 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
632 if (handler == headContext) {
633 headContext.close(next, promise);
634 } else if (handler instanceof ChannelDuplexHandler) {
635 ((ChannelDuplexHandler) handler).close(next, promise);
636 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
637 ((ChannelOutboundHandlerAdapter) handler).close(next, promise);
638 } else {
639 ((ChannelOutboundHandler) handler).close(next, promise);
640 }
641 } catch (Throwable t) {
642 notifyOutboundHandlerException(t, promise);
643 }
644 } else {
645 next.close(promise);
646 }
647 } else {
648 final ChannelPromise p = promise;
649 safeExecute(executor, () -> close(p), promise, null, false);
650 }
651
652 return promise;
653 }
654
655 @Override
656 public ChannelFuture deregister(ChannelPromise promise) {
657 if (isNotValidPromise(promise, false)) {
658
659 return promise;
660 }
661
662 final AbstractChannelHandlerContext next = findContextOutbound(MASK_DEREGISTER);
663 EventExecutor executor = next.executor();
664 if (executor.inEventLoop()) {
665 if (next.invokeHandler()) {
666 promise = ensurePromiseUseCorrectExecutor(promise);
667 try {
668
669
670
671 final ChannelHandler handler = next.handler();
672 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
673 if (handler == headContext) {
674 headContext.deregister(next, promise);
675 } else if (handler instanceof ChannelDuplexHandler) {
676 ((ChannelDuplexHandler) handler).deregister(next, promise);
677 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
678 ((ChannelOutboundHandlerAdapter) handler).deregister(next, promise);
679 } else {
680 ((ChannelOutboundHandler) handler).deregister(next, promise);
681 }
682 } catch (Throwable t) {
683 notifyOutboundHandlerException(t, promise);
684 }
685 } else {
686 deregister(promise);
687 }
688 } else {
689 final ChannelPromise p = promise;
690 safeExecute(executor, () -> deregister(p), promise, null, false);
691 }
692
693 return promise;
694 }
695
696 @Override
697 public ChannelHandlerContext read() {
698 final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
699 if (next.executor().inEventLoop()) {
700 if (next.invokeHandler()) {
701 try {
702
703
704
705 final ChannelHandler handler = next.handler();
706 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
707 if (handler == headContext) {
708 headContext.read(next);
709 } else if (handler instanceof ChannelDuplexHandler) {
710 ((ChannelDuplexHandler) handler).read(next);
711 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
712 ((ChannelOutboundHandlerAdapter) handler).read(next);
713 } else {
714 ((ChannelOutboundHandler) handler).read(next);
715 }
716 } catch (Throwable t) {
717 next.invokeExceptionCaught(t);
718 }
719 } else {
720 next.read();
721 }
722 } else {
723 next.executor().execute(getInvokeTasks().readTask);
724 }
725 return this;
726 }
727
728 @Override
729 public ChannelFuture write(Object msg) {
730 ChannelPromise promise = newPromise();
731 write(msg, false, promise);
732 return promise;
733 }
734
735 @Override
736 public ChannelFuture write(final Object msg, final ChannelPromise promise) {
737 write(msg, false, promise);
738 return promise;
739 }
740
741 @Override
742 public ChannelHandlerContext flush() {
743 final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
744 EventExecutor executor = next.executor();
745 if (executor.inEventLoop()) {
746 if (next.invokeHandler()) {
747 try {
748
749
750
751 final ChannelHandler handler = next.handler();
752 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
753 if (handler == headContext) {
754 headContext.flush(next);
755 } else if (handler instanceof ChannelDuplexHandler) {
756 ((ChannelDuplexHandler) handler).flush(next);
757 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
758 ((ChannelOutboundHandlerAdapter) handler).flush(next);
759 } else {
760 ((ChannelOutboundHandler) handler).flush(next);
761 }
762 } catch (Throwable t) {
763 next.invokeExceptionCaught(t);
764 }
765 } else {
766 next.flush();
767 }
768 } else {
769 safeExecute(executor, getInvokeTasks().flushTask, channel().voidPromise(), null, false);
770 }
771 return this;
772 }
773
774 @Override
775 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
776 write(msg, true, promise);
777 return promise;
778 }
779
780 void write(Object msg, boolean flush, ChannelPromise promise) {
781 if (validateWrite(msg, promise)) {
782 final AbstractChannelHandlerContext next = findContextOutbound(flush ?
783 MASK_WRITE | MASK_FLUSH : MASK_WRITE);
784 final Object m = pipeline.touch(msg, next);
785 EventExecutor executor = next.executor();
786 if (executor.inEventLoop()) {
787 if (next.invokeHandler()) {
788 promise = ensurePromiseUseCorrectExecutor(promise);
789 try {
790
791
792
793 final ChannelHandler handler = next.handler();
794 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
795 if (handler == headContext) {
796 headContext.write(next, msg, promise);
797 } else if (handler instanceof ChannelDuplexHandler) {
798 ((ChannelDuplexHandler) handler).write(next, msg, promise);
799 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
800 ((ChannelOutboundHandlerAdapter) handler).write(next, msg, promise);
801 } else {
802 ((ChannelOutboundHandler) handler).write(next, msg, promise);
803 }
804 } catch (Throwable t) {
805 notifyOutboundHandlerException(t, promise);
806 }
807 if (flush) {
808 try {
809
810
811
812 final ChannelHandler handler = next.handler();
813 final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
814 if (handler == headContext) {
815 headContext.flush(next);
816 } else if (handler instanceof ChannelDuplexHandler) {
817 ((ChannelDuplexHandler) handler).flush(next);
818 } else if (handler instanceof ChannelOutboundHandlerAdapter) {
819 ((ChannelOutboundHandlerAdapter) handler).flush(next);
820 } else {
821 ((ChannelOutboundHandler) handler).flush(next);
822 }
823 } catch (Throwable t) {
824 next.invokeExceptionCaught(t);
825 }
826 }
827 } else {
828 next.write(msg, flush, promise);
829 }
830 } else {
831 final WriteTask task = WriteTask.newInstance(this, m, promise, flush);
832 if (!safeExecute(executor, task, promise, m, !flush)) {
833
834
835
836
837 task.cancel();
838 }
839 }
840 }
841 }
842
843 private boolean validateWrite(Object msg, ChannelPromise promise) {
844 ObjectUtil.checkNotNull(msg, "msg");
845 try {
846 if (isNotValidPromise(promise, true)) {
847 ReferenceCountUtil.release(msg);
848 return false;
849 }
850 } catch (RuntimeException e) {
851 ReferenceCountUtil.release(msg);
852 throw e;
853 }
854 return true;
855 }
856
857 @Override
858 public ChannelFuture writeAndFlush(Object msg) {
859 return writeAndFlush(msg, newPromise());
860 }
861
862 private static void notifyOutboundHandlerException(Throwable cause, ChannelPromise promise) {
863
864
865 PromiseNotificationUtil.tryFailure(promise, cause, promise instanceof VoidChannelPromise ? null : logger);
866 }
867
868 @Override
869 public ChannelPromise newPromise() {
870 return new DefaultChannelPromise(channel(), executor());
871 }
872
873 @Override
874 public ChannelProgressivePromise newProgressivePromise() {
875 return new DefaultChannelProgressivePromise(channel(), executor());
876 }
877
878 @Override
879 public ChannelFuture newSucceededFuture() {
880 ChannelFuture succeededFuture = this.succeededFuture;
881 if (succeededFuture == null) {
882 this.succeededFuture = succeededFuture = new SucceededChannelFuture(channel(), executor());
883 }
884 return succeededFuture;
885 }
886
887 @Override
888 public ChannelFuture newFailedFuture(Throwable cause) {
889 return new FailedChannelFuture(channel(), executor(), cause);
890 }
891
892 private boolean isNotValidPromise(ChannelPromise promise, boolean allowVoidPromise) {
893 ObjectUtil.checkNotNull(promise, "promise");
894
895 if (promise.isDone()) {
896
897
898
899
900 if (promise.isCancelled()) {
901 return true;
902 }
903 throw new IllegalArgumentException("promise already done: " + promise);
904 }
905
906 if (promise.channel() != channel()) {
907 throw new IllegalArgumentException(String.format(
908 "promise.channel does not match: %s (expected: %s)", promise.channel(), channel()));
909 }
910
911 if (promise.getClass() == DefaultChannelPromise.class) {
912 return false;
913 }
914
915 if (!allowVoidPromise && promise instanceof VoidChannelPromise) {
916 throw new IllegalArgumentException(
917 StringUtil.simpleClassName(VoidChannelPromise.class) + " not allowed for this operation");
918 }
919
920 if (promise instanceof AbstractChannel.CloseFuture) {
921 throw new IllegalArgumentException(
922 StringUtil.simpleClassName(AbstractChannel.CloseFuture.class) + " not allowed in a pipeline");
923 }
924 return false;
925 }
926
927 private AbstractChannelHandlerContext findContextInbound(int mask) {
928 AbstractChannelHandlerContext ctx = this;
929 EventExecutor currentExecutor = executor();
930 do {
931 ctx = ctx.next;
932 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
933 return ctx;
934 }
935
936 private AbstractChannelHandlerContext findContextOutbound(int mask) {
937 AbstractChannelHandlerContext ctx = this;
938 EventExecutor currentExecutor = executor();
939 do {
940 ctx = ctx.prev;
941 } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
942 return ctx;
943 }
944
945 private static boolean skipContext(
946 AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
947
948 return (ctx.executionMask & (onlyMask | mask)) == 0 ||
949
950
951
952
953 (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
954 }
955
956 @Override
957 public ChannelPromise voidPromise() {
958 return channel().voidPromise();
959 }
960
961 final void setRemoved() {
962 handlerState = REMOVE_COMPLETE;
963 }
964
965 final boolean setAddComplete() {
966 for (;;) {
967 int oldState = handlerState;
968 if (oldState == REMOVE_COMPLETE) {
969 return false;
970 }
971
972
973
974 if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
975 return true;
976 }
977 }
978 }
979
980 final void setAddPending() {
981 boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
982 assert updated;
983 }
984
985 final void callHandlerAdded() throws Exception {
986
987
988 if (setAddComplete()) {
989 handler().handlerAdded(this);
990 }
991 }
992
993 final void callHandlerRemoved() throws Exception {
994 try {
995
996 if (handlerState == ADD_COMPLETE) {
997 handler().handlerRemoved(this);
998 }
999 } finally {
1000
1001 setRemoved();
1002 }
1003 }
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013 boolean invokeHandler() {
1014
1015 int handlerState = this.handlerState;
1016 return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
1017 }
1018
1019 @Override
1020 public boolean isRemoved() {
1021 return handlerState == REMOVE_COMPLETE;
1022 }
1023
1024 @Override
1025 public <T> Attribute<T> attr(AttributeKey<T> key) {
1026 return channel().attr(key);
1027 }
1028
1029 @Override
1030 public <T> boolean hasAttr(AttributeKey<T> key) {
1031 return channel().hasAttr(key);
1032 }
1033
1034 private static boolean safeExecute(EventExecutor executor, Runnable runnable,
1035 ChannelPromise promise, Object msg, boolean lazy) {
1036 try {
1037 if (lazy && executor instanceof AbstractEventExecutor) {
1038 ((AbstractEventExecutor) executor).lazyExecute(runnable);
1039 } else {
1040 executor.execute(runnable);
1041 }
1042 return true;
1043 } catch (Throwable cause) {
1044 try {
1045 if (msg != null) {
1046 ReferenceCountUtil.release(msg);
1047 }
1048 } finally {
1049 promise.setFailure(cause);
1050 }
1051 return false;
1052 }
1053 }
1054
1055 @Override
1056 public String toHintString() {
1057 return '\'' + name + "' will handle the message from this point.";
1058 }
1059
1060 @Override
1061 public String toString() {
1062 return StringUtil.simpleClassName(ChannelHandlerContext.class) + '(' + name + ", " + channel() + ')';
1063 }
1064
1065 Tasks getInvokeTasks() {
1066 Tasks tasks = invokeTasks;
1067 if (tasks == null) {
1068 invokeTasks = tasks = new Tasks(this);
1069 }
1070 return tasks;
1071 }
1072
1073 static final class WriteTask implements Runnable {
1074 private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
1075 @Override
1076 protected WriteTask newObject(Handle<WriteTask> handle) {
1077 return new WriteTask(handle);
1078 }
1079 };
1080
1081 static WriteTask newInstance(AbstractChannelHandlerContext ctx,
1082 Object msg, ChannelPromise promise, boolean flush) {
1083 WriteTask task = RECYCLER.get();
1084 init(task, ctx, msg, promise, flush);
1085 return task;
1086 }
1087
1088 private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
1089 SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);
1090
1091
1092 private static final int WRITE_TASK_OVERHEAD =
1093 SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 32);
1094
1095 private final Handle<WriteTask> handle;
1096 private AbstractChannelHandlerContext ctx;
1097 private Object msg;
1098 private ChannelPromise promise;
1099 private int size;
1100
1101 private WriteTask(Handle<WriteTask> handle) {
1102 this.handle = handle;
1103 }
1104
1105 static void init(WriteTask task, AbstractChannelHandlerContext ctx,
1106 Object msg, ChannelPromise promise, boolean flush) {
1107 task.ctx = ctx;
1108 task.msg = msg;
1109 task.promise = promise;
1110
1111 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1112 task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
1113 ctx.pipeline.incrementPendingOutboundBytes(task.size);
1114 } else {
1115 task.size = 0;
1116 }
1117 if (flush) {
1118 task.size |= Integer.MIN_VALUE;
1119 }
1120 }
1121
1122 @Override
1123 public void run() {
1124 try {
1125 decrementPendingOutboundBytes();
1126 ctx.write(msg, size < 0, promise);
1127 } finally {
1128 recycle();
1129 }
1130 }
1131
1132 void cancel() {
1133 try {
1134 decrementPendingOutboundBytes();
1135 } finally {
1136 recycle();
1137 }
1138 }
1139
1140 private void decrementPendingOutboundBytes() {
1141 if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
1142 ctx.pipeline.decrementPendingOutboundBytes(size & Integer.MAX_VALUE);
1143 }
1144 }
1145
1146 private void recycle() {
1147
1148 ctx = null;
1149 msg = null;
1150 promise = null;
1151 handle.recycle(this);
1152 }
1153 }
1154
1155 static final class Tasks {
1156 final Runnable fireChannelReadCompleteTask;
1157 private final Runnable readTask;
1158 private final Runnable fireChannelWritabilityChangedTask;
1159 private final Runnable flushTask;
1160
1161 Tasks(AbstractChannelHandlerContext ctx) {
1162 fireChannelReadCompleteTask = ctx::fireChannelReadComplete;
1163 readTask = ctx::read;
1164 fireChannelWritabilityChangedTask = ctx::fireChannelWritabilityChanged;
1165 flushTask = ctx::flush;
1166 }
1167 }
1168 }