View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import java.net.SocketAddress;
19  import java.nio.channels.ClosedChannelException;
20  import java.util.ArrayDeque;
21  import java.util.Queue;
22  import java.util.concurrent.TimeUnit;
23  
24  import io.netty.channel.AbstractChannel;
25  import io.netty.channel.Channel;
26  import io.netty.channel.ChannelConfig;
27  import io.netty.channel.ChannelFuture;
28  import io.netty.channel.ChannelFutureListener;
29  import io.netty.channel.ChannelHandler;
30  import io.netty.channel.ChannelHandlerContext;
31  import io.netty.channel.ChannelId;
32  import io.netty.channel.ChannelInitializer;
33  import io.netty.channel.ChannelMetadata;
34  import io.netty.channel.ChannelOutboundBuffer;
35  import io.netty.channel.ChannelPipeline;
36  import io.netty.channel.ChannelPromise;
37  import io.netty.channel.DefaultChannelConfig;
38  import io.netty.channel.DefaultChannelPipeline;
39  import io.netty.channel.EventLoop;
40  import io.netty.channel.RecvByteBufAllocator;
41  import io.netty.util.ReferenceCountUtil;
42  import io.netty.util.internal.ObjectUtil;
43  import io.netty.util.internal.PlatformDependent;
44  import io.netty.util.internal.RecyclableArrayList;
45  import io.netty.util.internal.logging.InternalLogger;
46  import io.netty.util.internal.logging.InternalLoggerFactory;
47  
48  /**
49   * Base class for {@link Channel} implementations that are used in an embedded fashion.
50   */
51  public class EmbeddedChannel extends AbstractChannel {
52  
53      private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
54      private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
55  
56      private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
57      private enum State { OPEN, ACTIVE, CLOSED }
58  
59      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
60  
61      private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
62      private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
63  
64      private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
65      private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
66          @Override
67          public void operationComplete(ChannelFuture future) throws Exception {
68              recordException(future);
69          }
70      };
71  
72      private final ChannelMetadata metadata;
73      private final ChannelConfig config;
74  
75      private Queue<Object> inboundMessages;
76      private Queue<Object> outboundMessages;
77      private Throwable lastException;
78      private State state;
79  
80      /**
81       * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
82       */
83      public EmbeddedChannel() {
84          this(EMPTY_HANDLERS);
85      }
86  
87      /**
88       * Create a new instance with the specified ID and an empty pipeline.
89       *
90       * @param channelId the {@link ChannelId} that will be used to identify this channel
91       */
92      public EmbeddedChannel(ChannelId channelId) {
93          this(channelId, EMPTY_HANDLERS);
94      }
95  
96      /**
97       * Create a new instance with the pipeline initialized with the specified handlers.
98       *
99       * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
100      */
101     public EmbeddedChannel(ChannelHandler... handlers) {
102         this(EmbeddedChannelId.INSTANCE, handlers);
103     }
104 
105     /**
106      * Create a new instance with the pipeline initialized with the specified handlers.
107      *
108      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
109      *                      to {@link #close()}, {@code true} otherwise.
110      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
111      */
112     public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
113         this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
114     }
115 
116     /**
117      * Create a new instance with the pipeline initialized with the specified handlers.
118      *
119      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
120      *                 constructor. If {@code false} the user will need to call {@link #register()}.
121      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
122      *                      to {@link #close()}, {@code true} otherwise.
123      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
124      */
125     public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
126         this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
127     }
128 
129     /**
130      * Create a new instance with the channel ID set to the given ID and the pipeline
131      * initialized with the specified handlers.
132      *
133      * @param channelId the {@link ChannelId} that will be used to identify this channel
134      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
135      */
136     public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
137         this(channelId, false, handlers);
138     }
139 
140     /**
141      * Create a new instance with the channel ID set to the given ID and the pipeline
142      * initialized with the specified handlers.
143      *
144      * @param channelId the {@link ChannelId} that will be used to identify this channel
145      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
146      *                      to {@link #close()}, {@code true} otherwise.
147      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
148      */
149     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
150         this(channelId, true, hasDisconnect, handlers);
151     }
152 
153     /**
154      * Create a new instance with the channel ID set to the given ID and the pipeline
155      * initialized with the specified handlers.
156      *
157      * @param channelId the {@link ChannelId} that will be used to identify this channel
158      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
159      *                 constructor. If {@code false} the user will need to call {@link #register()}.
160      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
161      *                      to {@link #close()}, {@code true} otherwise.
162      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
163      */
164     public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
165                            ChannelHandler... handlers) {
166         this(null, channelId, register, hasDisconnect, handlers);
167     }
168 
169     /**
170      * Create a new instance with the channel ID set to the given ID and the pipeline
171      * initialized with the specified handlers.
172      *
173      * @param parent    the parent {@link Channel} of this {@link EmbeddedChannel}.
174      * @param channelId the {@link ChannelId} that will be used to identify this channel
175      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
176      *                 constructor. If {@code false} the user will need to call {@link #register()}.
177      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
178      *                      to {@link #close()}, {@code true} otherwise.
179      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
180      */
181     public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
182                            final ChannelHandler... handlers) {
183         super(parent, channelId);
184         metadata = metadata(hasDisconnect);
185         config = new DefaultChannelConfig(this);
186         setup(register, handlers);
187     }
188 
189     /**
190      * Create a new instance with the channel ID set to the given ID and the pipeline
191      * initialized with the specified handlers.
192      *
193      * @param channelId the {@link ChannelId} that will be used to identify this channel
194      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
195      *                      to {@link #close()}, {@code true} otherwise.
196      * @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
197      * @param handlers the {@link ChannelHandler}s which will be added to the {@link ChannelPipeline}
198      */
199     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
200                            final ChannelHandler... handlers) {
201         super(null, channelId);
202         metadata = metadata(hasDisconnect);
203         this.config = ObjectUtil.checkNotNull(config, "config");
204         setup(true, handlers);
205     }
206 
207     private static ChannelMetadata metadata(boolean hasDisconnect) {
208         return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
209     }
210 
211     private void setup(boolean register, final ChannelHandler... handlers) {
212         ObjectUtil.checkNotNull(handlers, "handlers");
213         ChannelPipeline p = pipeline();
214         p.addLast(new ChannelInitializer<Channel>() {
215             @Override
216             protected void initChannel(Channel ch) throws Exception {
217                 ChannelPipeline pipeline = ch.pipeline();
218                 for (ChannelHandler h: handlers) {
219                     if (h == null) {
220                         break;
221                     }
222                     pipeline.addLast(h);
223                 }
224             }
225         });
226         if (register) {
227             ChannelFuture future = loop.register(this);
228             assert future.isDone();
229         }
230     }
231 
232     /**
233      * Register this {@code Channel} on its {@link EventLoop}.
234      */
235     public void register() throws Exception {
236         ChannelFuture future = loop.register(this);
237         assert future.isDone();
238         Throwable cause = future.cause();
239         if (cause != null) {
240             PlatformDependent.throwException(cause);
241         }
242     }
243 
244     @Override
245     protected final DefaultChannelPipeline newChannelPipeline() {
246         return new EmbeddedChannelPipeline(this);
247     }
248 
249     @Override
250     public ChannelMetadata metadata() {
251         return metadata;
252     }
253 
254     @Override
255     public ChannelConfig config() {
256         return config;
257     }
258 
259     @Override
260     public boolean isOpen() {
261         return state != State.CLOSED;
262     }
263 
264     @Override
265     public boolean isActive() {
266         return state == State.ACTIVE;
267     }
268 
269     /**
270      * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
271      */
272     public Queue<Object> inboundMessages() {
273         if (inboundMessages == null) {
274             inboundMessages = new ArrayDeque<Object>();
275         }
276         return inboundMessages;
277     }
278 
279     /**
280      * @deprecated use {@link #inboundMessages()}
281      */
282     @Deprecated
283     public Queue<Object> lastInboundBuffer() {
284         return inboundMessages();
285     }
286 
287     /**
288      * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
289      */
290     public Queue<Object> outboundMessages() {
291         if (outboundMessages == null) {
292             outboundMessages = new ArrayDeque<Object>();
293         }
294         return outboundMessages;
295     }
296 
297     /**
298      * @deprecated use {@link #outboundMessages()}
299      */
300     @Deprecated
301     public Queue<Object> lastOutboundBuffer() {
302         return outboundMessages();
303     }
304 
305     /**
306      * Return received data from this {@link Channel}
307      */
308     @SuppressWarnings("unchecked")
309     public <T> T readInbound() {
310         T message = (T) poll(inboundMessages);
311         if (message != null) {
312             ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
313         }
314         return message;
315     }
316 
317     /**
318      * Read data from the outbound. This may return {@code null} if nothing is readable.
319      */
320     @SuppressWarnings("unchecked")
321     public <T> T readOutbound() {
322         T message =  (T) poll(outboundMessages);
323         if (message != null) {
324             ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
325         }
326         return message;
327     }
328 
329     /**
330      * Write messages to the inbound of this {@link Channel}.
331      *
332      * @param msgs the messages to be written
333      *
334      * @return {@code true} if the write operation did add something to the inbound buffer
335      */
336     public boolean writeInbound(Object... msgs) {
337         ensureOpen();
338         if (msgs.length == 0) {
339             return isNotEmpty(inboundMessages);
340         }
341 
342         ChannelPipeline p = pipeline();
343         for (Object m: msgs) {
344             p.fireChannelRead(m);
345         }
346 
347         flushInbound(false, voidPromise());
348         return isNotEmpty(inboundMessages);
349     }
350 
351     /**
352      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
353      * method is conceptually equivalent to {@link #write(Object)}.
354      *
355      * @see #writeOneOutbound(Object)
356      */
357     public ChannelFuture writeOneInbound(Object msg) {
358         return writeOneInbound(msg, newPromise());
359     }
360 
361     /**
362      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
363      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
364      *
365      * @see #writeOneOutbound(Object, ChannelPromise)
366      */
367     public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
368         if (checkOpen(true)) {
369             pipeline().fireChannelRead(msg);
370         }
371         return checkException(promise);
372     }
373 
374     /**
375      * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
376      *
377      * @see #flushOutbound()
378      */
379     public EmbeddedChannel flushInbound() {
380         flushInbound(true, voidPromise());
381         return this;
382     }
383 
384     private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
385       if (checkOpen(recordException)) {
386           pipeline().fireChannelReadComplete();
387           runPendingTasks();
388       }
389 
390       return checkException(promise);
391     }
392 
393     /**
394      * Write messages to the outbound of this {@link Channel}.
395      *
396      * @param msgs              the messages to be written
397      * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
398      */
399     public boolean writeOutbound(Object... msgs) {
400         ensureOpen();
401         if (msgs.length == 0) {
402             return isNotEmpty(outboundMessages);
403         }
404 
405         RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
406         try {
407             for (Object m: msgs) {
408                 if (m == null) {
409                     break;
410                 }
411                 futures.add(write(m));
412             }
413 
414             flushOutbound0();
415 
416             int size = futures.size();
417             for (int i = 0; i < size; i++) {
418                 ChannelFuture future = (ChannelFuture) futures.get(i);
419                 if (future.isDone()) {
420                     recordException(future);
421                 } else {
422                     // The write may be delayed to run later by runPendingTasks()
423                     future.addListener(recordExceptionListener);
424                 }
425             }
426 
427             checkException();
428             return isNotEmpty(outboundMessages);
429         } finally {
430             futures.recycle();
431         }
432     }
433 
434     /**
435      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
436      * method is conceptually equivalent to {@link #write(Object)}.
437      *
438      * @see #writeOneInbound(Object)
439      */
440     public ChannelFuture writeOneOutbound(Object msg) {
441         return writeOneOutbound(msg, newPromise());
442     }
443 
444     /**
445      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
446      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
447      *
448      * @see #writeOneInbound(Object, ChannelPromise)
449      */
450     public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
451         if (checkOpen(true)) {
452             return write(msg, promise);
453         }
454         return checkException(promise);
455     }
456 
457     /**
458      * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
459      *
460      * @see #flushInbound()
461      */
462     public EmbeddedChannel flushOutbound() {
463         if (checkOpen(true)) {
464             flushOutbound0();
465         }
466         checkException(voidPromise());
467         return this;
468     }
469 
470     private void flushOutbound0() {
471         // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
472         // delay the write on the next eventloop run.
473         runPendingTasks();
474 
475         flush();
476     }
477 
478     /**
479      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
480      *
481      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
482      */
483     public boolean finish() {
484         return finish(false);
485     }
486 
487     /**
488      * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
489      * Any further try to write data to it will fail.
490      *
491      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
492      */
493     public boolean finishAndReleaseAll() {
494         return finish(true);
495     }
496 
497     /**
498      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
499      *
500      * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
501      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
502      */
503     private boolean finish(boolean releaseAll) {
504         close();
505         try {
506             checkException();
507             return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
508         } finally {
509             if (releaseAll) {
510                 releaseAll(inboundMessages);
511                 releaseAll(outboundMessages);
512             }
513         }
514     }
515 
516     /**
517      * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
518      * otherwise.
519      */
520     public boolean releaseInbound() {
521         return releaseAll(inboundMessages);
522     }
523 
524     /**
525      * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
526      * otherwise.
527      */
528     public boolean releaseOutbound() {
529         return releaseAll(outboundMessages);
530     }
531 
532     private static boolean releaseAll(Queue<Object> queue) {
533         if (isNotEmpty(queue)) {
534             for (;;) {
535                 Object msg = queue.poll();
536                 if (msg == null) {
537                     break;
538                 }
539                 ReferenceCountUtil.release(msg);
540             }
541             return true;
542         }
543         return false;
544     }
545 
546     private void finishPendingTasks(boolean cancel) {
547         runPendingTasks();
548         if (cancel) {
549             // Cancel all scheduled tasks that are left.
550             embeddedEventLoop().cancelScheduledTasks();
551         }
552     }
553 
554     @Override
555     public final ChannelFuture close() {
556         return close(newPromise());
557     }
558 
559     @Override
560     public final ChannelFuture disconnect() {
561         return disconnect(newPromise());
562     }
563 
564     @Override
565     public final ChannelFuture close(ChannelPromise promise) {
566         // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
567         // that needs to be run before the actual close takes place.
568         runPendingTasks();
569         ChannelFuture future = super.close(promise);
570 
571         // Now finish everything else and cancel all scheduled tasks that were not ready set.
572         finishPendingTasks(true);
573         return future;
574     }
575 
576     @Override
577     public final ChannelFuture disconnect(ChannelPromise promise) {
578         ChannelFuture future = super.disconnect(promise);
579         finishPendingTasks(!metadata.hasDisconnect());
580         return future;
581     }
582 
583     private static boolean isNotEmpty(Queue<Object> queue) {
584         return queue != null && !queue.isEmpty();
585     }
586 
587     private static Object poll(Queue<Object> queue) {
588         return queue != null ? queue.poll() : null;
589     }
590 
591     /**
592      * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
593      * for this {@link Channel}
594      */
595     public void runPendingTasks() {
596         try {
597             embeddedEventLoop().runTasks();
598         } catch (Exception e) {
599             recordException(e);
600         }
601 
602         try {
603             embeddedEventLoop().runScheduledTasks();
604         } catch (Exception e) {
605             recordException(e);
606         }
607     }
608 
609     /**
610      * Check whether this channel has any pending tasks that would be executed by a call to {@link #runPendingTasks()}.
611      * This includes normal tasks, and scheduled tasks where the deadline has expired. If this method returns
612      * {@code false}, a call to {@link #runPendingTasks()} would do nothing.
613      *
614      * @return {@code true} if there are any pending tasks, {@code false} otherwise.
615      */
616     public boolean hasPendingTasks() {
617         return embeddedEventLoop().hasPendingNormalTasks() ||
618                 embeddedEventLoop().nextScheduledTask() == 0;
619     }
620 
621     /**
622      * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
623      * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
624      * {@code -1}.
625      */
626     public long runScheduledPendingTasks() {
627         try {
628             return embeddedEventLoop().runScheduledTasks();
629         } catch (Exception e) {
630             recordException(e);
631             return embeddedEventLoop().nextScheduledTask();
632         }
633     }
634 
635     private void recordException(ChannelFuture future) {
636         if (!future.isSuccess()) {
637             recordException(future.cause());
638         }
639     }
640 
641     private void recordException(Throwable cause) {
642         if (lastException == null) {
643             lastException = cause;
644         } else {
645             logger.warn(
646                     "More than one exception was raised. " +
647                             "Will report only the first one and log others.", cause);
648         }
649     }
650 
651     /**
652      * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute
653      * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called).
654      */
655     public void advanceTimeBy(long duration, TimeUnit unit) {
656         embeddedEventLoop().advanceTimeBy(unit.toNanos(duration));
657     }
658 
659     /**
660      * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on
661      * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to
662      * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute.
663      */
664     public void freezeTime() {
665         embeddedEventLoop().freezeTime();
666     }
667 
668     /**
669      * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where
670      * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()}
671      * was called, it will run ten minutes after this method is called again (assuming no
672      * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using
673      * {@link #runScheduledPendingTasks()}).
674      */
675     public void unfreezeTime() {
676         embeddedEventLoop().unfreezeTime();
677     }
678 
679     /**
680      * Checks for the presence of an {@link Exception}.
681      */
682     private ChannelFuture checkException(ChannelPromise promise) {
683       Throwable t = lastException;
684       if (t != null) {
685         lastException = null;
686 
687         if (promise.isVoid()) {
688             PlatformDependent.throwException(t);
689         }
690 
691         return promise.setFailure(t);
692       }
693 
694       return promise.setSuccess();
695     }
696 
697     /**
698      * Check if there was any {@link Throwable} received and if so rethrow it.
699      */
700     public void checkException() {
701       checkException(voidPromise());
702     }
703 
704     /**
705      * Returns {@code true} if the {@link Channel} is open and records optionally
706      * an {@link Exception} if it isn't.
707      */
708     private boolean checkOpen(boolean recordException) {
709         if (!isOpen()) {
710           if (recordException) {
711               recordException(new ClosedChannelException());
712           }
713           return false;
714       }
715 
716       return true;
717     }
718 
719     private EmbeddedEventLoop embeddedEventLoop() {
720         if (isRegistered()) {
721             return (EmbeddedEventLoop) super.eventLoop();
722         }
723 
724         return loop;
725     }
726 
727     /**
728      * Ensure the {@link Channel} is open and if not throw an exception.
729      */
730     protected final void ensureOpen() {
731         if (!checkOpen(true)) {
732             checkException();
733         }
734     }
735 
736     @Override
737     protected boolean isCompatible(EventLoop loop) {
738         return loop instanceof EmbeddedEventLoop;
739     }
740 
741     @Override
742     protected SocketAddress localAddress0() {
743         return isActive()? LOCAL_ADDRESS : null;
744     }
745 
746     @Override
747     protected SocketAddress remoteAddress0() {
748         return isActive()? REMOTE_ADDRESS : null;
749     }
750 
751     @Override
752     protected void doRegister() throws Exception {
753         state = State.ACTIVE;
754     }
755 
756     @Override
757     protected void doBind(SocketAddress localAddress) throws Exception {
758         // NOOP
759     }
760 
761     @Override
762     protected void doDisconnect() throws Exception {
763         if (!metadata.hasDisconnect()) {
764             doClose();
765         }
766     }
767 
768     @Override
769     protected void doClose() throws Exception {
770         state = State.CLOSED;
771     }
772 
773     @Override
774     protected void doBeginRead() throws Exception {
775         // NOOP
776     }
777 
778     @Override
779     protected AbstractUnsafe newUnsafe() {
780         return new EmbeddedUnsafe();
781     }
782 
783     @Override
784     public Unsafe unsafe() {
785         return ((EmbeddedUnsafe) super.unsafe()).wrapped;
786     }
787 
788     @Override
789     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
790         for (;;) {
791             Object msg = in.current();
792             if (msg == null) {
793                 break;
794             }
795 
796             ReferenceCountUtil.retain(msg);
797             handleOutboundMessage(msg);
798             in.remove();
799         }
800     }
801 
802     /**
803      * Called for each outbound message.
804      *
805      * @see #doWrite(ChannelOutboundBuffer)
806      */
807     protected void handleOutboundMessage(Object msg) {
808         outboundMessages().add(msg);
809     }
810 
811     /**
812      * Called for each inbound message.
813      */
814     protected void handleInboundMessage(Object msg) {
815         inboundMessages().add(msg);
816     }
817 
818     private final class EmbeddedUnsafe extends AbstractUnsafe {
819 
820         // Delegates to the EmbeddedUnsafe instance but ensures runPendingTasks() is called after each operation
821         // that may change the state of the Channel and may schedule tasks for later execution.
822         final Unsafe wrapped = new Unsafe() {
823             @Override
824             public RecvByteBufAllocator.Handle recvBufAllocHandle() {
825                 return EmbeddedUnsafe.this.recvBufAllocHandle();
826             }
827 
828             @Override
829             public SocketAddress localAddress() {
830                 return EmbeddedUnsafe.this.localAddress();
831             }
832 
833             @Override
834             public SocketAddress remoteAddress() {
835                 return EmbeddedUnsafe.this.remoteAddress();
836             }
837 
838             @Override
839             public void register(EventLoop eventLoop, ChannelPromise promise) {
840                 EmbeddedUnsafe.this.register(eventLoop, promise);
841                 runPendingTasks();
842             }
843 
844             @Override
845             public void bind(SocketAddress localAddress, ChannelPromise promise) {
846                 EmbeddedUnsafe.this.bind(localAddress, promise);
847                 runPendingTasks();
848             }
849 
850             @Override
851             public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
852                 EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
853                 runPendingTasks();
854             }
855 
856             @Override
857             public void disconnect(ChannelPromise promise) {
858                 EmbeddedUnsafe.this.disconnect(promise);
859                 runPendingTasks();
860             }
861 
862             @Override
863             public void close(ChannelPromise promise) {
864                 EmbeddedUnsafe.this.close(promise);
865                 runPendingTasks();
866             }
867 
868             @Override
869             public void closeForcibly() {
870                 EmbeddedUnsafe.this.closeForcibly();
871                 runPendingTasks();
872             }
873 
874             @Override
875             public void deregister(ChannelPromise promise) {
876                 EmbeddedUnsafe.this.deregister(promise);
877                 runPendingTasks();
878             }
879 
880             @Override
881             public void beginRead() {
882                 EmbeddedUnsafe.this.beginRead();
883                 runPendingTasks();
884             }
885 
886             @Override
887             public void write(Object msg, ChannelPromise promise) {
888                 EmbeddedUnsafe.this.write(msg, promise);
889                 runPendingTasks();
890             }
891 
892             @Override
893             public void flush() {
894                 EmbeddedUnsafe.this.flush();
895                 runPendingTasks();
896             }
897 
898             @Override
899             public ChannelPromise voidPromise() {
900                 return EmbeddedUnsafe.this.voidPromise();
901             }
902 
903             @Override
904             public ChannelOutboundBuffer outboundBuffer() {
905                 return EmbeddedUnsafe.this.outboundBuffer();
906             }
907         };
908 
909         @Override
910         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
911             safeSetSuccess(promise);
912         }
913     }
914 
915     private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
916         EmbeddedChannelPipeline(EmbeddedChannel channel) {
917             super(channel);
918         }
919 
920         @Override
921         protected void onUnhandledInboundException(Throwable cause) {
922             recordException(cause);
923         }
924 
925         @Override
926         protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
927             handleInboundMessage(msg);
928         }
929     }
930 }