1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.embedded;
17
18 import io.netty5.buffer.api.internal.ResourceSupport;
19 import io.netty5.buffer.api.internal.Statics;
20 import io.netty5.channel.AdaptiveRecvBufferAllocator;
21 import io.netty5.channel.ChannelShutdownDirection;
22 import io.netty5.util.Resource;
23 import io.netty5.channel.AbstractChannel;
24 import io.netty5.channel.Channel;
25 import io.netty5.channel.ChannelHandler;
26 import io.netty5.channel.ChannelHandlerContext;
27 import io.netty5.channel.ChannelId;
28 import io.netty5.channel.ChannelInitializer;
29 import io.netty5.channel.ChannelMetadata;
30 import io.netty5.channel.ChannelOutboundBuffer;
31 import io.netty5.channel.ChannelPipeline;
32 import io.netty5.channel.DefaultChannelPipeline;
33 import io.netty5.channel.EventLoop;
34 import io.netty5.util.ReferenceCountUtil;
35 import io.netty5.util.concurrent.Future;
36 import io.netty5.util.concurrent.FutureListener;
37 import io.netty5.util.internal.RecyclableArrayList;
38 import io.netty5.util.internal.logging.InternalLogger;
39 import io.netty5.util.internal.logging.InternalLoggerFactory;
40
41 import java.net.SocketAddress;
42 import java.nio.channels.ClosedChannelException;
43 import java.util.ArrayDeque;
44 import java.util.Queue;
45 import java.util.concurrent.TimeUnit;
46
47 import static io.netty5.util.internal.PlatformDependent.throwException;
48 import static java.util.Objects.requireNonNull;
49
50
51
52
53 public class EmbeddedChannel extends AbstractChannel<Channel, SocketAddress, SocketAddress> {
54
55 private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
56 private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
57
58 private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
59 private enum State { OPEN, ACTIVE, CLOSED }
60
61 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
62
63 private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
64 private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
65
66 private final FutureListener<Void> recordExceptionListener = this::recordException;
67
68 private Queue<Object> inboundMessages;
69 private Queue<Object> outboundMessages;
70 private Throwable lastException;
71 private State state;
72 private boolean inputShutdown;
73 private boolean outputShutdown;
74
75
76
77
78 public EmbeddedChannel() {
79 this(EMPTY_HANDLERS);
80 }
81
82
83
84
85
86
87 public EmbeddedChannel(ChannelId channelId) {
88 this(channelId, EMPTY_HANDLERS);
89 }
90
91
92
93
94
95
96 public EmbeddedChannel(ChannelHandler... handlers) {
97 this(EmbeddedChannelId.INSTANCE, handlers);
98 }
99
100
101
102
103
104
105
106
107 public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
108 this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
109 }
110
111
112
113
114
115
116
117
118
119
120 public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
121 this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
122 }
123
124
125
126
127
128
129
130
131 public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
132 this(channelId, false, handlers);
133 }
134
135
136
137
138
139
140
141
142
143
144 public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
145 this(channelId, true, hasDisconnect, handlers);
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159 public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
160 ChannelHandler... handlers) {
161 this(null, channelId, register, hasDisconnect, handlers);
162 }
163
164
165
166
167
168
169
170
171
172
173
174
175
176 public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
177 final ChannelHandler... handlers) {
178 super(parent, new EmbeddedEventLoop(), metadata(hasDisconnect), new AdaptiveRecvBufferAllocator(), channelId);
179 setup(register, handlers);
180 }
181
182 private static ChannelMetadata metadata(boolean hasDisconnect) {
183 return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
184 }
185
186 private void setup(boolean register, final ChannelHandler... handlers) {
187 requireNonNull(handlers, "handlers");
188 ChannelPipeline p = pipeline();
189 p.addLast(new ChannelInitializer<>() {
190 @Override
191 protected void initChannel(Channel ch) throws Exception {
192 ChannelPipeline pipeline = ch.pipeline();
193 for (ChannelHandler h : handlers) {
194 if (h == null) {
195 break;
196 }
197 pipeline.addLast(h);
198 }
199 }
200 });
201 if (register) {
202 Future<Void> future = register();
203 assert future.isDone();
204 }
205 }
206
207 @Override
208 public Future<Void> register() {
209 Future<Void> future = super.register();
210 assert future.isDone();
211 Throwable cause = future.cause();
212 if (cause != null) {
213 throwException(cause);
214 }
215 return future;
216 }
217
218 @Override
219 protected final DefaultChannelPipeline newChannelPipeline() {
220 return new EmbeddedChannelPipeline(this);
221 }
222
223 @Override
224 public boolean isOpen() {
225 return state != State.CLOSED;
226 }
227
228 @Override
229 public boolean isActive() {
230 return state == State.ACTIVE;
231 }
232
233
234
235
236 public Queue<Object> inboundMessages() {
237 if (inboundMessages == null) {
238 inboundMessages = new ArrayDeque<>();
239 }
240 return inboundMessages;
241 }
242
243
244
245
246 @Deprecated
247 public Queue<Object> lastInboundBuffer() {
248 return inboundMessages();
249 }
250
251
252
253
254 public Queue<Object> outboundMessages() {
255 if (outboundMessages == null) {
256 outboundMessages = new ArrayDeque<>();
257 }
258 return outboundMessages;
259 }
260
261
262
263
264 @Deprecated
265 public Queue<Object> lastOutboundBuffer() {
266 return outboundMessages();
267 }
268
269
270
271
272 @SuppressWarnings("unchecked")
273 public <T> T readInbound() {
274 T message = (T) poll(inboundMessages);
275 if (message != null) {
276 Resource.touch(message, "Caller of readInbound() will handle the message from this point");
277 }
278 return message;
279 }
280
281
282
283
284 @SuppressWarnings("unchecked")
285 public <T> T readOutbound() {
286 T message = (T) poll(outboundMessages);
287 if (message != null) {
288 Resource.touch(message, "Caller of readOutbound() will handle the message from this point.");
289 }
290 return message;
291 }
292
293
294
295
296
297
298
299
300 public boolean writeInbound(Object... msgs) {
301 ensureOpen();
302 if (msgs.length == 0) {
303 return isNotEmpty(inboundMessages);
304 }
305
306 ChannelPipeline p = pipeline();
307 for (Object m: msgs) {
308 p.fireChannelRead(m);
309 }
310
311 flushInbound(false);
312 return isNotEmpty(inboundMessages);
313 }
314
315
316
317
318
319
320
321 public Future<Void> writeOneInbound(Object msg) {
322 if (checkOpen(true)) {
323 pipeline().fireChannelRead(msg);
324 }
325 return checkException0();
326 }
327
328
329
330
331
332
333 public EmbeddedChannel flushInbound() {
334 flushInbound(true);
335 return this;
336 }
337
338 private void flushInbound(boolean recordException) {
339 if (checkOpen(recordException)) {
340 pipeline().fireChannelReadComplete();
341 embeddedEventLoop().execute(this::readIfIsAutoRead);
342 runPendingTasks();
343 }
344 checkException();
345 }
346
347
348
349
350
351
352
353 public boolean writeOutbound(Object... msgs) {
354 ensureOpen();
355 if (msgs.length == 0) {
356 return isNotEmpty(outboundMessages);
357 }
358
359 RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
360 try {
361 for (Object m: msgs) {
362 if (m == null) {
363 break;
364 }
365 futures.add(write(m));
366 }
367
368 flushOutbound0();
369
370 int size = futures.size();
371 for (int i = 0; i < size; i++) {
372 Future<Void> future = (Future<Void>) futures.get(i);
373 if (future.isDone()) {
374 recordException(future);
375 } else {
376
377 future.addListener(recordExceptionListener);
378 }
379 }
380
381 checkException();
382 return isNotEmpty(outboundMessages);
383 } finally {
384 futures.recycle();
385 }
386 }
387
388
389
390
391
392
393
394 public Future<Void> writeOneOutbound(Object msg) {
395 if (checkOpen(true)) {
396 return write(msg);
397 }
398 return checkException0();
399 }
400
401
402
403
404
405
406 public EmbeddedChannel flushOutbound() {
407 if (checkOpen(true)) {
408 flushOutbound0();
409 }
410 checkException();
411 return this;
412 }
413
414 private void flushOutbound0() {
415
416
417 runPendingTasks();
418
419 flush();
420 }
421
422
423
424
425
426
427 public boolean finish() {
428 return finish(false);
429 }
430
431
432
433
434
435
436
437 public boolean finishAndReleaseAll() {
438 return finish(true);
439 }
440
441
442
443
444
445
446
447 private boolean finish(boolean releaseAll) {
448 close();
449 try {
450 checkException();
451 return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
452 } finally {
453 if (releaseAll) {
454 releaseAll(inboundMessages);
455 releaseAll(outboundMessages);
456 }
457 }
458 }
459
460
461
462
463
464 public boolean releaseInbound() {
465 return releaseAll(inboundMessages);
466 }
467
468
469
470
471
472 public boolean releaseOutbound() {
473 return releaseAll(outboundMessages);
474 }
475
476 private static boolean releaseAll(Queue<Object> queue) {
477 Exception closeFailed = null;
478 if (isNotEmpty(queue)) {
479 for (;;) {
480 Object msg = queue.poll();
481 if (msg == null) {
482 break;
483 }
484 try {
485 Resource.dispose(msg);
486 } catch (Exception e) {
487 if (closeFailed == null) {
488 closeFailed = e;
489 } else {
490 closeFailed.addSuppressed(e);
491 }
492 }
493 }
494 if (closeFailed != null) {
495 throwException(closeFailed);
496 }
497 return true;
498 }
499 return false;
500 }
501
502 private void finishPendingTasks(boolean cancel) {
503 runPendingTasks();
504 if (cancel) {
505
506 ((EmbeddedEventLoop) executor()).cancelScheduled();
507 }
508 }
509
510 @Override
511 public final Future<Void> close() {
512
513
514 runPendingTasks();
515 Future<Void> future = super.close();
516
517
518 finishPendingTasks(true);
519 return future;
520 }
521
522 @Override
523 public final Future<Void> disconnect() {
524 Future<Void> future = super.disconnect();
525 finishPendingTasks(!metadata().hasDisconnect());
526 return future;
527 }
528
529 private static boolean isNotEmpty(Queue<Object> queue) {
530 return queue != null && !queue.isEmpty();
531 }
532
533 private static Object poll(Queue<Object> queue) {
534 return queue != null ? queue.poll() : null;
535 }
536
537
538
539
540
541 public void runPendingTasks() {
542 EmbeddedEventLoop embeddedEventLoop = (EmbeddedEventLoop) executor();
543 try {
544 embeddedEventLoop.runTasks();
545 } catch (Exception e) {
546 recordException(e);
547 }
548
549 runScheduledPendingTasks();
550 }
551
552
553
554
555
556
557 public long runScheduledPendingTasks() {
558 EmbeddedEventLoop embeddedEventLoop = (EmbeddedEventLoop) executor();
559
560 try {
561 return embeddedEventLoop.runScheduledTasks();
562 } catch (Exception e) {
563 recordException(e);
564 return embeddedEventLoop.nextScheduledTask();
565 } finally {
566
567 embeddedEventLoop.runTasks();
568 }
569 }
570
571
572
573
574
575
576
577
578 public boolean hasPendingTasks() {
579 return embeddedEventLoop().hasPendingNormalTasks() ||
580 embeddedEventLoop().nextScheduledTask() == 0;
581 }
582
583 private void recordException(Future<?> future) {
584 if (future.isFailed()) {
585 recordException(future.cause());
586 }
587 }
588
589 private void recordException(Throwable cause) {
590 if (lastException == null) {
591 lastException = cause;
592 } else {
593 logger.warn(
594 "More than one exception was raised. " +
595 "Will report only the first one and log others.", cause);
596 }
597 }
598
599 private EmbeddedEventLoop embeddedEventLoop() {
600 return (EmbeddedEventLoop) executor();
601 }
602
603
604
605
606 public void advanceTimeBy(long duration, TimeUnit unit) {
607 embeddedEventLoop().advanceTimeBy(unit.toNanos(duration));
608 }
609
610
611
612
613
614
615 public void freezeTime() {
616 embeddedEventLoop().freezeTime();
617 }
618
619
620
621
622
623
624
625
626 public void unfreezeTime() {
627 embeddedEventLoop().unfreezeTime();
628 }
629
630
631
632
633 private Future<Void> checkException0() {
634 try {
635 checkException();
636 } catch (Throwable cause) {
637 return newFailedFuture(cause);
638 }
639 return newSucceededFuture();
640 }
641
642
643
644
645 public void checkException() {
646 Throwable t = lastException;
647 if (t != null) {
648 lastException = null;
649
650 throwException(t);
651 }
652 }
653
654
655
656
657
658 private boolean checkOpen(boolean recordException) {
659 if (!isOpen()) {
660 if (recordException) {
661 recordException(new ClosedChannelException());
662 }
663 return false;
664 }
665
666 return true;
667 }
668
669
670
671
672 protected final void ensureOpen() {
673 if (!checkOpen(true)) {
674 checkException();
675 }
676 }
677
678 @Override
679 protected SocketAddress localAddress0() {
680 return isActive()? LOCAL_ADDRESS : null;
681 }
682
683 @Override
684 protected SocketAddress remoteAddress0() {
685 return isActive()? REMOTE_ADDRESS : null;
686 }
687
688 void setActive() {
689 state = State.ACTIVE;
690 }
691
692 @Override
693 protected void doBind(SocketAddress localAddress) throws Exception {
694
695 }
696
697 @Override
698 protected void doShutdown(ChannelShutdownDirection direction) {
699 switch (direction) {
700 case Inbound:
701 inputShutdown = true;
702 break;
703 case Outbound:
704 outputShutdown = true;
705 break;
706 default:
707 throw new AssertionError();
708 }
709 }
710
711 @Override
712 public boolean isShutdown(ChannelShutdownDirection direction) {
713 if (!isActive()) {
714 return true;
715 }
716 switch (direction) {
717 case Inbound:
718 return inputShutdown;
719 case Outbound:
720 return outputShutdown;
721 default:
722 throw new AssertionError();
723 }
724 }
725
726 @Override
727 protected void doDisconnect() throws Exception {
728 if (!metadata().hasDisconnect()) {
729 doClose();
730 }
731 }
732
733 @Override
734 protected void doClose() throws Exception {
735 state = State.CLOSED;
736 }
737
738 @Override
739 protected void doBeginRead() throws Exception {
740
741 }
742
743 @Override
744 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
745 for (;;) {
746 Object msg = in.current();
747 if (msg == null) {
748 break;
749 }
750
751 if (msg instanceof ResourceSupport<?, ?>) {
752
753
754 handleOutboundMessage(Statics.acquire((ResourceSupport<?, ?>) msg));
755 } else if (msg instanceof Resource<?>) {
756
757
758
759 handleOutboundMessage(((Resource<?>) msg).send().receive());
760 } else {
761 handleOutboundMessage(ReferenceCountUtil.retain(msg));
762 }
763 in.remove();
764 }
765 }
766
767
768
769
770
771
772 protected void handleOutboundMessage(Object msg) {
773 outboundMessages().add(msg);
774 }
775
776
777
778
779 protected void handleInboundMessage(Object msg) {
780 inboundMessages().add(msg);
781 }
782
783 @Override
784 protected void runAfterTransportAction() {
785 super.runAfterTransportAction();
786 if (!((EmbeddedEventLoop) executor()).running) {
787 runPendingTasks();
788 }
789 }
790
791 @Override
792 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
793 return true;
794 }
795
796 @Override
797 protected boolean doFinishConnect(SocketAddress requestedRemoteAddress) {
798 return true;
799 }
800
801 private final class EmbeddedChannelPipeline extends DefaultAbstractChannelPipeline {
802 EmbeddedChannelPipeline(EmbeddedChannel channel) {
803 super(channel);
804 }
805
806 @Override
807 protected void onUnhandledInboundException(Throwable cause) {
808 recordException(cause);
809 }
810
811 @Override
812 protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
813 handleInboundMessage(msg);
814 }
815 }
816 }