1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel;
17
18 import io.netty5.util.Resource;
19 import io.netty5.util.ResourceLeakDetector;
20 import io.netty5.util.concurrent.EventExecutor;
21 import io.netty5.util.concurrent.FastThreadLocal;
22 import io.netty5.util.concurrent.Future;
23 import io.netty5.util.concurrent.Promise;
24 import io.netty5.util.internal.StringUtil;
25 import io.netty5.util.internal.logging.InternalLogger;
26 import io.netty5.util.internal.logging.InternalLoggerFactory;
27
28 import java.net.SocketAddress;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.Iterator;
32 import java.util.LinkedHashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.NoSuchElementException;
36 import java.util.WeakHashMap;
37 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
38 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
39 import java.util.function.IntSupplier;
40 import java.util.function.Predicate;
41
42 import static java.util.Objects.requireNonNull;
43 import static io.netty5.channel.DefaultChannelHandlerContext.safeExecute;
44
45
46
47
48
49 public abstract class DefaultChannelPipeline implements ChannelPipeline {
50
51 private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
52 private static final String HEAD_NAME = generateName0(HeadHandler.class);
53 private static final String TAIL_NAME = generateName0(TailHandler.class);
54
55 private static final ChannelHandler HEAD_HANDLER = new HeadHandler();
56 private static final ChannelHandler TAIL_HANDLER = new TailHandler();
57
58 private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
59 new FastThreadLocal<>() {
60 @Override
61 protected Map<Class<?>, String> initialValue() {
62 return new WeakHashMap<>();
63 }
64 };
65
66 private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
67 AtomicReferenceFieldUpdater.newUpdater(
68 DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
69 private final DefaultChannelHandlerContext head;
70 private final DefaultChannelHandlerContext tail;
71
72 private final Channel channel;
73 private final Future<Void> succeededFuture;
74 private final boolean touch = ResourceLeakDetector.isEnabled();
75 private final List<DefaultChannelHandlerContext> handlers = new ArrayList<>(4);
76
77 private volatile MessageSizeEstimator.Handle estimatorHandle;
78
79 private static final AtomicLongFieldUpdater<DefaultChannelPipeline> TOTAL_PENDING_OUTBOUND_BYTES_UPDATER =
80 AtomicLongFieldUpdater.newUpdater(DefaultChannelPipeline.class, "pendingOutboundBytes");
81
82 private volatile long pendingOutboundBytes;
83
84 protected DefaultChannelPipeline(Channel channel) {
85 this.channel = requireNonNull(channel, "channel");
86 succeededFuture = channel.executor().newSucceededFuture(null);
87
88 tail = new DefaultChannelHandlerContext(this, TAIL_NAME, TAIL_HANDLER);
89 head = new DefaultChannelHandlerContext(this, HEAD_NAME, HEAD_HANDLER);
90
91 head.next = tail;
92 tail.prev = head;
93 head.setAddComplete();
94 tail.setAddComplete();
95 }
96
97 final MessageSizeEstimator.Handle estimatorHandle() {
98 MessageSizeEstimator.Handle handle = estimatorHandle;
99 if (handle == null) {
100 handle = channel.getOption(ChannelOption.MESSAGE_SIZE_ESTIMATOR).newHandle();
101 if (!ESTIMATOR.compareAndSet(this, null, handle)) {
102 handle = estimatorHandle;
103 }
104 }
105 return handle;
106 }
107
108 final Object touch(Object msg, DefaultChannelHandlerContext next) {
109 if (touch) {
110 Resource.touch(msg, next);
111 }
112 return msg;
113 }
114
115 private DefaultChannelHandlerContext newContext(String name, ChannelHandler handler) {
116 checkMultiplicity(handler);
117 if (name == null) {
118 name = generateName(handler);
119 }
120 return new DefaultChannelHandlerContext(this, name, handler);
121 }
122
123 private void checkDuplicateName(String name) {
124 if (context(name) != null) {
125 throw new IllegalArgumentException("Duplicate handler name: " + name);
126 }
127 }
128
129 private static int checkNoSuchElement(int idx, String name) {
130 if (idx == -1) {
131 throw new NoSuchElementException(name);
132 }
133 return idx;
134 }
135
136 @Override
137 public final Channel channel() {
138 return channel;
139 }
140
141 @Override
142 public final EventExecutor executor() {
143 return channel().executor();
144 }
145
146 @Override
147 public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
148 DefaultChannelHandlerContext newCtx = newContext(name, handler);
149 EventExecutor executor = executor();
150 boolean inEventLoop = executor.inEventLoop();
151 synchronized (handlers) {
152 checkDuplicateName(newCtx.name());
153
154 handlers.add(0, newCtx);
155 if (!inEventLoop) {
156 try {
157 executor.execute(() -> addFirst0(newCtx));
158 return this;
159 } catch (Throwable cause) {
160 handlers.remove(0);
161 throw cause;
162 }
163 }
164 }
165
166 addFirst0(newCtx);
167 return this;
168 }
169
170 private void addFirst0(DefaultChannelHandlerContext newCtx) {
171 DefaultChannelHandlerContext nextCtx = head.next;
172 newCtx.prev = head;
173 newCtx.next = nextCtx;
174 head.next = newCtx;
175 nextCtx.prev = newCtx;
176 callHandlerAdded0(newCtx);
177 }
178
179 @Override
180 public final ChannelPipeline addLast(String name, ChannelHandler handler) {
181 DefaultChannelHandlerContext newCtx = newContext(name, handler);
182 EventExecutor executor = executor();
183 boolean inEventLoop = executor.inEventLoop();
184 synchronized (handlers) {
185 checkDuplicateName(newCtx.name());
186
187 handlers.add(newCtx);
188 if (!inEventLoop) {
189 try {
190 executor.execute(() -> addLast0(newCtx));
191 return this;
192 } catch (Throwable cause) {
193 handlers.remove(handlers.size() - 1);
194 throw cause;
195 }
196 }
197 }
198
199 addLast0(newCtx);
200 return this;
201 }
202
203 private void addLast0(DefaultChannelHandlerContext newCtx) {
204 DefaultChannelHandlerContext prev = tail.prev;
205 newCtx.prev = prev;
206 newCtx.next = tail;
207 prev.next = newCtx;
208 tail.prev = newCtx;
209 callHandlerAdded0(newCtx);
210 }
211
212 @Override
213 public final ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler) {
214 final DefaultChannelHandlerContext ctx;
215
216 DefaultChannelHandlerContext newCtx = newContext(name, handler);
217 EventExecutor executor = executor();
218 boolean inEventLoop = executor.inEventLoop();
219 synchronized (handlers) {
220 int i = checkNoSuchElement(findCtxIdx(context -> context.name().equals(baseName)), baseName);
221 checkDuplicateName(newCtx.name());
222
223 ctx = handlers.get(i);
224 handlers.add(i, newCtx);
225 if (!inEventLoop) {
226 try {
227 executor.execute(() -> addBefore0(ctx, newCtx));
228 return this;
229 } catch (Throwable cause) {
230 handlers.remove(i);
231 throw cause;
232 }
233 }
234 }
235
236 addBefore0(ctx, newCtx);
237 return this;
238 }
239
240 private void addBefore0(DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
241 newCtx.prev = ctx.prev;
242 newCtx.next = ctx;
243 ctx.prev.next = newCtx;
244 ctx.prev = newCtx;
245 callHandlerAdded0(newCtx);
246 }
247
248 @Override
249 public final ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler) {
250 final DefaultChannelHandlerContext ctx;
251
252 if (name == null) {
253 name = generateName(handler);
254 }
255
256 DefaultChannelHandlerContext newCtx = newContext(name, handler);
257 EventExecutor executor = executor();
258 boolean inEventLoop = executor.inEventLoop();
259 synchronized (handlers) {
260 int i = checkNoSuchElement(findCtxIdx(context -> context.name().equals(baseName)), baseName);
261
262 checkDuplicateName(newCtx.name());
263
264 ctx = handlers.get(i);
265 handlers.add(i + 1, newCtx);
266 if (!inEventLoop) {
267 try {
268 executor.execute(() -> addAfter0(ctx, newCtx));
269 return this;
270 } catch (Throwable cause) {
271 handlers.remove(i + 1);
272 throw cause;
273 }
274 }
275 }
276
277 addAfter0(ctx, newCtx);
278 return this;
279 }
280
281 private void addAfter0(DefaultChannelHandlerContext ctx, DefaultChannelHandlerContext newCtx) {
282 newCtx.prev = ctx;
283 newCtx.next = ctx.next;
284 ctx.next.prev = newCtx;
285 ctx.next = newCtx;
286 callHandlerAdded0(newCtx);
287 }
288
289 public final ChannelPipeline addFirst(ChannelHandler handler) {
290 return addFirst(null, handler);
291 }
292
293 @Override
294 public final ChannelPipeline addFirst(ChannelHandler... handlers) {
295 requireNonNull(handlers, "handlers");
296
297 for (int i = handlers.length - 1; i >= 0; i--) {
298 ChannelHandler h = handlers[i];
299 if (h != null) {
300 addFirst(null, h);
301 }
302 }
303
304 return this;
305 }
306
307 public final ChannelPipeline addLast(ChannelHandler handler) {
308 return addLast(null, handler);
309 }
310
311 @Override
312 public final ChannelPipeline addLast(ChannelHandler... handlers) {
313 requireNonNull(handlers, "handlers");
314
315 for (ChannelHandler h : handlers) {
316 if (h != null) {
317 addLast(null, h);
318 }
319 }
320
321 return this;
322 }
323
324 private String generateName(ChannelHandler handler) {
325 Map<Class<?>, String> cache = nameCaches.get();
326 Class<?> handlerType = handler.getClass();
327 String name = cache.get(handlerType);
328 if (name == null) {
329 name = generateName0(handlerType);
330 cache.put(handlerType, name);
331 }
332
333 synchronized (handlers) {
334
335
336 if (context(name) != null) {
337 String baseName = name.substring(0, name.length() - 1);
338 for (int i = 1;; i ++) {
339 String newName = baseName + i;
340 if (context(newName) == null) {
341 name = newName;
342 break;
343 }
344 }
345 }
346 }
347
348 return name;
349 }
350
351 private static String generateName0(Class<?> handlerType) {
352 return StringUtil.simpleClassName(handlerType) + "#0";
353 }
354
355 private int findCtxIdx(Predicate<DefaultChannelHandlerContext> predicate) {
356 for (int i = 0; i < handlers.size(); i++) {
357 if (predicate.test(handlers.get(i))) {
358 return i;
359 }
360 }
361 return -1;
362 }
363
364 @Override
365 public final ChannelPipeline remove(ChannelHandler handler) {
366 final DefaultChannelHandlerContext ctx;
367 EventExecutor executor = executor();
368 boolean inEventLoop = executor.inEventLoop();
369 synchronized (handlers) {
370 int idx = checkNoSuchElement(findCtxIdx(context -> context.handler() == handler), null);
371 ctx = handlers.remove(idx);
372 assert ctx != null;
373
374 if (!inEventLoop) {
375 scheduleRemove(idx, ctx);
376 return this;
377 }
378 }
379
380 remove0(ctx);
381 return this;
382 }
383
384 @Override
385 public final ChannelHandler remove(String name) {
386 final DefaultChannelHandlerContext ctx;
387 EventExecutor executor = executor();
388 boolean inEventLoop = executor.inEventLoop();
389 synchronized (handlers) {
390 int idx = checkNoSuchElement(findCtxIdx(context -> context.name().equals(name)), name);
391 ctx = handlers.remove(idx);
392 assert ctx != null;
393
394 if (!inEventLoop) {
395 return scheduleRemove(idx, ctx);
396 }
397 }
398
399 remove0(ctx);
400 return ctx.handler();
401 }
402
403 @SuppressWarnings("unchecked")
404 @Override
405 public final <T extends ChannelHandler> T remove(Class<T> handlerType) {
406 final DefaultChannelHandlerContext ctx;
407 EventExecutor executor = executor();
408 boolean inEventLoop = executor.inEventLoop();
409 synchronized (handlers) {
410 int idx = checkNoSuchElement(findCtxIdx(
411 context -> handlerType.isAssignableFrom(context.handler().getClass())), null);
412 ctx = handlers.remove(idx);
413 assert ctx != null;
414
415 if (!inEventLoop) {
416 return scheduleRemove(idx, ctx);
417 }
418 }
419
420 remove0(ctx);
421 return (T) ctx.handler();
422 }
423
424 @Override
425 public final <T extends ChannelHandler> T removeIfExists(String name) {
426 return removeIfExists(() -> findCtxIdx(context -> name.equals(context.name())));
427 }
428
429 @Override
430 public final <T extends ChannelHandler> T removeIfExists(Class<T> handlerType) {
431 return removeIfExists(() -> findCtxIdx(
432 context -> handlerType.isAssignableFrom(context.handler().getClass())));
433 }
434
435 @Override
436 public final <T extends ChannelHandler> T removeIfExists(ChannelHandler handler) {
437 return removeIfExists(() -> findCtxIdx(context -> handler == context.handler()));
438 }
439
440 @SuppressWarnings("unchecked")
441 private <T extends ChannelHandler> T removeIfExists(IntSupplier idxSupplier) {
442 final DefaultChannelHandlerContext ctx;
443 EventExecutor executor = executor();
444 boolean inEventLoop = executor.inEventLoop();
445 synchronized (handlers) {
446 int idx = idxSupplier.getAsInt();
447 if (idx == -1) {
448 return null;
449 }
450 ctx = handlers.remove(idx);
451 assert ctx != null;
452
453 if (!inEventLoop) {
454 return scheduleRemove(idx, ctx);
455 }
456 }
457 remove0(ctx);
458 return (T) ctx.handler();
459 }
460
461 private void remove0(DefaultChannelHandlerContext ctx) {
462 try {
463 callHandlerRemoved0(ctx);
464 } finally {
465 ctx.remove(true);
466 }
467 }
468
469 @Override
470 public final ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) {
471 replace(ctx -> ctx.handler() == oldHandler, newName, newHandler);
472 return this;
473 }
474
475 @Override
476 public final ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler) {
477 return replace(ctx -> ctx.name().equals(oldName), newName, newHandler);
478 }
479
480 @Override
481 @SuppressWarnings("unchecked")
482 public final <T extends ChannelHandler> T replace(
483 Class<T> oldHandlerType, String newName, ChannelHandler newHandler) {
484 return (T) replace(ctx -> oldHandlerType.isAssignableFrom(ctx.handler().getClass()), newName, newHandler);
485 }
486
487 private ChannelHandler replace(
488 Predicate<DefaultChannelHandlerContext> predicate, String newName, ChannelHandler newHandler) {
489 DefaultChannelHandlerContext oldCtx;
490 DefaultChannelHandlerContext newCtx = newContext(newName, newHandler);
491 EventExecutor executor = executor();
492 boolean inEventLoop = executor.inEventLoop();
493 synchronized (handlers) {
494 int idx = checkNoSuchElement(findCtxIdx(predicate), null);
495 oldCtx = handlers.get(idx);
496 assert oldCtx != head && oldCtx != tail && oldCtx != null;
497
498 if (!oldCtx.name().equals(newCtx.name())) {
499 checkDuplicateName(newCtx.name());
500 }
501 DefaultChannelHandlerContext removed = handlers.set(idx, newCtx);
502 assert removed != null;
503
504 if (!inEventLoop) {
505 try {
506 executor.execute(() -> replace0(oldCtx, newCtx));
507 return oldCtx.handler();
508 } catch (Throwable cause) {
509 handlers.set(idx, oldCtx);
510 throw cause;
511 }
512 }
513 }
514
515 replace0(oldCtx, newCtx);
516 return oldCtx.handler();
517 }
518
519 private void replace0(DefaultChannelHandlerContext oldCtx, DefaultChannelHandlerContext newCtx) {
520 DefaultChannelHandlerContext prev = oldCtx.prev;
521 DefaultChannelHandlerContext next = oldCtx.next;
522 newCtx.prev = prev;
523 newCtx.next = next;
524
525
526
527
528
529 prev.next = newCtx;
530 next.prev = newCtx;
531
532
533 oldCtx.prev = newCtx;
534 oldCtx.next = newCtx;
535
536 try {
537
538
539
540 callHandlerAdded0(newCtx);
541 callHandlerRemoved0(oldCtx);
542 } finally {
543 oldCtx.remove(false);
544 }
545 }
546
547 private static void checkMultiplicity(ChannelHandler handler) {
548 if (handler instanceof ChannelHandlerAdapter) {
549 ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
550 if (!h.isSharable() && h.added) {
551 throw new ChannelPipelineException(
552 h.getClass().getName() +
553 " is not a @Sharable handler, so can't be added or removed multiple times.");
554 }
555 h.added = true;
556 }
557 }
558
559 private void callHandlerAdded0(final DefaultChannelHandlerContext ctx) {
560 try {
561 ctx.callHandlerAdded();
562 } catch (Throwable t) {
563 boolean removed = false;
564 try {
565 synchronized (handlers) {
566 handlers.remove(ctx);
567 }
568
569 ctx.callHandlerRemoved();
570
571 removed = true;
572 } catch (Throwable t2) {
573 if (logger.isWarnEnabled()) {
574 logger.warn("Failed to remove a handler: " + ctx.name(), t2);
575 }
576 } finally {
577 ctx.remove(true);
578 }
579
580 if (removed) {
581 fireChannelExceptionCaught(new ChannelPipelineException(
582 ctx.handler().getClass().getName() +
583 ".handlerAdded() has thrown an exception; removed.", t));
584 } else {
585 fireChannelExceptionCaught(new ChannelPipelineException(
586 ctx.handler().getClass().getName() +
587 ".handlerAdded() has thrown an exception; also failed to remove.", t));
588 }
589 }
590 }
591
592 private void callHandlerRemoved0(final DefaultChannelHandlerContext ctx) {
593
594 try {
595 ctx.callHandlerRemoved();
596 } catch (Throwable t) {
597 fireChannelExceptionCaught(new ChannelPipelineException(
598 ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
599 }
600 }
601
602 @Override
603 public final ChannelHandler get(String name) {
604 ChannelHandlerContext ctx = context(name);
605 return ctx == null ? null : ctx.handler();
606 }
607
608 @SuppressWarnings("unchecked")
609 @Override
610 public final <T extends ChannelHandler> T get(Class<T> handlerType) {
611 ChannelHandlerContext ctx = context(handlerType);
612 return ctx == null ? null : (T) ctx.handler();
613 }
614
615 private DefaultChannelHandlerContext findCtx(Predicate<DefaultChannelHandlerContext> predicate) {
616 for (int i = 0; i < handlers.size(); i++) {
617 DefaultChannelHandlerContext ctx = handlers.get(i);
618 if (predicate.test(ctx)) {
619 return ctx;
620 }
621 }
622 return null;
623 }
624
625 @Override
626 public final ChannelHandlerContext context(String name) {
627 requireNonNull(name, "name");
628
629 synchronized (handlers) {
630 return findCtx(ctx -> ctx.name().equals(name));
631 }
632 }
633
634 @Override
635 public final ChannelHandlerContext context(ChannelHandler handler) {
636 requireNonNull(handler, "handler");
637
638 synchronized (handlers) {
639 return findCtx(ctx -> ctx.handler() == handler);
640 }
641 }
642
643 @Override
644 public final ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType) {
645 requireNonNull(handlerType, "handlerType");
646
647 synchronized (handlers) {
648 return findCtx(ctx -> handlerType.isAssignableFrom(ctx.handler().getClass()));
649 }
650 }
651
652 @Override
653 public final List<String> names() {
654 synchronized (handlers) {
655 List<String> names = new ArrayList<>(handlers.size());
656 for (int i = 0; i < handlers.size(); i++) {
657 names.add(handlers.get(i).name());
658 }
659 return names;
660 }
661 }
662
663
664
665
666 @Override
667 public final String toString() {
668 StringBuilder buf = new StringBuilder()
669 .append(StringUtil.simpleClassName(this))
670 .append('{');
671 synchronized (handlers) {
672 if (!handlers.isEmpty()) {
673 for (int i = 0; i < handlers.size(); i++) {
674 DefaultChannelHandlerContext ctx = handlers.get(i);
675
676 buf.append('(')
677 .append(ctx.name())
678 .append(" = ")
679 .append(ctx.handler().getClass().getName())
680 .append("), ");
681 }
682 buf.setLength(buf.length() - 2);
683 }
684 }
685 buf.append('}');
686 return buf.toString();
687 }
688
689 @Override
690 public ChannelHandler removeFirst() {
691 final DefaultChannelHandlerContext ctx;
692 EventExecutor executor = executor();
693 boolean inEventLoop = executor.inEventLoop();
694 synchronized (handlers) {
695 if (handlers.isEmpty()) {
696 throw new NoSuchElementException();
697 }
698 int idx = 0;
699
700 ctx = handlers.remove(idx);
701 assert ctx != null;
702
703 if (!inEventLoop) {
704 return scheduleRemove(idx, ctx);
705 }
706 }
707 remove0(ctx);
708 return ctx.handler();
709 }
710
711 @Override
712 public ChannelHandler removeLast() {
713 final DefaultChannelHandlerContext ctx;
714 EventExecutor executor = executor();
715 boolean inEventLoop = executor.inEventLoop();
716 synchronized (handlers) {
717 if (handlers.isEmpty()) {
718 return null;
719 }
720 int idx = handlers.size() - 1;
721
722 ctx = handlers.remove(idx);
723 assert ctx != null;
724
725 if (!inEventLoop) {
726 return scheduleRemove(idx, ctx);
727 }
728 }
729 remove0(ctx);
730 return ctx.handler();
731 }
732
733 @SuppressWarnings("unchecked")
734 private <T extends ChannelHandler> T scheduleRemove(int idx, DefaultChannelHandlerContext ctx) {
735 try {
736 ctx.executor().execute(() -> remove0(ctx));
737 return (T) ctx.handler();
738 } catch (Throwable cause) {
739 handlers.add(idx, ctx);
740 throw cause;
741 }
742 }
743
744 @Override
745 public ChannelHandler first() {
746 ChannelHandlerContext ctx = firstContext();
747 return ctx == null ? null : ctx.handler();
748 }
749
750 @Override
751 public ChannelHandlerContext firstContext() {
752 synchronized (handlers) {
753 return handlers.isEmpty() ? null : handlers.get(0);
754 }
755 }
756
757 @Override
758 public ChannelHandler last() {
759 ChannelHandlerContext ctx = lastContext();
760 return ctx == null ? null : ctx.handler();
761 }
762
763 @Override
764 public ChannelHandlerContext lastContext() {
765 synchronized (handlers) {
766 return handlers.isEmpty() ? null : handlers.get(handlers.size() - 1);
767 }
768 }
769
770 @Override
771 public Map<String, ChannelHandler> toMap() {
772 Map<String, ChannelHandler> map;
773 synchronized (handlers) {
774 if (handlers.isEmpty()) {
775 return Collections.emptyMap();
776 }
777 map = new LinkedHashMap<>(handlers.size());
778 for (int i = 0; i < handlers.size(); i++) {
779 ChannelHandlerContext ctx = handlers.get(i);
780 map.put(ctx.name(), ctx.handler());
781 }
782 return map;
783 }
784 }
785
786 @Override
787 public Iterator<Map.Entry<String, ChannelHandler>> iterator() {
788 return toMap().entrySet().iterator();
789 }
790
791 @Override
792 public final ChannelPipeline fireChannelRegistered() {
793 head.invokeChannelRegistered();
794 return this;
795 }
796
797 @Override
798 public final ChannelPipeline fireChannelUnregistered() {
799 head.invokeChannelUnregistered();
800 return this;
801 }
802
803 @Override
804 public final ChannelPipeline fireChannelActive() {
805 head.invokeChannelActive();
806 return this;
807 }
808
809 @Override
810 public final ChannelPipeline fireChannelInactive() {
811 head.invokeChannelInactive();
812 return this;
813 }
814
815 @Override
816 public final ChannelPipeline fireChannelShutdown(ChannelShutdownDirection direction) {
817 head.invokeChannelShutdown(direction);
818 return this;
819 }
820
821 @Override
822 public final ChannelPipeline fireChannelExceptionCaught(Throwable cause) {
823 head.invokeChannelExceptionCaught(cause);
824 return this;
825 }
826
827 @Override
828 public final ChannelPipeline fireChannelInboundEvent(Object event) {
829 head.invokeChannelInboundEvent(event);
830 return this;
831 }
832
833 @Override
834 public final ChannelPipeline fireChannelRead(Object msg) {
835 head.invokeChannelRead(msg);
836 return this;
837 }
838
839 @Override
840 public final ChannelPipeline fireChannelReadComplete() {
841 head.invokeChannelReadComplete();
842 return this;
843 }
844
845 @Override
846 public final ChannelPipeline fireChannelWritabilityChanged() {
847 head.invokeChannelWritabilityChanged();
848 return this;
849 }
850
851 @Override
852 public final Future<Void> bind(SocketAddress localAddress) {
853 return tail.bind(localAddress);
854 }
855
856 @Override
857 public final Future<Void> connect(SocketAddress remoteAddress) {
858 return tail.connect(remoteAddress);
859 }
860
861 @Override
862 public final Future<Void> connect(SocketAddress remoteAddress, SocketAddress localAddress) {
863 return tail.connect(remoteAddress, localAddress);
864 }
865
866 @Override
867 public final Future<Void> disconnect() {
868 return tail.disconnect();
869 }
870
871 @Override
872 public final Future<Void> close() {
873 return tail.close();
874 }
875
876 @Override
877 public Future<Void> shutdown(ChannelShutdownDirection direction) {
878 return tail.shutdown(direction);
879 }
880
881 @Override
882 public final Future<Void> register() {
883 return tail.register();
884 }
885
886 @Override
887 public final Future<Void> deregister() {
888 return tail.deregister();
889 }
890
891 @Override
892 public final ChannelPipeline flush() {
893 tail.flush();
894 return this;
895 }
896
897 @Override
898 public final ChannelPipeline read() {
899 tail.read();
900 return this;
901 }
902
903 @Override
904 public final Future<Void> write(Object msg) {
905 return tail.write(msg);
906 }
907
908 @Override
909 public final Future<Void> writeAndFlush(Object msg) {
910 return tail.writeAndFlush(msg);
911 }
912
913 @Override
914 public final Future<Void> sendOutboundEvent(Object event) {
915 return tail.sendOutboundEvent(event);
916 }
917
918 @Override
919 public final <V> Promise<V> newPromise() {
920 return ChannelPipeline.super.newPromise();
921 }
922
923 @Override
924 public final Future<Void> newSucceededFuture() {
925 return succeededFuture;
926 }
927 @Override
928 public final <V> Future<V> newSucceededFuture(V value) {
929 return ChannelPipeline.super.newSucceededFuture(value);
930 }
931
932 @Override
933 public final <V> Future<V> newFailedFuture(Throwable cause) {
934 return ChannelPipeline.super.newFailedFuture(cause);
935 }
936
937
938
939
940
941 protected void onUnhandledInboundException(Throwable cause) {
942 try {
943 logger.warn(
944 "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
945 "It usually means the last handler in the pipeline did not handle the exception.",
946 cause);
947 } finally {
948 Resource.dispose(cause);
949 }
950 }
951
952
953
954
955
956 protected void onUnhandledInboundChannelActive() {
957 }
958
959
960
961
962
963 protected void onUnhandledInboundChannelInactive() {
964 }
965
966
967
968
969
970 protected void onUnhandledInboundChannelShutdown(ChannelShutdownDirection direction) {
971 }
972
973
974
975
976
977
978 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
979 try {
980 logger.debug(
981 "Discarded inbound message {} that reached at the tail of the pipeline. " +
982 "Please check your pipeline configuration. Discarded message pipeline : {}. Channel : {}.",
983 msg, ctx.pipeline().names(), ctx.channel());
984 } finally {
985 Resource.dispose(msg);
986 }
987 }
988
989
990
991
992
993 protected void onUnhandledInboundChannelReadComplete() {
994 }
995
996
997
998
999
1000
1001 protected void onUnhandledChannelInboundEvent(Object evt) {
1002
1003
1004 Resource.dispose(evt);
1005 }
1006
1007
1008
1009
1010
1011 protected void onUnhandledChannelWritabilityChanged() {
1012 }
1013
1014
1015
1016
1017
1018
1019 protected abstract void pendingOutboundBytesUpdated(long pendingOutboundBytes);
1020
1021 @Override
1022 public final long pendingOutboundBytes() {
1023 return TOTAL_PENDING_OUTBOUND_BYTES_UPDATER.get(this);
1024 }
1025
1026
1027
1028
1029
1030
1031 final void incrementPendingOutboundBytes(long delta) {
1032 assert delta > 0;
1033 long pending = TOTAL_PENDING_OUTBOUND_BYTES_UPDATER.addAndGet(this, delta);
1034 if (pending < 0) {
1035 forceCloseTransport();
1036 throw new IllegalStateException("pendingOutboundBytes overflowed, force closed transport.");
1037 }
1038 pendingOutboundBytesUpdated(pending);
1039 }
1040
1041
1042
1043
1044
1045
1046 final void decrementPendingOutboundBytes(long delta) {
1047 assert delta > 0;
1048 long pending = TOTAL_PENDING_OUTBOUND_BYTES_UPDATER.addAndGet(this, -delta);
1049 if (pending < 0) {
1050 forceCloseTransport();
1051 throw new IllegalStateException("pendingOutboundBytes underflowed, force closed transport.");
1052 }
1053 pendingOutboundBytesUpdated(pending);
1054 }
1055
1056 private static DefaultChannelPipeline defaultChannelPipeline(ChannelHandlerContext ctx) {
1057 return (DefaultChannelPipeline) ctx.pipeline();
1058 }
1059
1060
1061 private static final class TailHandler implements ChannelHandler {
1062
1063 @Override
1064 public void channelRegistered(ChannelHandlerContext ctx) {
1065
1066 }
1067
1068 @Override
1069 public void channelUnregistered(ChannelHandlerContext ctx) {
1070
1071 }
1072
1073 @Override
1074 public void channelActive(ChannelHandlerContext ctx) {
1075 defaultChannelPipeline(ctx).onUnhandledInboundChannelActive();
1076 }
1077
1078 @Override
1079 public void channelInactive(ChannelHandlerContext ctx) {
1080 defaultChannelPipeline(ctx).onUnhandledInboundChannelInactive();
1081 }
1082
1083 @Override
1084 public void channelShutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) {
1085 defaultChannelPipeline(ctx).onUnhandledInboundChannelShutdown(direction);
1086 }
1087
1088 @Override
1089 public void channelWritabilityChanged(ChannelHandlerContext ctx) {
1090 defaultChannelPipeline(ctx).onUnhandledChannelWritabilityChanged();
1091 }
1092
1093 @Override
1094 public void channelInboundEvent(ChannelHandlerContext ctx, Object evt) {
1095 ((DefaultChannelPipeline) ctx.pipeline()).onUnhandledChannelInboundEvent(evt);
1096 }
1097
1098 @Override
1099 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
1100 defaultChannelPipeline(ctx).onUnhandledInboundException(cause);
1101 }
1102
1103 @Override
1104 public void channelRead(ChannelHandlerContext ctx, Object msg) {
1105 defaultChannelPipeline(ctx).onUnhandledInboundMessage(ctx, msg);
1106 }
1107
1108 @Override
1109 public void channelReadComplete(ChannelHandlerContext ctx) {
1110 defaultChannelPipeline(ctx).onUnhandledInboundChannelReadComplete();
1111 }
1112 }
1113
1114 private static final class HeadHandler implements ChannelHandler {
1115
1116 @Override
1117 public Future<Void> bind(
1118 ChannelHandlerContext ctx, SocketAddress localAddress) {
1119 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1120 EventExecutor executor = pipeline.transportExecutor();
1121 Promise<Void> promise = executor.newPromise();
1122 if (executor.inEventLoop()) {
1123 pipeline.bindTransport(localAddress, promise);
1124 } else {
1125 safeExecute(executor, () -> pipeline.bindTransport(localAddress, promise), promise, null);
1126 }
1127 return promise.asFuture();
1128 }
1129
1130 @Override
1131 public Future<Void> connect(
1132 ChannelHandlerContext ctx,
1133 SocketAddress remoteAddress, SocketAddress localAddress) {
1134 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1135 EventExecutor executor = pipeline.transportExecutor();
1136 Promise<Void> promise = executor.newPromise();
1137 if (executor.inEventLoop()) {
1138 pipeline.connectTransport(remoteAddress, localAddress, promise);
1139 } else {
1140 safeExecute(executor, () ->
1141 pipeline.connectTransport(remoteAddress, localAddress, promise), promise, null);
1142 }
1143 return promise.asFuture();
1144 }
1145
1146 @Override
1147 public Future<Void> disconnect(ChannelHandlerContext ctx) {
1148 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1149 EventExecutor executor = pipeline.transportExecutor();
1150 Promise<Void> promise = executor.newPromise();
1151 if (executor.inEventLoop()) {
1152 pipeline.disconnectTransport(promise);
1153 } else {
1154 safeExecute(executor, () -> pipeline.disconnectTransport(promise), promise, null);
1155 }
1156 return promise.asFuture();
1157 }
1158
1159 @Override
1160 public Future<Void> close(ChannelHandlerContext ctx) {
1161 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1162 EventExecutor executor = pipeline.transportExecutor();
1163 Promise<Void> promise = executor.newPromise();
1164 if (executor.inEventLoop()) {
1165 pipeline.closeTransport(promise);
1166 } else {
1167 safeExecute(executor, () -> pipeline.closeTransport(promise), promise, null);
1168 }
1169 return promise.asFuture();
1170 }
1171
1172 @Override
1173 public Future<Void> shutdown(ChannelHandlerContext ctx, ChannelShutdownDirection direction) {
1174 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1175 EventExecutor executor = pipeline.transportExecutor();
1176 Promise<Void> promise = executor.newPromise();
1177 if (executor.inEventLoop()) {
1178 pipeline.shutdownTransport(direction, promise);
1179 } else {
1180 safeExecute(executor, () -> pipeline.shutdownTransport(direction, promise), promise, null);
1181 }
1182 return promise.asFuture();
1183 }
1184
1185 @Override
1186 public Future<Void> register(ChannelHandlerContext ctx) {
1187 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1188 EventExecutor executor = pipeline.transportExecutor();
1189 Promise<Void> promise = executor.newPromise();
1190 if (executor.inEventLoop()) {
1191 pipeline.registerTransport(promise);
1192 } else {
1193 safeExecute(executor, () -> pipeline.registerTransport(promise), promise, null);
1194 }
1195 return promise.asFuture();
1196 }
1197
1198 @Override
1199 public Future<Void> deregister(ChannelHandlerContext ctx) {
1200 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1201 EventExecutor executor = pipeline.transportExecutor();
1202 Promise<Void> promise = executor.newPromise();
1203 if (executor.inEventLoop()) {
1204 pipeline.deregisterTransport(promise);
1205 } else {
1206 safeExecute(executor, () -> pipeline.deregisterTransport(promise), promise, null);
1207 }
1208 return promise.asFuture();
1209 }
1210
1211 @Override
1212 public void read(ChannelHandlerContext ctx) {
1213 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1214 EventExecutor executor = pipeline.transportExecutor();
1215 if (executor.inEventLoop()) {
1216 pipeline.readTransport();
1217 } else {
1218 safeExecute(executor, pipeline::readTransport, null, null);
1219 }
1220 }
1221
1222 @Override
1223 public Future<Void> write(ChannelHandlerContext ctx, Object msg) {
1224 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1225 EventExecutor executor = pipeline.transportExecutor();
1226 Promise<Void> promise = executor.newPromise();
1227 if (executor.inEventLoop()) {
1228 pipeline.writeTransport(msg, promise);
1229 } else {
1230 safeExecute(executor, () -> pipeline.writeTransport(msg, promise), promise, msg);
1231 }
1232 return promise.asFuture();
1233 }
1234
1235 @Override
1236 public void flush(ChannelHandlerContext ctx) {
1237 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1238 EventExecutor executor = pipeline.transportExecutor();
1239 if (executor.inEventLoop()) {
1240 pipeline.flushTransport();
1241 } else {
1242 safeExecute(executor, pipeline::flushTransport, null, null);
1243 }
1244 }
1245
1246 @Override
1247 public Future<Void> sendOutboundEvent(ChannelHandlerContext ctx, Object event) {
1248 DefaultChannelPipeline pipeline = defaultChannelPipeline(ctx);
1249 EventExecutor executor = pipeline.transportExecutor();
1250 Promise<Void> promise = executor.newPromise();
1251 if (executor.inEventLoop()) {
1252 pipeline.sendOutboundEventTransport(event, promise);
1253 } else {
1254 safeExecute(executor, () -> pipeline.sendOutboundEventTransport(event, promise), promise, event);
1255 }
1256 return promise.asFuture();
1257 }
1258 }
1259
1260 final void forceCloseTransport() {
1261 EventExecutor executor = transportExecutor();
1262 Promise<Void> promise = executor.newPromise();
1263 if (executor.inEventLoop()) {
1264 closeTransport(promise);
1265 } else {
1266 safeExecute(executor, () -> closeTransport(promise), promise, null);
1267 }
1268 }
1269
1270
1271
1272
1273
1274
1275 protected abstract EventExecutor transportExecutor();
1276
1277
1278
1279
1280
1281 protected abstract void registerTransport(Promise<Void> promise);
1282
1283
1284
1285
1286
1287 protected abstract void bindTransport(SocketAddress localAddress, Promise<Void> promise);
1288
1289
1290
1291
1292
1293
1294
1295
1296 protected abstract void connectTransport(
1297 SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise);
1298
1299
1300
1301
1302
1303 protected abstract void disconnectTransport(Promise<Void> promise);
1304
1305
1306
1307
1308
1309 protected abstract void closeTransport(Promise<Void> promise);
1310
1311
1312
1313
1314
1315 protected abstract void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise);
1316
1317
1318
1319
1320
1321
1322 protected abstract void deregisterTransport(Promise<Void> promise);
1323
1324
1325
1326
1327
1328
1329 protected abstract void readTransport();
1330
1331
1332
1333
1334
1335
1336 protected abstract void writeTransport(Object msg, Promise<Void> promise);
1337
1338
1339
1340
1341
1342 protected abstract void flushTransport();
1343
1344
1345
1346
1347
1348 protected abstract void sendOutboundEventTransport(Object event, Promise<Void> promise);
1349 }