View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelId;
26  import io.netty.channel.ChannelInitializer;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.DefaultChannelConfig;
32  import io.netty.channel.DefaultChannelPipeline;
33  import io.netty.channel.EventLoop;
34  import io.netty.channel.RecvByteBufAllocator;
35  import io.netty.util.ReferenceCountUtil;
36  import io.netty.util.concurrent.Ticker;
37  import io.netty.util.internal.ObjectUtil;
38  import io.netty.util.internal.PlatformDependent;
39  import io.netty.util.internal.RecyclableArrayList;
40  import io.netty.util.internal.logging.InternalLogger;
41  import io.netty.util.internal.logging.InternalLoggerFactory;
42  
43  import java.net.SocketAddress;
44  import java.nio.channels.ClosedChannelException;
45  import java.util.ArrayDeque;
46  import java.util.Objects;
47  import java.util.Queue;
48  import java.util.concurrent.TimeUnit;
49  
50  /**
51   * Base class for {@link Channel} implementations that are used in an embedded fashion.
52   */
53  public class EmbeddedChannel extends AbstractChannel {
54  
55      private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
56      private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
57  
58      private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
59      private enum State { OPEN, ACTIVE, CLOSED }
60  
61      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
62  
63      private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
64      private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
65  
66      private final EmbeddedEventLoop loop;
67      private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
68          @Override
69          public void operationComplete(ChannelFuture future) throws Exception {
70              recordException(future);
71          }
72      };
73  
74      private final ChannelMetadata metadata;
75      private final ChannelConfig config;
76  
77      private Queue<Object> inboundMessages;
78      private Queue<Object> outboundMessages;
79      private Throwable lastException;
80      private State state;
81      private int executingStackCnt;
82      private boolean cancelRemainingScheduledTasks;
83  
84      /**
85       * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
86       */
87      public EmbeddedChannel() {
88          this(builder());
89      }
90  
91      /**
92       * Create a new instance with the specified ID and an empty pipeline.
93       *
94       * @param channelId the {@link ChannelId} that will be used to identify this channel
95       */
96      public EmbeddedChannel(ChannelId channelId) {
97          this(builder().channelId(channelId));
98      }
99  
100     /**
101      * Create a new instance with the pipeline initialized with the specified handlers.
102      *
103      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
104      */
105     public EmbeddedChannel(ChannelHandler... handlers) {
106         this(builder().handlers(handlers));
107     }
108 
109     /**
110      * Create a new instance with the pipeline initialized with the specified handlers.
111      *
112      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
113      *                      to {@link #close()}, {@code true} otherwise.
114      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
115      */
116     public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
117         this(builder().hasDisconnect(hasDisconnect).handlers(handlers));
118     }
119 
120     /**
121      * Create a new instance with the pipeline initialized with the specified handlers.
122      *
123      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
124      *                 constructor. If {@code false} the user will need to call {@link #register()}.
125      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
126      *                      to {@link #close()}, {@code true} otherwise.
127      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
128      */
129     public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
130         this(builder().register(register).hasDisconnect(hasDisconnect).handlers(handlers));
131     }
132 
133     /**
134      * Create a new instance with the channel ID set to the given ID and the pipeline
135      * initialized with the specified handlers.
136      *
137      * @param channelId the {@link ChannelId} that will be used to identify this channel
138      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
139      */
140     public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
141         this(builder().channelId(channelId).handlers(handlers));
142     }
143 
144     /**
145      * Create a new instance with the channel ID set to the given ID and the pipeline
146      * initialized with the specified handlers.
147      *
148      * @param channelId the {@link ChannelId} that will be used to identify this channel
149      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
150      *                      to {@link #close()}, {@code true} otherwise.
151      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
152      */
153     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
154         this(builder().channelId(channelId).hasDisconnect(hasDisconnect).handlers(handlers));
155     }
156 
157     /**
158      * Create a new instance with the channel ID set to the given ID and the pipeline
159      * initialized with the specified handlers.
160      *
161      * @param channelId the {@link ChannelId} that will be used to identify this channel
162      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
163      *                 constructor. If {@code false} the user will need to call {@link #register()}.
164      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
165      *                      to {@link #close()}, {@code true} otherwise.
166      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
167      */
168     public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
169                            ChannelHandler... handlers) {
170         this(builder().channelId(channelId).register(register).hasDisconnect(hasDisconnect).handlers(handlers));
171     }
172 
173     /**
174      * Create a new instance with the channel ID set to the given ID and the pipeline
175      * initialized with the specified handlers.
176      *
177      * @param parent    the parent {@link Channel} of this {@link EmbeddedChannel}.
178      * @param channelId the {@link ChannelId} that will be used to identify this channel
179      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
180      *                 constructor. If {@code false} the user will need to call {@link #register()}.
181      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
182      *                      to {@link #close()}, {@code true} otherwise.
183      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
184      */
185     public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
186                            final ChannelHandler... handlers) {
187         this(builder()
188                 .parent(parent)
189                 .channelId(channelId)
190                 .register(register)
191                 .hasDisconnect(hasDisconnect)
192                 .handlers(handlers));
193     }
194 
195     /**
196      * Create a new instance with the channel ID set to the given ID and the pipeline
197      * initialized with the specified handlers.
198      *
199      * @param channelId the {@link ChannelId} that will be used to identify this channel
200      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
201      *                      to {@link #close()}, {@code true} otherwise.
202      * @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
203      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
204      */
205     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
206                            final ChannelHandler... handlers) {
207         this(builder().channelId(channelId).hasDisconnect(hasDisconnect).config(config).handlers(handlers));
208     }
209 
210     /**
211      * Create a new instance with the configuration from the given builder. This method is {@code protected} for use by
212      * subclasses; Otherwise, please use {@link Builder#build()}.
213      *
214      * @param builder The builder
215      */
216     protected EmbeddedChannel(Builder builder) {
217         super(builder.parent, builder.channelId);
218         loop = new EmbeddedEventLoop(builder.ticker == null ? new EmbeddedEventLoop.FreezableTicker() : builder.ticker);
219         metadata = metadata(builder.hasDisconnect);
220         config = builder.config == null ? new DefaultChannelConfig(this) : builder.config;
221         setup(builder.register, builder.handlers);
222     }
223 
224     private static ChannelMetadata metadata(boolean hasDisconnect) {
225         return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
226     }
227 
228     private void setup(boolean register, final ChannelHandler... handlers) {
229         ObjectUtil.checkNotNull(handlers, "handlers");
230         ChannelPipeline p = pipeline();
231         p.addLast(new ChannelInitializer<Channel>() {
232             @Override
233             protected void initChannel(Channel ch) throws Exception {
234                 ChannelPipeline pipeline = ch.pipeline();
235                 for (ChannelHandler h: handlers) {
236                     if (h == null) {
237                         break;
238                     }
239                     pipeline.addLast(h);
240                 }
241             }
242         });
243         if (register) {
244             ChannelFuture future = loop.register(this);
245             assert future.isDone();
246         }
247     }
248 
249     /**
250      * Register this {@code Channel} on its {@link EventLoop}.
251      */
252     public void register() throws Exception {
253         ChannelFuture future = loop.register(this);
254         assert future.isDone();
255         Throwable cause = future.cause();
256         if (cause != null) {
257             PlatformDependent.throwException(cause);
258         }
259     }
260 
261     @Override
262     protected final DefaultChannelPipeline newChannelPipeline() {
263         return new EmbeddedChannelPipeline(this);
264     }
265 
266     @Override
267     public ChannelMetadata metadata() {
268         return metadata;
269     }
270 
271     @Override
272     public ChannelConfig config() {
273         return config;
274     }
275 
276     @Override
277     public boolean isOpen() {
278         return state != State.CLOSED;
279     }
280 
281     @Override
282     public boolean isActive() {
283         return state == State.ACTIVE;
284     }
285 
286     /**
287      * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
288      */
289     public Queue<Object> inboundMessages() {
290         if (inboundMessages == null) {
291             inboundMessages = new ArrayDeque<Object>();
292         }
293         return inboundMessages;
294     }
295 
296     /**
297      * @deprecated use {@link #inboundMessages()}
298      */
299     @Deprecated
300     public Queue<Object> lastInboundBuffer() {
301         return inboundMessages();
302     }
303 
304     /**
305      * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
306      */
307     public Queue<Object> outboundMessages() {
308         if (outboundMessages == null) {
309             outboundMessages = new ArrayDeque<Object>();
310         }
311         return outboundMessages;
312     }
313 
314     /**
315      * @deprecated use {@link #outboundMessages()}
316      */
317     @Deprecated
318     public Queue<Object> lastOutboundBuffer() {
319         return outboundMessages();
320     }
321 
322     /**
323      * Return received data from this {@link Channel}
324      */
325     @SuppressWarnings("unchecked")
326     public <T> T readInbound() {
327         T message = (T) poll(inboundMessages);
328         if (message != null) {
329             ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
330         }
331         return message;
332     }
333 
334     /**
335      * Read data from the outbound. This may return {@code null} if nothing is readable.
336      */
337     @SuppressWarnings("unchecked")
338     public <T> T readOutbound() {
339         T message =  (T) poll(outboundMessages);
340         if (message != null) {
341             ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
342         }
343         return message;
344     }
345 
346     /**
347      * Write messages to the inbound of this {@link Channel}.
348      *
349      * @param msgs the messages to be written
350      *
351      * @return {@code true} if the write operation did add something to the inbound buffer
352      */
353     public boolean writeInbound(Object... msgs) {
354         ensureOpen();
355         if (msgs.length == 0) {
356             return isNotEmpty(inboundMessages);
357         }
358 
359         executingStackCnt++;
360         try {
361             ChannelPipeline p = pipeline();
362             for (Object m : msgs) {
363                 p.fireChannelRead(m);
364             }
365 
366             flushInbound(false, voidPromise());
367         } finally {
368             executingStackCnt--;
369             maybeRunPendingTasks();
370         }
371         return isNotEmpty(inboundMessages);
372     }
373 
374     /**
375      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
376      * method is conceptually equivalent to {@link #write(Object)}.
377      *
378      * @see #writeOneOutbound(Object)
379      */
380     public ChannelFuture writeOneInbound(Object msg) {
381         return writeOneInbound(msg, newPromise());
382     }
383 
384     /**
385      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
386      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
387      *
388      * @see #writeOneOutbound(Object, ChannelPromise)
389      */
390     public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
391         executingStackCnt++;
392         try {
393             if (checkOpen(true)) {
394                 pipeline().fireChannelRead(msg);
395             }
396         } finally {
397             executingStackCnt--;
398             maybeRunPendingTasks();
399         }
400         return checkException(promise);
401     }
402 
403     /**
404      * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
405      *
406      * @see #flushOutbound()
407      */
408     public EmbeddedChannel flushInbound() {
409         flushInbound(true, voidPromise());
410         return this;
411     }
412 
413     private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
414         executingStackCnt++;
415         try {
416             if (checkOpen(recordException)) {
417                 pipeline().fireChannelReadComplete();
418                 runPendingTasks();
419             }
420         } finally {
421             executingStackCnt--;
422             maybeRunPendingTasks();
423         }
424 
425       return checkException(promise);
426     }
427 
428     /**
429      * Write messages to the outbound of this {@link Channel}.
430      *
431      * @param msgs              the messages to be written
432      * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
433      */
434     public boolean writeOutbound(Object... msgs) {
435         ensureOpen();
436         if (msgs.length == 0) {
437             return isNotEmpty(outboundMessages);
438         }
439 
440         executingStackCnt++;
441         RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
442         try {
443             try {
444                 for (Object m : msgs) {
445                     if (m == null) {
446                         break;
447                     }
448                     futures.add(write(m));
449                 }
450 
451                 flushOutbound0();
452 
453                 int size = futures.size();
454                 for (int i = 0; i < size; i++) {
455                     ChannelFuture future = (ChannelFuture) futures.get(i);
456                     if (future.isDone()) {
457                         recordException(future);
458                     } else {
459                         // The write may be delayed to run later by runPendingTasks()
460                         future.addListener(recordExceptionListener);
461                     }
462                 }
463             } finally {
464                 executingStackCnt--;
465                 maybeRunPendingTasks();
466             }
467             checkException();
468             return isNotEmpty(outboundMessages);
469         } finally {
470             futures.recycle();
471         }
472     }
473 
474     /**
475      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
476      * method is conceptually equivalent to {@link #write(Object)}.
477      *
478      * @see #writeOneInbound(Object)
479      */
480     public ChannelFuture writeOneOutbound(Object msg) {
481         return writeOneOutbound(msg, newPromise());
482     }
483 
484     /**
485      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
486      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
487      *
488      * @see #writeOneInbound(Object, ChannelPromise)
489      */
490     public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
491         executingStackCnt++;
492         try {
493             if (checkOpen(true)) {
494                 return write(msg, promise);
495             }
496         } finally {
497             executingStackCnt--;
498             maybeRunPendingTasks();
499         }
500 
501         return checkException(promise);
502     }
503 
504     /**
505      * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
506      *
507      * @see #flushInbound()
508      */
509     public EmbeddedChannel flushOutbound() {
510         executingStackCnt++;
511         try {
512             if (checkOpen(true)) {
513                 flushOutbound0();
514             }
515         } finally {
516             executingStackCnt--;
517             maybeRunPendingTasks();
518         }
519         checkException(voidPromise());
520         return this;
521     }
522 
523     private void flushOutbound0() {
524         // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
525         // delay the write on the next eventloop run.
526         runPendingTasks();
527 
528         flush();
529     }
530 
531     /**
532      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
533      *
534      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
535      */
536     public boolean finish() {
537         return finish(false);
538     }
539 
540     /**
541      * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
542      * Any further try to write data to it will fail.
543      *
544      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
545      */
546     public boolean finishAndReleaseAll() {
547         return finish(true);
548     }
549 
550     /**
551      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
552      *
553      * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
554      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
555      */
556     private boolean finish(boolean releaseAll) {
557         executingStackCnt++;
558         try {
559             close();
560         } finally {
561             executingStackCnt--;
562             maybeRunPendingTasks();
563         }
564         try {
565             checkException();
566             return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
567         } finally {
568             if (releaseAll) {
569                 releaseAll(inboundMessages);
570                 releaseAll(outboundMessages);
571             }
572         }
573     }
574 
575     /**
576      * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
577      * otherwise.
578      */
579     public boolean releaseInbound() {
580         return releaseAll(inboundMessages);
581     }
582 
583     /**
584      * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
585      * otherwise.
586      */
587     public boolean releaseOutbound() {
588         return releaseAll(outboundMessages);
589     }
590 
591     private static boolean releaseAll(Queue<Object> queue) {
592         if (isNotEmpty(queue)) {
593             for (;;) {
594                 Object msg = queue.poll();
595                 if (msg == null) {
596                     break;
597                 }
598                 ReferenceCountUtil.release(msg);
599             }
600             return true;
601         }
602         return false;
603     }
604 
605     @Override
606     public final ChannelFuture close() {
607         return close(newPromise());
608     }
609 
610     @Override
611     public final ChannelFuture disconnect() {
612         return disconnect(newPromise());
613     }
614 
615     @Override
616     public final ChannelFuture close(ChannelPromise promise) {
617         // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
618         // that needs to be run before the actual close takes place.
619         executingStackCnt++;
620         ChannelFuture future;
621         try {
622             runPendingTasks();
623             future = super.close(promise);
624 
625             cancelRemainingScheduledTasks = true;
626         } finally {
627             executingStackCnt--;
628             maybeRunPendingTasks();
629         }
630         return future;
631     }
632 
633     @Override
634     public final ChannelFuture disconnect(ChannelPromise promise) {
635         executingStackCnt++;
636         ChannelFuture future;
637         try {
638             future = super.disconnect(promise);
639 
640             if (!metadata.hasDisconnect()) {
641                 cancelRemainingScheduledTasks = true;
642             }
643         } finally {
644             executingStackCnt--;
645             maybeRunPendingTasks();
646         }
647         return future;
648     }
649 
650     @Override
651     public ChannelFuture bind(SocketAddress localAddress) {
652         executingStackCnt++;
653         try {
654             return super.bind(localAddress);
655         } finally {
656             executingStackCnt--;
657             maybeRunPendingTasks();
658         }
659     }
660 
661     @Override
662     public ChannelFuture connect(SocketAddress remoteAddress) {
663         executingStackCnt++;
664         try {
665             return super.connect(remoteAddress);
666         } finally {
667             executingStackCnt--;
668             maybeRunPendingTasks();
669         }
670     }
671 
672     @Override
673     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
674         executingStackCnt++;
675         try {
676             return super.connect(remoteAddress, localAddress);
677         } finally {
678             executingStackCnt--;
679             maybeRunPendingTasks();
680         }
681     }
682 
683     @Override
684     public ChannelFuture deregister() {
685         executingStackCnt++;
686         try {
687             return super.deregister();
688         } finally {
689             executingStackCnt--;
690             maybeRunPendingTasks();
691         }
692     }
693 
694     @Override
695     public Channel flush() {
696         executingStackCnt++;
697         try {
698             return super.flush();
699         } finally {
700             executingStackCnt--;
701             maybeRunPendingTasks();
702         }
703     }
704 
705     @Override
706     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
707         executingStackCnt++;
708         try {
709             return super.bind(localAddress, promise);
710         } finally {
711             executingStackCnt--;
712             maybeRunPendingTasks();
713         }
714     }
715 
716     @Override
717     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
718         executingStackCnt++;
719         try {
720             return super.connect(remoteAddress, promise);
721         } finally {
722             executingStackCnt--;
723             maybeRunPendingTasks();
724         }
725     }
726 
727     @Override
728     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
729         executingStackCnt++;
730         try {
731             return super.connect(remoteAddress, localAddress, promise);
732         } finally {
733             executingStackCnt--;
734             maybeRunPendingTasks();
735         }
736     }
737 
738     @Override
739     public ChannelFuture deregister(ChannelPromise promise) {
740         executingStackCnt++;
741         try {
742             return super.deregister(promise);
743         } finally {
744             executingStackCnt--;
745             maybeRunPendingTasks();
746         }
747     }
748 
749     @Override
750     public Channel read() {
751         executingStackCnt++;
752         try {
753             return super.read();
754         } finally {
755             executingStackCnt--;
756             maybeRunPendingTasks();
757         }
758     }
759 
760     @Override
761     public ChannelFuture write(Object msg) {
762         executingStackCnt++;
763         try {
764             return super.write(msg);
765         } finally {
766             executingStackCnt--;
767             maybeRunPendingTasks();
768         }
769     }
770 
771     @Override
772     public ChannelFuture write(Object msg, ChannelPromise promise) {
773         executingStackCnt++;
774         try {
775             return super.write(msg, promise);
776         } finally {
777             executingStackCnt--;
778             maybeRunPendingTasks();
779         }
780     }
781 
782     @Override
783     public ChannelFuture writeAndFlush(Object msg) {
784         executingStackCnt++;
785         try {
786             return super.writeAndFlush(msg);
787         } finally {
788             executingStackCnt--;
789             maybeRunPendingTasks();
790         }
791     }
792 
793     @Override
794     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
795         executingStackCnt++;
796         try {
797             return super.writeAndFlush(msg, promise);
798         } finally {
799             executingStackCnt--;
800             maybeRunPendingTasks();
801         }
802     }
803 
804     private static boolean isNotEmpty(Queue<Object> queue) {
805         return queue != null && !queue.isEmpty();
806     }
807 
808     private static Object poll(Queue<Object> queue) {
809         return queue != null ? queue.poll() : null;
810     }
811 
812     private void maybeRunPendingTasks() {
813         if (executingStackCnt == 0) {
814             runPendingTasks();
815 
816             if (cancelRemainingScheduledTasks) {
817                 // Cancel all scheduled tasks that are left.
818                 embeddedEventLoop().cancelScheduledTasks();
819             }
820         }
821     }
822 
823     /**
824      * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
825      * for this {@link Channel}
826      */
827     public void runPendingTasks() {
828         try {
829             embeddedEventLoop().runTasks();
830         } catch (Exception e) {
831             recordException(e);
832         }
833 
834         try {
835             embeddedEventLoop().runScheduledTasks();
836         } catch (Exception e) {
837             recordException(e);
838         }
839     }
840 
841     /**
842      * Check whether this channel has any pending tasks that would be executed by a call to {@link #runPendingTasks()}.
843      * This includes normal tasks, and scheduled tasks where the deadline has expired. If this method returns
844      * {@code false}, a call to {@link #runPendingTasks()} would do nothing.
845      *
846      * @return {@code true} if there are any pending tasks, {@code false} otherwise.
847      */
848     public boolean hasPendingTasks() {
849         return embeddedEventLoop().hasPendingNormalTasks() ||
850                 embeddedEventLoop().nextScheduledTask() == 0;
851     }
852 
853     /**
854      * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
855      * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
856      * {@code -1}.
857      */
858     public long runScheduledPendingTasks() {
859         try {
860             return embeddedEventLoop().runScheduledTasks();
861         } catch (Exception e) {
862             recordException(e);
863             return embeddedEventLoop().nextScheduledTask();
864         }
865     }
866 
867     private void recordException(ChannelFuture future) {
868         if (!future.isSuccess()) {
869             recordException(future.cause());
870         }
871     }
872 
873     private void recordException(Throwable cause) {
874         if (lastException == null) {
875             lastException = cause;
876         } else {
877             logger.warn(
878                     "More than one exception was raised. " +
879                             "Will report only the first one and log others.", cause);
880         }
881     }
882 
883     private EmbeddedEventLoop.FreezableTicker freezableTicker() {
884         Ticker ticker = eventLoop().ticker();
885         if (ticker instanceof EmbeddedEventLoop.FreezableTicker) {
886             return (EmbeddedEventLoop.FreezableTicker) ticker;
887         } else {
888             throw new IllegalStateException(
889                     "EmbeddedChannel constructed with custom ticker, time manipulation methods are unavailable.");
890         }
891     }
892 
893     /**
894      * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute
895      * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called).
896      */
897     public void advanceTimeBy(long duration, TimeUnit unit) {
898         freezableTicker().advance(duration, unit);
899     }
900 
901     /**
902      * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on
903      * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to
904      * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute.
905      */
906     public void freezeTime() {
907         freezableTicker().freezeTime();
908     }
909 
910     /**
911      * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where
912      * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()}
913      * was called, it will run ten minutes after this method is called again (assuming no
914      * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using
915      * {@link #runScheduledPendingTasks()}).
916      */
917     public void unfreezeTime() {
918         freezableTicker().unfreezeTime();
919     }
920 
921     /**
922      * Checks for the presence of an {@link Exception}.
923      */
924     private ChannelFuture checkException(ChannelPromise promise) {
925       Throwable t = lastException;
926       if (t != null) {
927         lastException = null;
928 
929         if (promise.isVoid()) {
930             PlatformDependent.throwException(t);
931         }
932 
933         return promise.setFailure(t);
934       }
935 
936       return promise.setSuccess();
937     }
938 
939     /**
940      * Check if there was any {@link Throwable} received and if so rethrow it.
941      */
942     public void checkException() {
943       checkException(voidPromise());
944     }
945 
946     /**
947      * Returns {@code true} if the {@link Channel} is open and records optionally
948      * an {@link Exception} if it isn't.
949      */
950     private boolean checkOpen(boolean recordException) {
951         if (!isOpen()) {
952           if (recordException) {
953               recordException(new ClosedChannelException());
954           }
955           return false;
956       }
957 
958       return true;
959     }
960 
961     private EmbeddedEventLoop embeddedEventLoop() {
962         if (isRegistered()) {
963             return (EmbeddedEventLoop) super.eventLoop();
964         }
965 
966         return loop;
967     }
968 
969     /**
970      * Ensure the {@link Channel} is open and if not throw an exception.
971      */
972     protected final void ensureOpen() {
973         if (!checkOpen(true)) {
974             checkException();
975         }
976     }
977 
978     @Override
979     protected boolean isCompatible(EventLoop loop) {
980         return loop instanceof EmbeddedEventLoop;
981     }
982 
983     @Override
984     protected SocketAddress localAddress0() {
985         return isActive()? LOCAL_ADDRESS : null;
986     }
987 
988     @Override
989     protected SocketAddress remoteAddress0() {
990         return isActive()? REMOTE_ADDRESS : null;
991     }
992 
993     @Override
994     protected void doRegister() throws Exception {
995         state = State.ACTIVE;
996     }
997 
998     @Override
999     protected void doBind(SocketAddress localAddress) throws Exception {
1000         // NOOP
1001     }
1002 
1003     @Override
1004     protected void doDisconnect() throws Exception {
1005         if (!metadata.hasDisconnect()) {
1006             doClose();
1007         }
1008     }
1009 
1010     @Override
1011     protected void doClose() throws Exception {
1012         state = State.CLOSED;
1013     }
1014 
1015     @Override
1016     protected void doBeginRead() throws Exception {
1017         // NOOP
1018     }
1019 
1020     @Override
1021     protected AbstractUnsafe newUnsafe() {
1022         return new EmbeddedUnsafe();
1023     }
1024 
1025     @Override
1026     public Unsafe unsafe() {
1027         return ((EmbeddedUnsafe) super.unsafe()).wrapped;
1028     }
1029 
1030     @Override
1031     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
1032         for (;;) {
1033             Object msg = in.current();
1034             if (msg == null) {
1035                 break;
1036             }
1037 
1038             ReferenceCountUtil.retain(msg);
1039             handleOutboundMessage(msg);
1040             in.remove();
1041         }
1042     }
1043 
1044     /**
1045      * Called for each outbound message.
1046      *
1047      * @see #doWrite(ChannelOutboundBuffer)
1048      */
1049     protected void handleOutboundMessage(Object msg) {
1050         outboundMessages().add(msg);
1051     }
1052 
1053     /**
1054      * Called for each inbound message.
1055      */
1056     protected void handleInboundMessage(Object msg) {
1057         inboundMessages().add(msg);
1058     }
1059 
1060     public static Builder builder() {
1061         return new Builder();
1062     }
1063 
1064     public static final class Builder {
1065         Channel parent;
1066         ChannelId channelId = EmbeddedChannelId.INSTANCE;
1067         boolean register = true;
1068         boolean hasDisconnect;
1069         ChannelHandler[] handlers = EMPTY_HANDLERS;
1070         ChannelConfig config;
1071         Ticker ticker;
1072 
1073         private Builder() {
1074         }
1075 
1076         /**
1077          * The parent {@link Channel} of this {@link EmbeddedChannel}.
1078          *
1079          * @param parent the parent {@link Channel} of this {@link EmbeddedChannel}.
1080          * @return This builder
1081          */
1082         public Builder parent(Channel parent) {
1083             this.parent = parent;
1084             return this;
1085         }
1086 
1087         /**
1088          * The {@link ChannelId} that will be used to identify this channel.
1089          *
1090          * @param channelId the {@link ChannelId} that will be used to identify this channel
1091          * @return This builder
1092          */
1093         public Builder channelId(ChannelId channelId) {
1094             this.channelId = Objects.requireNonNull(channelId, "channelId");
1095             return this;
1096         }
1097 
1098         /**
1099          * {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the constructor. If
1100          * {@code false} the user will need to call {@link #register()}.
1101          *
1102          * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
1103          *                 constructor. If {@code false} the user will need to call {@link #register()}.
1104          * @return This builder
1105          */
1106         public Builder register(boolean register) {
1107             this.register = register;
1108             return this;
1109         }
1110 
1111         /**
1112          * {@code false} if this {@link Channel} will delegate {@link #disconnect()} to {@link #close()}, {@code true}
1113          * otherwise.
1114          *
1115          * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()} to
1116          *                      {@link #close()}, {@code true} otherwise
1117          * @return This builder
1118          */
1119         public Builder hasDisconnect(boolean hasDisconnect) {
1120             this.hasDisconnect = hasDisconnect;
1121             return this;
1122         }
1123 
1124         /**
1125          * The {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}.
1126          *
1127          * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
1128          * @return This builder
1129          */
1130         public Builder handlers(ChannelHandler... handlers) {
1131             this.handlers = Objects.requireNonNull(handlers, "handlers");
1132             return this;
1133         }
1134 
1135         /**
1136          * The {@link ChannelConfig} which will be returned by {@link #config()}.
1137          *
1138          * @param config the {@link ChannelConfig} which will be returned by {@link #config()}
1139          * @return This builder
1140          */
1141         public Builder config(ChannelConfig config) {
1142             this.config = Objects.requireNonNull(config, "config");
1143             return this;
1144         }
1145 
1146         /**
1147          * Configure a custom ticker for this event loop.
1148          *
1149          * @param ticker The custom ticker
1150          * @return This builder
1151          */
1152         public Builder ticker(Ticker ticker) {
1153             this.ticker = ticker;
1154             return this;
1155         }
1156 
1157         /**
1158          * Create the channel. If you wish to extend {@link EmbeddedChannel}, please use the
1159          * {@link #EmbeddedChannel(Builder)} constructor instead.
1160          *
1161          * @return The channel
1162          */
1163         public EmbeddedChannel build() {
1164             return new EmbeddedChannel(this);
1165         }
1166     }
1167 
1168     private final class EmbeddedUnsafe extends AbstractUnsafe {
1169 
1170         // Delegates to the EmbeddedUnsafe instance but ensures runPendingTasks() is called after each operation
1171         // that may change the state of the Channel and may schedule tasks for later execution.
1172         final Unsafe wrapped = new Unsafe() {
1173             @Override
1174             public RecvByteBufAllocator.Handle recvBufAllocHandle() {
1175                 return EmbeddedUnsafe.this.recvBufAllocHandle();
1176             }
1177 
1178             @Override
1179             public SocketAddress localAddress() {
1180                 return EmbeddedUnsafe.this.localAddress();
1181             }
1182 
1183             @Override
1184             public SocketAddress remoteAddress() {
1185                 return EmbeddedUnsafe.this.remoteAddress();
1186             }
1187 
1188             @Override
1189             public void register(EventLoop eventLoop, ChannelPromise promise) {
1190                 executingStackCnt++;
1191                 try {
1192                     EmbeddedUnsafe.this.register(eventLoop, promise);
1193                 } finally {
1194                     executingStackCnt--;
1195                     maybeRunPendingTasks();
1196                 }
1197             }
1198 
1199             @Override
1200             public void bind(SocketAddress localAddress, ChannelPromise promise) {
1201                 executingStackCnt++;
1202                 try {
1203                     EmbeddedUnsafe.this.bind(localAddress, promise);
1204                 } finally {
1205                     executingStackCnt--;
1206                     maybeRunPendingTasks();
1207                 }
1208             }
1209 
1210             @Override
1211             public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1212                 executingStackCnt++;
1213                 try {
1214                     EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
1215                 } finally {
1216                     executingStackCnt--;
1217                     maybeRunPendingTasks();
1218                 }
1219             }
1220 
1221             @Override
1222             public void disconnect(ChannelPromise promise) {
1223                 executingStackCnt++;
1224                 try {
1225                     EmbeddedUnsafe.this.disconnect(promise);
1226                 } finally {
1227                     executingStackCnt--;
1228                     maybeRunPendingTasks();
1229                 }
1230             }
1231 
1232             @Override
1233             public void close(ChannelPromise promise) {
1234                 executingStackCnt++;
1235                 try {
1236                     EmbeddedUnsafe.this.close(promise);
1237                 } finally {
1238                     executingStackCnt--;
1239                     maybeRunPendingTasks();
1240                 }
1241             }
1242 
1243             @Override
1244             public void closeForcibly() {
1245                 executingStackCnt++;
1246                 try {
1247                     EmbeddedUnsafe.this.closeForcibly();
1248                 } finally {
1249                     executingStackCnt--;
1250                     maybeRunPendingTasks();
1251                 }
1252             }
1253 
1254             @Override
1255             public void deregister(ChannelPromise promise) {
1256                 executingStackCnt++;
1257                 try {
1258                     EmbeddedUnsafe.this.deregister(promise);
1259                 } finally {
1260                     executingStackCnt--;
1261                     maybeRunPendingTasks();
1262                 }
1263             }
1264 
1265             @Override
1266             public void beginRead() {
1267                 executingStackCnt++;
1268                 try {
1269                     EmbeddedUnsafe.this.beginRead();
1270                 } finally {
1271                     executingStackCnt--;
1272                     maybeRunPendingTasks();
1273                 }
1274             }
1275 
1276             @Override
1277             public void write(Object msg, ChannelPromise promise) {
1278                 executingStackCnt++;
1279                 try {
1280                     EmbeddedUnsafe.this.write(msg, promise);
1281                 } finally {
1282                     executingStackCnt--;
1283                     maybeRunPendingTasks();
1284                 }
1285             }
1286 
1287             @Override
1288             public void flush() {
1289                 executingStackCnt++;
1290                 try {
1291                     EmbeddedUnsafe.this.flush();
1292                 } finally {
1293                     executingStackCnt--;
1294                     maybeRunPendingTasks();
1295                 }
1296             }
1297 
1298             @Override
1299             public ChannelPromise voidPromise() {
1300                 return EmbeddedUnsafe.this.voidPromise();
1301             }
1302 
1303             @Override
1304             public ChannelOutboundBuffer outboundBuffer() {
1305                 return EmbeddedUnsafe.this.outboundBuffer();
1306             }
1307         };
1308 
1309         @Override
1310         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1311             safeSetSuccess(promise);
1312         }
1313     }
1314 
1315     private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
1316         EmbeddedChannelPipeline(EmbeddedChannel channel) {
1317             super(channel);
1318         }
1319 
1320         @Override
1321         protected void onUnhandledInboundException(Throwable cause) {
1322             recordException(cause);
1323         }
1324 
1325         @Override
1326         protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1327             handleInboundMessage(msg);
1328         }
1329     }
1330 }