View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelId;
26  import io.netty.channel.ChannelInitializer;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.DefaultChannelConfig;
32  import io.netty.channel.DefaultChannelPipeline;
33  import io.netty.channel.EventLoop;
34  import io.netty.channel.RecvByteBufAllocator;
35  import io.netty.util.ReferenceCountUtil;
36  import io.netty.util.concurrent.Ticker;
37  import io.netty.util.internal.PlatformDependent;
38  import io.netty.util.internal.RecyclableArrayList;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.net.SocketAddress;
43  import java.nio.channels.ClosedChannelException;
44  import java.util.ArrayDeque;
45  import java.util.Objects;
46  import java.util.Queue;
47  import java.util.concurrent.TimeUnit;
48  
49  /**
50   * Base class for {@link Channel} implementations that are used in an embedded fashion.
51   */
52  public class EmbeddedChannel extends AbstractChannel {
53  
54      private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
55      private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
56  
57      private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
58      private enum State { OPEN, ACTIVE, CLOSED }
59  
60      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
61  
62      private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
63      private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
64  
65      private final EmbeddedEventLoop loop;
66      private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
67          @Override
68          public void operationComplete(ChannelFuture future) throws Exception {
69              recordException(future);
70          }
71      };
72  
73      private final ChannelMetadata metadata;
74      private final ChannelConfig config;
75  
76      private Queue<Object> inboundMessages;
77      private Queue<Object> outboundMessages;
78      private Throwable lastException;
79      private State state;
80      private int executingStackCnt;
81      private boolean cancelRemainingScheduledTasks;
82  
83      /**
84       * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
85       */
86      public EmbeddedChannel() {
87          this(builder());
88      }
89  
90      /**
91       * Create a new instance with the specified ID and an empty pipeline.
92       *
93       * @param channelId the {@link ChannelId} that will be used to identify this channel
94       */
95      public EmbeddedChannel(ChannelId channelId) {
96          this(builder().channelId(channelId));
97      }
98  
99      /**
100      * Create a new instance with the pipeline initialized with the specified handlers.
101      *
102      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
103      */
104     public EmbeddedChannel(ChannelHandler... handlers) {
105         this(builder().handlers(handlers));
106     }
107 
108     /**
109      * Create a new instance with the pipeline initialized with the specified handlers.
110      *
111      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
112      *                      to {@link #close()}, {@code true} otherwise.
113      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
114      */
115     public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
116         this(builder().hasDisconnect(hasDisconnect).handlers(handlers));
117     }
118 
119     /**
120      * Create a new instance with the pipeline initialized with the specified handlers.
121      *
122      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
123      *                 constructor. If {@code false} the user will need to call {@link #register()}.
124      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
125      *                      to {@link #close()}, {@code true} otherwise.
126      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
127      */
128     public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
129         this(builder().register(register).hasDisconnect(hasDisconnect).handlers(handlers));
130     }
131 
132     /**
133      * Create a new instance with the channel ID set to the given ID and the pipeline
134      * initialized with the specified handlers.
135      *
136      * @param channelId the {@link ChannelId} that will be used to identify this channel
137      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
138      */
139     public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
140         this(builder().channelId(channelId).handlers(handlers));
141     }
142 
143     /**
144      * Create a new instance with the channel ID set to the given ID and the pipeline
145      * initialized with the specified handlers.
146      *
147      * @param channelId the {@link ChannelId} that will be used to identify this channel
148      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
149      *                      to {@link #close()}, {@code true} otherwise.
150      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
151      */
152     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
153         this(builder().channelId(channelId).hasDisconnect(hasDisconnect).handlers(handlers));
154     }
155 
156     /**
157      * Create a new instance with the channel ID set to the given ID and the pipeline
158      * initialized with the specified handlers.
159      *
160      * @param channelId the {@link ChannelId} that will be used to identify this channel
161      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
162      *                 constructor. If {@code false} the user will need to call {@link #register()}.
163      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
164      *                      to {@link #close()}, {@code true} otherwise.
165      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
166      */
167     public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
168                            ChannelHandler... handlers) {
169         this(builder().channelId(channelId).register(register).hasDisconnect(hasDisconnect).handlers(handlers));
170     }
171 
172     /**
173      * Create a new instance with the channel ID set to the given ID and the pipeline
174      * initialized with the specified handlers.
175      *
176      * @param parent    the parent {@link Channel} of this {@link EmbeddedChannel}.
177      * @param channelId the {@link ChannelId} that will be used to identify this channel
178      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
179      *                 constructor. If {@code false} the user will need to call {@link #register()}.
180      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
181      *                      to {@link #close()}, {@code true} otherwise.
182      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
183      */
184     public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
185                            final ChannelHandler... handlers) {
186         this(builder()
187                 .parent(parent)
188                 .channelId(channelId)
189                 .register(register)
190                 .hasDisconnect(hasDisconnect)
191                 .handlers(handlers));
192     }
193 
194     /**
195      * Create a new instance with the channel ID set to the given ID and the pipeline
196      * initialized with the specified handlers.
197      *
198      * @param channelId the {@link ChannelId} that will be used to identify this channel
199      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
200      *                      to {@link #close()}, {@code true} otherwise.
201      * @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
202      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
203      */
204     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
205                            final ChannelHandler... handlers) {
206         this(builder().channelId(channelId).hasDisconnect(hasDisconnect).config(config).handlers(handlers));
207     }
208 
209     /**
210      * Create a new instance with the configuration from the given builder. This method is {@code protected} for use by
211      * subclasses; Otherwise, please use {@link Builder#build()}.
212      *
213      * @param builder The builder
214      */
215     protected EmbeddedChannel(Builder builder) {
216         super(builder.parent, builder.channelId);
217         loop = new EmbeddedEventLoop(builder.ticker == null ? new EmbeddedEventLoop.FreezableTicker() : builder.ticker);
218         metadata = metadata(builder.hasDisconnect);
219         config = builder.config == null ? new DefaultChannelConfig(this) : builder.config;
220         if (builder.handler == null) {
221             setup(builder.register, builder.handlers);
222         } else {
223             setup(builder.register, builder.handler);
224         }
225     }
226 
227     private static ChannelMetadata metadata(boolean hasDisconnect) {
228         return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
229     }
230 
231     private void setup(boolean register, final ChannelHandler... handlers) {
232         ChannelPipeline p = pipeline();
233         p.addLast(new ChannelInitializer<Channel>() {
234             @Override
235             protected void initChannel(Channel ch) {
236                 ChannelPipeline pipeline = ch.pipeline();
237                 for (ChannelHandler h: handlers) {
238                     if (h == null) {
239                         break;
240                     }
241                     pipeline.addLast(h);
242                 }
243             }
244         });
245         if (register) {
246             ChannelFuture future = loop.register(this);
247             assert future.isDone();
248         }
249     }
250 
251     private void setup(boolean register, final ChannelHandler handler) {
252         ChannelPipeline p = pipeline();
253         p.addLast(handler);
254         if (register) {
255             ChannelFuture future = loop.register(this);
256             assert future.isDone();
257         }
258     }
259 
260     /**
261      * Register this {@code Channel} on its {@link EventLoop}.
262      */
263     public void register() throws Exception {
264         ChannelFuture future = loop.register(this);
265         assert future.isDone();
266         Throwable cause = future.cause();
267         if (cause != null) {
268             PlatformDependent.throwException(cause);
269         }
270     }
271 
272     @Override
273     protected final DefaultChannelPipeline newChannelPipeline() {
274         return new EmbeddedChannelPipeline(this);
275     }
276 
277     @Override
278     public ChannelMetadata metadata() {
279         return metadata;
280     }
281 
282     @Override
283     public ChannelConfig config() {
284         return config;
285     }
286 
287     @Override
288     public boolean isOpen() {
289         return state != State.CLOSED;
290     }
291 
292     @Override
293     public boolean isActive() {
294         return state == State.ACTIVE;
295     }
296 
297     /**
298      * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
299      */
300     public Queue<Object> inboundMessages() {
301         if (inboundMessages == null) {
302             inboundMessages = new ArrayDeque<Object>();
303         }
304         return inboundMessages;
305     }
306 
307     /**
308      * @deprecated use {@link #inboundMessages()}
309      */
310     @Deprecated
311     public Queue<Object> lastInboundBuffer() {
312         return inboundMessages();
313     }
314 
315     /**
316      * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
317      */
318     public Queue<Object> outboundMessages() {
319         if (outboundMessages == null) {
320             outboundMessages = new ArrayDeque<Object>();
321         }
322         return outboundMessages;
323     }
324 
325     /**
326      * @deprecated use {@link #outboundMessages()}
327      */
328     @Deprecated
329     public Queue<Object> lastOutboundBuffer() {
330         return outboundMessages();
331     }
332 
333     /**
334      * Return received data from this {@link Channel}
335      */
336     @SuppressWarnings("unchecked")
337     public <T> T readInbound() {
338         T message = (T) poll(inboundMessages);
339         if (message != null) {
340             ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
341         }
342         return message;
343     }
344 
345     /**
346      * Read data from the outbound. This may return {@code null} if nothing is readable.
347      */
348     @SuppressWarnings("unchecked")
349     public <T> T readOutbound() {
350         T message =  (T) poll(outboundMessages);
351         if (message != null) {
352             ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
353         }
354         return message;
355     }
356 
357     /**
358      * Write messages to the inbound of this {@link Channel}.
359      *
360      * @param msgs the messages to be written
361      *
362      * @return {@code true} if the write operation did add something to the inbound buffer
363      */
364     public boolean writeInbound(Object... msgs) {
365         ensureOpen();
366         if (msgs.length == 0) {
367             return isNotEmpty(inboundMessages);
368         }
369 
370         executingStackCnt++;
371         try {
372             ChannelPipeline p = pipeline();
373             for (Object m : msgs) {
374                 p.fireChannelRead(m);
375             }
376 
377             flushInbound(false, voidPromise());
378         } finally {
379             executingStackCnt--;
380             maybeRunPendingTasks();
381         }
382         return isNotEmpty(inboundMessages);
383     }
384 
385     /**
386      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
387      * method is conceptually equivalent to {@link #write(Object)}.
388      *
389      * @see #writeOneOutbound(Object)
390      */
391     public ChannelFuture writeOneInbound(Object msg) {
392         return writeOneInbound(msg, newPromise());
393     }
394 
395     /**
396      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
397      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
398      *
399      * @see #writeOneOutbound(Object, ChannelPromise)
400      */
401     public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
402         executingStackCnt++;
403         try {
404             if (checkOpen(true)) {
405                 pipeline().fireChannelRead(msg);
406             }
407         } finally {
408             executingStackCnt--;
409             maybeRunPendingTasks();
410         }
411         return checkException(promise);
412     }
413 
414     /**
415      * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
416      *
417      * @see #flushOutbound()
418      */
419     public EmbeddedChannel flushInbound() {
420         flushInbound(true, voidPromise());
421         return this;
422     }
423 
424     private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
425         executingStackCnt++;
426         try {
427             if (checkOpen(recordException)) {
428                 pipeline().fireChannelReadComplete();
429                 runPendingTasks();
430             }
431         } finally {
432             executingStackCnt--;
433             maybeRunPendingTasks();
434         }
435 
436       return checkException(promise);
437     }
438 
439     /**
440      * Write messages to the outbound of this {@link Channel}.
441      *
442      * @param msgs              the messages to be written
443      * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
444      */
445     public boolean writeOutbound(Object... msgs) {
446         ensureOpen();
447         if (msgs.length == 0) {
448             return isNotEmpty(outboundMessages);
449         }
450 
451         executingStackCnt++;
452         RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
453         try {
454             try {
455                 for (Object m : msgs) {
456                     if (m == null) {
457                         break;
458                     }
459                     futures.add(write(m));
460                 }
461 
462                 flushOutbound0();
463 
464                 int size = futures.size();
465                 for (int i = 0; i < size; i++) {
466                     ChannelFuture future = (ChannelFuture) futures.get(i);
467                     if (future.isDone()) {
468                         recordException(future);
469                     } else {
470                         // The write may be delayed to run later by runPendingTasks()
471                         future.addListener(recordExceptionListener);
472                     }
473                 }
474             } finally {
475                 executingStackCnt--;
476                 maybeRunPendingTasks();
477             }
478             checkException();
479             return isNotEmpty(outboundMessages);
480         } finally {
481             futures.recycle();
482         }
483     }
484 
485     /**
486      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
487      * method is conceptually equivalent to {@link #write(Object)}.
488      *
489      * @see #writeOneInbound(Object)
490      */
491     public ChannelFuture writeOneOutbound(Object msg) {
492         return writeOneOutbound(msg, newPromise());
493     }
494 
495     /**
496      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
497      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
498      *
499      * @see #writeOneInbound(Object, ChannelPromise)
500      */
501     public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
502         executingStackCnt++;
503         try {
504             if (checkOpen(true)) {
505                 return write(msg, promise);
506             }
507         } finally {
508             executingStackCnt--;
509             maybeRunPendingTasks();
510         }
511 
512         return checkException(promise);
513     }
514 
515     /**
516      * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
517      *
518      * @see #flushInbound()
519      */
520     public EmbeddedChannel flushOutbound() {
521         executingStackCnt++;
522         try {
523             if (checkOpen(true)) {
524                 flushOutbound0();
525             }
526         } finally {
527             executingStackCnt--;
528             maybeRunPendingTasks();
529         }
530         checkException(voidPromise());
531         return this;
532     }
533 
534     private void flushOutbound0() {
535         // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
536         // delay the write on the next eventloop run.
537         runPendingTasks();
538 
539         flush();
540     }
541 
542     /**
543      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
544      *
545      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
546      */
547     public boolean finish() {
548         return finish(false);
549     }
550 
551     /**
552      * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
553      * Any further try to write data to it will fail.
554      *
555      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
556      */
557     public boolean finishAndReleaseAll() {
558         return finish(true);
559     }
560 
561     /**
562      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
563      *
564      * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
565      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
566      */
567     private boolean finish(boolean releaseAll) {
568         executingStackCnt++;
569         try {
570             close();
571         } finally {
572             executingStackCnt--;
573             maybeRunPendingTasks();
574         }
575         try {
576             checkException();
577             return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
578         } finally {
579             if (releaseAll) {
580                 releaseAll(inboundMessages);
581                 releaseAll(outboundMessages);
582             }
583         }
584     }
585 
586     /**
587      * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
588      * otherwise.
589      */
590     public boolean releaseInbound() {
591         return releaseAll(inboundMessages);
592     }
593 
594     /**
595      * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
596      * otherwise.
597      */
598     public boolean releaseOutbound() {
599         return releaseAll(outboundMessages);
600     }
601 
602     private static boolean releaseAll(Queue<Object> queue) {
603         if (isNotEmpty(queue)) {
604             for (;;) {
605                 Object msg = queue.poll();
606                 if (msg == null) {
607                     break;
608                 }
609                 ReferenceCountUtil.release(msg);
610             }
611             return true;
612         }
613         return false;
614     }
615 
616     @Override
617     public final ChannelFuture close() {
618         return close(newPromise());
619     }
620 
621     @Override
622     public final ChannelFuture disconnect() {
623         return disconnect(newPromise());
624     }
625 
626     @Override
627     public final ChannelFuture close(ChannelPromise promise) {
628         // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
629         // that needs to be run before the actual close takes place.
630         executingStackCnt++;
631         ChannelFuture future;
632         try {
633             runPendingTasks();
634             future = super.close(promise);
635 
636             cancelRemainingScheduledTasks = true;
637         } finally {
638             executingStackCnt--;
639             maybeRunPendingTasks();
640         }
641         return future;
642     }
643 
644     @Override
645     public final ChannelFuture disconnect(ChannelPromise promise) {
646         executingStackCnt++;
647         ChannelFuture future;
648         try {
649             future = super.disconnect(promise);
650 
651             if (!metadata.hasDisconnect()) {
652                 cancelRemainingScheduledTasks = true;
653             }
654         } finally {
655             executingStackCnt--;
656             maybeRunPendingTasks();
657         }
658         return future;
659     }
660 
661     @Override
662     public ChannelFuture bind(SocketAddress localAddress) {
663         executingStackCnt++;
664         try {
665             return super.bind(localAddress);
666         } finally {
667             executingStackCnt--;
668             maybeRunPendingTasks();
669         }
670     }
671 
672     @Override
673     public ChannelFuture connect(SocketAddress remoteAddress) {
674         executingStackCnt++;
675         try {
676             return super.connect(remoteAddress);
677         } finally {
678             executingStackCnt--;
679             maybeRunPendingTasks();
680         }
681     }
682 
683     @Override
684     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
685         executingStackCnt++;
686         try {
687             return super.connect(remoteAddress, localAddress);
688         } finally {
689             executingStackCnt--;
690             maybeRunPendingTasks();
691         }
692     }
693 
694     @Override
695     public ChannelFuture deregister() {
696         executingStackCnt++;
697         try {
698             return super.deregister();
699         } finally {
700             executingStackCnt--;
701             maybeRunPendingTasks();
702         }
703     }
704 
705     @Override
706     public Channel flush() {
707         executingStackCnt++;
708         try {
709             return super.flush();
710         } finally {
711             executingStackCnt--;
712             maybeRunPendingTasks();
713         }
714     }
715 
716     @Override
717     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
718         executingStackCnt++;
719         try {
720             return super.bind(localAddress, promise);
721         } finally {
722             executingStackCnt--;
723             maybeRunPendingTasks();
724         }
725     }
726 
727     @Override
728     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
729         executingStackCnt++;
730         try {
731             return super.connect(remoteAddress, promise);
732         } finally {
733             executingStackCnt--;
734             maybeRunPendingTasks();
735         }
736     }
737 
738     @Override
739     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
740         executingStackCnt++;
741         try {
742             return super.connect(remoteAddress, localAddress, promise);
743         } finally {
744             executingStackCnt--;
745             maybeRunPendingTasks();
746         }
747     }
748 
749     @Override
750     public ChannelFuture deregister(ChannelPromise promise) {
751         executingStackCnt++;
752         try {
753             return super.deregister(promise);
754         } finally {
755             executingStackCnt--;
756             maybeRunPendingTasks();
757         }
758     }
759 
760     @Override
761     public Channel read() {
762         executingStackCnt++;
763         try {
764             return super.read();
765         } finally {
766             executingStackCnt--;
767             maybeRunPendingTasks();
768         }
769     }
770 
771     @Override
772     public ChannelFuture write(Object msg) {
773         executingStackCnt++;
774         try {
775             return super.write(msg);
776         } finally {
777             executingStackCnt--;
778             maybeRunPendingTasks();
779         }
780     }
781 
782     @Override
783     public ChannelFuture write(Object msg, ChannelPromise promise) {
784         executingStackCnt++;
785         try {
786             return super.write(msg, promise);
787         } finally {
788             executingStackCnt--;
789             maybeRunPendingTasks();
790         }
791     }
792 
793     @Override
794     public ChannelFuture writeAndFlush(Object msg) {
795         executingStackCnt++;
796         try {
797             return super.writeAndFlush(msg);
798         } finally {
799             executingStackCnt--;
800             maybeRunPendingTasks();
801         }
802     }
803 
804     @Override
805     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
806         executingStackCnt++;
807         try {
808             return super.writeAndFlush(msg, promise);
809         } finally {
810             executingStackCnt--;
811             maybeRunPendingTasks();
812         }
813     }
814 
815     private static boolean isNotEmpty(Queue<Object> queue) {
816         return queue != null && !queue.isEmpty();
817     }
818 
819     private static Object poll(Queue<Object> queue) {
820         return queue != null ? queue.poll() : null;
821     }
822 
823     private void maybeRunPendingTasks() {
824         if (executingStackCnt == 0) {
825             runPendingTasks();
826 
827             if (cancelRemainingScheduledTasks) {
828                 // Cancel all scheduled tasks that are left.
829                 embeddedEventLoop().cancelScheduledTasks();
830             }
831         }
832     }
833 
834     /**
835      * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
836      * for this {@link Channel}
837      */
838     public void runPendingTasks() {
839         try {
840             embeddedEventLoop().runTasks();
841         } catch (Exception e) {
842             recordException(e);
843         }
844 
845         try {
846             embeddedEventLoop().runScheduledTasks();
847         } catch (Exception e) {
848             recordException(e);
849         }
850     }
851 
852     /**
853      * Check whether this channel has any pending tasks that would be executed by a call to {@link #runPendingTasks()}.
854      * This includes normal tasks, and scheduled tasks where the deadline has expired. If this method returns
855      * {@code false}, a call to {@link #runPendingTasks()} would do nothing.
856      *
857      * @return {@code true} if there are any pending tasks, {@code false} otherwise.
858      */
859     public boolean hasPendingTasks() {
860         return embeddedEventLoop().hasPendingNormalTasks() ||
861                 embeddedEventLoop().nextScheduledTask() == 0;
862     }
863 
864     /**
865      * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
866      * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
867      * {@code -1}.
868      */
869     public long runScheduledPendingTasks() {
870         try {
871             return embeddedEventLoop().runScheduledTasks();
872         } catch (Exception e) {
873             recordException(e);
874             return embeddedEventLoop().nextScheduledTask();
875         }
876     }
877 
878     private void recordException(ChannelFuture future) {
879         if (!future.isSuccess()) {
880             recordException(future.cause());
881         }
882     }
883 
884     private void recordException(Throwable cause) {
885         if (lastException == null) {
886             lastException = cause;
887         } else {
888             logger.warn(
889                     "More than one exception was raised. " +
890                             "Will report only the first one and log others.", cause);
891         }
892     }
893 
894     private EmbeddedEventLoop.FreezableTicker freezableTicker() {
895         Ticker ticker = eventLoop().ticker();
896         if (ticker instanceof EmbeddedEventLoop.FreezableTicker) {
897             return (EmbeddedEventLoop.FreezableTicker) ticker;
898         } else {
899             throw new IllegalStateException(
900                     "EmbeddedChannel constructed with custom ticker, time manipulation methods are unavailable.");
901         }
902     }
903 
904     /**
905      * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute
906      * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called).
907      */
908     public void advanceTimeBy(long duration, TimeUnit unit) {
909         freezableTicker().advance(duration, unit);
910     }
911 
912     /**
913      * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on
914      * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to
915      * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute.
916      */
917     public void freezeTime() {
918         freezableTicker().freezeTime();
919     }
920 
921     /**
922      * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where
923      * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()}
924      * was called, it will run ten minutes after this method is called again (assuming no
925      * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using
926      * {@link #runScheduledPendingTasks()}).
927      */
928     public void unfreezeTime() {
929         freezableTicker().unfreezeTime();
930     }
931 
932     /**
933      * Checks for the presence of an {@link Exception}.
934      */
935     private ChannelFuture checkException(ChannelPromise promise) {
936       Throwable t = lastException;
937       if (t != null) {
938         lastException = null;
939 
940         if (promise.isVoid()) {
941             PlatformDependent.throwException(t);
942         }
943 
944         return promise.setFailure(t);
945       }
946 
947       return promise.setSuccess();
948     }
949 
950     /**
951      * Check if there was any {@link Throwable} received and if so rethrow it.
952      */
953     public void checkException() {
954       checkException(voidPromise());
955     }
956 
957     /**
958      * Returns {@code true} if the {@link Channel} is open and records optionally
959      * an {@link Exception} if it isn't.
960      */
961     private boolean checkOpen(boolean recordException) {
962         if (!isOpen()) {
963           if (recordException) {
964               recordException(new ClosedChannelException());
965           }
966           return false;
967       }
968 
969       return true;
970     }
971 
972     private EmbeddedEventLoop embeddedEventLoop() {
973         if (isRegistered()) {
974             return (EmbeddedEventLoop) super.eventLoop();
975         }
976 
977         return loop;
978     }
979 
980     /**
981      * Ensure the {@link Channel} is open and if not throw an exception.
982      */
983     protected final void ensureOpen() {
984         if (!checkOpen(true)) {
985             checkException();
986         }
987     }
988 
989     @Override
990     protected boolean isCompatible(EventLoop loop) {
991         return loop instanceof EmbeddedEventLoop;
992     }
993 
994     @Override
995     protected SocketAddress localAddress0() {
996         return isActive()? LOCAL_ADDRESS : null;
997     }
998 
999     @Override
1000     protected SocketAddress remoteAddress0() {
1001         return isActive()? REMOTE_ADDRESS : null;
1002     }
1003 
1004     @Override
1005     protected void doRegister() throws Exception {
1006         state = State.ACTIVE;
1007     }
1008 
1009     @Override
1010     protected void doBind(SocketAddress localAddress) throws Exception {
1011         // NOOP
1012     }
1013 
1014     @Override
1015     protected void doDisconnect() throws Exception {
1016         if (!metadata.hasDisconnect()) {
1017             doClose();
1018         }
1019     }
1020 
1021     @Override
1022     protected void doClose() throws Exception {
1023         state = State.CLOSED;
1024     }
1025 
1026     @Override
1027     protected void doBeginRead() throws Exception {
1028         // NOOP
1029     }
1030 
1031     @Override
1032     protected AbstractUnsafe newUnsafe() {
1033         return new EmbeddedUnsafe();
1034     }
1035 
1036     @Override
1037     public Unsafe unsafe() {
1038         return ((EmbeddedUnsafe) super.unsafe()).wrapped;
1039     }
1040 
1041     @Override
1042     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
1043         for (;;) {
1044             Object msg = in.current();
1045             if (msg == null) {
1046                 break;
1047             }
1048 
1049             ReferenceCountUtil.retain(msg);
1050             handleOutboundMessage(msg);
1051             in.remove();
1052         }
1053     }
1054 
1055     /**
1056      * Called for each outbound message.
1057      *
1058      * @see #doWrite(ChannelOutboundBuffer)
1059      */
1060     protected void handleOutboundMessage(Object msg) {
1061         outboundMessages().add(msg);
1062     }
1063 
1064     /**
1065      * Called for each inbound message.
1066      */
1067     protected void handleInboundMessage(Object msg) {
1068         inboundMessages().add(msg);
1069     }
1070 
1071     public static Builder builder() {
1072         return new Builder();
1073     }
1074 
1075     public static final class Builder {
1076         Channel parent;
1077         ChannelId channelId = EmbeddedChannelId.INSTANCE;
1078         boolean register = true;
1079         boolean hasDisconnect;
1080         //you should use either handlers or handler variable, but not both.
1081         ChannelHandler[] handlers = EMPTY_HANDLERS;
1082         ChannelHandler handler;
1083         ChannelConfig config;
1084         Ticker ticker;
1085 
1086         private Builder() {
1087         }
1088 
1089         /**
1090          * The parent {@link Channel} of this {@link EmbeddedChannel}.
1091          *
1092          * @param parent the parent {@link Channel} of this {@link EmbeddedChannel}.
1093          * @return This builder
1094          */
1095         public Builder parent(Channel parent) {
1096             this.parent = parent;
1097             return this;
1098         }
1099 
1100         /**
1101          * The {@link ChannelId} that will be used to identify this channel.
1102          *
1103          * @param channelId the {@link ChannelId} that will be used to identify this channel
1104          * @return This builder
1105          */
1106         public Builder channelId(ChannelId channelId) {
1107             this.channelId = Objects.requireNonNull(channelId, "channelId");
1108             return this;
1109         }
1110 
1111         /**
1112          * {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the constructor. If
1113          * {@code false} the user will need to call {@link #register()}.
1114          *
1115          * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
1116          *                 constructor. If {@code false} the user will need to call {@link #register()}.
1117          * @return This builder
1118          */
1119         public Builder register(boolean register) {
1120             this.register = register;
1121             return this;
1122         }
1123 
1124         /**
1125          * {@code false} if this {@link Channel} will delegate {@link #disconnect()} to {@link #close()}, {@code true}
1126          * otherwise.
1127          *
1128          * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()} to
1129          *                      {@link #close()}, {@code true} otherwise
1130          * @return This builder
1131          */
1132         public Builder hasDisconnect(boolean hasDisconnect) {
1133             this.hasDisconnect = hasDisconnect;
1134             return this;
1135         }
1136 
1137         /**
1138          * The {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}.
1139          *
1140          * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
1141          * @return This builder
1142          */
1143         public Builder handlers(ChannelHandler... handlers) {
1144             this.handlers = Objects.requireNonNull(handlers, "handlers");
1145             this.handler = null;
1146             return this;
1147         }
1148 
1149         /**
1150          * The {@link ChannelHandler} which will be added to the {@link ChannelPipeline}.
1151          *
1152          * @param handler the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
1153          * @return This builder
1154          */
1155         public Builder handlers(ChannelHandler handler) {
1156             this.handler = Objects.requireNonNull(handler, "handler");
1157             this.handlers = null;
1158             return this;
1159         }
1160 
1161         /**
1162          * The {@link ChannelConfig} which will be returned by {@link #config()}.
1163          *
1164          * @param config the {@link ChannelConfig} which will be returned by {@link #config()}
1165          * @return This builder
1166          */
1167         public Builder config(ChannelConfig config) {
1168             this.config = Objects.requireNonNull(config, "config");
1169             return this;
1170         }
1171 
1172         /**
1173          * Configure a custom ticker for this event loop.
1174          *
1175          * @param ticker The custom ticker
1176          * @return This builder
1177          */
1178         public Builder ticker(Ticker ticker) {
1179             this.ticker = ticker;
1180             return this;
1181         }
1182 
1183         /**
1184          * Create the channel. If you wish to extend {@link EmbeddedChannel}, please use the
1185          * {@link #EmbeddedChannel(Builder)} constructor instead.
1186          *
1187          * @return The channel
1188          */
1189         public EmbeddedChannel build() {
1190             return new EmbeddedChannel(this);
1191         }
1192     }
1193 
1194     private final class EmbeddedUnsafe extends AbstractUnsafe {
1195 
1196         // Delegates to the EmbeddedUnsafe instance but ensures runPendingTasks() is called after each operation
1197         // that may change the state of the Channel and may schedule tasks for later execution.
1198         final Unsafe wrapped = new Unsafe() {
1199             @Override
1200             public RecvByteBufAllocator.Handle recvBufAllocHandle() {
1201                 return EmbeddedUnsafe.this.recvBufAllocHandle();
1202             }
1203 
1204             @Override
1205             public SocketAddress localAddress() {
1206                 return EmbeddedUnsafe.this.localAddress();
1207             }
1208 
1209             @Override
1210             public SocketAddress remoteAddress() {
1211                 return EmbeddedUnsafe.this.remoteAddress();
1212             }
1213 
1214             @Override
1215             public void register(EventLoop eventLoop, ChannelPromise promise) {
1216                 executingStackCnt++;
1217                 try {
1218                     EmbeddedUnsafe.this.register(eventLoop, promise);
1219                 } finally {
1220                     executingStackCnt--;
1221                     maybeRunPendingTasks();
1222                 }
1223             }
1224 
1225             @Override
1226             public void bind(SocketAddress localAddress, ChannelPromise promise) {
1227                 executingStackCnt++;
1228                 try {
1229                     EmbeddedUnsafe.this.bind(localAddress, promise);
1230                 } finally {
1231                     executingStackCnt--;
1232                     maybeRunPendingTasks();
1233                 }
1234             }
1235 
1236             @Override
1237             public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1238                 executingStackCnt++;
1239                 try {
1240                     EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
1241                 } finally {
1242                     executingStackCnt--;
1243                     maybeRunPendingTasks();
1244                 }
1245             }
1246 
1247             @Override
1248             public void disconnect(ChannelPromise promise) {
1249                 executingStackCnt++;
1250                 try {
1251                     EmbeddedUnsafe.this.disconnect(promise);
1252                 } finally {
1253                     executingStackCnt--;
1254                     maybeRunPendingTasks();
1255                 }
1256             }
1257 
1258             @Override
1259             public void close(ChannelPromise promise) {
1260                 executingStackCnt++;
1261                 try {
1262                     EmbeddedUnsafe.this.close(promise);
1263                 } finally {
1264                     executingStackCnt--;
1265                     maybeRunPendingTasks();
1266                 }
1267             }
1268 
1269             @Override
1270             public void closeForcibly() {
1271                 executingStackCnt++;
1272                 try {
1273                     EmbeddedUnsafe.this.closeForcibly();
1274                 } finally {
1275                     executingStackCnt--;
1276                     maybeRunPendingTasks();
1277                 }
1278             }
1279 
1280             @Override
1281             public void deregister(ChannelPromise promise) {
1282                 executingStackCnt++;
1283                 try {
1284                     EmbeddedUnsafe.this.deregister(promise);
1285                 } finally {
1286                     executingStackCnt--;
1287                     maybeRunPendingTasks();
1288                 }
1289             }
1290 
1291             @Override
1292             public void beginRead() {
1293                 executingStackCnt++;
1294                 try {
1295                     EmbeddedUnsafe.this.beginRead();
1296                 } finally {
1297                     executingStackCnt--;
1298                     maybeRunPendingTasks();
1299                 }
1300             }
1301 
1302             @Override
1303             public void write(Object msg, ChannelPromise promise) {
1304                 executingStackCnt++;
1305                 try {
1306                     EmbeddedUnsafe.this.write(msg, promise);
1307                 } finally {
1308                     executingStackCnt--;
1309                     maybeRunPendingTasks();
1310                 }
1311             }
1312 
1313             @Override
1314             public void flush() {
1315                 executingStackCnt++;
1316                 try {
1317                     EmbeddedUnsafe.this.flush();
1318                 } finally {
1319                     executingStackCnt--;
1320                     maybeRunPendingTasks();
1321                 }
1322             }
1323 
1324             @Override
1325             public ChannelPromise voidPromise() {
1326                 return EmbeddedUnsafe.this.voidPromise();
1327             }
1328 
1329             @Override
1330             public ChannelOutboundBuffer outboundBuffer() {
1331                 return EmbeddedUnsafe.this.outboundBuffer();
1332             }
1333         };
1334 
1335         @Override
1336         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1337             safeSetSuccess(promise);
1338         }
1339     }
1340 
1341     private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
1342         EmbeddedChannelPipeline(EmbeddedChannel channel) {
1343             super(channel);
1344         }
1345 
1346         @Override
1347         protected void onUnhandledInboundException(Throwable cause) {
1348             recordException(cause);
1349         }
1350 
1351         @Override
1352         protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1353             handleInboundMessage(msg);
1354         }
1355     }
1356 }