1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.embedded;
17
18 import io.netty.channel.AbstractChannel;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelId;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelMetadata;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.DefaultChannelConfig;
32 import io.netty.channel.DefaultChannelPipeline;
33 import io.netty.channel.EventLoop;
34 import io.netty.channel.RecvByteBufAllocator;
35 import io.netty.util.ReferenceCountUtil;
36 import io.netty.util.concurrent.Ticker;
37 import io.netty.util.internal.PlatformDependent;
38 import io.netty.util.internal.RecyclableArrayList;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.net.SocketAddress;
43 import java.nio.channels.ClosedChannelException;
44 import java.util.ArrayDeque;
45 import java.util.Objects;
46 import java.util.Queue;
47 import java.util.concurrent.TimeUnit;
48
49
50
51
52 public class EmbeddedChannel extends AbstractChannel {
53
54 private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
55 private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
56
57 private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
58 private enum State { OPEN, ACTIVE, CLOSED }
59
60 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
61
62 private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
63 private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
64
65 private final EmbeddedEventLoop loop;
66 private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
67 @Override
68 public void operationComplete(ChannelFuture future) throws Exception {
69 recordException(future);
70 }
71 };
72
73 private final ChannelMetadata metadata;
74 private final ChannelConfig config;
75
76 private Queue<Object> inboundMessages;
77 private Queue<Object> outboundMessages;
78 private Throwable lastException;
79 private State state;
80 private int executingStackCnt;
81 private boolean cancelRemainingScheduledTasks;
82
83
84
85
86 public EmbeddedChannel() {
87 this(builder());
88 }
89
90
91
92
93
94
95 public EmbeddedChannel(ChannelId channelId) {
96 this(builder().channelId(channelId));
97 }
98
99
100
101
102
103
104 public EmbeddedChannel(ChannelHandler... handlers) {
105 this(builder().handlers(handlers));
106 }
107
108
109
110
111
112
113
114
115 public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
116 this(builder().hasDisconnect(hasDisconnect).handlers(handlers));
117 }
118
119
120
121
122
123
124
125
126
127
128 public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
129 this(builder().register(register).hasDisconnect(hasDisconnect).handlers(handlers));
130 }
131
132
133
134
135
136
137
138
139 public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
140 this(builder().channelId(channelId).handlers(handlers));
141 }
142
143
144
145
146
147
148
149
150
151
152 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
153 this(builder().channelId(channelId).hasDisconnect(hasDisconnect).handlers(handlers));
154 }
155
156
157
158
159
160
161
162
163
164
165
166
167 public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
168 ChannelHandler... handlers) {
169 this(builder().channelId(channelId).register(register).hasDisconnect(hasDisconnect).handlers(handlers));
170 }
171
172
173
174
175
176
177
178
179
180
181
182
183
184 public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
185 final ChannelHandler... handlers) {
186 this(builder()
187 .parent(parent)
188 .channelId(channelId)
189 .register(register)
190 .hasDisconnect(hasDisconnect)
191 .handlers(handlers));
192 }
193
194
195
196
197
198
199
200
201
202
203
204 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
205 final ChannelHandler... handlers) {
206 this(builder().channelId(channelId).hasDisconnect(hasDisconnect).config(config).handlers(handlers));
207 }
208
209
210
211
212
213
214
215 protected EmbeddedChannel(Builder builder) {
216 super(builder.parent, builder.channelId);
217 loop = new EmbeddedEventLoop(builder.ticker == null ? new EmbeddedEventLoop.FreezableTicker() : builder.ticker);
218 metadata = metadata(builder.hasDisconnect);
219 config = builder.config == null ? new DefaultChannelConfig(this) : builder.config;
220 if (builder.handler == null) {
221 setup(builder.register, builder.handlers);
222 } else {
223 setup(builder.register, builder.handler);
224 }
225 }
226
227 private static ChannelMetadata metadata(boolean hasDisconnect) {
228 return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
229 }
230
231 private void setup(boolean register, final ChannelHandler... handlers) {
232 ChannelPipeline p = pipeline();
233 p.addLast(new ChannelInitializer<Channel>() {
234 @Override
235 protected void initChannel(Channel ch) {
236 ChannelPipeline pipeline = ch.pipeline();
237 for (ChannelHandler h: handlers) {
238 if (h == null) {
239 break;
240 }
241 pipeline.addLast(h);
242 }
243 }
244 });
245 if (register) {
246 ChannelFuture future = loop.register(this);
247 assert future.isDone();
248 }
249 }
250
251 private void setup(boolean register, final ChannelHandler handler) {
252 ChannelPipeline p = pipeline();
253 p.addLast(handler);
254 if (register) {
255 ChannelFuture future = loop.register(this);
256 assert future.isDone();
257 }
258 }
259
260
261
262
263 public void register() throws Exception {
264 ChannelFuture future = loop.register(this);
265 assert future.isDone();
266 Throwable cause = future.cause();
267 if (cause != null) {
268 PlatformDependent.throwException(cause);
269 }
270 }
271
272 @Override
273 protected final DefaultChannelPipeline newChannelPipeline() {
274 return new EmbeddedChannelPipeline(this);
275 }
276
277 @Override
278 public ChannelMetadata metadata() {
279 return metadata;
280 }
281
282 @Override
283 public ChannelConfig config() {
284 return config;
285 }
286
287 @Override
288 public boolean isOpen() {
289 return state != State.CLOSED;
290 }
291
292 @Override
293 public boolean isActive() {
294 return state == State.ACTIVE;
295 }
296
297
298
299
300 public Queue<Object> inboundMessages() {
301 if (inboundMessages == null) {
302 inboundMessages = new ArrayDeque<Object>();
303 }
304 return inboundMessages;
305 }
306
307
308
309
310 @Deprecated
311 public Queue<Object> lastInboundBuffer() {
312 return inboundMessages();
313 }
314
315
316
317
318 public Queue<Object> outboundMessages() {
319 if (outboundMessages == null) {
320 outboundMessages = new ArrayDeque<Object>();
321 }
322 return outboundMessages;
323 }
324
325
326
327
328 @Deprecated
329 public Queue<Object> lastOutboundBuffer() {
330 return outboundMessages();
331 }
332
333
334
335
336 @SuppressWarnings("unchecked")
337 public <T> T readInbound() {
338 T message = (T) poll(inboundMessages);
339 if (message != null) {
340 ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
341 }
342 return message;
343 }
344
345
346
347
348 @SuppressWarnings("unchecked")
349 public <T> T readOutbound() {
350 T message = (T) poll(outboundMessages);
351 if (message != null) {
352 ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
353 }
354 return message;
355 }
356
357
358
359
360
361
362
363
364 public boolean writeInbound(Object... msgs) {
365 ensureOpen();
366 if (msgs.length == 0) {
367 return isNotEmpty(inboundMessages);
368 }
369
370 executingStackCnt++;
371 try {
372 ChannelPipeline p = pipeline();
373 for (Object m : msgs) {
374 p.fireChannelRead(m);
375 }
376
377 flushInbound(false, voidPromise());
378 } finally {
379 executingStackCnt--;
380 maybeRunPendingTasks();
381 }
382 return isNotEmpty(inboundMessages);
383 }
384
385
386
387
388
389
390
391 public ChannelFuture writeOneInbound(Object msg) {
392 return writeOneInbound(msg, newPromise());
393 }
394
395
396
397
398
399
400
401 public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
402 executingStackCnt++;
403 try {
404 if (checkOpen(true)) {
405 pipeline().fireChannelRead(msg);
406 }
407 } finally {
408 executingStackCnt--;
409 maybeRunPendingTasks();
410 }
411 return checkException(promise);
412 }
413
414
415
416
417
418
419 public EmbeddedChannel flushInbound() {
420 flushInbound(true, voidPromise());
421 return this;
422 }
423
424 private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
425 executingStackCnt++;
426 try {
427 if (checkOpen(recordException)) {
428 pipeline().fireChannelReadComplete();
429 runPendingTasks();
430 }
431 } finally {
432 executingStackCnt--;
433 maybeRunPendingTasks();
434 }
435
436 return checkException(promise);
437 }
438
439
440
441
442
443
444
445 public boolean writeOutbound(Object... msgs) {
446 ensureOpen();
447 if (msgs.length == 0) {
448 return isNotEmpty(outboundMessages);
449 }
450
451 executingStackCnt++;
452 RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
453 try {
454 try {
455 for (Object m : msgs) {
456 if (m == null) {
457 break;
458 }
459 futures.add(write(m));
460 }
461
462 flushOutbound0();
463
464 int size = futures.size();
465 for (int i = 0; i < size; i++) {
466 ChannelFuture future = (ChannelFuture) futures.get(i);
467 if (future.isDone()) {
468 recordException(future);
469 } else {
470
471 future.addListener(recordExceptionListener);
472 }
473 }
474 } finally {
475 executingStackCnt--;
476 maybeRunPendingTasks();
477 }
478 checkException();
479 return isNotEmpty(outboundMessages);
480 } finally {
481 futures.recycle();
482 }
483 }
484
485
486
487
488
489
490
491 public ChannelFuture writeOneOutbound(Object msg) {
492 return writeOneOutbound(msg, newPromise());
493 }
494
495
496
497
498
499
500
501 public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
502 executingStackCnt++;
503 try {
504 if (checkOpen(true)) {
505 return write(msg, promise);
506 }
507 } finally {
508 executingStackCnt--;
509 maybeRunPendingTasks();
510 }
511
512 return checkException(promise);
513 }
514
515
516
517
518
519
520 public EmbeddedChannel flushOutbound() {
521 executingStackCnt++;
522 try {
523 if (checkOpen(true)) {
524 flushOutbound0();
525 }
526 } finally {
527 executingStackCnt--;
528 maybeRunPendingTasks();
529 }
530 checkException(voidPromise());
531 return this;
532 }
533
534 private void flushOutbound0() {
535
536
537 runPendingTasks();
538
539 flush();
540 }
541
542
543
544
545
546
547 public boolean finish() {
548 return finish(false);
549 }
550
551
552
553
554
555
556
557 public boolean finishAndReleaseAll() {
558 return finish(true);
559 }
560
561
562
563
564
565
566
567 private boolean finish(boolean releaseAll) {
568 executingStackCnt++;
569 try {
570 close();
571 } finally {
572 executingStackCnt--;
573 maybeRunPendingTasks();
574 }
575 try {
576 checkException();
577 return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
578 } finally {
579 if (releaseAll) {
580 releaseAll(inboundMessages);
581 releaseAll(outboundMessages);
582 }
583 }
584 }
585
586
587
588
589
590 public boolean releaseInbound() {
591 return releaseAll(inboundMessages);
592 }
593
594
595
596
597
598 public boolean releaseOutbound() {
599 return releaseAll(outboundMessages);
600 }
601
602 private static boolean releaseAll(Queue<Object> queue) {
603 if (isNotEmpty(queue)) {
604 for (;;) {
605 Object msg = queue.poll();
606 if (msg == null) {
607 break;
608 }
609 ReferenceCountUtil.release(msg);
610 }
611 return true;
612 }
613 return false;
614 }
615
616 @Override
617 public final ChannelFuture close() {
618 return close(newPromise());
619 }
620
621 @Override
622 public final ChannelFuture disconnect() {
623 return disconnect(newPromise());
624 }
625
626 @Override
627 public final ChannelFuture close(ChannelPromise promise) {
628
629
630 executingStackCnt++;
631 ChannelFuture future;
632 try {
633 runPendingTasks();
634 future = super.close(promise);
635
636 cancelRemainingScheduledTasks = true;
637 } finally {
638 executingStackCnt--;
639 maybeRunPendingTasks();
640 }
641 return future;
642 }
643
644 @Override
645 public final ChannelFuture disconnect(ChannelPromise promise) {
646 executingStackCnt++;
647 ChannelFuture future;
648 try {
649 future = super.disconnect(promise);
650
651 if (!metadata.hasDisconnect()) {
652 cancelRemainingScheduledTasks = true;
653 }
654 } finally {
655 executingStackCnt--;
656 maybeRunPendingTasks();
657 }
658 return future;
659 }
660
661 @Override
662 public ChannelFuture bind(SocketAddress localAddress) {
663 executingStackCnt++;
664 try {
665 return super.bind(localAddress);
666 } finally {
667 executingStackCnt--;
668 maybeRunPendingTasks();
669 }
670 }
671
672 @Override
673 public ChannelFuture connect(SocketAddress remoteAddress) {
674 executingStackCnt++;
675 try {
676 return super.connect(remoteAddress);
677 } finally {
678 executingStackCnt--;
679 maybeRunPendingTasks();
680 }
681 }
682
683 @Override
684 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
685 executingStackCnt++;
686 try {
687 return super.connect(remoteAddress, localAddress);
688 } finally {
689 executingStackCnt--;
690 maybeRunPendingTasks();
691 }
692 }
693
694 @Override
695 public ChannelFuture deregister() {
696 executingStackCnt++;
697 try {
698 return super.deregister();
699 } finally {
700 executingStackCnt--;
701 maybeRunPendingTasks();
702 }
703 }
704
705 @Override
706 public Channel flush() {
707 executingStackCnt++;
708 try {
709 return super.flush();
710 } finally {
711 executingStackCnt--;
712 maybeRunPendingTasks();
713 }
714 }
715
716 @Override
717 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
718 executingStackCnt++;
719 try {
720 return super.bind(localAddress, promise);
721 } finally {
722 executingStackCnt--;
723 maybeRunPendingTasks();
724 }
725 }
726
727 @Override
728 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
729 executingStackCnt++;
730 try {
731 return super.connect(remoteAddress, promise);
732 } finally {
733 executingStackCnt--;
734 maybeRunPendingTasks();
735 }
736 }
737
738 @Override
739 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
740 executingStackCnt++;
741 try {
742 return super.connect(remoteAddress, localAddress, promise);
743 } finally {
744 executingStackCnt--;
745 maybeRunPendingTasks();
746 }
747 }
748
749 @Override
750 public ChannelFuture deregister(ChannelPromise promise) {
751 executingStackCnt++;
752 try {
753 return super.deregister(promise);
754 } finally {
755 executingStackCnt--;
756 maybeRunPendingTasks();
757 }
758 }
759
760 @Override
761 public Channel read() {
762 executingStackCnt++;
763 try {
764 return super.read();
765 } finally {
766 executingStackCnt--;
767 maybeRunPendingTasks();
768 }
769 }
770
771 @Override
772 public ChannelFuture write(Object msg) {
773 executingStackCnt++;
774 try {
775 return super.write(msg);
776 } finally {
777 executingStackCnt--;
778 maybeRunPendingTasks();
779 }
780 }
781
782 @Override
783 public ChannelFuture write(Object msg, ChannelPromise promise) {
784 executingStackCnt++;
785 try {
786 return super.write(msg, promise);
787 } finally {
788 executingStackCnt--;
789 maybeRunPendingTasks();
790 }
791 }
792
793 @Override
794 public ChannelFuture writeAndFlush(Object msg) {
795 executingStackCnt++;
796 try {
797 return super.writeAndFlush(msg);
798 } finally {
799 executingStackCnt--;
800 maybeRunPendingTasks();
801 }
802 }
803
804 @Override
805 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
806 executingStackCnt++;
807 try {
808 return super.writeAndFlush(msg, promise);
809 } finally {
810 executingStackCnt--;
811 maybeRunPendingTasks();
812 }
813 }
814
815 private static boolean isNotEmpty(Queue<Object> queue) {
816 return queue != null && !queue.isEmpty();
817 }
818
819 private static Object poll(Queue<Object> queue) {
820 return queue != null ? queue.poll() : null;
821 }
822
823 private void maybeRunPendingTasks() {
824 if (executingStackCnt == 0) {
825 runPendingTasks();
826
827 if (cancelRemainingScheduledTasks) {
828
829 embeddedEventLoop().cancelScheduledTasks();
830 }
831 }
832 }
833
834
835
836
837
838 public void runPendingTasks() {
839 try {
840 embeddedEventLoop().runTasks();
841 } catch (Exception e) {
842 recordException(e);
843 }
844
845 try {
846 embeddedEventLoop().runScheduledTasks();
847 } catch (Exception e) {
848 recordException(e);
849 }
850 }
851
852
853
854
855
856
857
858
859 public boolean hasPendingTasks() {
860 return embeddedEventLoop().hasPendingNormalTasks() ||
861 embeddedEventLoop().nextScheduledTask() == 0;
862 }
863
864
865
866
867
868
869 public long runScheduledPendingTasks() {
870 try {
871 return embeddedEventLoop().runScheduledTasks();
872 } catch (Exception e) {
873 recordException(e);
874 return embeddedEventLoop().nextScheduledTask();
875 }
876 }
877
878 private void recordException(ChannelFuture future) {
879 if (!future.isSuccess()) {
880 recordException(future.cause());
881 }
882 }
883
884 private void recordException(Throwable cause) {
885 if (lastException == null) {
886 lastException = cause;
887 } else {
888 logger.warn(
889 "More than one exception was raised. " +
890 "Will report only the first one and log others.", cause);
891 }
892 }
893
894 private EmbeddedEventLoop.FreezableTicker freezableTicker() {
895 Ticker ticker = eventLoop().ticker();
896 if (ticker instanceof EmbeddedEventLoop.FreezableTicker) {
897 return (EmbeddedEventLoop.FreezableTicker) ticker;
898 } else {
899 throw new IllegalStateException(
900 "EmbeddedChannel constructed with custom ticker, time manipulation methods are unavailable.");
901 }
902 }
903
904
905
906
907
908 public void advanceTimeBy(long duration, TimeUnit unit) {
909 freezableTicker().advance(duration, unit);
910 }
911
912
913
914
915
916
917 public void freezeTime() {
918 freezableTicker().freezeTime();
919 }
920
921
922
923
924
925
926
927
928 public void unfreezeTime() {
929 freezableTicker().unfreezeTime();
930 }
931
932
933
934
935 private ChannelFuture checkException(ChannelPromise promise) {
936 Throwable t = lastException;
937 if (t != null) {
938 lastException = null;
939
940 if (promise.isVoid()) {
941 PlatformDependent.throwException(t);
942 }
943
944 return promise.setFailure(t);
945 }
946
947 return promise.setSuccess();
948 }
949
950
951
952
953 public void checkException() {
954 checkException(voidPromise());
955 }
956
957
958
959
960
961 private boolean checkOpen(boolean recordException) {
962 if (!isOpen()) {
963 if (recordException) {
964 recordException(new ClosedChannelException());
965 }
966 return false;
967 }
968
969 return true;
970 }
971
972 private EmbeddedEventLoop embeddedEventLoop() {
973 if (isRegistered()) {
974 return (EmbeddedEventLoop) super.eventLoop();
975 }
976
977 return loop;
978 }
979
980
981
982
983 protected final void ensureOpen() {
984 if (!checkOpen(true)) {
985 checkException();
986 }
987 }
988
989 @Override
990 protected boolean isCompatible(EventLoop loop) {
991 return loop instanceof EmbeddedEventLoop;
992 }
993
994 @Override
995 protected SocketAddress localAddress0() {
996 return isActive()? LOCAL_ADDRESS : null;
997 }
998
999 @Override
1000 protected SocketAddress remoteAddress0() {
1001 return isActive()? REMOTE_ADDRESS : null;
1002 }
1003
1004 @Override
1005 protected void doRegister() throws Exception {
1006 state = State.ACTIVE;
1007 }
1008
1009 @Override
1010 protected void doBind(SocketAddress localAddress) throws Exception {
1011
1012 }
1013
1014 @Override
1015 protected void doDisconnect() throws Exception {
1016 if (!metadata.hasDisconnect()) {
1017 doClose();
1018 }
1019 }
1020
1021 @Override
1022 protected void doClose() throws Exception {
1023 state = State.CLOSED;
1024 }
1025
1026 @Override
1027 protected void doBeginRead() throws Exception {
1028
1029 }
1030
1031 @Override
1032 protected AbstractUnsafe newUnsafe() {
1033 return new EmbeddedUnsafe();
1034 }
1035
1036 @Override
1037 public Unsafe unsafe() {
1038 return ((EmbeddedUnsafe) super.unsafe()).wrapped;
1039 }
1040
1041 @Override
1042 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
1043 for (;;) {
1044 Object msg = in.current();
1045 if (msg == null) {
1046 break;
1047 }
1048
1049 ReferenceCountUtil.retain(msg);
1050 handleOutboundMessage(msg);
1051 in.remove();
1052 }
1053 }
1054
1055
1056
1057
1058
1059
1060 protected void handleOutboundMessage(Object msg) {
1061 outboundMessages().add(msg);
1062 }
1063
1064
1065
1066
1067 protected void handleInboundMessage(Object msg) {
1068 inboundMessages().add(msg);
1069 }
1070
1071 public static Builder builder() {
1072 return new Builder();
1073 }
1074
1075 public static final class Builder {
1076 Channel parent;
1077 ChannelId channelId = EmbeddedChannelId.INSTANCE;
1078 boolean register = true;
1079 boolean hasDisconnect;
1080
1081 ChannelHandler[] handlers = EMPTY_HANDLERS;
1082 ChannelHandler handler;
1083 ChannelConfig config;
1084 Ticker ticker;
1085
1086 private Builder() {
1087 }
1088
1089
1090
1091
1092
1093
1094
1095 public Builder parent(Channel parent) {
1096 this.parent = parent;
1097 return this;
1098 }
1099
1100
1101
1102
1103
1104
1105
1106 public Builder channelId(ChannelId channelId) {
1107 this.channelId = Objects.requireNonNull(channelId, "channelId");
1108 return this;
1109 }
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119 public Builder register(boolean register) {
1120 this.register = register;
1121 return this;
1122 }
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132 public Builder hasDisconnect(boolean hasDisconnect) {
1133 this.hasDisconnect = hasDisconnect;
1134 return this;
1135 }
1136
1137
1138
1139
1140
1141
1142
1143 public Builder handlers(ChannelHandler... handlers) {
1144 this.handlers = Objects.requireNonNull(handlers, "handlers");
1145 this.handler = null;
1146 return this;
1147 }
1148
1149
1150
1151
1152
1153
1154
1155 public Builder handlers(ChannelHandler handler) {
1156 this.handler = Objects.requireNonNull(handler, "handler");
1157 this.handlers = null;
1158 return this;
1159 }
1160
1161
1162
1163
1164
1165
1166
1167 public Builder config(ChannelConfig config) {
1168 this.config = Objects.requireNonNull(config, "config");
1169 return this;
1170 }
1171
1172
1173
1174
1175
1176
1177
1178 public Builder ticker(Ticker ticker) {
1179 this.ticker = ticker;
1180 return this;
1181 }
1182
1183
1184
1185
1186
1187
1188
1189 public EmbeddedChannel build() {
1190 return new EmbeddedChannel(this);
1191 }
1192 }
1193
1194 private final class EmbeddedUnsafe extends AbstractUnsafe {
1195
1196
1197
1198 final Unsafe wrapped = new Unsafe() {
1199 @Override
1200 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
1201 return EmbeddedUnsafe.this.recvBufAllocHandle();
1202 }
1203
1204 @Override
1205 public SocketAddress localAddress() {
1206 return EmbeddedUnsafe.this.localAddress();
1207 }
1208
1209 @Override
1210 public SocketAddress remoteAddress() {
1211 return EmbeddedUnsafe.this.remoteAddress();
1212 }
1213
1214 @Override
1215 public void register(EventLoop eventLoop, ChannelPromise promise) {
1216 executingStackCnt++;
1217 try {
1218 EmbeddedUnsafe.this.register(eventLoop, promise);
1219 } finally {
1220 executingStackCnt--;
1221 maybeRunPendingTasks();
1222 }
1223 }
1224
1225 @Override
1226 public void bind(SocketAddress localAddress, ChannelPromise promise) {
1227 executingStackCnt++;
1228 try {
1229 EmbeddedUnsafe.this.bind(localAddress, promise);
1230 } finally {
1231 executingStackCnt--;
1232 maybeRunPendingTasks();
1233 }
1234 }
1235
1236 @Override
1237 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1238 executingStackCnt++;
1239 try {
1240 EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
1241 } finally {
1242 executingStackCnt--;
1243 maybeRunPendingTasks();
1244 }
1245 }
1246
1247 @Override
1248 public void disconnect(ChannelPromise promise) {
1249 executingStackCnt++;
1250 try {
1251 EmbeddedUnsafe.this.disconnect(promise);
1252 } finally {
1253 executingStackCnt--;
1254 maybeRunPendingTasks();
1255 }
1256 }
1257
1258 @Override
1259 public void close(ChannelPromise promise) {
1260 executingStackCnt++;
1261 try {
1262 EmbeddedUnsafe.this.close(promise);
1263 } finally {
1264 executingStackCnt--;
1265 maybeRunPendingTasks();
1266 }
1267 }
1268
1269 @Override
1270 public void closeForcibly() {
1271 executingStackCnt++;
1272 try {
1273 EmbeddedUnsafe.this.closeForcibly();
1274 } finally {
1275 executingStackCnt--;
1276 maybeRunPendingTasks();
1277 }
1278 }
1279
1280 @Override
1281 public void deregister(ChannelPromise promise) {
1282 executingStackCnt++;
1283 try {
1284 EmbeddedUnsafe.this.deregister(promise);
1285 } finally {
1286 executingStackCnt--;
1287 maybeRunPendingTasks();
1288 }
1289 }
1290
1291 @Override
1292 public void beginRead() {
1293 executingStackCnt++;
1294 try {
1295 EmbeddedUnsafe.this.beginRead();
1296 } finally {
1297 executingStackCnt--;
1298 maybeRunPendingTasks();
1299 }
1300 }
1301
1302 @Override
1303 public void write(Object msg, ChannelPromise promise) {
1304 executingStackCnt++;
1305 try {
1306 EmbeddedUnsafe.this.write(msg, promise);
1307 } finally {
1308 executingStackCnt--;
1309 maybeRunPendingTasks();
1310 }
1311 }
1312
1313 @Override
1314 public void flush() {
1315 executingStackCnt++;
1316 try {
1317 EmbeddedUnsafe.this.flush();
1318 } finally {
1319 executingStackCnt--;
1320 maybeRunPendingTasks();
1321 }
1322 }
1323
1324 @Override
1325 public ChannelPromise voidPromise() {
1326 return EmbeddedUnsafe.this.voidPromise();
1327 }
1328
1329 @Override
1330 public ChannelOutboundBuffer outboundBuffer() {
1331 return EmbeddedUnsafe.this.outboundBuffer();
1332 }
1333 };
1334
1335 @Override
1336 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1337 safeSetSuccess(promise);
1338 }
1339 }
1340
1341 private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
1342 EmbeddedChannelPipeline(EmbeddedChannel channel) {
1343 super(channel);
1344 }
1345
1346 @Override
1347 protected void onUnhandledInboundException(Throwable cause) {
1348 recordException(cause);
1349 }
1350
1351 @Override
1352 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1353 handleInboundMessage(msg);
1354 }
1355 }
1356 }