View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import io.netty.channel.AbstractChannel;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelConfig;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelHandler;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelId;
26  import io.netty.channel.ChannelInitializer;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOutboundBuffer;
29  import io.netty.channel.ChannelPipeline;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.DefaultChannelConfig;
32  import io.netty.channel.DefaultChannelPipeline;
33  import io.netty.channel.EventLoop;
34  import io.netty.channel.RecvByteBufAllocator;
35  import io.netty.util.ReferenceCountUtil;
36  import io.netty.util.concurrent.Ticker;
37  import io.netty.util.internal.PlatformDependent;
38  import io.netty.util.internal.RecyclableArrayList;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.net.SocketAddress;
43  import java.nio.channels.ClosedChannelException;
44  import java.util.ArrayDeque;
45  import java.util.Objects;
46  import java.util.Queue;
47  import java.util.concurrent.TimeUnit;
48  
49  /**
50   * Base class for {@link Channel} implementations that are used in an embedded fashion.
51   */
52  public class EmbeddedChannel extends AbstractChannel {
53  
54      private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
55      private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
56  
57      private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
58      private enum State { OPEN, ACTIVE, CLOSED }
59  
60      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
61  
62      private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
63      private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
64  
65      private final EmbeddedEventLoop loop;
66      private final ChannelFutureListener recordExceptionListener = this::recordException;
67  
68      private final ChannelMetadata metadata;
69      private final ChannelConfig config;
70  
71      private Queue<Object> inboundMessages;
72      private Queue<Object> outboundMessages;
73      private Throwable lastException;
74      private State state;
75      private int executingStackCnt;
76      private boolean cancelRemainingScheduledTasks;
77  
78      /**
79       * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
80       */
81      public EmbeddedChannel() {
82          this(builder());
83      }
84  
85      /**
86       * Create a new instance with the specified ID and an empty pipeline.
87       *
88       * @param channelId the {@link ChannelId} that will be used to identify this channel
89       */
90      public EmbeddedChannel(ChannelId channelId) {
91          this(builder().channelId(channelId));
92      }
93  
94      /**
95       * Create a new instance with the pipeline initialized with the specified handlers.
96       *
97       * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
98       */
99      public EmbeddedChannel(ChannelHandler... handlers) {
100         this(builder().handlers(handlers));
101     }
102 
103     /**
104      * Create a new instance with the pipeline initialized with the specified handlers.
105      *
106      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
107      *                      to {@link #close()}, {@code true} otherwise.
108      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
109      */
110     public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
111         this(builder().hasDisconnect(hasDisconnect).handlers(handlers));
112     }
113 
114     /**
115      * Create a new instance with the pipeline initialized with the specified handlers.
116      *
117      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
118      *                 constructor. If {@code false} the user will need to call {@link #register()}.
119      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
120      *                      to {@link #close()}, {@code true} otherwise.
121      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
122      */
123     public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
124         this(builder().register(register).hasDisconnect(hasDisconnect).handlers(handlers));
125     }
126 
127     /**
128      * Create a new instance with the channel ID set to the given ID and the pipeline
129      * initialized with the specified handlers.
130      *
131      * @param channelId the {@link ChannelId} that will be used to identify this channel
132      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
133      */
134     public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
135         this(builder().channelId(channelId).handlers(handlers));
136     }
137 
138     /**
139      * Create a new instance with the channel ID set to the given ID and the pipeline
140      * initialized with the specified handlers.
141      *
142      * @param channelId the {@link ChannelId} that will be used to identify this channel
143      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
144      *                      to {@link #close()}, {@code true} otherwise.
145      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
146      */
147     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
148         this(builder().channelId(channelId).hasDisconnect(hasDisconnect).handlers(handlers));
149     }
150 
151     /**
152      * Create a new instance with the channel ID set to the given ID and the pipeline
153      * initialized with the specified handlers.
154      *
155      * @param channelId the {@link ChannelId} that will be used to identify this channel
156      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
157      *                 constructor. If {@code false} the user will need to call {@link #register()}.
158      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
159      *                      to {@link #close()}, {@code true} otherwise.
160      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
161      */
162     public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
163                            ChannelHandler... handlers) {
164         this(builder().channelId(channelId).register(register).hasDisconnect(hasDisconnect).handlers(handlers));
165     }
166 
167     /**
168      * Create a new instance with the channel ID set to the given ID and the pipeline
169      * initialized with the specified handlers.
170      *
171      * @param parent    the parent {@link Channel} of this {@link EmbeddedChannel}.
172      * @param channelId the {@link ChannelId} that will be used to identify this channel
173      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
174      *                 constructor. If {@code false} the user will need to call {@link #register()}.
175      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
176      *                      to {@link #close()}, {@code true} otherwise.
177      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
178      */
179     public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
180                            final ChannelHandler... handlers) {
181         this(builder()
182                 .parent(parent)
183                 .channelId(channelId)
184                 .register(register)
185                 .hasDisconnect(hasDisconnect)
186                 .handlers(handlers));
187     }
188 
189     /**
190      * Create a new instance with the channel ID set to the given ID and the pipeline
191      * initialized with the specified handlers.
192      *
193      * @param channelId the {@link ChannelId} that will be used to identify this channel
194      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
195      *                      to {@link #close()}, {@code true} otherwise.
196      * @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
197      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
198      */
199     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
200                            final ChannelHandler... handlers) {
201         this(builder().channelId(channelId).hasDisconnect(hasDisconnect).config(config).handlers(handlers));
202     }
203 
204     /**
205      * Create a new instance with the configuration from the given builder. This method is {@code protected} for use by
206      * subclasses; Otherwise, please use {@link Builder#build()}.
207      *
208      * @param builder The builder
209      */
210     protected EmbeddedChannel(Builder builder) {
211         super(builder.parent, builder.channelId);
212         loop = new EmbeddedEventLoop(builder.ticker == null ? new EmbeddedEventLoop.FreezableTicker() : builder.ticker);
213         metadata = metadata(builder.hasDisconnect);
214         config = builder.config == null ? new DefaultChannelConfig(this) : builder.config;
215         if (builder.handler == null) {
216             setup(builder.register, builder.handlers);
217         } else {
218             setup(builder.register, builder.handler);
219         }
220     }
221 
222     private static ChannelMetadata metadata(boolean hasDisconnect) {
223         return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
224     }
225 
226     private void setup(boolean register, final ChannelHandler... handlers) {
227         ChannelPipeline p = pipeline();
228         p.addLast(new ChannelInitializer<Channel>() {
229             @Override
230             protected void initChannel(Channel ch) {
231                 ChannelPipeline pipeline = ch.pipeline();
232                 for (ChannelHandler h: handlers) {
233                     if (h == null) {
234                         break;
235                     }
236                     pipeline.addLast(h);
237                 }
238             }
239         });
240         if (register) {
241             ChannelFuture future = loop.register(this);
242             assert future.isDone();
243         }
244     }
245 
246     private void setup(boolean register, final ChannelHandler handler) {
247         ChannelPipeline p = pipeline();
248         p.addLast(handler);
249         if (register) {
250             ChannelFuture future = loop.register(this);
251             assert future.isDone();
252         }
253     }
254 
255     /**
256      * Register this {@code Channel} on its {@link EventLoop}.
257      */
258     public void register() throws Exception {
259         ChannelFuture future = loop.register(this);
260         assert future.isDone();
261         Throwable cause = future.cause();
262         if (cause != null) {
263             PlatformDependent.throwException(cause);
264         }
265     }
266 
267     @Override
268     protected final DefaultChannelPipeline newChannelPipeline() {
269         return new EmbeddedChannelPipeline(this);
270     }
271 
272     @Override
273     public ChannelMetadata metadata() {
274         return metadata;
275     }
276 
277     @Override
278     public ChannelConfig config() {
279         return config;
280     }
281 
282     @Override
283     public boolean isOpen() {
284         return state != State.CLOSED;
285     }
286 
287     @Override
288     public boolean isActive() {
289         return state == State.ACTIVE;
290     }
291 
292     /**
293      * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
294      */
295     public Queue<Object> inboundMessages() {
296         if (inboundMessages == null) {
297             inboundMessages = new ArrayDeque<Object>();
298         }
299         return inboundMessages;
300     }
301 
302     /**
303      * @deprecated use {@link #inboundMessages()}
304      */
305     @Deprecated
306     public Queue<Object> lastInboundBuffer() {
307         return inboundMessages();
308     }
309 
310     /**
311      * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
312      */
313     public Queue<Object> outboundMessages() {
314         if (outboundMessages == null) {
315             outboundMessages = new ArrayDeque<Object>();
316         }
317         return outboundMessages;
318     }
319 
320     /**
321      * @deprecated use {@link #outboundMessages()}
322      */
323     @Deprecated
324     public Queue<Object> lastOutboundBuffer() {
325         return outboundMessages();
326     }
327 
328     /**
329      * Return received data from this {@link Channel}
330      */
331     @SuppressWarnings("unchecked")
332     public <T> T readInbound() {
333         T message = (T) poll(inboundMessages);
334         if (message != null) {
335             ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
336         }
337         return message;
338     }
339 
340     /**
341      * Read data from the outbound. This may return {@code null} if nothing is readable.
342      */
343     @SuppressWarnings("unchecked")
344     public <T> T readOutbound() {
345         T message =  (T) poll(outboundMessages);
346         if (message != null) {
347             ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
348         }
349         return message;
350     }
351 
352     /**
353      * Write messages to the inbound of this {@link Channel}.
354      *
355      * @param msgs the messages to be written
356      *
357      * @return {@code true} if the write operation did add something to the inbound buffer
358      */
359     public boolean writeInbound(Object... msgs) {
360         ensureOpen();
361         if (msgs.length == 0) {
362             return isNotEmpty(inboundMessages);
363         }
364 
365         executingStackCnt++;
366         try {
367             ChannelPipeline p = pipeline();
368             for (Object m : msgs) {
369                 p.fireChannelRead(m);
370             }
371 
372             flushInbound(false, voidPromise());
373         } finally {
374             executingStackCnt--;
375             maybeRunPendingTasks();
376         }
377         return isNotEmpty(inboundMessages);
378     }
379 
380     /**
381      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
382      * method is conceptually equivalent to {@link #write(Object)}.
383      *
384      * @see #writeOneOutbound(Object)
385      */
386     public ChannelFuture writeOneInbound(Object msg) {
387         return writeOneInbound(msg, newPromise());
388     }
389 
390     /**
391      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
392      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
393      *
394      * @see #writeOneOutbound(Object, ChannelPromise)
395      */
396     public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
397         executingStackCnt++;
398         try {
399             if (checkOpen(true)) {
400                 pipeline().fireChannelRead(msg);
401             }
402         } finally {
403             executingStackCnt--;
404             maybeRunPendingTasks();
405         }
406         return checkException(promise);
407     }
408 
409     /**
410      * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
411      *
412      * @see #flushOutbound()
413      */
414     public EmbeddedChannel flushInbound() {
415         flushInbound(true, voidPromise());
416         return this;
417     }
418 
419     private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
420         executingStackCnt++;
421         try {
422             if (checkOpen(recordException)) {
423                 pipeline().fireChannelReadComplete();
424                 runPendingTasks();
425             }
426         } finally {
427             executingStackCnt--;
428             maybeRunPendingTasks();
429         }
430 
431       return checkException(promise);
432     }
433 
434     /**
435      * Write messages to the outbound of this {@link Channel}.
436      *
437      * @param msgs              the messages to be written
438      * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
439      */
440     public boolean writeOutbound(Object... msgs) {
441         ensureOpen();
442         if (msgs.length == 0) {
443             return isNotEmpty(outboundMessages);
444         }
445 
446         executingStackCnt++;
447         RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
448         try {
449             try {
450                 for (Object m : msgs) {
451                     if (m == null) {
452                         break;
453                     }
454                     futures.add(write(m));
455                 }
456 
457                 flushOutbound0();
458 
459                 int size = futures.size();
460                 for (int i = 0; i < size; i++) {
461                     ChannelFuture future = (ChannelFuture) futures.get(i);
462                     if (future.isDone()) {
463                         recordException(future);
464                     } else {
465                         // The write may be delayed to run later by runPendingTasks()
466                         future.addListener(recordExceptionListener);
467                     }
468                 }
469             } finally {
470                 executingStackCnt--;
471                 maybeRunPendingTasks();
472             }
473             checkException();
474             return isNotEmpty(outboundMessages);
475         } finally {
476             futures.recycle();
477         }
478     }
479 
480     /**
481      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
482      * method is conceptually equivalent to {@link #write(Object)}.
483      *
484      * @see #writeOneInbound(Object)
485      */
486     public ChannelFuture writeOneOutbound(Object msg) {
487         return writeOneOutbound(msg, newPromise());
488     }
489 
490     /**
491      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
492      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
493      *
494      * @see #writeOneInbound(Object, ChannelPromise)
495      */
496     public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
497         executingStackCnt++;
498         try {
499             if (checkOpen(true)) {
500                 return write(msg, promise);
501             }
502         } finally {
503             executingStackCnt--;
504             maybeRunPendingTasks();
505         }
506 
507         return checkException(promise);
508     }
509 
510     /**
511      * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
512      *
513      * @see #flushInbound()
514      */
515     public EmbeddedChannel flushOutbound() {
516         executingStackCnt++;
517         try {
518             if (checkOpen(true)) {
519                 flushOutbound0();
520             }
521         } finally {
522             executingStackCnt--;
523             maybeRunPendingTasks();
524         }
525         checkException(voidPromise());
526         return this;
527     }
528 
529     private void flushOutbound0() {
530         // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
531         // delay the write on the next eventloop run.
532         runPendingTasks();
533 
534         flush();
535     }
536 
537     /**
538      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
539      *
540      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
541      */
542     public boolean finish() {
543         return finish(false);
544     }
545 
546     /**
547      * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
548      * Any further try to write data to it will fail.
549      *
550      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
551      */
552     public boolean finishAndReleaseAll() {
553         return finish(true);
554     }
555 
556     /**
557      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
558      *
559      * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
560      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
561      */
562     private boolean finish(boolean releaseAll) {
563         executingStackCnt++;
564         try {
565             close();
566         } finally {
567             executingStackCnt--;
568             maybeRunPendingTasks();
569         }
570         try {
571             checkException();
572             return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
573         } finally {
574             if (releaseAll) {
575                 releaseAll(inboundMessages);
576                 releaseAll(outboundMessages);
577             }
578         }
579     }
580 
581     /**
582      * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
583      * otherwise.
584      */
585     public boolean releaseInbound() {
586         return releaseAll(inboundMessages);
587     }
588 
589     /**
590      * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
591      * otherwise.
592      */
593     public boolean releaseOutbound() {
594         return releaseAll(outboundMessages);
595     }
596 
597     private static boolean releaseAll(Queue<Object> queue) {
598         if (isNotEmpty(queue)) {
599             for (;;) {
600                 Object msg = queue.poll();
601                 if (msg == null) {
602                     break;
603                 }
604                 ReferenceCountUtil.release(msg);
605             }
606             return true;
607         }
608         return false;
609     }
610 
611     @Override
612     public final ChannelFuture close() {
613         return close(newPromise());
614     }
615 
616     @Override
617     public final ChannelFuture disconnect() {
618         return disconnect(newPromise());
619     }
620 
621     @Override
622     public final ChannelFuture close(ChannelPromise promise) {
623         // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
624         // that needs to be run before the actual close takes place.
625         executingStackCnt++;
626         ChannelFuture future;
627         try {
628             runPendingTasks();
629             future = super.close(promise);
630 
631             cancelRemainingScheduledTasks = true;
632         } finally {
633             executingStackCnt--;
634             maybeRunPendingTasks();
635         }
636         return future;
637     }
638 
639     @Override
640     public final ChannelFuture disconnect(ChannelPromise promise) {
641         executingStackCnt++;
642         ChannelFuture future;
643         try {
644             future = super.disconnect(promise);
645 
646             if (!metadata.hasDisconnect()) {
647                 cancelRemainingScheduledTasks = true;
648             }
649         } finally {
650             executingStackCnt--;
651             maybeRunPendingTasks();
652         }
653         return future;
654     }
655 
656     @Override
657     public ChannelFuture bind(SocketAddress localAddress) {
658         executingStackCnt++;
659         try {
660             return super.bind(localAddress);
661         } finally {
662             executingStackCnt--;
663             maybeRunPendingTasks();
664         }
665     }
666 
667     @Override
668     public ChannelFuture connect(SocketAddress remoteAddress) {
669         executingStackCnt++;
670         try {
671             return super.connect(remoteAddress);
672         } finally {
673             executingStackCnt--;
674             maybeRunPendingTasks();
675         }
676     }
677 
678     @Override
679     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
680         executingStackCnt++;
681         try {
682             return super.connect(remoteAddress, localAddress);
683         } finally {
684             executingStackCnt--;
685             maybeRunPendingTasks();
686         }
687     }
688 
689     @Override
690     public ChannelFuture deregister() {
691         executingStackCnt++;
692         try {
693             return super.deregister();
694         } finally {
695             executingStackCnt--;
696             maybeRunPendingTasks();
697         }
698     }
699 
700     @Override
701     public Channel flush() {
702         executingStackCnt++;
703         try {
704             return super.flush();
705         } finally {
706             executingStackCnt--;
707             maybeRunPendingTasks();
708         }
709     }
710 
711     @Override
712     public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
713         executingStackCnt++;
714         try {
715             return super.bind(localAddress, promise);
716         } finally {
717             executingStackCnt--;
718             maybeRunPendingTasks();
719         }
720     }
721 
722     @Override
723     public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
724         executingStackCnt++;
725         try {
726             return super.connect(remoteAddress, promise);
727         } finally {
728             executingStackCnt--;
729             maybeRunPendingTasks();
730         }
731     }
732 
733     @Override
734     public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
735         executingStackCnt++;
736         try {
737             return super.connect(remoteAddress, localAddress, promise);
738         } finally {
739             executingStackCnt--;
740             maybeRunPendingTasks();
741         }
742     }
743 
744     @Override
745     public ChannelFuture deregister(ChannelPromise promise) {
746         executingStackCnt++;
747         try {
748             return super.deregister(promise);
749         } finally {
750             executingStackCnt--;
751             maybeRunPendingTasks();
752         }
753     }
754 
755     @Override
756     public Channel read() {
757         executingStackCnt++;
758         try {
759             return super.read();
760         } finally {
761             executingStackCnt--;
762             maybeRunPendingTasks();
763         }
764     }
765 
766     @Override
767     public ChannelFuture write(Object msg) {
768         executingStackCnt++;
769         try {
770             return super.write(msg);
771         } finally {
772             executingStackCnt--;
773             maybeRunPendingTasks();
774         }
775     }
776 
777     @Override
778     public ChannelFuture write(Object msg, ChannelPromise promise) {
779         executingStackCnt++;
780         try {
781             return super.write(msg, promise);
782         } finally {
783             executingStackCnt--;
784             maybeRunPendingTasks();
785         }
786     }
787 
788     @Override
789     public ChannelFuture writeAndFlush(Object msg) {
790         executingStackCnt++;
791         try {
792             return super.writeAndFlush(msg);
793         } finally {
794             executingStackCnt--;
795             maybeRunPendingTasks();
796         }
797     }
798 
799     @Override
800     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
801         executingStackCnt++;
802         try {
803             return super.writeAndFlush(msg, promise);
804         } finally {
805             executingStackCnt--;
806             maybeRunPendingTasks();
807         }
808     }
809 
810     private static boolean isNotEmpty(Queue<Object> queue) {
811         return queue != null && !queue.isEmpty();
812     }
813 
814     private static Object poll(Queue<Object> queue) {
815         return queue != null ? queue.poll() : null;
816     }
817 
818     private void maybeRunPendingTasks() {
819         if (executingStackCnt == 0) {
820             runPendingTasks();
821 
822             if (cancelRemainingScheduledTasks) {
823                 // Cancel all scheduled tasks that are left.
824                 embeddedEventLoop().cancelScheduledTasks();
825             }
826         }
827     }
828 
829     /**
830      * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
831      * for this {@link Channel}
832      */
833     public void runPendingTasks() {
834         try {
835             embeddedEventLoop().runTasks();
836         } catch (Exception e) {
837             recordException(e);
838         }
839 
840         try {
841             embeddedEventLoop().runScheduledTasks();
842         } catch (Exception e) {
843             recordException(e);
844         }
845     }
846 
847     /**
848      * Check whether this channel has any pending tasks that would be executed by a call to {@link #runPendingTasks()}.
849      * This includes normal tasks, and scheduled tasks where the deadline has expired. If this method returns
850      * {@code false}, a call to {@link #runPendingTasks()} would do nothing.
851      *
852      * @return {@code true} if there are any pending tasks, {@code false} otherwise.
853      */
854     public boolean hasPendingTasks() {
855         return embeddedEventLoop().hasPendingNormalTasks() ||
856                 embeddedEventLoop().nextScheduledTask() == 0;
857     }
858 
859     /**
860      * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
861      * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
862      * {@code -1}.
863      */
864     public long runScheduledPendingTasks() {
865         try {
866             return embeddedEventLoop().runScheduledTasks();
867         } catch (Exception e) {
868             recordException(e);
869             return embeddedEventLoop().nextScheduledTask();
870         }
871     }
872 
873     private void recordException(ChannelFuture future) {
874         if (!future.isSuccess()) {
875             recordException(future.cause());
876         }
877     }
878 
879     private void recordException(Throwable cause) {
880         if (lastException == null) {
881             lastException = cause;
882         } else {
883             logger.warn(
884                     "More than one exception was raised. " +
885                             "Will report only the first one and log others.", cause);
886         }
887     }
888 
889     private EmbeddedEventLoop.FreezableTicker freezableTicker() {
890         Ticker ticker = eventLoop().ticker();
891         if (ticker instanceof EmbeddedEventLoop.FreezableTicker) {
892             return (EmbeddedEventLoop.FreezableTicker) ticker;
893         } else {
894             throw new IllegalStateException(
895                     "EmbeddedChannel constructed with custom ticker, time manipulation methods are unavailable.");
896         }
897     }
898 
899     /**
900      * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute
901      * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called).
902      */
903     public void advanceTimeBy(long duration, TimeUnit unit) {
904         freezableTicker().advance(duration, unit);
905     }
906 
907     /**
908      * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on
909      * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to
910      * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute.
911      */
912     public void freezeTime() {
913         freezableTicker().freezeTime();
914     }
915 
916     /**
917      * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where
918      * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()}
919      * was called, it will run ten minutes after this method is called again (assuming no
920      * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using
921      * {@link #runScheduledPendingTasks()}).
922      */
923     public void unfreezeTime() {
924         freezableTicker().unfreezeTime();
925     }
926 
927     /**
928      * Checks for the presence of an {@link Exception}.
929      */
930     private ChannelFuture checkException(ChannelPromise promise) {
931       Throwable t = lastException;
932       if (t != null) {
933         lastException = null;
934 
935         if (promise.isVoid()) {
936             PlatformDependent.throwException(t);
937         }
938 
939         return promise.setFailure(t);
940       }
941 
942       return promise.setSuccess();
943     }
944 
945     /**
946      * Check if there was any {@link Throwable} received and if so rethrow it.
947      */
948     public void checkException() {
949       checkException(voidPromise());
950     }
951 
952     /**
953      * Returns {@code true} if the {@link Channel} is open and records optionally
954      * an {@link Exception} if it isn't.
955      */
956     private boolean checkOpen(boolean recordException) {
957         if (!isOpen()) {
958           if (recordException) {
959               recordException(new ClosedChannelException());
960           }
961           return false;
962       }
963 
964       return true;
965     }
966 
967     private EmbeddedEventLoop embeddedEventLoop() {
968         if (isRegistered()) {
969             return (EmbeddedEventLoop) super.eventLoop();
970         }
971 
972         return loop;
973     }
974 
975     /**
976      * Ensure the {@link Channel} is open and if not throw an exception.
977      */
978     protected final void ensureOpen() {
979         if (!checkOpen(true)) {
980             checkException();
981         }
982     }
983 
984     @Override
985     protected boolean isCompatible(EventLoop loop) {
986         return loop instanceof EmbeddedEventLoop;
987     }
988 
989     @Override
990     protected SocketAddress localAddress0() {
991         return isActive()? LOCAL_ADDRESS : null;
992     }
993 
994     @Override
995     protected SocketAddress remoteAddress0() {
996         return isActive()? REMOTE_ADDRESS : null;
997     }
998 
999     @Override
1000     protected void doRegister() throws Exception {
1001         state = State.ACTIVE;
1002     }
1003 
1004     @Override
1005     protected void doBind(SocketAddress localAddress) throws Exception {
1006         // NOOP
1007     }
1008 
1009     @Override
1010     protected void doDisconnect() throws Exception {
1011         if (!metadata.hasDisconnect()) {
1012             doClose();
1013         }
1014     }
1015 
1016     @Override
1017     protected void doClose() throws Exception {
1018         state = State.CLOSED;
1019     }
1020 
1021     @Override
1022     protected void doBeginRead() throws Exception {
1023         // NOOP
1024     }
1025 
1026     @Override
1027     protected AbstractUnsafe newUnsafe() {
1028         return new EmbeddedUnsafe();
1029     }
1030 
1031     @Override
1032     public Unsafe unsafe() {
1033         return ((EmbeddedUnsafe) super.unsafe()).wrapped;
1034     }
1035 
1036     @Override
1037     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
1038         for (;;) {
1039             Object msg = in.current();
1040             if (msg == null) {
1041                 break;
1042             }
1043 
1044             ReferenceCountUtil.retain(msg);
1045             handleOutboundMessage(msg);
1046             in.remove();
1047         }
1048     }
1049 
1050     /**
1051      * Called for each outbound message.
1052      *
1053      * @see #doWrite(ChannelOutboundBuffer)
1054      */
1055     protected void handleOutboundMessage(Object msg) {
1056         outboundMessages().add(msg);
1057     }
1058 
1059     /**
1060      * Called for each inbound message.
1061      */
1062     protected void handleInboundMessage(Object msg) {
1063         inboundMessages().add(msg);
1064     }
1065 
1066     public static Builder builder() {
1067         return new Builder();
1068     }
1069 
1070     public static final class Builder {
1071         Channel parent;
1072         ChannelId channelId = EmbeddedChannelId.INSTANCE;
1073         boolean register = true;
1074         boolean hasDisconnect;
1075         //you should use either handlers or handler variable, but not both.
1076         ChannelHandler[] handlers = EMPTY_HANDLERS;
1077         ChannelHandler handler;
1078         ChannelConfig config;
1079         Ticker ticker;
1080 
1081         private Builder() {
1082         }
1083 
1084         /**
1085          * The parent {@link Channel} of this {@link EmbeddedChannel}.
1086          *
1087          * @param parent the parent {@link Channel} of this {@link EmbeddedChannel}.
1088          * @return This builder
1089          */
1090         public Builder parent(Channel parent) {
1091             this.parent = parent;
1092             return this;
1093         }
1094 
1095         /**
1096          * The {@link ChannelId} that will be used to identify this channel.
1097          *
1098          * @param channelId the {@link ChannelId} that will be used to identify this channel
1099          * @return This builder
1100          */
1101         public Builder channelId(ChannelId channelId) {
1102             this.channelId = Objects.requireNonNull(channelId, "channelId");
1103             return this;
1104         }
1105 
1106         /**
1107          * {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the constructor. If
1108          * {@code false} the user will need to call {@link #register()}.
1109          *
1110          * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
1111          *                 constructor. If {@code false} the user will need to call {@link #register()}.
1112          * @return This builder
1113          */
1114         public Builder register(boolean register) {
1115             this.register = register;
1116             return this;
1117         }
1118 
1119         /**
1120          * {@code false} if this {@link Channel} will delegate {@link #disconnect()} to {@link #close()}, {@code true}
1121          * otherwise.
1122          *
1123          * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()} to
1124          *                      {@link #close()}, {@code true} otherwise
1125          * @return This builder
1126          */
1127         public Builder hasDisconnect(boolean hasDisconnect) {
1128             this.hasDisconnect = hasDisconnect;
1129             return this;
1130         }
1131 
1132         /**
1133          * The {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}.
1134          *
1135          * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
1136          * @return This builder
1137          */
1138         public Builder handlers(ChannelHandler... handlers) {
1139             this.handlers = Objects.requireNonNull(handlers, "handlers");
1140             this.handler = null;
1141             return this;
1142         }
1143 
1144         /**
1145          * The {@link ChannelHandler} which will be added to the {@link ChannelPipeline}.
1146          *
1147          * @param handler the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
1148          * @return This builder
1149          */
1150         public Builder handlers(ChannelHandler handler) {
1151             this.handler = Objects.requireNonNull(handler, "handler");
1152             this.handlers = null;
1153             return this;
1154         }
1155 
1156         /**
1157          * The {@link ChannelConfig} which will be returned by {@link #config()}.
1158          *
1159          * @param config the {@link ChannelConfig} which will be returned by {@link #config()}
1160          * @return This builder
1161          */
1162         public Builder config(ChannelConfig config) {
1163             this.config = Objects.requireNonNull(config, "config");
1164             return this;
1165         }
1166 
1167         /**
1168          * Configure a custom ticker for this event loop.
1169          *
1170          * @param ticker The custom ticker
1171          * @return This builder
1172          */
1173         public Builder ticker(Ticker ticker) {
1174             this.ticker = ticker;
1175             return this;
1176         }
1177 
1178         /**
1179          * Create the channel. If you wish to extend {@link EmbeddedChannel}, please use the
1180          * {@link #EmbeddedChannel(Builder)} constructor instead.
1181          *
1182          * @return The channel
1183          */
1184         public EmbeddedChannel build() {
1185             return new EmbeddedChannel(this);
1186         }
1187     }
1188 
1189     private final class EmbeddedUnsafe extends AbstractUnsafe {
1190 
1191         // Delegates to the EmbeddedUnsafe instance but ensures runPendingTasks() is called after each operation
1192         // that may change the state of the Channel and may schedule tasks for later execution.
1193         final Unsafe wrapped = new Unsafe() {
1194             @Override
1195             public RecvByteBufAllocator.Handle recvBufAllocHandle() {
1196                 return EmbeddedUnsafe.this.recvBufAllocHandle();
1197             }
1198 
1199             @Override
1200             public SocketAddress localAddress() {
1201                 return EmbeddedUnsafe.this.localAddress();
1202             }
1203 
1204             @Override
1205             public SocketAddress remoteAddress() {
1206                 return EmbeddedUnsafe.this.remoteAddress();
1207             }
1208 
1209             @Override
1210             public void register(EventLoop eventLoop, ChannelPromise promise) {
1211                 executingStackCnt++;
1212                 try {
1213                     EmbeddedUnsafe.this.register(eventLoop, promise);
1214                 } finally {
1215                     executingStackCnt--;
1216                     maybeRunPendingTasks();
1217                 }
1218             }
1219 
1220             @Override
1221             public void bind(SocketAddress localAddress, ChannelPromise promise) {
1222                 executingStackCnt++;
1223                 try {
1224                     EmbeddedUnsafe.this.bind(localAddress, promise);
1225                 } finally {
1226                     executingStackCnt--;
1227                     maybeRunPendingTasks();
1228                 }
1229             }
1230 
1231             @Override
1232             public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1233                 executingStackCnt++;
1234                 try {
1235                     EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
1236                 } finally {
1237                     executingStackCnt--;
1238                     maybeRunPendingTasks();
1239                 }
1240             }
1241 
1242             @Override
1243             public void disconnect(ChannelPromise promise) {
1244                 executingStackCnt++;
1245                 try {
1246                     EmbeddedUnsafe.this.disconnect(promise);
1247                 } finally {
1248                     executingStackCnt--;
1249                     maybeRunPendingTasks();
1250                 }
1251             }
1252 
1253             @Override
1254             public void close(ChannelPromise promise) {
1255                 executingStackCnt++;
1256                 try {
1257                     EmbeddedUnsafe.this.close(promise);
1258                 } finally {
1259                     executingStackCnt--;
1260                     maybeRunPendingTasks();
1261                 }
1262             }
1263 
1264             @Override
1265             public void closeForcibly() {
1266                 executingStackCnt++;
1267                 try {
1268                     EmbeddedUnsafe.this.closeForcibly();
1269                 } finally {
1270                     executingStackCnt--;
1271                     maybeRunPendingTasks();
1272                 }
1273             }
1274 
1275             @Override
1276             public void deregister(ChannelPromise promise) {
1277                 executingStackCnt++;
1278                 try {
1279                     EmbeddedUnsafe.this.deregister(promise);
1280                 } finally {
1281                     executingStackCnt--;
1282                     maybeRunPendingTasks();
1283                 }
1284             }
1285 
1286             @Override
1287             public void beginRead() {
1288                 executingStackCnt++;
1289                 try {
1290                     EmbeddedUnsafe.this.beginRead();
1291                 } finally {
1292                     executingStackCnt--;
1293                     maybeRunPendingTasks();
1294                 }
1295             }
1296 
1297             @Override
1298             public void write(Object msg, ChannelPromise promise) {
1299                 executingStackCnt++;
1300                 try {
1301                     EmbeddedUnsafe.this.write(msg, promise);
1302                 } finally {
1303                     executingStackCnt--;
1304                     maybeRunPendingTasks();
1305                 }
1306             }
1307 
1308             @Override
1309             public void flush() {
1310                 executingStackCnt++;
1311                 try {
1312                     EmbeddedUnsafe.this.flush();
1313                 } finally {
1314                     executingStackCnt--;
1315                     maybeRunPendingTasks();
1316                 }
1317             }
1318 
1319             @Override
1320             public ChannelPromise voidPromise() {
1321                 return EmbeddedUnsafe.this.voidPromise();
1322             }
1323 
1324             @Override
1325             public ChannelOutboundBuffer outboundBuffer() {
1326                 return EmbeddedUnsafe.this.outboundBuffer();
1327             }
1328         };
1329 
1330         @Override
1331         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
1332             safeSetSuccess(promise);
1333         }
1334     }
1335 
1336     private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
1337         EmbeddedChannelPipeline(EmbeddedChannel channel) {
1338             super(channel);
1339         }
1340 
1341         @Override
1342         protected void onUnhandledInboundException(Throwable cause) {
1343             recordException(cause);
1344         }
1345 
1346         @Override
1347         protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
1348             handleInboundMessage(msg);
1349         }
1350     }
1351 }