1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.embedded;
17
18 import io.netty.channel.AbstractChannel;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelId;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelMetadata;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.DefaultChannelConfig;
32 import io.netty.channel.DefaultChannelPipeline;
33 import io.netty.channel.EventLoop;
34 import io.netty.channel.RecvByteBufAllocator;
35 import io.netty.util.ReferenceCountUtil;
36 import io.netty.util.internal.ObjectUtil;
37 import io.netty.util.internal.PlatformDependent;
38 import io.netty.util.internal.RecyclableArrayList;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.net.SocketAddress;
43 import java.nio.channels.ClosedChannelException;
44 import java.util.ArrayDeque;
45 import java.util.Queue;
46 import java.util.concurrent.TimeUnit;
47
48
49
50
51 public class EmbeddedChannel extends AbstractChannel {
52
53 private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
54 private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
55
56 private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
57 private enum State { OPEN, ACTIVE, CLOSED }
58
59 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
60
61 private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
62 private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
63
64 private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
65 private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
66 @Override
67 public void operationComplete(ChannelFuture future) throws Exception {
68 recordException(future);
69 }
70 };
71
72 private final ChannelMetadata metadata;
73 private final ChannelConfig config;
74
75 private Queue<Object> inboundMessages;
76 private Queue<Object> outboundMessages;
77 private Throwable lastException;
78 private State state;
79 private int executingStackCnt;
80 private boolean cancelRemainingScheduledTasks;
81
82
83
84
85 public EmbeddedChannel() {
86 this(EMPTY_HANDLERS);
87 }
88
89
90
91
92
93
94 public EmbeddedChannel(ChannelId channelId) {
95 this(channelId, EMPTY_HANDLERS);
96 }
97
98
99
100
101
102
103 public EmbeddedChannel(ChannelHandler... handlers) {
104 this(EmbeddedChannelId.INSTANCE, handlers);
105 }
106
107
108
109
110
111
112
113
114 public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
115 this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
116 }
117
118
119
120
121
122
123
124
125
126
127 public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
128 this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
129 }
130
131
132
133
134
135
136
137
138 public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
139 this(channelId, false, handlers);
140 }
141
142
143
144
145
146
147
148
149
150
151 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
152 this(channelId, true, hasDisconnect, handlers);
153 }
154
155
156
157
158
159
160
161
162
163
164
165
166 public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
167 ChannelHandler... handlers) {
168 this(null, channelId, register, hasDisconnect, handlers);
169 }
170
171
172
173
174
175
176
177
178
179
180
181
182
183 public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
184 final ChannelHandler... handlers) {
185 super(parent, channelId);
186 metadata = metadata(hasDisconnect);
187 config = new DefaultChannelConfig(this);
188 setup(register, handlers);
189 }
190
191
192
193
194
195
196
197
198
199
200
201 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
202 final ChannelHandler... handlers) {
203 super(null, channelId);
204 metadata = metadata(hasDisconnect);
205 this.config = ObjectUtil.checkNotNull(config, "config");
206 setup(true, handlers);
207 }
208
209 private static ChannelMetadata metadata(boolean hasDisconnect) {
210 return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
211 }
212
213 private void setup(boolean register, final ChannelHandler... handlers) {
214 ObjectUtil.checkNotNull(handlers, "handlers");
215 ChannelPipeline p = pipeline();
216 p.addLast(new ChannelInitializer<Channel>() {
217 @Override
218 protected void initChannel(Channel ch) throws Exception {
219 ChannelPipeline pipeline = ch.pipeline();
220 for (ChannelHandler h: handlers) {
221 if (h == null) {
222 break;
223 }
224 pipeline.addLast(h);
225 }
226 }
227 });
228 if (register) {
229 ChannelFuture future = loop.register(this);
230 assert future.isDone();
231 }
232 }
233
234
235
236
237 public void register() throws Exception {
238 ChannelFuture future = loop.register(this);
239 assert future.isDone();
240 Throwable cause = future.cause();
241 if (cause != null) {
242 PlatformDependent.throwException(cause);
243 }
244 }
245
246 @Override
247 protected final DefaultChannelPipeline newChannelPipeline() {
248 return new EmbeddedChannelPipeline(this);
249 }
250
251 @Override
252 public ChannelMetadata metadata() {
253 return metadata;
254 }
255
256 @Override
257 public ChannelConfig config() {
258 return config;
259 }
260
261 @Override
262 public boolean isOpen() {
263 return state != State.CLOSED;
264 }
265
266 @Override
267 public boolean isActive() {
268 return state == State.ACTIVE;
269 }
270
271
272
273
274 public Queue<Object> inboundMessages() {
275 if (inboundMessages == null) {
276 inboundMessages = new ArrayDeque<Object>();
277 }
278 return inboundMessages;
279 }
280
281
282
283
284 @Deprecated
285 public Queue<Object> lastInboundBuffer() {
286 return inboundMessages();
287 }
288
289
290
291
292 public Queue<Object> outboundMessages() {
293 if (outboundMessages == null) {
294 outboundMessages = new ArrayDeque<Object>();
295 }
296 return outboundMessages;
297 }
298
299
300
301
302 @Deprecated
303 public Queue<Object> lastOutboundBuffer() {
304 return outboundMessages();
305 }
306
307
308
309
310 @SuppressWarnings("unchecked")
311 public <T> T readInbound() {
312 T message = (T) poll(inboundMessages);
313 if (message != null) {
314 ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
315 }
316 return message;
317 }
318
319
320
321
322 @SuppressWarnings("unchecked")
323 public <T> T readOutbound() {
324 T message = (T) poll(outboundMessages);
325 if (message != null) {
326 ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
327 }
328 return message;
329 }
330
331
332
333
334
335
336
337
338 public boolean writeInbound(Object... msgs) {
339 ensureOpen();
340 if (msgs.length == 0) {
341 return isNotEmpty(inboundMessages);
342 }
343
344 executingStackCnt++;
345 try {
346 ChannelPipeline p = pipeline();
347 for (Object m : msgs) {
348 p.fireChannelRead(m);
349 }
350
351 flushInbound(false, voidPromise());
352 } finally {
353 executingStackCnt--;
354 maybeRunPendingTasks();
355 }
356 return isNotEmpty(inboundMessages);
357 }
358
359
360
361
362
363
364
365 public ChannelFuture writeOneInbound(Object msg) {
366 return writeOneInbound(msg, newPromise());
367 }
368
369
370
371
372
373
374
375 public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
376 executingStackCnt++;
377 try {
378 if (checkOpen(true)) {
379 pipeline().fireChannelRead(msg);
380 }
381 } finally {
382 executingStackCnt--;
383 maybeRunPendingTasks();
384 }
385 return checkException(promise);
386 }
387
388
389
390
391
392
393 public EmbeddedChannel flushInbound() {
394 flushInbound(true, voidPromise());
395 return this;
396 }
397
398 private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
399 executingStackCnt++;
400 try {
401 if (checkOpen(recordException)) {
402 pipeline().fireChannelReadComplete();
403 runPendingTasks();
404 }
405 } finally {
406 executingStackCnt--;
407 maybeRunPendingTasks();
408 }
409
410 return checkException(promise);
411 }
412
413
414
415
416
417
418
419 public boolean writeOutbound(Object... msgs) {
420 ensureOpen();
421 if (msgs.length == 0) {
422 return isNotEmpty(outboundMessages);
423 }
424
425 executingStackCnt++;
426 RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
427 try {
428 try {
429 for (Object m : msgs) {
430 if (m == null) {
431 break;
432 }
433 futures.add(write(m));
434 }
435
436 flushOutbound0();
437
438 int size = futures.size();
439 for (int i = 0; i < size; i++) {
440 ChannelFuture future = (ChannelFuture) futures.get(i);
441 if (future.isDone()) {
442 recordException(future);
443 } else {
444
445 future.addListener(recordExceptionListener);
446 }
447 }
448 } finally {
449 executingStackCnt--;
450 maybeRunPendingTasks();
451 }
452 checkException();
453 return isNotEmpty(outboundMessages);
454 } finally {
455 futures.recycle();
456 }
457 }
458
459
460
461
462
463
464
465 public ChannelFuture writeOneOutbound(Object msg) {
466 return writeOneOutbound(msg, newPromise());
467 }
468
469
470
471
472
473
474
475 public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
476 executingStackCnt++;
477 try {
478 if (checkOpen(true)) {
479 return write(msg, promise);
480 }
481 } finally {
482 executingStackCnt--;
483 maybeRunPendingTasks();
484 }
485
486 return checkException(promise);
487 }
488
489
490
491
492
493
494 public EmbeddedChannel flushOutbound() {
495 executingStackCnt++;
496 try {
497 if (checkOpen(true)) {
498 flushOutbound0();
499 }
500 } finally {
501 executingStackCnt--;
502 maybeRunPendingTasks();
503 }
504 checkException(voidPromise());
505 return this;
506 }
507
508 private void flushOutbound0() {
509
510
511 runPendingTasks();
512
513 flush();
514 }
515
516
517
518
519
520
521 public boolean finish() {
522 return finish(false);
523 }
524
525
526
527
528
529
530
531 public boolean finishAndReleaseAll() {
532 return finish(true);
533 }
534
535
536
537
538
539
540
541 private boolean finish(boolean releaseAll) {
542 executingStackCnt++;
543 try {
544 close();
545 } finally {
546 executingStackCnt--;
547 maybeRunPendingTasks();
548 }
549 try {
550 checkException();
551 return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
552 } finally {
553 if (releaseAll) {
554 releaseAll(inboundMessages);
555 releaseAll(outboundMessages);
556 }
557 }
558 }
559
560
561
562
563
564 public boolean releaseInbound() {
565 return releaseAll(inboundMessages);
566 }
567
568
569
570
571
572 public boolean releaseOutbound() {
573 return releaseAll(outboundMessages);
574 }
575
576 private static boolean releaseAll(Queue<Object> queue) {
577 if (isNotEmpty(queue)) {
578 for (;;) {
579 Object msg = queue.poll();
580 if (msg == null) {
581 break;
582 }
583 ReferenceCountUtil.release(msg);
584 }
585 return true;
586 }
587 return false;
588 }
589
590 @Override
591 public final ChannelFuture close() {
592 return close(newPromise());
593 }
594
595 @Override
596 public final ChannelFuture disconnect() {
597 return disconnect(newPromise());
598 }
599
600 @Override
601 public final ChannelFuture close(ChannelPromise promise) {
602
603
604 executingStackCnt++;
605 ChannelFuture future;
606 try {
607 runPendingTasks();
608 future = super.close(promise);
609
610 cancelRemainingScheduledTasks = true;
611 } finally {
612 executingStackCnt--;
613 maybeRunPendingTasks();
614 }
615 return future;
616 }
617
618 @Override
619 public final ChannelFuture disconnect(ChannelPromise promise) {
620 executingStackCnt++;
621 ChannelFuture future;
622 try {
623 future = super.disconnect(promise);
624
625 if (!metadata.hasDisconnect()) {
626 cancelRemainingScheduledTasks = true;
627 }
628 } finally {
629 executingStackCnt--;
630 maybeRunPendingTasks();
631 }
632 return future;
633 }
634
635 @Override
636 public ChannelFuture bind(SocketAddress localAddress) {
637 executingStackCnt++;
638 try {
639 return super.bind(localAddress);
640 } finally {
641 executingStackCnt--;
642 maybeRunPendingTasks();
643 }
644 }
645
646 @Override
647 public ChannelFuture connect(SocketAddress remoteAddress) {
648 executingStackCnt++;
649 try {
650 return super.connect(remoteAddress);
651 } finally {
652 executingStackCnt--;
653 maybeRunPendingTasks();
654 }
655 }
656
657 @Override
658 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
659 executingStackCnt++;
660 try {
661 return super.connect(remoteAddress, localAddress);
662 } finally {
663 executingStackCnt--;
664 maybeRunPendingTasks();
665 }
666 }
667
668 @Override
669 public ChannelFuture deregister() {
670 executingStackCnt++;
671 try {
672 return super.deregister();
673 } finally {
674 executingStackCnt--;
675 maybeRunPendingTasks();
676 }
677 }
678
679 @Override
680 public Channel flush() {
681 executingStackCnt++;
682 try {
683 return super.flush();
684 } finally {
685 executingStackCnt--;
686 maybeRunPendingTasks();
687 }
688 }
689
690 @Override
691 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
692 executingStackCnt++;
693 try {
694 return super.bind(localAddress, promise);
695 } finally {
696 executingStackCnt--;
697 maybeRunPendingTasks();
698 }
699 }
700
701 @Override
702 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
703 executingStackCnt++;
704 try {
705 return super.connect(remoteAddress, promise);
706 } finally {
707 executingStackCnt--;
708 maybeRunPendingTasks();
709 }
710 }
711
712 @Override
713 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
714 executingStackCnt++;
715 try {
716 return super.connect(remoteAddress, localAddress, promise);
717 } finally {
718 executingStackCnt--;
719 maybeRunPendingTasks();
720 }
721 }
722
723 @Override
724 public ChannelFuture deregister(ChannelPromise promise) {
725 executingStackCnt++;
726 try {
727 return super.deregister(promise);
728 } finally {
729 executingStackCnt--;
730 maybeRunPendingTasks();
731 }
732 }
733
734 @Override
735 public Channel read() {
736 executingStackCnt++;
737 try {
738 return super.read();
739 } finally {
740 executingStackCnt--;
741 maybeRunPendingTasks();
742 }
743 }
744
745 @Override
746 public ChannelFuture write(Object msg) {
747 executingStackCnt++;
748 try {
749 return super.write(msg);
750 } finally {
751 executingStackCnt--;
752 maybeRunPendingTasks();
753 }
754 }
755
756 @Override
757 public ChannelFuture write(Object msg, ChannelPromise promise) {
758 executingStackCnt++;
759 try {
760 return super.write(msg, promise);
761 } finally {
762 executingStackCnt--;
763 maybeRunPendingTasks();
764 }
765 }
766
767 @Override
768 public ChannelFuture writeAndFlush(Object msg) {
769 executingStackCnt++;
770 try {
771 return super.writeAndFlush(msg);
772 } finally {
773 executingStackCnt--;
774 maybeRunPendingTasks();
775 }
776 }
777
778 @Override
779 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
780 executingStackCnt++;
781 try {
782 return super.writeAndFlush(msg, promise);
783 } finally {
784 executingStackCnt--;
785 maybeRunPendingTasks();
786 }
787 }
788
789 private static boolean isNotEmpty(Queue<Object> queue) {
790 return queue != null && !queue.isEmpty();
791 }
792
793 private static Object poll(Queue<Object> queue) {
794 return queue != null ? queue.poll() : null;
795 }
796
797 private void maybeRunPendingTasks() {
798 if (executingStackCnt == 0) {
799 runPendingTasks();
800
801 if (cancelRemainingScheduledTasks) {
802
803 embeddedEventLoop().cancelScheduledTasks();
804 }
805 }
806 }
807
808
809
810
811
812 public void runPendingTasks() {
813 try {
814 embeddedEventLoop().runTasks();
815 } catch (Exception e) {
816 recordException(e);
817 }
818
819 try {
820 embeddedEventLoop().runScheduledTasks();
821 } catch (Exception e) {
822 recordException(e);
823 }
824 }
825
826
827
828
829
830
831
832
833 public boolean hasPendingTasks() {
834 return embeddedEventLoop().hasPendingNormalTasks() ||
835 embeddedEventLoop().nextScheduledTask() == 0;
836 }
837
838
839
840
841
842
843 public long runScheduledPendingTasks() {
844 try {
845 return embeddedEventLoop().runScheduledTasks();
846 } catch (Exception e) {
847 recordException(e);
848 return embeddedEventLoop().nextScheduledTask();
849 }
850 }
851
852 private void recordException(ChannelFuture future) {
853 if (!future.isSuccess()) {
854 recordException(future.cause());
855 }
856 }
857
858 private void recordException(Throwable cause) {
859 if (lastException == null) {
860 lastException = cause;
861 } else {
862 logger.warn(
863 "More than one exception was raised. " +
864 "Will report only the first one and log others.", cause);
865 }
866 }
867
868
869
870
871
872 public void advanceTimeBy(long duration, TimeUnit unit) {
873 embeddedEventLoop().advanceTimeBy(unit.toNanos(duration));
874 }
875
876
877
878
879
880
881 public void freezeTime() {
882 embeddedEventLoop().freezeTime();
883 }
884
885
886
887
888
889
890
891
892 public void unfreezeTime() {
893 embeddedEventLoop().unfreezeTime();
894 }
895
896
897
898
899 private ChannelFuture checkException(ChannelPromise promise) {
900 Throwable t = lastException;
901 if (t != null) {
902 lastException = null;
903
904 if (promise.isVoid()) {
905 PlatformDependent.throwException(t);
906 }
907
908 return promise.setFailure(t);
909 }
910
911 return promise.setSuccess();
912 }
913
914
915
916
917 public void checkException() {
918 checkException(voidPromise());
919 }
920
921
922
923
924
925 private boolean checkOpen(boolean recordException) {
926 if (!isOpen()) {
927 if (recordException) {
928 recordException(new ClosedChannelException());
929 }
930 return false;
931 }
932
933 return true;
934 }
935
936 private EmbeddedEventLoop embeddedEventLoop() {
937 if (isRegistered()) {
938 return (EmbeddedEventLoop) super.eventLoop();
939 }
940
941 return loop;
942 }
943
944
945
946
947 protected final void ensureOpen() {
948 if (!checkOpen(true)) {
949 checkException();
950 }
951 }
952
953 @Override
954 protected boolean isCompatible(EventLoop loop) {
955 return loop instanceof EmbeddedEventLoop;
956 }
957
958 @Override
959 protected SocketAddress localAddress0() {
960 return isActive()? LOCAL_ADDRESS : null;
961 }
962
963 @Override
964 protected SocketAddress remoteAddress0() {
965 return isActive()? REMOTE_ADDRESS : null;
966 }
967
968 @Override
969 protected void doRegister() throws Exception {
970 state = State.ACTIVE;
971 }
972
973 @Override
974 protected void doBind(SocketAddress localAddress) throws Exception {
975
976 }
977
978 @Override
979 protected void doDisconnect() throws Exception {
980 if (!metadata.hasDisconnect()) {
981 doClose();
982 }
983 }
984
985 @Override
986 protected void doClose() throws Exception {
987 state = State.CLOSED;
988 }
989
990 @Override
991 protected void doBeginRead() throws Exception {
992
993 }
994
995 @Override
996 protected AbstractUnsafe newUnsafe() {
997 return new EmbeddedUnsafe();
998 }
999
1000 @Override
1001 public Unsafe unsafe() {
1002 return ((EmbeddedUnsafe) super.unsafe()).wrapped;
1003 }
1004
1005 @Override
1006 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
1007 for (;;) {
1008 Object msg = in.current();
1009 if (msg == null) {
1010 break;
1011 }
1012
1013 ReferenceCountUtil.retain(msg);
1014 handleOutboundMessage(msg);
1015 in.remove();
1016 }
1017 }
1018
1019
1020
1021
1022
1023
1024 protected void handleOutboundMessage(Object msg) {
1025 outboundMessages().add(msg);
1026 }
1027
1028
1029
1030
1031 protected void handleInboundMessage(Object msg) {
1032 inboundMessages().add(msg);
1033 }
1034
1035 private final class EmbeddedUnsafe extends AbstractUnsafe {
1036
1037
1038
1039 final Unsafe wrapped = new Unsafe() {
1040 @Override
1041 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
1042 return EmbeddedUnsafe.this.recvBufAllocHandle();
1043 }
1044
1045 @Override
1046 public SocketAddress localAddress() {
1047 return EmbeddedUnsafe.this.localAddress();
1048 }
1049
1050 @Override
1051 public SocketAddress remoteAddress() {
1052 return EmbeddedUnsafe.this.remoteAddress();
1053 }
1054
1055 @Override
1056 public void register(EventLoop eventLoop, ChannelPromise promise) {
1057 executingStackCnt++;
1058 try {
1059 EmbeddedUnsafe.this.register(eventLoop, promise);
1060 } finally {
1061 executingStackCnt--;
1062 maybeRunPendingTasks();
1063 }
1064 }
1065
1066 @Override
1067 public void bind(SocketAddress localAddress, ChannelPromise promise) {
1068 executingStackCnt++;
1069 try {
1070 EmbeddedUnsafe.this.bind(localAddress, promise);
1071 } finally {
1072 executingStackCnt--;
1073 maybeRunPendingTasks();
1074 }
1075 }
1076
1077 @Override
1078 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1079 executingStackCnt++;
1080 try {
1081 EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
1082 } finally {
1083 executingStackCnt--;
1084 maybeRunPendingTasks();
1085 }
1086 }
1087
1088 @Override
1089 public void disconnect(ChannelPromise promise) {
1090 executingStackCnt++;
1091 try {
1092 EmbeddedUnsafe.this.disconnect(promise);
1093 } finally {
1094 executingStackCnt--;
1095 maybeRunPendingTasks();
1096 }
1097 }
1098
1099 @Override
1100 public void close(ChannelPromise promise) {
1101 executingStackCnt++;
1102 try {
1103 EmbeddedUnsafe.this.close(promise);
1104 } finally {
1105 executingStackCnt--;
1106 maybeRunPendingTasks();
1107 }
1108 }
1109
1110 @Override
1111 public void closeForcibly() {
1112 executingStackCnt++;
1113 try {
1114 EmbeddedUnsafe.this.closeForcibly();
1115 } finally {
1116 executingStackCnt--;
1117 maybeRunPendingTasks();
1118 }
1119 }
1120
1121 @Override
1122 public void deregister(ChannelPromise promise) {
1123 executingStackCnt++;
1124 try {
1125 EmbeddedUnsafe.this.deregister(promise);
1126 } finally {
1127 executingStackCnt--;
1128 maybeRunPendingTasks();
1129 }
1130 }
1131
1132 @Override
1133 public void beginRead() {
1134 executingStackCnt++;
1135 try {
1136 EmbeddedUnsafe.this.beginRead();
1137 } finally {
1138 executingStackCnt--;
1139 maybeRunPendingTasks();
1140 }
1141 }
1142
1143 @Override
1144 public void write(Object msg, ChannelPromise promise) {
1145 executingStackCnt++;
1146 try {
1147 EmbeddedUnsafe.this.write(msg, promise);
1148 } finally {
1149 executingStackCnt--;
1150 maybeRunPendingTasks();
1151 }
1152 }
1153
1154 @Override
1155 public void flush() {
1156 executingStackCnt++;
1157 try {
1158 EmbeddedUnsafe.this.flush();
1159 } finally {
1160 executingStackCnt--;
1161 maybeRunPendingTasks();
1162 }
1163 }
1164
1165 @Override
1166 public ChannelPromise voidPromise() {
1167 return EmbeddedUnsafe.this.voidPromise();
1168 }
1169
1170 @Override
1171 public ChannelOutboundBuffer outboundBuffer() {
1172 return EmbeddedUnsafe.this.outboundBuffer();
1173 }
1174 };
1175
1176 @Override
1177 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1178 safeSetSuccess(promise);
1179 }
1180 }
1181
1182 private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
1183 EmbeddedChannelPipeline(EmbeddedChannel channel) {
1184 super(channel);
1185 }
1186
1187 @Override
1188 protected void onUnhandledInboundException(Throwable cause) {
1189 recordException(cause);
1190 }
1191
1192 @Override
1193 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1194 handleInboundMessage(msg);
1195 }
1196 }
1197 }