1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.embedded;
17
18 import java.net.SocketAddress;
19 import java.nio.channels.ClosedChannelException;
20 import java.util.ArrayDeque;
21 import java.util.Queue;
22 import java.util.concurrent.TimeUnit;
23
24 import io.netty.channel.AbstractChannel;
25 import io.netty.channel.Channel;
26 import io.netty.channel.ChannelConfig;
27 import io.netty.channel.ChannelFuture;
28 import io.netty.channel.ChannelFutureListener;
29 import io.netty.channel.ChannelHandler;
30 import io.netty.channel.ChannelHandlerContext;
31 import io.netty.channel.ChannelId;
32 import io.netty.channel.ChannelInitializer;
33 import io.netty.channel.ChannelMetadata;
34 import io.netty.channel.ChannelOutboundBuffer;
35 import io.netty.channel.ChannelPipeline;
36 import io.netty.channel.ChannelPromise;
37 import io.netty.channel.DefaultChannelConfig;
38 import io.netty.channel.DefaultChannelPipeline;
39 import io.netty.channel.EventLoop;
40 import io.netty.channel.RecvByteBufAllocator;
41 import io.netty.util.ReferenceCountUtil;
42 import io.netty.util.internal.ObjectUtil;
43 import io.netty.util.internal.PlatformDependent;
44 import io.netty.util.internal.RecyclableArrayList;
45 import io.netty.util.internal.logging.InternalLogger;
46 import io.netty.util.internal.logging.InternalLoggerFactory;
47
48
49
50
51 public class EmbeddedChannel extends AbstractChannel {
52
53 private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
54 private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
55
56 private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
57 private enum State { OPEN, ACTIVE, CLOSED }
58
59 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
60
61 private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
62 private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
63
64 private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
65 private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
66 @Override
67 public void operationComplete(ChannelFuture future) throws Exception {
68 recordException(future);
69 }
70 };
71
72 private final ChannelMetadata metadata;
73 private final ChannelConfig config;
74
75 private Queue<Object> inboundMessages;
76 private Queue<Object> outboundMessages;
77 private Throwable lastException;
78 private State state;
79 private int executingStackCnt;
80
81
82
83
84 public EmbeddedChannel() {
85 this(EMPTY_HANDLERS);
86 }
87
88
89
90
91
92
93 public EmbeddedChannel(ChannelId channelId) {
94 this(channelId, EMPTY_HANDLERS);
95 }
96
97
98
99
100
101
102 public EmbeddedChannel(ChannelHandler... handlers) {
103 this(EmbeddedChannelId.INSTANCE, handlers);
104 }
105
106
107
108
109
110
111
112
113 public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
114 this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
115 }
116
117
118
119
120
121
122
123
124
125
126 public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
127 this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
128 }
129
130
131
132
133
134
135
136
137 public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
138 this(channelId, false, handlers);
139 }
140
141
142
143
144
145
146
147
148
149
150 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
151 this(channelId, true, hasDisconnect, handlers);
152 }
153
154
155
156
157
158
159
160
161
162
163
164
165 public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
166 ChannelHandler... handlers) {
167 this(null, channelId, register, hasDisconnect, handlers);
168 }
169
170
171
172
173
174
175
176
177
178
179
180
181
182 public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
183 final ChannelHandler... handlers) {
184 super(parent, channelId);
185 metadata = metadata(hasDisconnect);
186 config = new DefaultChannelConfig(this);
187 setup(register, handlers);
188 }
189
190
191
192
193
194
195
196
197
198
199
200 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
201 final ChannelHandler... handlers) {
202 super(null, channelId);
203 metadata = metadata(hasDisconnect);
204 this.config = ObjectUtil.checkNotNull(config, "config");
205 setup(true, handlers);
206 }
207
208 private static ChannelMetadata metadata(boolean hasDisconnect) {
209 return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
210 }
211
212 private void setup(boolean register, final ChannelHandler... handlers) {
213 ObjectUtil.checkNotNull(handlers, "handlers");
214 ChannelPipeline p = pipeline();
215 p.addLast(new ChannelInitializer<Channel>() {
216 @Override
217 protected void initChannel(Channel ch) throws Exception {
218 ChannelPipeline pipeline = ch.pipeline();
219 for (ChannelHandler h: handlers) {
220 if (h == null) {
221 break;
222 }
223 pipeline.addLast(h);
224 }
225 }
226 });
227 if (register) {
228 ChannelFuture future = loop.register(this);
229 assert future.isDone();
230 }
231 }
232
233
234
235
236 public void register() throws Exception {
237 ChannelFuture future = loop.register(this);
238 assert future.isDone();
239 Throwable cause = future.cause();
240 if (cause != null) {
241 PlatformDependent.throwException(cause);
242 }
243 }
244
245 @Override
246 protected final DefaultChannelPipeline newChannelPipeline() {
247 return new EmbeddedChannelPipeline(this);
248 }
249
250 @Override
251 public ChannelMetadata metadata() {
252 return metadata;
253 }
254
255 @Override
256 public ChannelConfig config() {
257 return config;
258 }
259
260 @Override
261 public boolean isOpen() {
262 return state != State.CLOSED;
263 }
264
265 @Override
266 public boolean isActive() {
267 return state == State.ACTIVE;
268 }
269
270
271
272
273 public Queue<Object> inboundMessages() {
274 if (inboundMessages == null) {
275 inboundMessages = new ArrayDeque<Object>();
276 }
277 return inboundMessages;
278 }
279
280
281
282
283 @Deprecated
284 public Queue<Object> lastInboundBuffer() {
285 return inboundMessages();
286 }
287
288
289
290
291 public Queue<Object> outboundMessages() {
292 if (outboundMessages == null) {
293 outboundMessages = new ArrayDeque<Object>();
294 }
295 return outboundMessages;
296 }
297
298
299
300
301 @Deprecated
302 public Queue<Object> lastOutboundBuffer() {
303 return outboundMessages();
304 }
305
306
307
308
309 @SuppressWarnings("unchecked")
310 public <T> T readInbound() {
311 T message = (T) poll(inboundMessages);
312 if (message != null) {
313 ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
314 }
315 return message;
316 }
317
318
319
320
321 @SuppressWarnings("unchecked")
322 public <T> T readOutbound() {
323 T message = (T) poll(outboundMessages);
324 if (message != null) {
325 ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
326 }
327 return message;
328 }
329
330
331
332
333
334
335
336
337 public boolean writeInbound(Object... msgs) {
338 ensureOpen();
339 if (msgs.length == 0) {
340 return isNotEmpty(inboundMessages);
341 }
342
343 executingStackCnt++;
344 try {
345 ChannelPipeline p = pipeline();
346 for (Object m : msgs) {
347 p.fireChannelRead(m);
348 }
349
350 flushInbound(false, voidPromise());
351 } finally {
352 executingStackCnt--;
353 maybeRunPendingTasks();
354 }
355 return isNotEmpty(inboundMessages);
356 }
357
358
359
360
361
362
363
364 public ChannelFuture writeOneInbound(Object msg) {
365 return writeOneInbound(msg, newPromise());
366 }
367
368
369
370
371
372
373
374 public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
375 executingStackCnt++;
376 try {
377 if (checkOpen(true)) {
378 pipeline().fireChannelRead(msg);
379 }
380 } finally {
381 executingStackCnt--;
382 maybeRunPendingTasks();
383 }
384 return checkException(promise);
385 }
386
387
388
389
390
391
392 public EmbeddedChannel flushInbound() {
393 flushInbound(true, voidPromise());
394 return this;
395 }
396
397 private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
398 executingStackCnt++;
399 try {
400 if (checkOpen(recordException)) {
401 pipeline().fireChannelReadComplete();
402 runPendingTasks();
403 }
404 } finally {
405 executingStackCnt--;
406 maybeRunPendingTasks();
407 }
408
409 return checkException(promise);
410 }
411
412
413
414
415
416
417
418 public boolean writeOutbound(Object... msgs) {
419 ensureOpen();
420 if (msgs.length == 0) {
421 return isNotEmpty(outboundMessages);
422 }
423
424 executingStackCnt++;
425 RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
426 try {
427 try {
428 for (Object m : msgs) {
429 if (m == null) {
430 break;
431 }
432 futures.add(write(m));
433 }
434
435 flushOutbound0();
436
437 int size = futures.size();
438 for (int i = 0; i < size; i++) {
439 ChannelFuture future = (ChannelFuture) futures.get(i);
440 if (future.isDone()) {
441 recordException(future);
442 } else {
443
444 future.addListener(recordExceptionListener);
445 }
446 }
447 } finally {
448 executingStackCnt--;
449 maybeRunPendingTasks();
450 }
451 checkException();
452 return isNotEmpty(outboundMessages);
453 } finally {
454 futures.recycle();
455 }
456 }
457
458
459
460
461
462
463
464 public ChannelFuture writeOneOutbound(Object msg) {
465 return writeOneOutbound(msg, newPromise());
466 }
467
468
469
470
471
472
473
474 public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
475 executingStackCnt++;
476 try {
477 if (checkOpen(true)) {
478 return write(msg, promise);
479 }
480 } finally {
481 executingStackCnt--;
482 maybeRunPendingTasks();
483 }
484
485 return checkException(promise);
486 }
487
488
489
490
491
492
493 public EmbeddedChannel flushOutbound() {
494 executingStackCnt++;
495 try {
496 if (checkOpen(true)) {
497 flushOutbound0();
498 }
499 } finally {
500 executingStackCnt--;
501 maybeRunPendingTasks();
502 }
503 checkException(voidPromise());
504 return this;
505 }
506
507 private void flushOutbound0() {
508
509
510 runPendingTasks();
511
512 flush();
513 }
514
515
516
517
518
519
520 public boolean finish() {
521 return finish(false);
522 }
523
524
525
526
527
528
529
530 public boolean finishAndReleaseAll() {
531 return finish(true);
532 }
533
534
535
536
537
538
539
540 private boolean finish(boolean releaseAll) {
541 executingStackCnt++;
542 try {
543 close();
544 } finally {
545 executingStackCnt--;
546 maybeRunPendingTasks();
547 }
548 try {
549 checkException();
550 return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
551 } finally {
552 if (releaseAll) {
553 releaseAll(inboundMessages);
554 releaseAll(outboundMessages);
555 }
556 }
557 }
558
559
560
561
562
563 public boolean releaseInbound() {
564 return releaseAll(inboundMessages);
565 }
566
567
568
569
570
571 public boolean releaseOutbound() {
572 return releaseAll(outboundMessages);
573 }
574
575 private static boolean releaseAll(Queue<Object> queue) {
576 if (isNotEmpty(queue)) {
577 for (;;) {
578 Object msg = queue.poll();
579 if (msg == null) {
580 break;
581 }
582 ReferenceCountUtil.release(msg);
583 }
584 return true;
585 }
586 return false;
587 }
588
589 private void finishPendingTasks(boolean cancel) {
590 runPendingTasks();
591 if (cancel) {
592
593 embeddedEventLoop().cancelScheduledTasks();
594 }
595 }
596
597 @Override
598 public final ChannelFuture close() {
599 return close(newPromise());
600 }
601
602 @Override
603 public final ChannelFuture disconnect() {
604 return disconnect(newPromise());
605 }
606
607 @Override
608 public final ChannelFuture close(ChannelPromise promise) {
609
610
611 executingStackCnt++;
612 ChannelFuture future;
613 try {
614 runPendingTasks();
615 future = super.close(promise);
616 } finally {
617 executingStackCnt--;
618 maybeRunPendingTasks();
619 }
620
621
622 finishPendingTasks(true);
623 return future;
624 }
625
626 @Override
627 public final ChannelFuture disconnect(ChannelPromise promise) {
628 executingStackCnt++;
629 ChannelFuture future;
630 try {
631 future = super.disconnect(promise);
632 } finally {
633 executingStackCnt--;
634 maybeRunPendingTasks();
635 }
636 finishPendingTasks(!metadata.hasDisconnect());
637 return future;
638 }
639
640 @Override
641 public ChannelFuture bind(SocketAddress localAddress) {
642 executingStackCnt++;
643 try {
644 return super.bind(localAddress);
645 } finally {
646 executingStackCnt--;
647 maybeRunPendingTasks();
648 }
649 }
650
651 @Override
652 public ChannelFuture connect(SocketAddress remoteAddress) {
653 executingStackCnt++;
654 try {
655 return super.connect(remoteAddress);
656 } finally {
657 executingStackCnt--;
658 maybeRunPendingTasks();
659 }
660 }
661
662 @Override
663 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
664 executingStackCnt++;
665 try {
666 return super.connect(remoteAddress, localAddress);
667 } finally {
668 executingStackCnt--;
669 maybeRunPendingTasks();
670 }
671 }
672
673 @Override
674 public ChannelFuture deregister() {
675 executingStackCnt++;
676 try {
677 return super.deregister();
678 } finally {
679 executingStackCnt--;
680 maybeRunPendingTasks();
681 }
682 }
683
684 @Override
685 public Channel flush() {
686 executingStackCnt++;
687 try {
688 return super.flush();
689 } finally {
690 executingStackCnt--;
691 maybeRunPendingTasks();
692 }
693 }
694
695 @Override
696 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
697 executingStackCnt++;
698 try {
699 return super.bind(localAddress, promise);
700 } finally {
701 executingStackCnt--;
702 maybeRunPendingTasks();
703 }
704 }
705
706 @Override
707 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
708 executingStackCnt++;
709 try {
710 return super.connect(remoteAddress, promise);
711 } finally {
712 executingStackCnt--;
713 maybeRunPendingTasks();
714 }
715 }
716
717 @Override
718 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
719 executingStackCnt++;
720 try {
721 return super.connect(remoteAddress, localAddress, promise);
722 } finally {
723 executingStackCnt--;
724 maybeRunPendingTasks();
725 }
726 }
727
728 @Override
729 public ChannelFuture deregister(ChannelPromise promise) {
730 executingStackCnt++;
731 try {
732 return super.deregister(promise);
733 } finally {
734 executingStackCnt--;
735 maybeRunPendingTasks();
736 }
737 }
738
739 @Override
740 public Channel read() {
741 executingStackCnt++;
742 try {
743 return super.read();
744 } finally {
745 executingStackCnt--;
746 maybeRunPendingTasks();
747 }
748 }
749
750 @Override
751 public ChannelFuture write(Object msg) {
752 executingStackCnt++;
753 try {
754 return super.write(msg);
755 } finally {
756 executingStackCnt--;
757 maybeRunPendingTasks();
758 }
759 }
760
761 @Override
762 public ChannelFuture write(Object msg, ChannelPromise promise) {
763 executingStackCnt++;
764 try {
765 return super.write(msg, promise);
766 } finally {
767 executingStackCnt--;
768 maybeRunPendingTasks();
769 }
770 }
771
772 @Override
773 public ChannelFuture writeAndFlush(Object msg) {
774 executingStackCnt++;
775 try {
776 return super.writeAndFlush(msg);
777 } finally {
778 executingStackCnt--;
779 maybeRunPendingTasks();
780 }
781 }
782
783 @Override
784 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
785 executingStackCnt++;
786 try {
787 return super.writeAndFlush(msg, promise);
788 } finally {
789 executingStackCnt--;
790 maybeRunPendingTasks();
791 }
792 }
793
794 private static boolean isNotEmpty(Queue<Object> queue) {
795 return queue != null && !queue.isEmpty();
796 }
797
798 private static Object poll(Queue<Object> queue) {
799 return queue != null ? queue.poll() : null;
800 }
801
802 private void maybeRunPendingTasks() {
803 if (executingStackCnt == 0) {
804 runPendingTasks();
805 }
806 }
807
808
809
810
811
812 public void runPendingTasks() {
813 try {
814 embeddedEventLoop().runTasks();
815 } catch (Exception e) {
816 recordException(e);
817 }
818
819 try {
820 embeddedEventLoop().runScheduledTasks();
821 } catch (Exception e) {
822 recordException(e);
823 }
824 }
825
826
827
828
829
830
831
832
833 public boolean hasPendingTasks() {
834 return embeddedEventLoop().hasPendingNormalTasks() ||
835 embeddedEventLoop().nextScheduledTask() == 0;
836 }
837
838
839
840
841
842
843 public long runScheduledPendingTasks() {
844 try {
845 return embeddedEventLoop().runScheduledTasks();
846 } catch (Exception e) {
847 recordException(e);
848 return embeddedEventLoop().nextScheduledTask();
849 }
850 }
851
852 private void recordException(ChannelFuture future) {
853 if (!future.isSuccess()) {
854 recordException(future.cause());
855 }
856 }
857
858 private void recordException(Throwable cause) {
859 if (lastException == null) {
860 lastException = cause;
861 } else {
862 logger.warn(
863 "More than one exception was raised. " +
864 "Will report only the first one and log others.", cause);
865 }
866 }
867
868
869
870
871
872 public void advanceTimeBy(long duration, TimeUnit unit) {
873 embeddedEventLoop().advanceTimeBy(unit.toNanos(duration));
874 }
875
876
877
878
879
880
881 public void freezeTime() {
882 embeddedEventLoop().freezeTime();
883 }
884
885
886
887
888
889
890
891
892 public void unfreezeTime() {
893 embeddedEventLoop().unfreezeTime();
894 }
895
896
897
898
899 private ChannelFuture checkException(ChannelPromise promise) {
900 Throwable t = lastException;
901 if (t != null) {
902 lastException = null;
903
904 if (promise.isVoid()) {
905 PlatformDependent.throwException(t);
906 }
907
908 return promise.setFailure(t);
909 }
910
911 return promise.setSuccess();
912 }
913
914
915
916
917 public void checkException() {
918 checkException(voidPromise());
919 }
920
921
922
923
924
925 private boolean checkOpen(boolean recordException) {
926 if (!isOpen()) {
927 if (recordException) {
928 recordException(new ClosedChannelException());
929 }
930 return false;
931 }
932
933 return true;
934 }
935
936 private EmbeddedEventLoop embeddedEventLoop() {
937 if (isRegistered()) {
938 return (EmbeddedEventLoop) super.eventLoop();
939 }
940
941 return loop;
942 }
943
944
945
946
947 protected final void ensureOpen() {
948 if (!checkOpen(true)) {
949 checkException();
950 }
951 }
952
953 @Override
954 protected boolean isCompatible(EventLoop loop) {
955 return loop instanceof EmbeddedEventLoop;
956 }
957
958 @Override
959 protected SocketAddress localAddress0() {
960 return isActive()? LOCAL_ADDRESS : null;
961 }
962
963 @Override
964 protected SocketAddress remoteAddress0() {
965 return isActive()? REMOTE_ADDRESS : null;
966 }
967
968 @Override
969 protected void doRegister() throws Exception {
970 state = State.ACTIVE;
971 }
972
973 @Override
974 protected void doBind(SocketAddress localAddress) throws Exception {
975
976 }
977
978 @Override
979 protected void doDisconnect() throws Exception {
980 if (!metadata.hasDisconnect()) {
981 doClose();
982 }
983 }
984
985 @Override
986 protected void doClose() throws Exception {
987 state = State.CLOSED;
988 }
989
990 @Override
991 protected void doBeginRead() throws Exception {
992
993 }
994
995 @Override
996 protected AbstractUnsafe newUnsafe() {
997 return new EmbeddedUnsafe();
998 }
999
1000 @Override
1001 public Unsafe unsafe() {
1002 return ((EmbeddedUnsafe) super.unsafe()).wrapped;
1003 }
1004
1005 @Override
1006 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
1007 for (;;) {
1008 Object msg = in.current();
1009 if (msg == null) {
1010 break;
1011 }
1012
1013 ReferenceCountUtil.retain(msg);
1014 handleOutboundMessage(msg);
1015 in.remove();
1016 }
1017 }
1018
1019
1020
1021
1022
1023
1024 protected void handleOutboundMessage(Object msg) {
1025 outboundMessages().add(msg);
1026 }
1027
1028
1029
1030
1031 protected void handleInboundMessage(Object msg) {
1032 inboundMessages().add(msg);
1033 }
1034
1035 private final class EmbeddedUnsafe extends AbstractUnsafe {
1036
1037
1038
1039 final Unsafe wrapped = new Unsafe() {
1040 @Override
1041 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
1042 return EmbeddedUnsafe.this.recvBufAllocHandle();
1043 }
1044
1045 @Override
1046 public SocketAddress localAddress() {
1047 return EmbeddedUnsafe.this.localAddress();
1048 }
1049
1050 @Override
1051 public SocketAddress remoteAddress() {
1052 return EmbeddedUnsafe.this.remoteAddress();
1053 }
1054
1055 @Override
1056 public void register(EventLoop eventLoop, ChannelPromise promise) {
1057 executingStackCnt++;
1058 try {
1059 EmbeddedUnsafe.this.register(eventLoop, promise);
1060 } finally {
1061 executingStackCnt--;
1062 maybeRunPendingTasks();
1063 }
1064 }
1065
1066 @Override
1067 public void bind(SocketAddress localAddress, ChannelPromise promise) {
1068 executingStackCnt++;
1069 try {
1070 EmbeddedUnsafe.this.bind(localAddress, promise);
1071 } finally {
1072 executingStackCnt--;
1073 maybeRunPendingTasks();
1074 }
1075 }
1076
1077 @Override
1078 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1079 executingStackCnt++;
1080 try {
1081 EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
1082 } finally {
1083 executingStackCnt--;
1084 maybeRunPendingTasks();
1085 }
1086 }
1087
1088 @Override
1089 public void disconnect(ChannelPromise promise) {
1090 executingStackCnt++;
1091 try {
1092 EmbeddedUnsafe.this.disconnect(promise);
1093 } finally {
1094 executingStackCnt--;
1095 maybeRunPendingTasks();
1096 }
1097 }
1098
1099 @Override
1100 public void close(ChannelPromise promise) {
1101 executingStackCnt++;
1102 try {
1103 EmbeddedUnsafe.this.close(promise);
1104 } finally {
1105 executingStackCnt--;
1106 maybeRunPendingTasks();
1107 }
1108 }
1109
1110 @Override
1111 public void closeForcibly() {
1112 executingStackCnt++;
1113 try {
1114 EmbeddedUnsafe.this.closeForcibly();
1115 } finally {
1116 executingStackCnt--;
1117 maybeRunPendingTasks();
1118 }
1119 }
1120
1121 @Override
1122 public void deregister(ChannelPromise promise) {
1123 executingStackCnt++;
1124 try {
1125 EmbeddedUnsafe.this.deregister(promise);
1126 } finally {
1127 executingStackCnt--;
1128 maybeRunPendingTasks();
1129 }
1130 }
1131
1132 @Override
1133 public void beginRead() {
1134 executingStackCnt++;
1135 try {
1136 EmbeddedUnsafe.this.beginRead();
1137 } finally {
1138 executingStackCnt--;
1139 maybeRunPendingTasks();
1140 }
1141 }
1142
1143 @Override
1144 public void write(Object msg, ChannelPromise promise) {
1145 executingStackCnt++;
1146 try {
1147 EmbeddedUnsafe.this.write(msg, promise);
1148 } finally {
1149 executingStackCnt--;
1150 maybeRunPendingTasks();
1151 }
1152 }
1153
1154 @Override
1155 public void flush() {
1156 executingStackCnt++;
1157 try {
1158 EmbeddedUnsafe.this.flush();
1159 } finally {
1160 executingStackCnt--;
1161 maybeRunPendingTasks();
1162 }
1163 }
1164
1165 @Override
1166 public ChannelPromise voidPromise() {
1167 return EmbeddedUnsafe.this.voidPromise();
1168 }
1169
1170 @Override
1171 public ChannelOutboundBuffer outboundBuffer() {
1172 return EmbeddedUnsafe.this.outboundBuffer();
1173 }
1174 };
1175
1176 @Override
1177 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1178 safeSetSuccess(promise);
1179 }
1180 }
1181
1182 private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
1183 EmbeddedChannelPipeline(EmbeddedChannel channel) {
1184 super(channel);
1185 }
1186
1187 @Override
1188 protected void onUnhandledInboundException(Throwable cause) {
1189 recordException(cause);
1190 }
1191
1192 @Override
1193 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1194 handleInboundMessage(msg);
1195 }
1196 }
1197 }