1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.embedded;
17
18 import io.netty.channel.AbstractChannel;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelConfig;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelFutureListener;
23 import io.netty.channel.ChannelHandler;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelId;
26 import io.netty.channel.ChannelInitializer;
27 import io.netty.channel.ChannelMetadata;
28 import io.netty.channel.ChannelOutboundBuffer;
29 import io.netty.channel.ChannelPipeline;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.DefaultChannelConfig;
32 import io.netty.channel.DefaultChannelPipeline;
33 import io.netty.channel.EventLoop;
34 import io.netty.channel.RecvByteBufAllocator;
35 import io.netty.util.ReferenceCountUtil;
36 import io.netty.util.concurrent.Ticker;
37 import io.netty.util.internal.PlatformDependent;
38 import io.netty.util.internal.RecyclableArrayList;
39 import io.netty.util.internal.logging.InternalLogger;
40 import io.netty.util.internal.logging.InternalLoggerFactory;
41
42 import java.net.SocketAddress;
43 import java.nio.channels.ClosedChannelException;
44 import java.util.ArrayDeque;
45 import java.util.Objects;
46 import java.util.Queue;
47 import java.util.concurrent.TimeUnit;
48
49
50
51
52 public class EmbeddedChannel extends AbstractChannel {
53
54 private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
55 private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
56
57 private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
58 private enum State { OPEN, ACTIVE, CLOSED }
59
60 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
61
62 private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
63 private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
64
65 private final EmbeddedEventLoop loop;
66 private final ChannelFutureListener recordExceptionListener = this::recordException;
67
68 private final ChannelMetadata metadata;
69 private final ChannelConfig config;
70
71 private Queue<Object> inboundMessages;
72 private Queue<Object> outboundMessages;
73 private Throwable lastException;
74 private State state;
75 private int executingStackCnt;
76 private boolean cancelRemainingScheduledTasks;
77
78
79
80
81 public EmbeddedChannel() {
82 this(builder());
83 }
84
85
86
87
88
89
90 public EmbeddedChannel(ChannelId channelId) {
91 this(builder().channelId(channelId));
92 }
93
94
95
96
97
98
99 public EmbeddedChannel(ChannelHandler... handlers) {
100 this(builder().handlers(handlers));
101 }
102
103
104
105
106
107
108
109
110 public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
111 this(builder().hasDisconnect(hasDisconnect).handlers(handlers));
112 }
113
114
115
116
117
118
119
120
121
122
123 public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
124 this(builder().register(register).hasDisconnect(hasDisconnect).handlers(handlers));
125 }
126
127
128
129
130
131
132
133
134 public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
135 this(builder().channelId(channelId).handlers(handlers));
136 }
137
138
139
140
141
142
143
144
145
146
147 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
148 this(builder().channelId(channelId).hasDisconnect(hasDisconnect).handlers(handlers));
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162 public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
163 ChannelHandler... handlers) {
164 this(builder().channelId(channelId).register(register).hasDisconnect(hasDisconnect).handlers(handlers));
165 }
166
167
168
169
170
171
172
173
174
175
176
177
178
179 public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
180 final ChannelHandler... handlers) {
181 this(builder()
182 .parent(parent)
183 .channelId(channelId)
184 .register(register)
185 .hasDisconnect(hasDisconnect)
186 .handlers(handlers));
187 }
188
189
190
191
192
193
194
195
196
197
198
199 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
200 final ChannelHandler... handlers) {
201 this(builder().channelId(channelId).hasDisconnect(hasDisconnect).config(config).handlers(handlers));
202 }
203
204
205
206
207
208
209
210 protected EmbeddedChannel(Builder builder) {
211 super(builder.parent, builder.channelId);
212 loop = new EmbeddedEventLoop(builder.ticker == null ? new EmbeddedEventLoop.FreezableTicker() : builder.ticker);
213 metadata = metadata(builder.hasDisconnect);
214 config = builder.config == null ? new DefaultChannelConfig(this) : builder.config;
215 if (builder.handler == null) {
216 setup(builder.register, builder.handlers);
217 } else {
218 setup(builder.register, builder.handler);
219 }
220 }
221
222 private static ChannelMetadata metadata(boolean hasDisconnect) {
223 return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
224 }
225
226 private void setup(boolean register, final ChannelHandler... handlers) {
227 ChannelPipeline p = pipeline();
228 p.addLast(new ChannelInitializer<Channel>() {
229 @Override
230 protected void initChannel(Channel ch) {
231 ChannelPipeline pipeline = ch.pipeline();
232 for (ChannelHandler h: handlers) {
233 if (h == null) {
234 break;
235 }
236 pipeline.addLast(h);
237 }
238 }
239 });
240 if (register) {
241 ChannelFuture future = loop.register(this);
242 assert future.isDone();
243 }
244 }
245
246 private void setup(boolean register, final ChannelHandler handler) {
247 ChannelPipeline p = pipeline();
248 p.addLast(handler);
249 if (register) {
250 ChannelFuture future = loop.register(this);
251 assert future.isDone();
252 }
253 }
254
255
256
257
258 public void register() throws Exception {
259 ChannelFuture future = loop.register(this);
260 assert future.isDone();
261 Throwable cause = future.cause();
262 if (cause != null) {
263 PlatformDependent.throwException(cause);
264 }
265 }
266
267 @Override
268 protected final DefaultChannelPipeline newChannelPipeline() {
269 return new EmbeddedChannelPipeline(this);
270 }
271
272 @Override
273 public ChannelMetadata metadata() {
274 return metadata;
275 }
276
277 @Override
278 public ChannelConfig config() {
279 return config;
280 }
281
282 @Override
283 public boolean isOpen() {
284 return state != State.CLOSED;
285 }
286
287 @Override
288 public boolean isActive() {
289 return state == State.ACTIVE;
290 }
291
292
293
294
295 public Queue<Object> inboundMessages() {
296 if (inboundMessages == null) {
297 inboundMessages = new ArrayDeque<Object>();
298 }
299 return inboundMessages;
300 }
301
302
303
304
305 @Deprecated
306 public Queue<Object> lastInboundBuffer() {
307 return inboundMessages();
308 }
309
310
311
312
313 public Queue<Object> outboundMessages() {
314 if (outboundMessages == null) {
315 outboundMessages = new ArrayDeque<Object>();
316 }
317 return outboundMessages;
318 }
319
320
321
322
323 @Deprecated
324 public Queue<Object> lastOutboundBuffer() {
325 return outboundMessages();
326 }
327
328
329
330
331 @SuppressWarnings("unchecked")
332 public <T> T readInbound() {
333 T message = (T) poll(inboundMessages);
334 if (message != null) {
335 ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
336 }
337 return message;
338 }
339
340
341
342
343 @SuppressWarnings("unchecked")
344 public <T> T readOutbound() {
345 T message = (T) poll(outboundMessages);
346 if (message != null) {
347 ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
348 }
349 return message;
350 }
351
352
353
354
355
356
357
358
359 public boolean writeInbound(Object... msgs) {
360 ensureOpen();
361 if (msgs.length == 0) {
362 return isNotEmpty(inboundMessages);
363 }
364
365 executingStackCnt++;
366 try {
367 ChannelPipeline p = pipeline();
368 for (Object m : msgs) {
369 p.fireChannelRead(m);
370 }
371
372 flushInbound(false, voidPromise());
373 } finally {
374 executingStackCnt--;
375 maybeRunPendingTasks();
376 }
377 return isNotEmpty(inboundMessages);
378 }
379
380
381
382
383
384
385
386 public ChannelFuture writeOneInbound(Object msg) {
387 return writeOneInbound(msg, newPromise());
388 }
389
390
391
392
393
394
395
396 public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
397 executingStackCnt++;
398 try {
399 if (checkOpen(true)) {
400 pipeline().fireChannelRead(msg);
401 }
402 } finally {
403 executingStackCnt--;
404 maybeRunPendingTasks();
405 }
406 return checkException(promise);
407 }
408
409
410
411
412
413
414 public EmbeddedChannel flushInbound() {
415 flushInbound(true, voidPromise());
416 return this;
417 }
418
419 private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
420 executingStackCnt++;
421 try {
422 if (checkOpen(recordException)) {
423 pipeline().fireChannelReadComplete();
424 runPendingTasks();
425 }
426 } finally {
427 executingStackCnt--;
428 maybeRunPendingTasks();
429 }
430
431 return checkException(promise);
432 }
433
434
435
436
437
438
439
440 public boolean writeOutbound(Object... msgs) {
441 ensureOpen();
442 if (msgs.length == 0) {
443 return isNotEmpty(outboundMessages);
444 }
445
446 executingStackCnt++;
447 RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
448 try {
449 try {
450 for (Object m : msgs) {
451 if (m == null) {
452 break;
453 }
454 futures.add(write(m));
455 }
456
457 flushOutbound0();
458
459 int size = futures.size();
460 for (int i = 0; i < size; i++) {
461 ChannelFuture future = (ChannelFuture) futures.get(i);
462 if (future.isDone()) {
463 recordException(future);
464 } else {
465
466 future.addListener(recordExceptionListener);
467 }
468 }
469 } finally {
470 executingStackCnt--;
471 maybeRunPendingTasks();
472 }
473 checkException();
474 return isNotEmpty(outboundMessages);
475 } finally {
476 futures.recycle();
477 }
478 }
479
480
481
482
483
484
485
486 public ChannelFuture writeOneOutbound(Object msg) {
487 return writeOneOutbound(msg, newPromise());
488 }
489
490
491
492
493
494
495
496 public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
497 executingStackCnt++;
498 try {
499 if (checkOpen(true)) {
500 return write(msg, promise);
501 }
502 } finally {
503 executingStackCnt--;
504 maybeRunPendingTasks();
505 }
506
507 return checkException(promise);
508 }
509
510
511
512
513
514
515 public EmbeddedChannel flushOutbound() {
516 executingStackCnt++;
517 try {
518 if (checkOpen(true)) {
519 flushOutbound0();
520 }
521 } finally {
522 executingStackCnt--;
523 maybeRunPendingTasks();
524 }
525 checkException(voidPromise());
526 return this;
527 }
528
529 private void flushOutbound0() {
530
531
532 runPendingTasks();
533
534 flush();
535 }
536
537
538
539
540
541
542 public boolean finish() {
543 return finish(false);
544 }
545
546
547
548
549
550
551
552 public boolean finishAndReleaseAll() {
553 return finish(true);
554 }
555
556
557
558
559
560
561
562 private boolean finish(boolean releaseAll) {
563 executingStackCnt++;
564 try {
565 close();
566 } finally {
567 executingStackCnt--;
568 maybeRunPendingTasks();
569 }
570 try {
571 checkException();
572 return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
573 } finally {
574 if (releaseAll) {
575 releaseAll(inboundMessages);
576 releaseAll(outboundMessages);
577 }
578 }
579 }
580
581
582
583
584
585 public boolean releaseInbound() {
586 return releaseAll(inboundMessages);
587 }
588
589
590
591
592
593 public boolean releaseOutbound() {
594 return releaseAll(outboundMessages);
595 }
596
597 private static boolean releaseAll(Queue<Object> queue) {
598 if (isNotEmpty(queue)) {
599 for (;;) {
600 Object msg = queue.poll();
601 if (msg == null) {
602 break;
603 }
604 ReferenceCountUtil.release(msg);
605 }
606 return true;
607 }
608 return false;
609 }
610
611 @Override
612 public final ChannelFuture close() {
613 return close(newPromise());
614 }
615
616 @Override
617 public final ChannelFuture disconnect() {
618 return disconnect(newPromise());
619 }
620
621 @Override
622 public final ChannelFuture close(ChannelPromise promise) {
623
624
625 executingStackCnt++;
626 ChannelFuture future;
627 try {
628 runPendingTasks();
629 future = super.close(promise);
630
631 cancelRemainingScheduledTasks = true;
632 } finally {
633 executingStackCnt--;
634 maybeRunPendingTasks();
635 }
636 return future;
637 }
638
639 @Override
640 public final ChannelFuture disconnect(ChannelPromise promise) {
641 executingStackCnt++;
642 ChannelFuture future;
643 try {
644 future = super.disconnect(promise);
645
646 if (!metadata.hasDisconnect()) {
647 cancelRemainingScheduledTasks = true;
648 }
649 } finally {
650 executingStackCnt--;
651 maybeRunPendingTasks();
652 }
653 return future;
654 }
655
656 @Override
657 public ChannelFuture bind(SocketAddress localAddress) {
658 executingStackCnt++;
659 try {
660 return super.bind(localAddress);
661 } finally {
662 executingStackCnt--;
663 maybeRunPendingTasks();
664 }
665 }
666
667 @Override
668 public ChannelFuture connect(SocketAddress remoteAddress) {
669 executingStackCnt++;
670 try {
671 return super.connect(remoteAddress);
672 } finally {
673 executingStackCnt--;
674 maybeRunPendingTasks();
675 }
676 }
677
678 @Override
679 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
680 executingStackCnt++;
681 try {
682 return super.connect(remoteAddress, localAddress);
683 } finally {
684 executingStackCnt--;
685 maybeRunPendingTasks();
686 }
687 }
688
689 @Override
690 public ChannelFuture deregister() {
691 executingStackCnt++;
692 try {
693 return super.deregister();
694 } finally {
695 executingStackCnt--;
696 maybeRunPendingTasks();
697 }
698 }
699
700 @Override
701 public Channel flush() {
702 executingStackCnt++;
703 try {
704 return super.flush();
705 } finally {
706 executingStackCnt--;
707 maybeRunPendingTasks();
708 }
709 }
710
711 @Override
712 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
713 executingStackCnt++;
714 try {
715 return super.bind(localAddress, promise);
716 } finally {
717 executingStackCnt--;
718 maybeRunPendingTasks();
719 }
720 }
721
722 @Override
723 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
724 executingStackCnt++;
725 try {
726 return super.connect(remoteAddress, promise);
727 } finally {
728 executingStackCnt--;
729 maybeRunPendingTasks();
730 }
731 }
732
733 @Override
734 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
735 executingStackCnt++;
736 try {
737 return super.connect(remoteAddress, localAddress, promise);
738 } finally {
739 executingStackCnt--;
740 maybeRunPendingTasks();
741 }
742 }
743
744 @Override
745 public ChannelFuture deregister(ChannelPromise promise) {
746 executingStackCnt++;
747 try {
748 return super.deregister(promise);
749 } finally {
750 executingStackCnt--;
751 maybeRunPendingTasks();
752 }
753 }
754
755 @Override
756 public Channel read() {
757 executingStackCnt++;
758 try {
759 return super.read();
760 } finally {
761 executingStackCnt--;
762 maybeRunPendingTasks();
763 }
764 }
765
766 @Override
767 public ChannelFuture write(Object msg) {
768 executingStackCnt++;
769 try {
770 return super.write(msg);
771 } finally {
772 executingStackCnt--;
773 maybeRunPendingTasks();
774 }
775 }
776
777 @Override
778 public ChannelFuture write(Object msg, ChannelPromise promise) {
779 executingStackCnt++;
780 try {
781 return super.write(msg, promise);
782 } finally {
783 executingStackCnt--;
784 maybeRunPendingTasks();
785 }
786 }
787
788 @Override
789 public ChannelFuture writeAndFlush(Object msg) {
790 executingStackCnt++;
791 try {
792 return super.writeAndFlush(msg);
793 } finally {
794 executingStackCnt--;
795 maybeRunPendingTasks();
796 }
797 }
798
799 @Override
800 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
801 executingStackCnt++;
802 try {
803 return super.writeAndFlush(msg, promise);
804 } finally {
805 executingStackCnt--;
806 maybeRunPendingTasks();
807 }
808 }
809
810 private static boolean isNotEmpty(Queue<Object> queue) {
811 return queue != null && !queue.isEmpty();
812 }
813
814 private static Object poll(Queue<Object> queue) {
815 return queue != null ? queue.poll() : null;
816 }
817
818 private void maybeRunPendingTasks() {
819 if (executingStackCnt == 0) {
820 runPendingTasks();
821
822 if (cancelRemainingScheduledTasks) {
823
824 embeddedEventLoop().cancelScheduledTasks();
825 }
826 }
827 }
828
829
830
831
832
833 public void runPendingTasks() {
834 try {
835 embeddedEventLoop().runTasks();
836 } catch (Exception e) {
837 recordException(e);
838 }
839
840 try {
841 embeddedEventLoop().runScheduledTasks();
842 } catch (Exception e) {
843 recordException(e);
844 }
845 }
846
847
848
849
850
851
852
853
854 public boolean hasPendingTasks() {
855 return embeddedEventLoop().hasPendingNormalTasks() ||
856 embeddedEventLoop().nextScheduledTask() == 0;
857 }
858
859
860
861
862
863
864 public long runScheduledPendingTasks() {
865 try {
866 return embeddedEventLoop().runScheduledTasks();
867 } catch (Exception e) {
868 recordException(e);
869 return embeddedEventLoop().nextScheduledTask();
870 }
871 }
872
873 private void recordException(ChannelFuture future) {
874 if (!future.isSuccess()) {
875 recordException(future.cause());
876 }
877 }
878
879 private void recordException(Throwable cause) {
880 if (lastException == null) {
881 lastException = cause;
882 } else {
883 logger.warn(
884 "More than one exception was raised. " +
885 "Will report only the first one and log others.", cause);
886 }
887 }
888
889 private EmbeddedEventLoop.FreezableTicker freezableTicker() {
890 Ticker ticker = eventLoop().ticker();
891 if (ticker instanceof EmbeddedEventLoop.FreezableTicker) {
892 return (EmbeddedEventLoop.FreezableTicker) ticker;
893 } else {
894 throw new IllegalStateException(
895 "EmbeddedChannel constructed with custom ticker, time manipulation methods are unavailable.");
896 }
897 }
898
899
900
901
902
903 public void advanceTimeBy(long duration, TimeUnit unit) {
904 freezableTicker().advance(duration, unit);
905 }
906
907
908
909
910
911
912 public void freezeTime() {
913 freezableTicker().freezeTime();
914 }
915
916
917
918
919
920
921
922
923 public void unfreezeTime() {
924 freezableTicker().unfreezeTime();
925 }
926
927
928
929
930 private ChannelFuture checkException(ChannelPromise promise) {
931 Throwable t = lastException;
932 if (t != null) {
933 lastException = null;
934
935 if (promise.isVoid()) {
936 PlatformDependent.throwException(t);
937 }
938
939 return promise.setFailure(t);
940 }
941
942 return promise.setSuccess();
943 }
944
945
946
947
948 public void checkException() {
949 checkException(voidPromise());
950 }
951
952
953
954
955
956 private boolean checkOpen(boolean recordException) {
957 if (!isOpen()) {
958 if (recordException) {
959 recordException(new ClosedChannelException());
960 }
961 return false;
962 }
963
964 return true;
965 }
966
967 private EmbeddedEventLoop embeddedEventLoop() {
968 if (isRegistered()) {
969 return (EmbeddedEventLoop) super.eventLoop();
970 }
971
972 return loop;
973 }
974
975
976
977
978 protected final void ensureOpen() {
979 if (!checkOpen(true)) {
980 checkException();
981 }
982 }
983
984 @Override
985 protected boolean isCompatible(EventLoop loop) {
986 return loop instanceof EmbeddedEventLoop;
987 }
988
989 @Override
990 protected SocketAddress localAddress0() {
991 return isActive()? LOCAL_ADDRESS : null;
992 }
993
994 @Override
995 protected SocketAddress remoteAddress0() {
996 return isActive()? REMOTE_ADDRESS : null;
997 }
998
999 @Override
1000 protected void doRegister() throws Exception {
1001 state = State.ACTIVE;
1002 }
1003
1004 @Override
1005 protected void doBind(SocketAddress localAddress) throws Exception {
1006
1007 }
1008
1009 @Override
1010 protected void doDisconnect() throws Exception {
1011 if (!metadata.hasDisconnect()) {
1012 doClose();
1013 }
1014 }
1015
1016 @Override
1017 protected void doClose() throws Exception {
1018 state = State.CLOSED;
1019 }
1020
1021 @Override
1022 protected void doBeginRead() throws Exception {
1023
1024 }
1025
1026 @Override
1027 protected AbstractUnsafe newUnsafe() {
1028 return new EmbeddedUnsafe();
1029 }
1030
1031 @Override
1032 public Unsafe unsafe() {
1033 return ((EmbeddedUnsafe) super.unsafe()).wrapped;
1034 }
1035
1036 @Override
1037 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
1038 for (;;) {
1039 Object msg = in.current();
1040 if (msg == null) {
1041 break;
1042 }
1043
1044 ReferenceCountUtil.retain(msg);
1045 handleOutboundMessage(msg);
1046 in.remove();
1047 }
1048 }
1049
1050
1051
1052
1053
1054
1055 protected void handleOutboundMessage(Object msg) {
1056 outboundMessages().add(msg);
1057 }
1058
1059
1060
1061
1062 protected void handleInboundMessage(Object msg) {
1063 inboundMessages().add(msg);
1064 }
1065
1066 public static Builder builder() {
1067 return new Builder();
1068 }
1069
1070 public static final class Builder {
1071 Channel parent;
1072 ChannelId channelId = EmbeddedChannelId.INSTANCE;
1073 boolean register = true;
1074 boolean hasDisconnect;
1075
1076 ChannelHandler[] handlers = EMPTY_HANDLERS;
1077 ChannelHandler handler;
1078 ChannelConfig config;
1079 Ticker ticker;
1080
1081 private Builder() {
1082 }
1083
1084
1085
1086
1087
1088
1089
1090 public Builder parent(Channel parent) {
1091 this.parent = parent;
1092 return this;
1093 }
1094
1095
1096
1097
1098
1099
1100
1101 public Builder channelId(ChannelId channelId) {
1102 this.channelId = Objects.requireNonNull(channelId, "channelId");
1103 return this;
1104 }
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114 public Builder register(boolean register) {
1115 this.register = register;
1116 return this;
1117 }
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127 public Builder hasDisconnect(boolean hasDisconnect) {
1128 this.hasDisconnect = hasDisconnect;
1129 return this;
1130 }
1131
1132
1133
1134
1135
1136
1137
1138 public Builder handlers(ChannelHandler... handlers) {
1139 this.handlers = Objects.requireNonNull(handlers, "handlers");
1140 this.handler = null;
1141 return this;
1142 }
1143
1144
1145
1146
1147
1148
1149
1150 public Builder handlers(ChannelHandler handler) {
1151 this.handler = Objects.requireNonNull(handler, "handler");
1152 this.handlers = null;
1153 return this;
1154 }
1155
1156
1157
1158
1159
1160
1161
1162 public Builder config(ChannelConfig config) {
1163 this.config = Objects.requireNonNull(config, "config");
1164 return this;
1165 }
1166
1167
1168
1169
1170
1171
1172
1173 public Builder ticker(Ticker ticker) {
1174 this.ticker = ticker;
1175 return this;
1176 }
1177
1178
1179
1180
1181
1182
1183
1184 public EmbeddedChannel build() {
1185 return new EmbeddedChannel(this);
1186 }
1187 }
1188
1189 private final class EmbeddedUnsafe extends AbstractUnsafe {
1190
1191
1192
1193 final Unsafe wrapped = new Unsafe() {
1194 @Override
1195 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
1196 return EmbeddedUnsafe.this.recvBufAllocHandle();
1197 }
1198
1199 @Override
1200 public SocketAddress localAddress() {
1201 return EmbeddedUnsafe.this.localAddress();
1202 }
1203
1204 @Override
1205 public SocketAddress remoteAddress() {
1206 return EmbeddedUnsafe.this.remoteAddress();
1207 }
1208
1209 @Override
1210 public void register(EventLoop eventLoop, ChannelPromise promise) {
1211 executingStackCnt++;
1212 try {
1213 EmbeddedUnsafe.this.register(eventLoop, promise);
1214 } finally {
1215 executingStackCnt--;
1216 maybeRunPendingTasks();
1217 }
1218 }
1219
1220 @Override
1221 public void bind(SocketAddress localAddress, ChannelPromise promise) {
1222 executingStackCnt++;
1223 try {
1224 EmbeddedUnsafe.this.bind(localAddress, promise);
1225 } finally {
1226 executingStackCnt--;
1227 maybeRunPendingTasks();
1228 }
1229 }
1230
1231 @Override
1232 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1233 executingStackCnt++;
1234 try {
1235 EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
1236 } finally {
1237 executingStackCnt--;
1238 maybeRunPendingTasks();
1239 }
1240 }
1241
1242 @Override
1243 public void disconnect(ChannelPromise promise) {
1244 executingStackCnt++;
1245 try {
1246 EmbeddedUnsafe.this.disconnect(promise);
1247 } finally {
1248 executingStackCnt--;
1249 maybeRunPendingTasks();
1250 }
1251 }
1252
1253 @Override
1254 public void close(ChannelPromise promise) {
1255 executingStackCnt++;
1256 try {
1257 EmbeddedUnsafe.this.close(promise);
1258 } finally {
1259 executingStackCnt--;
1260 maybeRunPendingTasks();
1261 }
1262 }
1263
1264 @Override
1265 public void closeForcibly() {
1266 executingStackCnt++;
1267 try {
1268 EmbeddedUnsafe.this.closeForcibly();
1269 } finally {
1270 executingStackCnt--;
1271 maybeRunPendingTasks();
1272 }
1273 }
1274
1275 @Override
1276 public void deregister(ChannelPromise promise) {
1277 executingStackCnt++;
1278 try {
1279 EmbeddedUnsafe.this.deregister(promise);
1280 } finally {
1281 executingStackCnt--;
1282 maybeRunPendingTasks();
1283 }
1284 }
1285
1286 @Override
1287 public void beginRead() {
1288 executingStackCnt++;
1289 try {
1290 EmbeddedUnsafe.this.beginRead();
1291 } finally {
1292 executingStackCnt--;
1293 maybeRunPendingTasks();
1294 }
1295 }
1296
1297 @Override
1298 public void write(Object msg, ChannelPromise promise) {
1299 executingStackCnt++;
1300 try {
1301 EmbeddedUnsafe.this.write(msg, promise);
1302 } finally {
1303 executingStackCnt--;
1304 maybeRunPendingTasks();
1305 }
1306 }
1307
1308 @Override
1309 public void flush() {
1310 executingStackCnt++;
1311 try {
1312 EmbeddedUnsafe.this.flush();
1313 } finally {
1314 executingStackCnt--;
1315 maybeRunPendingTasks();
1316 }
1317 }
1318
1319 @Override
1320 public ChannelPromise voidPromise() {
1321 return EmbeddedUnsafe.this.voidPromise();
1322 }
1323
1324 @Override
1325 public ChannelOutboundBuffer outboundBuffer() {
1326 return EmbeddedUnsafe.this.outboundBuffer();
1327 }
1328 };
1329
1330 @Override
1331 public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1332 safeSetSuccess(promise);
1333 }
1334 }
1335
1336 private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
1337 EmbeddedChannelPipeline(EmbeddedChannel channel) {
1338 super(channel);
1339 }
1340
1341 @Override
1342 protected void onUnhandledInboundException(Throwable cause) {
1343 recordException(cause);
1344 }
1345
1346 @Override
1347 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1348 handleInboundMessage(msg);
1349 }
1350 }
1351 }