View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelId;
26  import io.netty.channel.ChannelInitializer;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.DefaultChannelConfig;
32  import io.netty.channel.DefaultChannelPipeline;
33  import io.netty.channel.EventLoop;
34  import io.netty.channel.RecvByteBufAllocator;
35  import io.netty.util.ReferenceCountUtil;
36  import io.netty.util.internal.ObjectUtil;
37  import io.netty.util.internal.PlatformDependent;
38  import io.netty.util.internal.RecyclableArrayList;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.net.SocketAddress;
43  import java.nio.channels.ClosedChannelException;
44  import java.util.ArrayDeque;
45  import java.util.Queue;
46  import java.util.concurrent.TimeUnit;
47  
48  /**
49   * Base class for {@link Channel} implementations that are used in an embedded fashion.
50   */
51  public class EmbeddedChannel extends AbstractChannel {
52  
53      private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
54      private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
55  
56      private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
57      private enum State { OPEN, ACTIVE, CLOSED }
58  
59      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
60  
61      private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
62      private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
63  
64      private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
65      private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
66          @Override
67          public void operationComplete(ChannelFuture future) throws Exception {
68              recordException(future);
69          }
70      };
71  
72      private final ChannelMetadata metadata;
73      private final ChannelConfig config;
74  
75      private Queue<Object> inboundMessages;
76      private Queue<Object> outboundMessages;
77      private Throwable lastException;
78      private State state;
79      private int executingStackCnt;
80      private boolean cancelRemainingScheduledTasks;
81  
82      /**
83       * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
84       */
85      public EmbeddedChannel() {
86          this(EMPTY_HANDLERS);
87      }
88  
89      /**
90       * Create a new instance with the specified ID and an empty pipeline.
91       *
92       * @param channelId the {@link ChannelId} that will be used to identify this channel
93       */
94      public EmbeddedChannel(ChannelId channelId) {
95          this(channelId, EMPTY_HANDLERS);
96      }
97  
98      /**
99       * Create a new instance with the pipeline initialized with the specified handlers.
100      *
101      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
102      */
103     public EmbeddedChannel(ChannelHandler... handlers) {
104         this(EmbeddedChannelId.INSTANCE, handlers);
105     }
106 
107     /**
108      * Create a new instance with the pipeline initialized with the specified handlers.
109      *
110      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
111      *                      to {@link #close()}, {@code true} otherwise.
112      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
113      */
114     public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
115         this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
116     }
117 
118     /**
119      * Create a new instance with the pipeline initialized with the specified handlers.
120      *
121      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
122      *                 constructor. If {@code false} the user will need to call {@link #register()}.
123      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
124      *                      to {@link #close()}, {@code true} otherwise.
125      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
126      */
127     public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
128         this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
129     }
130 
131     /**
132      * Create a new instance with the channel ID set to the given ID and the pipeline
133      * initialized with the specified handlers.
134      *
135      * @param channelId the {@link ChannelId} that will be used to identify this channel
136      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
137      */
138     public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
139         this(channelId, false, handlers);
140     }
141 
142     /**
143      * Create a new instance with the channel ID set to the given ID and the pipeline
144      * initialized with the specified handlers.
145      *
146      * @param channelId the {@link ChannelId} that will be used to identify this channel
147      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
148      *                      to {@link #close()}, {@code true} otherwise.
149      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
150      */
151     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
152         this(channelId, true, hasDisconnect, handlers);
153     }
154 
155     /**
156      * Create a new instance with the channel ID set to the given ID and the pipeline
157      * initialized with the specified handlers.
158      *
159      * @param channelId the {@link ChannelId} that will be used to identify this channel
160      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
161      *                 constructor. If {@code false} the user will need to call {@link #register()}.
162      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
163      *                      to {@link #close()}, {@code true} otherwise.
164      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
165      */
166     public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
167                            ChannelHandler... handlers) {
168         this(null, channelId, register, hasDisconnect, handlers);
169     }
170 
171     /**
172      * Create a new instance with the channel ID set to the given ID and the pipeline
173      * initialized with the specified handlers.
174      *
175      * @param parent    the parent {@link Channel} of this {@link EmbeddedChannel}.
176      * @param channelId the {@link ChannelId} that will be used to identify this channel
177      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
178      *                 constructor. If {@code false} the user will need to call {@link #register()}.
179      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
180      *                      to {@link #close()}, {@code true} otherwise.
181      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
182      */
183     public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
184                            final ChannelHandler... handlers) {
185         super(parent, channelId);
186         metadata = metadata(hasDisconnect);
187         config = new DefaultChannelConfig(this);
188         setup(register, handlers);
189     }
190 
191     /**
192      * Create a new instance with the channel ID set to the given ID and the pipeline
193      * initialized with the specified handlers.
194      *
195      * @param channelId the {@link ChannelId} that will be used to identify this channel
196      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
197      *                      to {@link #close()}, {@code true} otherwise.
198      * @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
199      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
200      */
201     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
202                            final ChannelHandler... handlers) {
203         super(null, channelId);
204         metadata = metadata(hasDisconnect);
205         this.config = ObjectUtil.checkNotNull(config, "config");
206         setup(true, handlers);
207     }
208 
209     private static ChannelMetadata metadata(boolean hasDisconnect) {
210         return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
211     }
212 
213     private void setup(boolean register, final ChannelHandler... handlers) {
214         ObjectUtil.checkNotNull(handlers, "handlers");
215         ChannelPipeline p = pipeline();
216         p.addLast(new ChannelInitializer<Channel>() {
217             @Override
218             protected void initChannel(Channel ch) throws Exception {
219                 ChannelPipeline pipeline = ch.pipeline();
220                 for (ChannelHandler h: handlers) {
221                     if (h == null) {
222                         break;
223                     }
224                     pipeline.addLast(h);
225                 }
226             }
227         });
228         if (register) {
229             ChannelFuture future = loop.register(this);
230             assert future.isDone();
231         }
232     }
233 
234     /**
235      * Register this {@code Channel} on its {@link EventLoop}.
236      */
237     public void register() throws Exception {
238         ChannelFuture future = loop.register(this);
239         assert future.isDone();
240         Throwable cause = future.cause();
241         if (cause != null) {
242             PlatformDependent.throwException(cause);
243         }
244     }
245 
246     @Override
247     protected final DefaultChannelPipeline newChannelPipeline() {
248         return new EmbeddedChannelPipeline(this);
249     }
250 
251     @Override
252     public ChannelMetadata metadata() {
253         return metadata;
254     }
255 
256     @Override
257     public ChannelConfig config() {
258         return config;
259     }
260 
261     @Override
262     public boolean isOpen() {
263         return state != State.CLOSED;
264     }
265 
266     @Override
267     public boolean isActive() {
268         return state == State.ACTIVE;
269     }
270 
271     /**
272      * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
273      */
274     public Queue<Object> inboundMessages() {
275         if (inboundMessages == null) {
276             inboundMessages = new ArrayDeque<Object>();
277         }
278         return inboundMessages;
279     }
280 
281     /**
282      * @deprecated use {@link #inboundMessages()}
283      */
284     @Deprecated
285     public Queue<Object> lastInboundBuffer() {
286         return inboundMessages();
287     }
288 
289     /**
290      * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
291      */
292     public Queue<Object> outboundMessages() {
293         if (outboundMessages == null) {
294             outboundMessages = new ArrayDeque<Object>();
295         }
296         return outboundMessages;
297     }
298 
299     /**
300      * @deprecated use {@link #outboundMessages()}
301      */
302     @Deprecated
303     public Queue<Object> lastOutboundBuffer() {
304         return outboundMessages();
305     }
306 
307     /**
308      * Return received data from this {@link Channel}
309      */
310     @SuppressWarnings("unchecked")
311     public <T> T readInbound() {
312         T message = (T) poll(inboundMessages);
313         if (message != null) {
314             ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
315         }
316         return message;
317     }
318 
319     /**
320      * Read data from the outbound. This may return {@code null} if nothing is readable.
321      */
322     @SuppressWarnings("unchecked")
323     public <T> T readOutbound() {
324         T message =  (T) poll(outboundMessages);
325         if (message != null) {
326             ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
327         }
328         return message;
329     }
330 
331     /**
332      * Write messages to the inbound of this {@link Channel}.
333      *
334      * @param msgs the messages to be written
335      *
336      * @return {@code true} if the write operation did add something to the inbound buffer
337      */
338     public boolean writeInbound(Object... msgs) {
339         ensureOpen();
340         if (msgs.length == 0) {
341             return isNotEmpty(inboundMessages);
342         }
343 
344         executingStackCnt++;
345         try {
346             ChannelPipeline p = pipeline();
347             for (Object m : msgs) {
348                 p.fireChannelRead(m);
349             }
350 
351             flushInbound(false, voidPromise());
352         } finally {
353             executingStackCnt--;
354             maybeRunPendingTasks();
355         }
356         return isNotEmpty(inboundMessages);
357     }
358 
359     /**
360      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
361      * method is conceptually equivalent to {@link #write(Object)}.
362      *
363      * @see #writeOneOutbound(Object)
364      */
365     public ChannelFuture writeOneInbound(Object msg) {
366         return writeOneInbound(msg, newPromise());
367     }
368 
369     /**
370      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
371      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
372      *
373      * @see #writeOneOutbound(Object, ChannelPromise)
374      */
375     public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
376         executingStackCnt++;
377         try {
378             if (checkOpen(true)) {
379                 pipeline().fireChannelRead(msg);
380             }
381         } finally {
382             executingStackCnt--;
383             maybeRunPendingTasks();
384         }
385         return checkException(promise);
386     }
387 
388     /**
389      * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
390      *
391      * @see #flushOutbound()
392      */
393     public EmbeddedChannel flushInbound() {
394         flushInbound(true, voidPromise());
395         return this;
396     }
397 
398     private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
399         executingStackCnt++;
400         try {
401             if (checkOpen(recordException)) {
402                 pipeline().fireChannelReadComplete();
403                 runPendingTasks();
404             }
405         } finally {
406             executingStackCnt--;
407             maybeRunPendingTasks();
408         }
409 
410       return checkException(promise);
411     }
412 
413     /**
414      * Write messages to the outbound of this {@link Channel}.
415      *
416      * @param msgs              the messages to be written
417      * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
418      */
419     public boolean writeOutbound(Object... msgs) {
420         ensureOpen();
421         if (msgs.length == 0) {
422             return isNotEmpty(outboundMessages);
423         }
424 
425         executingStackCnt++;
426         RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
427         try {
428             try {
429                 for (Object m : msgs) {
430                     if (m == null) {
431                         break;
432                     }
433                     futures.add(write(m));
434                 }
435 
436                 flushOutbound0();
437 
438                 int size = futures.size();
439                 for (int i = 0; i < size; i++) {
440                     ChannelFuture future = (ChannelFuture) futures.get(i);
441                     if (future.isDone()) {
442                         recordException(future);
443                     } else {
444                         // The write may be delayed to run later by runPendingTasks()
445                         future.addListener(recordExceptionListener);
446                     }
447                 }
448             } finally {
449                 executingStackCnt--;
450                 maybeRunPendingTasks();
451             }
452             checkException();
453             return isNotEmpty(outboundMessages);
454         } finally {
455             futures.recycle();
456         }
457     }
458 
459     /**
460      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
461      * method is conceptually equivalent to {@link #write(Object)}.
462      *
463      * @see #writeOneInbound(Object)
464      */
465     public ChannelFuture writeOneOutbound(Object msg) {
466         return writeOneOutbound(msg, newPromise());
467     }
468 
469     /**
470      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
471      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
472      *
473      * @see #writeOneInbound(Object, ChannelPromise)
474      */
475     public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
476         executingStackCnt++;
477         try {
478             if (checkOpen(true)) {
479                 return write(msg, promise);
480             }
481         } finally {
482             executingStackCnt--;
483             maybeRunPendingTasks();
484         }
485 
486         return checkException(promise);
487     }
488 
489     /**
490      * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
491      *
492      * @see #flushInbound()
493      */
494     public EmbeddedChannel flushOutbound() {
495         executingStackCnt++;
496         try {
497             if (checkOpen(true)) {
498                 flushOutbound0();
499             }
500         } finally {
501             executingStackCnt--;
502             maybeRunPendingTasks();
503         }
504         checkException(voidPromise());
505         return this;
506     }
507 
508     private void flushOutbound0() {
509         // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
510         // delay the write on the next eventloop run.
511         runPendingTasks();
512 
513         flush();
514     }
515 
516     /**
517      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
518      *
519      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
520      */
521     public boolean finish() {
522         return finish(false);
523     }
524 
525     /**
526      * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
527      * Any further try to write data to it will fail.
528      *
529      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
530      */
531     public boolean finishAndReleaseAll() {
532         return finish(true);
533     }
534 
535     /**
536      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
537      *
538      * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
539      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
540      */
541     private boolean finish(boolean releaseAll) {
542         executingStackCnt++;
543         try {
544             close();
545         } finally {
546             executingStackCnt--;
547             maybeRunPendingTasks();
548         }
549         try {
550             checkException();
551             return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
552         } finally {
553             if (releaseAll) {
554                 releaseAll(inboundMessages);
555                 releaseAll(outboundMessages);
556             }
557         }
558     }
559 
560     /**
561      * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
562      * otherwise.
563      */
564     public boolean releaseInbound() {
565         return releaseAll(inboundMessages);
566     }
567 
568     /**
569      * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
570      * otherwise.
571      */
572     public boolean releaseOutbound() {
573         return releaseAll(outboundMessages);
574     }
575 
576     private static boolean releaseAll(Queue<Object> queue) {
577         if (isNotEmpty(queue)) {
578             for (;;) {
579                 Object msg = queue.poll();
580                 if (msg == null) {
581                     break;
582                 }
583                 ReferenceCountUtil.release(msg);
584             }
585             return true;
586         }
587         return false;
588     }
589 
590     @Override
591     public final ChannelFuture close() {
592         return close(newPromise());
593     }
594 
595     @Override
596     public final ChannelFuture disconnect() {
597         return disconnect(newPromise());
598     }
599 
600     @Override
601     public final ChannelFuture close(ChannelPromise promise) {
602         // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
603         // that needs to be run before the actual close takes place.
604         executingStackCnt++;
605         ChannelFuture future;
606         try {
607             runPendingTasks();
608             future = super.close(promise);
609 
610             cancelRemainingScheduledTasks = true;
611         } finally {
612             executingStackCnt--;
613             maybeRunPendingTasks();
614         }
615         return future;
616     }
617 
618     @Override
619     public final ChannelFuture disconnect(ChannelPromise promise) {
620         executingStackCnt++;
621         ChannelFuture future;
622         try {
623             future = super.disconnect(promise);
624 
625             if (!metadata.hasDisconnect()) {
626                 cancelRemainingScheduledTasks = true;
627             }
628         } finally {
629             executingStackCnt--;
630             maybeRunPendingTasks();
631         }
632         return future;
633     }
634 
635     @Override
636     public ChannelFuture bind(SocketAddress localAddress) {
637         executingStackCnt++;
638         try {
639             return super.bind(localAddress);
640         } finally {
641             executingStackCnt--;
642             maybeRunPendingTasks();
643         }
644     }
645 
646     @Override
647     public ChannelFuture connect(SocketAddress remoteAddress) {
648         executingStackCnt++;
649         try {
650             return super.connect(remoteAddress);
651         } finally {
652             executingStackCnt--;
653             maybeRunPendingTasks();
654         }
655     }
656 
657     @Override
658     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
659         executingStackCnt++;
660         try {
661             return super.connect(remoteAddress, localAddress);
662         } finally {
663             executingStackCnt--;
664             maybeRunPendingTasks();
665         }
666     }
667 
668     @Override
669     public ChannelFuture deregister() {
670         executingStackCnt++;
671         try {
672             return super.deregister();
673         } finally {
674             executingStackCnt--;
675             maybeRunPendingTasks();
676         }
677     }
678 
679     @Override
680     public Channel flush() {
681         executingStackCnt++;
682         try {
683             return super.flush();
684         } finally {
685             executingStackCnt--;
686             maybeRunPendingTasks();
687         }
688     }
689 
690     @Override
691     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
692         executingStackCnt++;
693         try {
694             return super.bind(localAddress, promise);
695         } finally {
696             executingStackCnt--;
697             maybeRunPendingTasks();
698         }
699     }
700 
701     @Override
702     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
703         executingStackCnt++;
704         try {
705             return super.connect(remoteAddress, promise);
706         } finally {
707             executingStackCnt--;
708             maybeRunPendingTasks();
709         }
710     }
711 
712     @Override
713     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
714         executingStackCnt++;
715         try {
716             return super.connect(remoteAddress, localAddress, promise);
717         } finally {
718             executingStackCnt--;
719             maybeRunPendingTasks();
720         }
721     }
722 
723     @Override
724     public ChannelFuture deregister(ChannelPromise promise) {
725         executingStackCnt++;
726         try {
727             return super.deregister(promise);
728         } finally {
729             executingStackCnt--;
730             maybeRunPendingTasks();
731         }
732     }
733 
734     @Override
735     public Channel read() {
736         executingStackCnt++;
737         try {
738             return super.read();
739         } finally {
740             executingStackCnt--;
741             maybeRunPendingTasks();
742         }
743     }
744 
745     @Override
746     public ChannelFuture write(Object msg) {
747         executingStackCnt++;
748         try {
749             return super.write(msg);
750         } finally {
751             executingStackCnt--;
752             maybeRunPendingTasks();
753         }
754     }
755 
756     @Override
757     public ChannelFuture write(Object msg, ChannelPromise promise) {
758         executingStackCnt++;
759         try {
760             return super.write(msg, promise);
761         } finally {
762             executingStackCnt--;
763             maybeRunPendingTasks();
764         }
765     }
766 
767     @Override
768     public ChannelFuture writeAndFlush(Object msg) {
769         executingStackCnt++;
770         try {
771             return super.writeAndFlush(msg);
772         } finally {
773             executingStackCnt--;
774             maybeRunPendingTasks();
775         }
776     }
777 
778     @Override
779     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
780         executingStackCnt++;
781         try {
782             return super.writeAndFlush(msg, promise);
783         } finally {
784             executingStackCnt--;
785             maybeRunPendingTasks();
786         }
787     }
788 
789     private static boolean isNotEmpty(Queue<Object> queue) {
790         return queue != null && !queue.isEmpty();
791     }
792 
793     private static Object poll(Queue<Object> queue) {
794         return queue != null ? queue.poll() : null;
795     }
796 
797     private void maybeRunPendingTasks() {
798         if (executingStackCnt == 0) {
799             runPendingTasks();
800 
801             if (cancelRemainingScheduledTasks) {
802                 // Cancel all scheduled tasks that are left.
803                 embeddedEventLoop().cancelScheduledTasks();
804             }
805         }
806     }
807 
808     /**
809      * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
810      * for this {@link Channel}
811      */
812     public void runPendingTasks() {
813         try {
814             embeddedEventLoop().runTasks();
815         } catch (Exception e) {
816             recordException(e);
817         }
818 
819         try {
820             embeddedEventLoop().runScheduledTasks();
821         } catch (Exception e) {
822             recordException(e);
823         }
824     }
825 
826     /**
827      * Check whether this channel has any pending tasks that would be executed by a call to {@link #runPendingTasks()}.
828      * This includes normal tasks, and scheduled tasks where the deadline has expired. If this method returns
829      * {@code false}, a call to {@link #runPendingTasks()} would do nothing.
830      *
831      * @return {@code true} if there are any pending tasks, {@code false} otherwise.
832      */
833     public boolean hasPendingTasks() {
834         return embeddedEventLoop().hasPendingNormalTasks() ||
835                 embeddedEventLoop().nextScheduledTask() == 0;
836     }
837 
838     /**
839      * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
840      * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
841      * {@code -1}.
842      */
843     public long runScheduledPendingTasks() {
844         try {
845             return embeddedEventLoop().runScheduledTasks();
846         } catch (Exception e) {
847             recordException(e);
848             return embeddedEventLoop().nextScheduledTask();
849         }
850     }
851 
852     private void recordException(ChannelFuture future) {
853         if (!future.isSuccess()) {
854             recordException(future.cause());
855         }
856     }
857 
858     private void recordException(Throwable cause) {
859         if (lastException == null) {
860             lastException = cause;
861         } else {
862             logger.warn(
863                     "More than one exception was raised. " +
864                             "Will report only the first one and log others.", cause);
865         }
866     }
867 
868     /**
869      * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute
870      * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called).
871      */
872     public void advanceTimeBy(long duration, TimeUnit unit) {
873         embeddedEventLoop().advanceTimeBy(unit.toNanos(duration));
874     }
875 
876     /**
877      * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on
878      * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to
879      * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute.
880      */
881     public void freezeTime() {
882         embeddedEventLoop().freezeTime();
883     }
884 
885     /**
886      * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where
887      * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()}
888      * was called, it will run ten minutes after this method is called again (assuming no
889      * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using
890      * {@link #runScheduledPendingTasks()}).
891      */
892     public void unfreezeTime() {
893         embeddedEventLoop().unfreezeTime();
894     }
895 
896     /**
897      * Checks for the presence of an {@link Exception}.
898      */
899     private ChannelFuture checkException(ChannelPromise promise) {
900       Throwable t = lastException;
901       if (t != null) {
902         lastException = null;
903 
904         if (promise.isVoid()) {
905             PlatformDependent.throwException(t);
906         }
907 
908         return promise.setFailure(t);
909       }
910 
911       return promise.setSuccess();
912     }
913 
914     /**
915      * Check if there was any {@link Throwable} received and if so rethrow it.
916      */
917     public void checkException() {
918       checkException(voidPromise());
919     }
920 
921     /**
922      * Returns {@code true} if the {@link Channel} is open and records optionally
923      * an {@link Exception} if it isn't.
924      */
925     private boolean checkOpen(boolean recordException) {
926         if (!isOpen()) {
927           if (recordException) {
928               recordException(new ClosedChannelException());
929           }
930           return false;
931       }
932 
933       return true;
934     }
935 
936     private EmbeddedEventLoop embeddedEventLoop() {
937         if (isRegistered()) {
938             return (EmbeddedEventLoop) super.eventLoop();
939         }
940 
941         return loop;
942     }
943 
944     /**
945      * Ensure the {@link Channel} is open and if not throw an exception.
946      */
947     protected final void ensureOpen() {
948         if (!checkOpen(true)) {
949             checkException();
950         }
951     }
952 
953     @Override
954     protected boolean isCompatible(EventLoop loop) {
955         return loop instanceof EmbeddedEventLoop;
956     }
957 
958     @Override
959     protected SocketAddress localAddress0() {
960         return isActive()? LOCAL_ADDRESS : null;
961     }
962 
963     @Override
964     protected SocketAddress remoteAddress0() {
965         return isActive()? REMOTE_ADDRESS : null;
966     }
967 
968     @Override
969     protected void doRegister() throws Exception {
970         state = State.ACTIVE;
971     }
972 
973     @Override
974     protected void doBind(SocketAddress localAddress) throws Exception {
975         // NOOP
976     }
977 
978     @Override
979     protected void doDisconnect() throws Exception {
980         if (!metadata.hasDisconnect()) {
981             doClose();
982         }
983     }
984 
985     @Override
986     protected void doClose() throws Exception {
987         state = State.CLOSED;
988     }
989 
990     @Override
991     protected void doBeginRead() throws Exception {
992         // NOOP
993     }
994 
995     @Override
996     protected AbstractUnsafe newUnsafe() {
997         return new EmbeddedUnsafe();
998     }
999 
1000     @Override
1001     public Unsafe unsafe() {
1002         return ((EmbeddedUnsafe) super.unsafe()).wrapped;
1003     }
1004 
1005     @Override
1006     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
1007         for (;;) {
1008             Object msg = in.current();
1009             if (msg == null) {
1010                 break;
1011             }
1012 
1013             ReferenceCountUtil.retain(msg);
1014             handleOutboundMessage(msg);
1015             in.remove();
1016         }
1017     }
1018 
1019     /**
1020      * Called for each outbound message.
1021      *
1022      * @see #doWrite(ChannelOutboundBuffer)
1023      */
1024     protected void handleOutboundMessage(Object msg) {
1025         outboundMessages().add(msg);
1026     }
1027 
1028     /**
1029      * Called for each inbound message.
1030      */
1031     protected void handleInboundMessage(Object msg) {
1032         inboundMessages().add(msg);
1033     }
1034 
1035     private final class EmbeddedUnsafe extends AbstractUnsafe {
1036 
1037         // Delegates to the EmbeddedUnsafe instance but ensures runPendingTasks() is called after each operation
1038         // that may change the state of the Channel and may schedule tasks for later execution.
1039         final Unsafe wrapped = new Unsafe() {
1040             @Override
1041             public RecvByteBufAllocator.Handle recvBufAllocHandle() {
1042                 return EmbeddedUnsafe.this.recvBufAllocHandle();
1043             }
1044 
1045             @Override
1046             public SocketAddress localAddress() {
1047                 return EmbeddedUnsafe.this.localAddress();
1048             }
1049 
1050             @Override
1051             public SocketAddress remoteAddress() {
1052                 return EmbeddedUnsafe.this.remoteAddress();
1053             }
1054 
1055             @Override
1056             public void register(EventLoop eventLoop, ChannelPromise promise) {
1057                 executingStackCnt++;
1058                 try {
1059                     EmbeddedUnsafe.this.register(eventLoop, promise);
1060                 } finally {
1061                     executingStackCnt--;
1062                     maybeRunPendingTasks();
1063                 }
1064             }
1065 
1066             @Override
1067             public void bind(SocketAddress localAddress, ChannelPromise promise) {
1068                 executingStackCnt++;
1069                 try {
1070                     EmbeddedUnsafe.this.bind(localAddress, promise);
1071                 } finally {
1072                     executingStackCnt--;
1073                     maybeRunPendingTasks();
1074                 }
1075             }
1076 
1077             @Override
1078             public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1079                 executingStackCnt++;
1080                 try {
1081                     EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
1082                 } finally {
1083                     executingStackCnt--;
1084                     maybeRunPendingTasks();
1085                 }
1086             }
1087 
1088             @Override
1089             public void disconnect(ChannelPromise promise) {
1090                 executingStackCnt++;
1091                 try {
1092                     EmbeddedUnsafe.this.disconnect(promise);
1093                 } finally {
1094                     executingStackCnt--;
1095                     maybeRunPendingTasks();
1096                 }
1097             }
1098 
1099             @Override
1100             public void close(ChannelPromise promise) {
1101                 executingStackCnt++;
1102                 try {
1103                     EmbeddedUnsafe.this.close(promise);
1104                 } finally {
1105                     executingStackCnt--;
1106                     maybeRunPendingTasks();
1107                 }
1108             }
1109 
1110             @Override
1111             public void closeForcibly() {
1112                 executingStackCnt++;
1113                 try {
1114                     EmbeddedUnsafe.this.closeForcibly();
1115                 } finally {
1116                     executingStackCnt--;
1117                     maybeRunPendingTasks();
1118                 }
1119             }
1120 
1121             @Override
1122             public void deregister(ChannelPromise promise) {
1123                 executingStackCnt++;
1124                 try {
1125                     EmbeddedUnsafe.this.deregister(promise);
1126                 } finally {
1127                     executingStackCnt--;
1128                     maybeRunPendingTasks();
1129                 }
1130             }
1131 
1132             @Override
1133             public void beginRead() {
1134                 executingStackCnt++;
1135                 try {
1136                     EmbeddedUnsafe.this.beginRead();
1137                 } finally {
1138                     executingStackCnt--;
1139                     maybeRunPendingTasks();
1140                 }
1141             }
1142 
1143             @Override
1144             public void write(Object msg, ChannelPromise promise) {
1145                 executingStackCnt++;
1146                 try {
1147                     EmbeddedUnsafe.this.write(msg, promise);
1148                 } finally {
1149                     executingStackCnt--;
1150                     maybeRunPendingTasks();
1151                 }
1152             }
1153 
1154             @Override
1155             public void flush() {
1156                 executingStackCnt++;
1157                 try {
1158                     EmbeddedUnsafe.this.flush();
1159                 } finally {
1160                     executingStackCnt--;
1161                     maybeRunPendingTasks();
1162                 }
1163             }
1164 
1165             @Override
1166             public ChannelPromise voidPromise() {
1167                 return EmbeddedUnsafe.this.voidPromise();
1168             }
1169 
1170             @Override
1171             public ChannelOutboundBuffer outboundBuffer() {
1172                 return EmbeddedUnsafe.this.outboundBuffer();
1173             }
1174         };
1175 
1176         @Override
1177         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1178             safeSetSuccess(promise);
1179         }
1180     }
1181 
1182     private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
1183         EmbeddedChannelPipeline(EmbeddedChannel channel) {
1184             super(channel);
1185         }
1186 
1187         @Override
1188         protected void onUnhandledInboundException(Throwable cause) {
1189             recordException(cause);
1190         }
1191 
1192         @Override
1193         protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1194             handleInboundMessage(msg);
1195         }
1196     }
1197 }