View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import java.net.SocketAddress;
19  import java.nio.channels.ClosedChannelException;
20  import java.util.ArrayDeque;
21  import java.util.Queue;
22  import java.util.concurrent.TimeUnit;
23  
24  import io.netty.channel.AbstractChannel;
25  import io.netty.channel.Channel;
26  import io.netty.channel.ChannelConfig;
27  import io.netty.channel.ChannelFuture;
28  import io.netty.channel.ChannelFutureListener;
29  import io.netty.channel.ChannelHandler;
30  import io.netty.channel.ChannelHandlerContext;
31  import io.netty.channel.ChannelId;
32  import io.netty.channel.ChannelInitializer;
33  import io.netty.channel.ChannelMetadata;
34  import io.netty.channel.ChannelOutboundBuffer;
35  import io.netty.channel.ChannelPipeline;
36  import io.netty.channel.ChannelPromise;
37  import io.netty.channel.DefaultChannelConfig;
38  import io.netty.channel.DefaultChannelPipeline;
39  import io.netty.channel.EventLoop;
40  import io.netty.channel.RecvByteBufAllocator;
41  import io.netty.util.ReferenceCountUtil;
42  import io.netty.util.internal.ObjectUtil;
43  import io.netty.util.internal.PlatformDependent;
44  import io.netty.util.internal.RecyclableArrayList;
45  import io.netty.util.internal.logging.InternalLogger;
46  import io.netty.util.internal.logging.InternalLoggerFactory;
47  
48  /**
49   * Base class for {@link Channel} implementations that are used in an embedded fashion.
50   */
51  public class EmbeddedChannel extends AbstractChannel {
52  
53      private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
54      private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
55  
56      private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
57      private enum State { OPEN, ACTIVE, CLOSED }
58  
59      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
60  
61      private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
62      private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
63  
64      private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
65      private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
66          @Override
67          public void operationComplete(ChannelFuture future) throws Exception {
68              recordException(future);
69          }
70      };
71  
72      private final ChannelMetadata metadata;
73      private final ChannelConfig config;
74  
75      private Queue<Object> inboundMessages;
76      private Queue<Object> outboundMessages;
77      private Throwable lastException;
78      private State state;
79      private int executingStackCnt;
80  
81      /**
82       * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
83       */
84      public EmbeddedChannel() {
85          this(EMPTY_HANDLERS);
86      }
87  
88      /**
89       * Create a new instance with the specified ID and an empty pipeline.
90       *
91       * @param channelId the {@link ChannelId} that will be used to identify this channel
92       */
93      public EmbeddedChannel(ChannelId channelId) {
94          this(channelId, EMPTY_HANDLERS);
95      }
96  
97      /**
98       * Create a new instance with the pipeline initialized with the specified handlers.
99       *
100      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
101      */
102     public EmbeddedChannel(ChannelHandler... handlers) {
103         this(EmbeddedChannelId.INSTANCE, handlers);
104     }
105 
106     /**
107      * Create a new instance with the pipeline initialized with the specified handlers.
108      *
109      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
110      *                      to {@link #close()}, {@code true} otherwise.
111      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
112      */
113     public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
114         this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
115     }
116 
117     /**
118      * Create a new instance with the pipeline initialized with the specified handlers.
119      *
120      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
121      *                 constructor. If {@code false} the user will need to call {@link #register()}.
122      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
123      *                      to {@link #close()}, {@code true} otherwise.
124      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
125      */
126     public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
127         this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
128     }
129 
130     /**
131      * Create a new instance with the channel ID set to the given ID and the pipeline
132      * initialized with the specified handlers.
133      *
134      * @param channelId the {@link ChannelId} that will be used to identify this channel
135      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
136      */
137     public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
138         this(channelId, false, handlers);
139     }
140 
141     /**
142      * Create a new instance with the channel ID set to the given ID and the pipeline
143      * initialized with the specified handlers.
144      *
145      * @param channelId the {@link ChannelId} that will be used to identify this channel
146      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
147      *                      to {@link #close()}, {@code true} otherwise.
148      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
149      */
150     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
151         this(channelId, true, hasDisconnect, handlers);
152     }
153 
154     /**
155      * Create a new instance with the channel ID set to the given ID and the pipeline
156      * initialized with the specified handlers.
157      *
158      * @param channelId the {@link ChannelId} that will be used to identify this channel
159      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
160      *                 constructor. If {@code false} the user will need to call {@link #register()}.
161      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
162      *                      to {@link #close()}, {@code true} otherwise.
163      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
164      */
165     public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
166                            ChannelHandler... handlers) {
167         this(null, channelId, register, hasDisconnect, handlers);
168     }
169 
170     /**
171      * Create a new instance with the channel ID set to the given ID and the pipeline
172      * initialized with the specified handlers.
173      *
174      * @param parent    the parent {@link Channel} of this {@link EmbeddedChannel}.
175      * @param channelId the {@link ChannelId} that will be used to identify this channel
176      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
177      *                 constructor. If {@code false} the user will need to call {@link #register()}.
178      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
179      *                      to {@link #close()}, {@code true} otherwise.
180      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
181      */
182     public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
183                            final ChannelHandler... handlers) {
184         super(parent, channelId);
185         metadata = metadata(hasDisconnect);
186         config = new DefaultChannelConfig(this);
187         setup(register, handlers);
188     }
189 
190     /**
191      * Create a new instance with the channel ID set to the given ID and the pipeline
192      * initialized with the specified handlers.
193      *
194      * @param channelId the {@link ChannelId} that will be used to identify this channel
195      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
196      *                      to {@link #close()}, {@code true} otherwise.
197      * @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
198      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
199      */
200     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
201                            final ChannelHandler... handlers) {
202         super(null, channelId);
203         metadata = metadata(hasDisconnect);
204         this.config = ObjectUtil.checkNotNull(config, "config");
205         setup(true, handlers);
206     }
207 
208     private static ChannelMetadata metadata(boolean hasDisconnect) {
209         return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
210     }
211 
212     private void setup(boolean register, final ChannelHandler... handlers) {
213         ObjectUtil.checkNotNull(handlers, "handlers");
214         ChannelPipeline p = pipeline();
215         p.addLast(new ChannelInitializer<Channel>() {
216             @Override
217             protected void initChannel(Channel ch) throws Exception {
218                 ChannelPipeline pipeline = ch.pipeline();
219                 for (ChannelHandler h: handlers) {
220                     if (h == null) {
221                         break;
222                     }
223                     pipeline.addLast(h);
224                 }
225             }
226         });
227         if (register) {
228             ChannelFuture future = loop.register(this);
229             assert future.isDone();
230         }
231     }
232 
233     /**
234      * Register this {@code Channel} on its {@link EventLoop}.
235      */
236     public void register() throws Exception {
237         ChannelFuture future = loop.register(this);
238         assert future.isDone();
239         Throwable cause = future.cause();
240         if (cause != null) {
241             PlatformDependent.throwException(cause);
242         }
243     }
244 
245     @Override
246     protected final DefaultChannelPipeline newChannelPipeline() {
247         return new EmbeddedChannelPipeline(this);
248     }
249 
250     @Override
251     public ChannelMetadata metadata() {
252         return metadata;
253     }
254 
255     @Override
256     public ChannelConfig config() {
257         return config;
258     }
259 
260     @Override
261     public boolean isOpen() {
262         return state != State.CLOSED;
263     }
264 
265     @Override
266     public boolean isActive() {
267         return state == State.ACTIVE;
268     }
269 
270     /**
271      * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
272      */
273     public Queue<Object> inboundMessages() {
274         if (inboundMessages == null) {
275             inboundMessages = new ArrayDeque<Object>();
276         }
277         return inboundMessages;
278     }
279 
280     /**
281      * @deprecated use {@link #inboundMessages()}
282      */
283     @Deprecated
284     public Queue<Object> lastInboundBuffer() {
285         return inboundMessages();
286     }
287 
288     /**
289      * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
290      */
291     public Queue<Object> outboundMessages() {
292         if (outboundMessages == null) {
293             outboundMessages = new ArrayDeque<Object>();
294         }
295         return outboundMessages;
296     }
297 
298     /**
299      * @deprecated use {@link #outboundMessages()}
300      */
301     @Deprecated
302     public Queue<Object> lastOutboundBuffer() {
303         return outboundMessages();
304     }
305 
306     /**
307      * Return received data from this {@link Channel}
308      */
309     @SuppressWarnings("unchecked")
310     public <T> T readInbound() {
311         T message = (T) poll(inboundMessages);
312         if (message != null) {
313             ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
314         }
315         return message;
316     }
317 
318     /**
319      * Read data from the outbound. This may return {@code null} if nothing is readable.
320      */
321     @SuppressWarnings("unchecked")
322     public <T> T readOutbound() {
323         T message =  (T) poll(outboundMessages);
324         if (message != null) {
325             ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
326         }
327         return message;
328     }
329 
330     /**
331      * Write messages to the inbound of this {@link Channel}.
332      *
333      * @param msgs the messages to be written
334      *
335      * @return {@code true} if the write operation did add something to the inbound buffer
336      */
337     public boolean writeInbound(Object... msgs) {
338         ensureOpen();
339         if (msgs.length == 0) {
340             return isNotEmpty(inboundMessages);
341         }
342 
343         executingStackCnt++;
344         try {
345             ChannelPipeline p = pipeline();
346             for (Object m : msgs) {
347                 p.fireChannelRead(m);
348             }
349 
350             flushInbound(false, voidPromise());
351         } finally {
352             executingStackCnt--;
353             maybeRunPendingTasks();
354         }
355         return isNotEmpty(inboundMessages);
356     }
357 
358     /**
359      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
360      * method is conceptually equivalent to {@link #write(Object)}.
361      *
362      * @see #writeOneOutbound(Object)
363      */
364     public ChannelFuture writeOneInbound(Object msg) {
365         return writeOneInbound(msg, newPromise());
366     }
367 
368     /**
369      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
370      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
371      *
372      * @see #writeOneOutbound(Object, ChannelPromise)
373      */
374     public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
375         executingStackCnt++;
376         try {
377             if (checkOpen(true)) {
378                 pipeline().fireChannelRead(msg);
379             }
380         } finally {
381             executingStackCnt--;
382             maybeRunPendingTasks();
383         }
384         return checkException(promise);
385     }
386 
387     /**
388      * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
389      *
390      * @see #flushOutbound()
391      */
392     public EmbeddedChannel flushInbound() {
393         flushInbound(true, voidPromise());
394         return this;
395     }
396 
397     private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
398         executingStackCnt++;
399         try {
400             if (checkOpen(recordException)) {
401                 pipeline().fireChannelReadComplete();
402                 runPendingTasks();
403             }
404         } finally {
405             executingStackCnt--;
406             maybeRunPendingTasks();
407         }
408 
409       return checkException(promise);
410     }
411 
412     /**
413      * Write messages to the outbound of this {@link Channel}.
414      *
415      * @param msgs              the messages to be written
416      * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
417      */
418     public boolean writeOutbound(Object... msgs) {
419         ensureOpen();
420         if (msgs.length == 0) {
421             return isNotEmpty(outboundMessages);
422         }
423 
424         executingStackCnt++;
425         RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
426         try {
427             try {
428                 for (Object m : msgs) {
429                     if (m == null) {
430                         break;
431                     }
432                     futures.add(write(m));
433                 }
434 
435                 flushOutbound0();
436 
437                 int size = futures.size();
438                 for (int i = 0; i < size; i++) {
439                     ChannelFuture future = (ChannelFuture) futures.get(i);
440                     if (future.isDone()) {
441                         recordException(future);
442                     } else {
443                         // The write may be delayed to run later by runPendingTasks()
444                         future.addListener(recordExceptionListener);
445                     }
446                 }
447             } finally {
448                 executingStackCnt--;
449                 maybeRunPendingTasks();
450             }
451             checkException();
452             return isNotEmpty(outboundMessages);
453         } finally {
454             futures.recycle();
455         }
456     }
457 
458     /**
459      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
460      * method is conceptually equivalent to {@link #write(Object)}.
461      *
462      * @see #writeOneInbound(Object)
463      */
464     public ChannelFuture writeOneOutbound(Object msg) {
465         return writeOneOutbound(msg, newPromise());
466     }
467 
468     /**
469      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
470      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
471      *
472      * @see #writeOneInbound(Object, ChannelPromise)
473      */
474     public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
475         executingStackCnt++;
476         try {
477             if (checkOpen(true)) {
478                 return write(msg, promise);
479             }
480         } finally {
481             executingStackCnt--;
482             maybeRunPendingTasks();
483         }
484 
485         return checkException(promise);
486     }
487 
488     /**
489      * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
490      *
491      * @see #flushInbound()
492      */
493     public EmbeddedChannel flushOutbound() {
494         executingStackCnt++;
495         try {
496             if (checkOpen(true)) {
497                 flushOutbound0();
498             }
499         } finally {
500             executingStackCnt--;
501             maybeRunPendingTasks();
502         }
503         checkException(voidPromise());
504         return this;
505     }
506 
507     private void flushOutbound0() {
508         // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
509         // delay the write on the next eventloop run.
510         runPendingTasks();
511 
512         flush();
513     }
514 
515     /**
516      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
517      *
518      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
519      */
520     public boolean finish() {
521         return finish(false);
522     }
523 
524     /**
525      * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
526      * Any further try to write data to it will fail.
527      *
528      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
529      */
530     public boolean finishAndReleaseAll() {
531         return finish(true);
532     }
533 
534     /**
535      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
536      *
537      * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
538      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
539      */
540     private boolean finish(boolean releaseAll) {
541         executingStackCnt++;
542         try {
543             close();
544         } finally {
545             executingStackCnt--;
546             maybeRunPendingTasks();
547         }
548         try {
549             checkException();
550             return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
551         } finally {
552             if (releaseAll) {
553                 releaseAll(inboundMessages);
554                 releaseAll(outboundMessages);
555             }
556         }
557     }
558 
559     /**
560      * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
561      * otherwise.
562      */
563     public boolean releaseInbound() {
564         return releaseAll(inboundMessages);
565     }
566 
567     /**
568      * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
569      * otherwise.
570      */
571     public boolean releaseOutbound() {
572         return releaseAll(outboundMessages);
573     }
574 
575     private static boolean releaseAll(Queue<Object> queue) {
576         if (isNotEmpty(queue)) {
577             for (;;) {
578                 Object msg = queue.poll();
579                 if (msg == null) {
580                     break;
581                 }
582                 ReferenceCountUtil.release(msg);
583             }
584             return true;
585         }
586         return false;
587     }
588 
589     private void finishPendingTasks(boolean cancel) {
590         runPendingTasks();
591         if (cancel) {
592             // Cancel all scheduled tasks that are left.
593             embeddedEventLoop().cancelScheduledTasks();
594         }
595     }
596 
597     @Override
598     public final ChannelFuture close() {
599         return close(newPromise());
600     }
601 
602     @Override
603     public final ChannelFuture disconnect() {
604         return disconnect(newPromise());
605     }
606 
607     @Override
608     public final ChannelFuture close(ChannelPromise promise) {
609         // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
610         // that needs to be run before the actual close takes place.
611         executingStackCnt++;
612         ChannelFuture future;
613         try {
614             runPendingTasks();
615             future = super.close(promise);
616         } finally {
617             executingStackCnt--;
618             maybeRunPendingTasks();
619         }
620 
621         // Now finish everything else and cancel all scheduled tasks that were not ready set.
622         finishPendingTasks(true);
623         return future;
624     }
625 
626     @Override
627     public final ChannelFuture disconnect(ChannelPromise promise) {
628         executingStackCnt++;
629         ChannelFuture future;
630         try {
631             future = super.disconnect(promise);
632         } finally {
633             executingStackCnt--;
634             maybeRunPendingTasks();
635         }
636         finishPendingTasks(!metadata.hasDisconnect());
637         return future;
638     }
639 
640     @Override
641     public ChannelFuture bind(SocketAddress localAddress) {
642         executingStackCnt++;
643         try {
644             return super.bind(localAddress);
645         } finally {
646             executingStackCnt--;
647             maybeRunPendingTasks();
648         }
649     }
650 
651     @Override
652     public ChannelFuture connect(SocketAddress remoteAddress) {
653         executingStackCnt++;
654         try {
655             return super.connect(remoteAddress);
656         } finally {
657             executingStackCnt--;
658             maybeRunPendingTasks();
659         }
660     }
661 
662     @Override
663     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
664         executingStackCnt++;
665         try {
666             return super.connect(remoteAddress, localAddress);
667         } finally {
668             executingStackCnt--;
669             maybeRunPendingTasks();
670         }
671     }
672 
673     @Override
674     public ChannelFuture deregister() {
675         executingStackCnt++;
676         try {
677             return super.deregister();
678         } finally {
679             executingStackCnt--;
680             maybeRunPendingTasks();
681         }
682     }
683 
684     @Override
685     public Channel flush() {
686         executingStackCnt++;
687         try {
688             return super.flush();
689         } finally {
690             executingStackCnt--;
691             maybeRunPendingTasks();
692         }
693     }
694 
695     @Override
696     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
697         executingStackCnt++;
698         try {
699             return super.bind(localAddress, promise);
700         } finally {
701             executingStackCnt--;
702             maybeRunPendingTasks();
703         }
704     }
705 
706     @Override
707     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
708         executingStackCnt++;
709         try {
710             return super.connect(remoteAddress, promise);
711         } finally {
712             executingStackCnt--;
713             maybeRunPendingTasks();
714         }
715     }
716 
717     @Override
718     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
719         executingStackCnt++;
720         try {
721             return super.connect(remoteAddress, localAddress, promise);
722         } finally {
723             executingStackCnt--;
724             maybeRunPendingTasks();
725         }
726     }
727 
728     @Override
729     public ChannelFuture deregister(ChannelPromise promise) {
730         executingStackCnt++;
731         try {
732             return super.deregister(promise);
733         } finally {
734             executingStackCnt--;
735             maybeRunPendingTasks();
736         }
737     }
738 
739     @Override
740     public Channel read() {
741         executingStackCnt++;
742         try {
743             return super.read();
744         } finally {
745             executingStackCnt--;
746             maybeRunPendingTasks();
747         }
748     }
749 
750     @Override
751     public ChannelFuture write(Object msg) {
752         executingStackCnt++;
753         try {
754             return super.write(msg);
755         } finally {
756             executingStackCnt--;
757             maybeRunPendingTasks();
758         }
759     }
760 
761     @Override
762     public ChannelFuture write(Object msg, ChannelPromise promise) {
763         executingStackCnt++;
764         try {
765             return super.write(msg, promise);
766         } finally {
767             executingStackCnt--;
768             maybeRunPendingTasks();
769         }
770     }
771 
772     @Override
773     public ChannelFuture writeAndFlush(Object msg) {
774         executingStackCnt++;
775         try {
776             return super.writeAndFlush(msg);
777         } finally {
778             executingStackCnt--;
779             maybeRunPendingTasks();
780         }
781     }
782 
783     @Override
784     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
785         executingStackCnt++;
786         try {
787             return super.writeAndFlush(msg, promise);
788         } finally {
789             executingStackCnt--;
790             maybeRunPendingTasks();
791         }
792     }
793 
794     private static boolean isNotEmpty(Queue<Object> queue) {
795         return queue != null && !queue.isEmpty();
796     }
797 
798     private static Object poll(Queue<Object> queue) {
799         return queue != null ? queue.poll() : null;
800     }
801 
802     private void maybeRunPendingTasks() {
803         if (executingStackCnt == 0) {
804             runPendingTasks();
805         }
806     }
807 
808     /**
809      * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
810      * for this {@link Channel}
811      */
812     public void runPendingTasks() {
813         try {
814             embeddedEventLoop().runTasks();
815         } catch (Exception e) {
816             recordException(e);
817         }
818 
819         try {
820             embeddedEventLoop().runScheduledTasks();
821         } catch (Exception e) {
822             recordException(e);
823         }
824     }
825 
826     /**
827      * Check whether this channel has any pending tasks that would be executed by a call to {@link #runPendingTasks()}.
828      * This includes normal tasks, and scheduled tasks where the deadline has expired. If this method returns
829      * {@code false}, a call to {@link #runPendingTasks()} would do nothing.
830      *
831      * @return {@code true} if there are any pending tasks, {@code false} otherwise.
832      */
833     public boolean hasPendingTasks() {
834         return embeddedEventLoop().hasPendingNormalTasks() ||
835                 embeddedEventLoop().nextScheduledTask() == 0;
836     }
837 
838     /**
839      * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
840      * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
841      * {@code -1}.
842      */
843     public long runScheduledPendingTasks() {
844         try {
845             return embeddedEventLoop().runScheduledTasks();
846         } catch (Exception e) {
847             recordException(e);
848             return embeddedEventLoop().nextScheduledTask();
849         }
850     }
851 
852     private void recordException(ChannelFuture future) {
853         if (!future.isSuccess()) {
854             recordException(future.cause());
855         }
856     }
857 
858     private void recordException(Throwable cause) {
859         if (lastException == null) {
860             lastException = cause;
861         } else {
862             logger.warn(
863                     "More than one exception was raised. " +
864                             "Will report only the first one and log others.", cause);
865         }
866     }
867 
868     /**
869      * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute
870      * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called).
871      */
872     public void advanceTimeBy(long duration, TimeUnit unit) {
873         embeddedEventLoop().advanceTimeBy(unit.toNanos(duration));
874     }
875 
876     /**
877      * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on
878      * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to
879      * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute.
880      */
881     public void freezeTime() {
882         embeddedEventLoop().freezeTime();
883     }
884 
885     /**
886      * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where
887      * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()}
888      * was called, it will run ten minutes after this method is called again (assuming no
889      * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using
890      * {@link #runScheduledPendingTasks()}).
891      */
892     public void unfreezeTime() {
893         embeddedEventLoop().unfreezeTime();
894     }
895 
896     /**
897      * Checks for the presence of an {@link Exception}.
898      */
899     private ChannelFuture checkException(ChannelPromise promise) {
900       Throwable t = lastException;
901       if (t != null) {
902         lastException = null;
903 
904         if (promise.isVoid()) {
905             PlatformDependent.throwException(t);
906         }
907 
908         return promise.setFailure(t);
909       }
910 
911       return promise.setSuccess();
912     }
913 
914     /**
915      * Check if there was any {@link Throwable} received and if so rethrow it.
916      */
917     public void checkException() {
918       checkException(voidPromise());
919     }
920 
921     /**
922      * Returns {@code true} if the {@link Channel} is open and records optionally
923      * an {@link Exception} if it isn't.
924      */
925     private boolean checkOpen(boolean recordException) {
926         if (!isOpen()) {
927           if (recordException) {
928               recordException(new ClosedChannelException());
929           }
930           return false;
931       }
932 
933       return true;
934     }
935 
936     private EmbeddedEventLoop embeddedEventLoop() {
937         if (isRegistered()) {
938             return (EmbeddedEventLoop) super.eventLoop();
939         }
940 
941         return loop;
942     }
943 
944     /**
945      * Ensure the {@link Channel} is open and if not throw an exception.
946      */
947     protected final void ensureOpen() {
948         if (!checkOpen(true)) {
949             checkException();
950         }
951     }
952 
953     @Override
954     protected boolean isCompatible(EventLoop loop) {
955         return loop instanceof EmbeddedEventLoop;
956     }
957 
958     @Override
959     protected SocketAddress localAddress0() {
960         return isActive()? LOCAL_ADDRESS : null;
961     }
962 
963     @Override
964     protected SocketAddress remoteAddress0() {
965         return isActive()? REMOTE_ADDRESS : null;
966     }
967 
968     @Override
969     protected void doRegister() throws Exception {
970         state = State.ACTIVE;
971     }
972 
973     @Override
974     protected void doBind(SocketAddress localAddress) throws Exception {
975         // NOOP
976     }
977 
978     @Override
979     protected void doDisconnect() throws Exception {
980         if (!metadata.hasDisconnect()) {
981             doClose();
982         }
983     }
984 
985     @Override
986     protected void doClose() throws Exception {
987         state = State.CLOSED;
988     }
989 
990     @Override
991     protected void doBeginRead() throws Exception {
992         // NOOP
993     }
994 
995     @Override
996     protected AbstractUnsafe newUnsafe() {
997         return new EmbeddedUnsafe();
998     }
999 
1000     @Override
1001     public Unsafe unsafe() {
1002         return ((EmbeddedUnsafe) super.unsafe()).wrapped;
1003     }
1004 
1005     @Override
1006     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
1007         for (;;) {
1008             Object msg = in.current();
1009             if (msg == null) {
1010                 break;
1011             }
1012 
1013             ReferenceCountUtil.retain(msg);
1014             handleOutboundMessage(msg);
1015             in.remove();
1016         }
1017     }
1018 
1019     /**
1020      * Called for each outbound message.
1021      *
1022      * @see #doWrite(ChannelOutboundBuffer)
1023      */
1024     protected void handleOutboundMessage(Object msg) {
1025         outboundMessages().add(msg);
1026     }
1027 
1028     /**
1029      * Called for each inbound message.
1030      */
1031     protected void handleInboundMessage(Object msg) {
1032         inboundMessages().add(msg);
1033     }
1034 
1035     private final class EmbeddedUnsafe extends AbstractUnsafe {
1036 
1037         // Delegates to the EmbeddedUnsafe instance but ensures runPendingTasks() is called after each operation
1038         // that may change the state of the Channel and may schedule tasks for later execution.
1039         final Unsafe wrapped = new Unsafe() {
1040             @Override
1041             public RecvByteBufAllocator.Handle recvBufAllocHandle() {
1042                 return EmbeddedUnsafe.this.recvBufAllocHandle();
1043             }
1044 
1045             @Override
1046             public SocketAddress localAddress() {
1047                 return EmbeddedUnsafe.this.localAddress();
1048             }
1049 
1050             @Override
1051             public SocketAddress remoteAddress() {
1052                 return EmbeddedUnsafe.this.remoteAddress();
1053             }
1054 
1055             @Override
1056             public void register(EventLoop eventLoop, ChannelPromise promise) {
1057                 executingStackCnt++;
1058                 try {
1059                     EmbeddedUnsafe.this.register(eventLoop, promise);
1060                 } finally {
1061                     executingStackCnt--;
1062                     maybeRunPendingTasks();
1063                 }
1064             }
1065 
1066             @Override
1067             public void bind(SocketAddress localAddress, ChannelPromise promise) {
1068                 executingStackCnt++;
1069                 try {
1070                     EmbeddedUnsafe.this.bind(localAddress, promise);
1071                 } finally {
1072                     executingStackCnt--;
1073                     maybeRunPendingTasks();
1074                 }
1075             }
1076 
1077             @Override
1078             public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1079                 executingStackCnt++;
1080                 try {
1081                     EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
1082                 } finally {
1083                     executingStackCnt--;
1084                     maybeRunPendingTasks();
1085                 }
1086             }
1087 
1088             @Override
1089             public void disconnect(ChannelPromise promise) {
1090                 executingStackCnt++;
1091                 try {
1092                     EmbeddedUnsafe.this.disconnect(promise);
1093                 } finally {
1094                     executingStackCnt--;
1095                     maybeRunPendingTasks();
1096                 }
1097             }
1098 
1099             @Override
1100             public void close(ChannelPromise promise) {
1101                 executingStackCnt++;
1102                 try {
1103                     EmbeddedUnsafe.this.close(promise);
1104                 } finally {
1105                     executingStackCnt--;
1106                     maybeRunPendingTasks();
1107                 }
1108             }
1109 
1110             @Override
1111             public void closeForcibly() {
1112                 executingStackCnt++;
1113                 try {
1114                     EmbeddedUnsafe.this.closeForcibly();
1115                 } finally {
1116                     executingStackCnt--;
1117                     maybeRunPendingTasks();
1118                 }
1119             }
1120 
1121             @Override
1122             public void deregister(ChannelPromise promise) {
1123                 executingStackCnt++;
1124                 try {
1125                     EmbeddedUnsafe.this.deregister(promise);
1126                 } finally {
1127                     executingStackCnt--;
1128                     maybeRunPendingTasks();
1129                 }
1130             }
1131 
1132             @Override
1133             public void beginRead() {
1134                 executingStackCnt++;
1135                 try {
1136                     EmbeddedUnsafe.this.beginRead();
1137                 } finally {
1138                     executingStackCnt--;
1139                     maybeRunPendingTasks();
1140                 }
1141             }
1142 
1143             @Override
1144             public void write(Object msg, ChannelPromise promise) {
1145                 executingStackCnt++;
1146                 try {
1147                     EmbeddedUnsafe.this.write(msg, promise);
1148                 } finally {
1149                     executingStackCnt--;
1150                     maybeRunPendingTasks();
1151                 }
1152             }
1153 
1154             @Override
1155             public void flush() {
1156                 executingStackCnt++;
1157                 try {
1158                     EmbeddedUnsafe.this.flush();
1159                 } finally {
1160                     executingStackCnt--;
1161                     maybeRunPendingTasks();
1162                 }
1163             }
1164 
1165             @Override
1166             public ChannelPromise voidPromise() {
1167                 return EmbeddedUnsafe.this.voidPromise();
1168             }
1169 
1170             @Override
1171             public ChannelOutboundBuffer outboundBuffer() {
1172                 return EmbeddedUnsafe.this.outboundBuffer();
1173             }
1174         };
1175 
1176         @Override
1177         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1178             safeSetSuccess(promise);
1179         }
1180     }
1181 
1182     private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
1183         EmbeddedChannelPipeline(EmbeddedChannel channel) {
1184             super(channel);
1185         }
1186 
1187         @Override
1188         protected void onUnhandledInboundException(Throwable cause) {
1189             recordException(cause);
1190         }
1191 
1192         @Override
1193         protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1194             handleInboundMessage(msg);
1195         }
1196     }
1197 }