1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.embedded;
17
18 import io.netty.channel.AbstractChannel;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelId;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelMetadata;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.DefaultChannelConfig;
32 import io.netty.channel.DefaultChannelPipeline;
33 import io.netty.channel.EventLoop;
34 import io.netty.channel.RecvByteBufAllocator;
35 import io.netty.util.ReferenceCountUtil;
36 import io.netty.util.concurrent.Ticker;
37 import io.netty.util.internal.ObjectUtil;
38 import io.netty.util.internal.PlatformDependent;
39 import io.netty.util.internal.RecyclableArrayList;
40 import io.netty.util.internal.logging.InternalLogger;
41 import io.netty.util.internal.logging.InternalLoggerFactory;
42
43 import java.net.SocketAddress;
44 import java.nio.channels.ClosedChannelException;
45 import java.util.ArrayDeque;
46 import java.util.Objects;
47 import java.util.Queue;
48 import java.util.concurrent.TimeUnit;
49
50
51
52
53 public class EmbeddedChannel extends AbstractChannel {
54
55 private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
56 private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
57
58 private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
59 private enum State { OPEN, ACTIVE, CLOSED }
60
61 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
62
63 private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
64 private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
65
66 private final EmbeddedEventLoop loop;
67 private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
68 @Override
69 public void operationComplete(ChannelFuture future) throws Exception {
70 recordException(future);
71 }
72 };
73
74 private final ChannelMetadata metadata;
75 private final ChannelConfig config;
76
77 private Queue<Object> inboundMessages;
78 private Queue<Object> outboundMessages;
79 private Throwable lastException;
80 private State state;
81 private int executingStackCnt;
82 private boolean cancelRemainingScheduledTasks;
83
84
85
86
87 public EmbeddedChannel() {
88 this(builder());
89 }
90
91
92
93
94
95
96 public EmbeddedChannel(ChannelId channelId) {
97 this(builder().channelId(channelId));
98 }
99
100
101
102
103
104
105 public EmbeddedChannel(ChannelHandler... handlers) {
106 this(builder().handlers(handlers));
107 }
108
109
110
111
112
113
114
115
116 public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
117 this(builder().hasDisconnect(hasDisconnect).handlers(handlers));
118 }
119
120
121
122
123
124
125
126
127
128
129 public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
130 this(builder().register(register).hasDisconnect(hasDisconnect).handlers(handlers));
131 }
132
133
134
135
136
137
138
139
140 public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
141 this(builder().channelId(channelId).handlers(handlers));
142 }
143
144
145
146
147
148
149
150
151
152
153 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
154 this(builder().channelId(channelId).hasDisconnect(hasDisconnect).handlers(handlers));
155 }
156
157
158
159
160
161
162
163
164
165
166
167
168 public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
169 ChannelHandler... handlers) {
170 this(builder().channelId(channelId).register(register).hasDisconnect(hasDisconnect).handlers(handlers));
171 }
172
173
174
175
176
177
178
179
180
181
182
183
184
185 public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
186 final ChannelHandler... handlers) {
187 this(builder()
188 .parent(parent)
189 .channelId(channelId)
190 .register(register)
191 .hasDisconnect(hasDisconnect)
192 .handlers(handlers));
193 }
194
195
196
197
198
199
200
201
202
203
204
205 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
206 final ChannelHandler... handlers) {
207 this(builder().channelId(channelId).hasDisconnect(hasDisconnect).config(config).handlers(handlers));
208 }
209
210
211
212
213
214
215
216 protected EmbeddedChannel(Builder builder) {
217 super(builder.parent, builder.channelId);
218 loop = new EmbeddedEventLoop(builder.ticker == null ? new EmbeddedEventLoop.FreezableTicker() : builder.ticker);
219 metadata = metadata(builder.hasDisconnect);
220 config = builder.config == null ? new DefaultChannelConfig(this) : builder.config;
221 setup(builder.register, builder.handlers);
222 }
223
224 private static ChannelMetadata metadata(boolean hasDisconnect) {
225 return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
226 }
227
228 private void setup(boolean register, final ChannelHandler... handlers) {
229 ObjectUtil.checkNotNull(handlers, "handlers");
230 ChannelPipeline p = pipeline();
231 p.addLast(new ChannelInitializer<Channel>() {
232 @Override
233 protected void initChannel(Channel ch) throws Exception {
234 ChannelPipeline pipeline = ch.pipeline();
235 for (ChannelHandler h: handlers) {
236 if (h == null) {
237 break;
238 }
239 pipeline.addLast(h);
240 }
241 }
242 });
243 if (register) {
244 ChannelFuture future = loop.register(this);
245 assert future.isDone();
246 }
247 }
248
249
250
251
252 public void register() throws Exception {
253 ChannelFuture future = loop.register(this);
254 assert future.isDone();
255 Throwable cause = future.cause();
256 if (cause != null) {
257 PlatformDependent.throwException(cause);
258 }
259 }
260
261 @Override
262 protected final DefaultChannelPipeline newChannelPipeline() {
263 return new EmbeddedChannelPipeline(this);
264 }
265
266 @Override
267 public ChannelMetadata metadata() {
268 return metadata;
269 }
270
271 @Override
272 public ChannelConfig config() {
273 return config;
274 }
275
276 @Override
277 public boolean isOpen() {
278 return state != State.CLOSED;
279 }
280
281 @Override
282 public boolean isActive() {
283 return state == State.ACTIVE;
284 }
285
286
287
288
289 public Queue<Object> inboundMessages() {
290 if (inboundMessages == null) {
291 inboundMessages = new ArrayDeque<Object>();
292 }
293 return inboundMessages;
294 }
295
296
297
298
299 @Deprecated
300 public Queue<Object> lastInboundBuffer() {
301 return inboundMessages();
302 }
303
304
305
306
307 public Queue<Object> outboundMessages() {
308 if (outboundMessages == null) {
309 outboundMessages = new ArrayDeque<Object>();
310 }
311 return outboundMessages;
312 }
313
314
315
316
317 @Deprecated
318 public Queue<Object> lastOutboundBuffer() {
319 return outboundMessages();
320 }
321
322
323
324
325 @SuppressWarnings("unchecked")
326 public <T> T readInbound() {
327 T message = (T) poll(inboundMessages);
328 if (message != null) {
329 ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
330 }
331 return message;
332 }
333
334
335
336
337 @SuppressWarnings("unchecked")
338 public <T> T readOutbound() {
339 T message = (T) poll(outboundMessages);
340 if (message != null) {
341 ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
342 }
343 return message;
344 }
345
346
347
348
349
350
351
352
353 public boolean writeInbound(Object... msgs) {
354 ensureOpen();
355 if (msgs.length == 0) {
356 return isNotEmpty(inboundMessages);
357 }
358
359 executingStackCnt++;
360 try {
361 ChannelPipeline p = pipeline();
362 for (Object m : msgs) {
363 p.fireChannelRead(m);
364 }
365
366 flushInbound(false, voidPromise());
367 } finally {
368 executingStackCnt--;
369 maybeRunPendingTasks();
370 }
371 return isNotEmpty(inboundMessages);
372 }
373
374
375
376
377
378
379
380 public ChannelFuture writeOneInbound(Object msg) {
381 return writeOneInbound(msg, newPromise());
382 }
383
384
385
386
387
388
389
390 public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
391 executingStackCnt++;
392 try {
393 if (checkOpen(true)) {
394 pipeline().fireChannelRead(msg);
395 }
396 } finally {
397 executingStackCnt--;
398 maybeRunPendingTasks();
399 }
400 return checkException(promise);
401 }
402
403
404
405
406
407
408 public EmbeddedChannel flushInbound() {
409 flushInbound(true, voidPromise());
410 return this;
411 }
412
413 private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
414 executingStackCnt++;
415 try {
416 if (checkOpen(recordException)) {
417 pipeline().fireChannelReadComplete();
418 runPendingTasks();
419 }
420 } finally {
421 executingStackCnt--;
422 maybeRunPendingTasks();
423 }
424
425 return checkException(promise);
426 }
427
428
429
430
431
432
433
434 public boolean writeOutbound(Object... msgs) {
435 ensureOpen();
436 if (msgs.length == 0) {
437 return isNotEmpty(outboundMessages);
438 }
439
440 executingStackCnt++;
441 RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
442 try {
443 try {
444 for (Object m : msgs) {
445 if (m == null) {
446 break;
447 }
448 futures.add(write(m));
449 }
450
451 flushOutbound0();
452
453 int size = futures.size();
454 for (int i = 0; i < size; i++) {
455 ChannelFuture future = (ChannelFuture) futures.get(i);
456 if (future.isDone()) {
457 recordException(future);
458 } else {
459
460 future.addListener(recordExceptionListener);
461 }
462 }
463 } finally {
464 executingStackCnt--;
465 maybeRunPendingTasks();
466 }
467 checkException();
468 return isNotEmpty(outboundMessages);
469 } finally {
470 futures.recycle();
471 }
472 }
473
474
475
476
477
478
479
480 public ChannelFuture writeOneOutbound(Object msg) {
481 return writeOneOutbound(msg, newPromise());
482 }
483
484
485
486
487
488
489
490 public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
491 executingStackCnt++;
492 try {
493 if (checkOpen(true)) {
494 return write(msg, promise);
495 }
496 } finally {
497 executingStackCnt--;
498 maybeRunPendingTasks();
499 }
500
501 return checkException(promise);
502 }
503
504
505
506
507
508
509 public EmbeddedChannel flushOutbound() {
510 executingStackCnt++;
511 try {
512 if (checkOpen(true)) {
513 flushOutbound0();
514 }
515 } finally {
516 executingStackCnt--;
517 maybeRunPendingTasks();
518 }
519 checkException(voidPromise());
520 return this;
521 }
522
523 private void flushOutbound0() {
524
525
526 runPendingTasks();
527
528 flush();
529 }
530
531
532
533
534
535
536 public boolean finish() {
537 return finish(false);
538 }
539
540
541
542
543
544
545
546 public boolean finishAndReleaseAll() {
547 return finish(true);
548 }
549
550
551
552
553
554
555
556 private boolean finish(boolean releaseAll) {
557 executingStackCnt++;
558 try {
559 close();
560 } finally {
561 executingStackCnt--;
562 maybeRunPendingTasks();
563 }
564 try {
565 checkException();
566 return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
567 } finally {
568 if (releaseAll) {
569 releaseAll(inboundMessages);
570 releaseAll(outboundMessages);
571 }
572 }
573 }
574
575
576
577
578
579 public boolean releaseInbound() {
580 return releaseAll(inboundMessages);
581 }
582
583
584
585
586
587 public boolean releaseOutbound() {
588 return releaseAll(outboundMessages);
589 }
590
591 private static boolean releaseAll(Queue<Object> queue) {
592 if (isNotEmpty(queue)) {
593 for (;;) {
594 Object msg = queue.poll();
595 if (msg == null) {
596 break;
597 }
598 ReferenceCountUtil.release(msg);
599 }
600 return true;
601 }
602 return false;
603 }
604
605 @Override
606 public final ChannelFuture close() {
607 return close(newPromise());
608 }
609
610 @Override
611 public final ChannelFuture disconnect() {
612 return disconnect(newPromise());
613 }
614
615 @Override
616 public final ChannelFuture close(ChannelPromise promise) {
617
618
619 executingStackCnt++;
620 ChannelFuture future;
621 try {
622 runPendingTasks();
623 future = super.close(promise);
624
625 cancelRemainingScheduledTasks = true;
626 } finally {
627 executingStackCnt--;
628 maybeRunPendingTasks();
629 }
630 return future;
631 }
632
633 @Override
634 public final ChannelFuture disconnect(ChannelPromise promise) {
635 executingStackCnt++;
636 ChannelFuture future;
637 try {
638 future = super.disconnect(promise);
639
640 if (!metadata.hasDisconnect()) {
641 cancelRemainingScheduledTasks = true;
642 }
643 } finally {
644 executingStackCnt--;
645 maybeRunPendingTasks();
646 }
647 return future;
648 }
649
650 @Override
651 public ChannelFuture bind(SocketAddress localAddress) {
652 executingStackCnt++;
653 try {
654 return super.bind(localAddress);
655 } finally {
656 executingStackCnt--;
657 maybeRunPendingTasks();
658 }
659 }
660
661 @Override
662 public ChannelFuture connect(SocketAddress remoteAddress) {
663 executingStackCnt++;
664 try {
665 return super.connect(remoteAddress);
666 } finally {
667 executingStackCnt--;
668 maybeRunPendingTasks();
669 }
670 }
671
672 @Override
673 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
674 executingStackCnt++;
675 try {
676 return super.connect(remoteAddress, localAddress);
677 } finally {
678 executingStackCnt--;
679 maybeRunPendingTasks();
680 }
681 }
682
683 @Override
684 public ChannelFuture deregister() {
685 executingStackCnt++;
686 try {
687 return super.deregister();
688 } finally {
689 executingStackCnt--;
690 maybeRunPendingTasks();
691 }
692 }
693
694 @Override
695 public Channel flush() {
696 executingStackCnt++;
697 try {
698 return super.flush();
699 } finally {
700 executingStackCnt--;
701 maybeRunPendingTasks();
702 }
703 }
704
705 @Override
706 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
707 executingStackCnt++;
708 try {
709 return super.bind(localAddress, promise);
710 } finally {
711 executingStackCnt--;
712 maybeRunPendingTasks();
713 }
714 }
715
716 @Override
717 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
718 executingStackCnt++;
719 try {
720 return super.connect(remoteAddress, promise);
721 } finally {
722 executingStackCnt--;
723 maybeRunPendingTasks();
724 }
725 }
726
727 @Override
728 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
729 executingStackCnt++;
730 try {
731 return super.connect(remoteAddress, localAddress, promise);
732 } finally {
733 executingStackCnt--;
734 maybeRunPendingTasks();
735 }
736 }
737
738 @Override
739 public ChannelFuture deregister(ChannelPromise promise) {
740 executingStackCnt++;
741 try {
742 return super.deregister(promise);
743 } finally {
744 executingStackCnt--;
745 maybeRunPendingTasks();
746 }
747 }
748
749 @Override
750 public Channel read() {
751 executingStackCnt++;
752 try {
753 return super.read();
754 } finally {
755 executingStackCnt--;
756 maybeRunPendingTasks();
757 }
758 }
759
760 @Override
761 public ChannelFuture write(Object msg) {
762 executingStackCnt++;
763 try {
764 return super.write(msg);
765 } finally {
766 executingStackCnt--;
767 maybeRunPendingTasks();
768 }
769 }
770
771 @Override
772 public ChannelFuture write(Object msg, ChannelPromise promise) {
773 executingStackCnt++;
774 try {
775 return super.write(msg, promise);
776 } finally {
777 executingStackCnt--;
778 maybeRunPendingTasks();
779 }
780 }
781
782 @Override
783 public ChannelFuture writeAndFlush(Object msg) {
784 executingStackCnt++;
785 try {
786 return super.writeAndFlush(msg);
787 } finally {
788 executingStackCnt--;
789 maybeRunPendingTasks();
790 }
791 }
792
793 @Override
794 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
795 executingStackCnt++;
796 try {
797 return super.writeAndFlush(msg, promise);
798 } finally {
799 executingStackCnt--;
800 maybeRunPendingTasks();
801 }
802 }
803
804 private static boolean isNotEmpty(Queue<Object> queue) {
805 return queue != null && !queue.isEmpty();
806 }
807
808 private static Object poll(Queue<Object> queue) {
809 return queue != null ? queue.poll() : null;
810 }
811
812 private void maybeRunPendingTasks() {
813 if (executingStackCnt == 0) {
814 runPendingTasks();
815
816 if (cancelRemainingScheduledTasks) {
817
818 embeddedEventLoop().cancelScheduledTasks();
819 }
820 }
821 }
822
823
824
825
826
827 public void runPendingTasks() {
828 try {
829 embeddedEventLoop().runTasks();
830 } catch (Exception e) {
831 recordException(e);
832 }
833
834 try {
835 embeddedEventLoop().runScheduledTasks();
836 } catch (Exception e) {
837 recordException(e);
838 }
839 }
840
841
842
843
844
845
846
847
848 public boolean hasPendingTasks() {
849 return embeddedEventLoop().hasPendingNormalTasks() ||
850 embeddedEventLoop().nextScheduledTask() == 0;
851 }
852
853
854
855
856
857
858 public long runScheduledPendingTasks() {
859 try {
860 return embeddedEventLoop().runScheduledTasks();
861 } catch (Exception e) {
862 recordException(e);
863 return embeddedEventLoop().nextScheduledTask();
864 }
865 }
866
867 private void recordException(ChannelFuture future) {
868 if (!future.isSuccess()) {
869 recordException(future.cause());
870 }
871 }
872
873 private void recordException(Throwable cause) {
874 if (lastException == null) {
875 lastException = cause;
876 } else {
877 logger.warn(
878 "More than one exception was raised. " +
879 "Will report only the first one and log others.", cause);
880 }
881 }
882
883 private EmbeddedEventLoop.FreezableTicker freezableTicker() {
884 Ticker ticker = eventLoop().ticker();
885 if (ticker instanceof EmbeddedEventLoop.FreezableTicker) {
886 return (EmbeddedEventLoop.FreezableTicker) ticker;
887 } else {
888 throw new IllegalStateException(
889 "EmbeddedChannel constructed with custom ticker, time manipulation methods are unavailable.");
890 }
891 }
892
893
894
895
896
897 public void advanceTimeBy(long duration, TimeUnit unit) {
898 freezableTicker().advance(duration, unit);
899 }
900
901
902
903
904
905
906 public void freezeTime() {
907 freezableTicker().freezeTime();
908 }
909
910
911
912
913
914
915
916
917 public void unfreezeTime() {
918 freezableTicker().unfreezeTime();
919 }
920
921
922
923
924 private ChannelFuture checkException(ChannelPromise promise) {
925 Throwable t = lastException;
926 if (t != null) {
927 lastException = null;
928
929 if (promise.isVoid()) {
930 PlatformDependent.throwException(t);
931 }
932
933 return promise.setFailure(t);
934 }
935
936 return promise.setSuccess();
937 }
938
939
940
941
942 public void checkException() {
943 checkException(voidPromise());
944 }
945
946
947
948
949
950 private boolean checkOpen(boolean recordException) {
951 if (!isOpen()) {
952 if (recordException) {
953 recordException(new ClosedChannelException());
954 }
955 return false;
956 }
957
958 return true;
959 }
960
961 private EmbeddedEventLoop embeddedEventLoop() {
962 if (isRegistered()) {
963 return (EmbeddedEventLoop) super.eventLoop();
964 }
965
966 return loop;
967 }
968
969
970
971
972 protected final void ensureOpen() {
973 if (!checkOpen(true)) {
974 checkException();
975 }
976 }
977
978 @Override
979 protected boolean isCompatible(EventLoop loop) {
980 return loop instanceof EmbeddedEventLoop;
981 }
982
983 @Override
984 protected SocketAddress localAddress0() {
985 return isActive()? LOCAL_ADDRESS : null;
986 }
987
988 @Override
989 protected SocketAddress remoteAddress0() {
990 return isActive()? REMOTE_ADDRESS : null;
991 }
992
993 @Override
994 protected void doRegister() throws Exception {
995 state = State.ACTIVE;
996 }
997
998 @Override
999 protected void doBind(SocketAddress localAddress) throws Exception {
1000
1001 }
1002
1003 @Override
1004 protected void doDisconnect() throws Exception {
1005 if (!metadata.hasDisconnect()) {
1006 doClose();
1007 }
1008 }
1009
1010 @Override
1011 protected void doClose() throws Exception {
1012 state = State.CLOSED;
1013 }
1014
1015 @Override
1016 protected void doBeginRead() throws Exception {
1017
1018 }
1019
1020 @Override
1021 protected AbstractUnsafe newUnsafe() {
1022 return new EmbeddedUnsafe();
1023 }
1024
1025 @Override
1026 public Unsafe unsafe() {
1027 return ((EmbeddedUnsafe) super.unsafe()).wrapped;
1028 }
1029
1030 @Override
1031 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
1032 for (;;) {
1033 Object msg = in.current();
1034 if (msg == null) {
1035 break;
1036 }
1037
1038 ReferenceCountUtil.retain(msg);
1039 handleOutboundMessage(msg);
1040 in.remove();
1041 }
1042 }
1043
1044
1045
1046
1047
1048
1049 protected void handleOutboundMessage(Object msg) {
1050 outboundMessages().add(msg);
1051 }
1052
1053
1054
1055
1056 protected void handleInboundMessage(Object msg) {
1057 inboundMessages().add(msg);
1058 }
1059
1060 public static Builder builder() {
1061 return new Builder();
1062 }
1063
1064 public static final class Builder {
1065 Channel parent;
1066 ChannelId channelId = EmbeddedChannelId.INSTANCE;
1067 boolean register = true;
1068 boolean hasDisconnect;
1069 ChannelHandler[] handlers = EMPTY_HANDLERS;
1070 ChannelConfig config;
1071 Ticker ticker;
1072
1073 private Builder() {
1074 }
1075
1076
1077
1078
1079
1080
1081
1082 public Builder parent(Channel parent) {
1083 this.parent = parent;
1084 return this;
1085 }
1086
1087
1088
1089
1090
1091
1092
1093 public Builder channelId(ChannelId channelId) {
1094 this.channelId = Objects.requireNonNull(channelId, "channelId");
1095 return this;
1096 }
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106 public Builder register(boolean register) {
1107 this.register = register;
1108 return this;
1109 }
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119 public Builder hasDisconnect(boolean hasDisconnect) {
1120 this.hasDisconnect = hasDisconnect;
1121 return this;
1122 }
1123
1124
1125
1126
1127
1128
1129
1130 public Builder handlers(ChannelHandler... handlers) {
1131 this.handlers = Objects.requireNonNull(handlers, "handlers");
1132 return this;
1133 }
1134
1135
1136
1137
1138
1139
1140
1141 public Builder config(ChannelConfig config) {
1142 this.config = Objects.requireNonNull(config, "config");
1143 return this;
1144 }
1145
1146
1147
1148
1149
1150
1151
1152 public Builder ticker(Ticker ticker) {
1153 this.ticker = ticker;
1154 return this;
1155 }
1156
1157
1158
1159
1160
1161
1162
1163 public EmbeddedChannel build() {
1164 return new EmbeddedChannel(this);
1165 }
1166 }
1167
1168 private final class EmbeddedUnsafe extends AbstractUnsafe {
1169
1170
1171
1172 final Unsafe wrapped = new Unsafe() {
1173 @Override
1174 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
1175 return EmbeddedUnsafe.this.recvBufAllocHandle();
1176 }
1177
1178 @Override
1179 public SocketAddress localAddress() {
1180 return EmbeddedUnsafe.this.localAddress();
1181 }
1182
1183 @Override
1184 public SocketAddress remoteAddress() {
1185 return EmbeddedUnsafe.this.remoteAddress();
1186 }
1187
1188 @Override
1189 public void register(EventLoop eventLoop, ChannelPromise promise) {
1190 executingStackCnt++;
1191 try {
1192 EmbeddedUnsafe.this.register(eventLoop, promise);
1193 } finally {
1194 executingStackCnt--;
1195 maybeRunPendingTasks();
1196 }
1197 }
1198
1199 @Override
1200 public void bind(SocketAddress localAddress, ChannelPromise promise) {
1201 executingStackCnt++;
1202 try {
1203 EmbeddedUnsafe.this.bind(localAddress, promise);
1204 } finally {
1205 executingStackCnt--;
1206 maybeRunPendingTasks();
1207 }
1208 }
1209
1210 @Override
1211 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1212 executingStackCnt++;
1213 try {
1214 EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
1215 } finally {
1216 executingStackCnt--;
1217 maybeRunPendingTasks();
1218 }
1219 }
1220
1221 @Override
1222 public void disconnect(ChannelPromise promise) {
1223 executingStackCnt++;
1224 try {
1225 EmbeddedUnsafe.this.disconnect(promise);
1226 } finally {
1227 executingStackCnt--;
1228 maybeRunPendingTasks();
1229 }
1230 }
1231
1232 @Override
1233 public void close(ChannelPromise promise) {
1234 executingStackCnt++;
1235 try {
1236 EmbeddedUnsafe.this.close(promise);
1237 } finally {
1238 executingStackCnt--;
1239 maybeRunPendingTasks();
1240 }
1241 }
1242
1243 @Override
1244 public void closeForcibly() {
1245 executingStackCnt++;
1246 try {
1247 EmbeddedUnsafe.this.closeForcibly();
1248 } finally {
1249 executingStackCnt--;
1250 maybeRunPendingTasks();
1251 }
1252 }
1253
1254 @Override
1255 public void deregister(ChannelPromise promise) {
1256 executingStackCnt++;
1257 try {
1258 EmbeddedUnsafe.this.deregister(promise);
1259 } finally {
1260 executingStackCnt--;
1261 maybeRunPendingTasks();
1262 }
1263 }
1264
1265 @Override
1266 public void beginRead() {
1267 executingStackCnt++;
1268 try {
1269 EmbeddedUnsafe.this.beginRead();
1270 } finally {
1271 executingStackCnt--;
1272 maybeRunPendingTasks();
1273 }
1274 }
1275
1276 @Override
1277 public void write(Object msg, ChannelPromise promise) {
1278 executingStackCnt++;
1279 try {
1280 EmbeddedUnsafe.this.write(msg, promise);
1281 } finally {
1282 executingStackCnt--;
1283 maybeRunPendingTasks();
1284 }
1285 }
1286
1287 @Override
1288 public void flush() {
1289 executingStackCnt++;
1290 try {
1291 EmbeddedUnsafe.this.flush();
1292 } finally {
1293 executingStackCnt--;
1294 maybeRunPendingTasks();
1295 }
1296 }
1297
1298 @Override
1299 public ChannelPromise voidPromise() {
1300 return EmbeddedUnsafe.this.voidPromise();
1301 }
1302
1303 @Override
1304 public ChannelOutboundBuffer outboundBuffer() {
1305 return EmbeddedUnsafe.this.outboundBuffer();
1306 }
1307 };
1308
1309 @Override
1310 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1311 safeSetSuccess(promise);
1312 }
1313 }
1314
1315 private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
1316 EmbeddedChannelPipeline(EmbeddedChannel channel) {
1317 super(channel);
1318 }
1319
1320 @Override
1321 protected void onUnhandledInboundException(Throwable cause) {
1322 recordException(cause);
1323 }
1324
1325 @Override
1326 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1327 handleInboundMessage(msg);
1328 }
1329 }
1330 }