View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import java.net.SocketAddress;
19  import java.nio.channels.ClosedChannelException;
20  import java.util.ArrayDeque;
21  import java.util.Queue;
22  
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelHandler;
29  import io.netty.channel.ChannelId;
30  import io.netty.channel.ChannelInitializer;
31  import io.netty.channel.ChannelMetadata;
32  import io.netty.channel.ChannelOutboundBuffer;
33  import io.netty.channel.ChannelPipeline;
34  import io.netty.channel.ChannelPromise;
35  import io.netty.channel.DefaultChannelConfig;
36  import io.netty.channel.DefaultChannelPipeline;
37  import io.netty.channel.EventLoop;
38  import io.netty.channel.RecvByteBufAllocator;
39  import io.netty.util.ReferenceCountUtil;
40  import io.netty.util.internal.ObjectUtil;
41  import io.netty.util.internal.PlatformDependent;
42  import io.netty.util.internal.RecyclableArrayList;
43  import io.netty.util.internal.logging.InternalLogger;
44  import io.netty.util.internal.logging.InternalLoggerFactory;
45  
46  /**
47   * Base class for {@link Channel} implementations that are used in an embedded fashion.
48   */
49  public class EmbeddedChannel extends AbstractChannel {
50  
51      private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
52      private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
53  
54      private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
55      private enum State { OPEN, ACTIVE, CLOSED }
56  
57      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
58  
59      private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
60      private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
61  
62      private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
63      private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
64          @Override
65          public void operationComplete(ChannelFuture future) throws Exception {
66              recordException(future);
67          }
68      };
69  
70      private final ChannelMetadata metadata;
71      private final ChannelConfig config;
72  
73      private Queue<Object> inboundMessages;
74      private Queue<Object> outboundMessages;
75      private Throwable lastException;
76      private State state;
77  
78      /**
79       * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
80       */
81      public EmbeddedChannel() {
82          this(EMPTY_HANDLERS);
83      }
84  
85      /**
86       * Create a new instance with the specified ID and an empty pipeline.
87       *
88       * @param channelId the {@link ChannelId} that will be used to identify this channel
89       */
90      public EmbeddedChannel(ChannelId channelId) {
91          this(channelId, EMPTY_HANDLERS);
92      }
93  
94      /**
95       * Create a new instance with the pipeline initialized with the specified handlers.
96       *
97       * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
98       */
99      public EmbeddedChannel(ChannelHandler... handlers) {
100         this(EmbeddedChannelId.INSTANCE, handlers);
101     }
102 
103     /**
104      * Create a new instance with the pipeline initialized with the specified handlers.
105      *
106      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
107      *                      to {@link #close()}, {@link false} otherwise.
108      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
109      */
110     public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
111         this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
112     }
113 
114     /**
115      * Create a new instance with the pipeline initialized with the specified handlers.
116      *
117      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
118      *                 constructor. If {@code false} the user will need to call {@link #register()}.
119      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
120      *                      to {@link #close()}, {@link false} otherwise.
121      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
122      */
123     public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
124         this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
125     }
126 
127     /**
128      * Create a new instance with the channel ID set to the given ID and the pipeline
129      * initialized with the specified handlers.
130      *
131      * @param channelId the {@link ChannelId} that will be used to identify this channel
132      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
133      */
134     public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
135         this(channelId, false, handlers);
136     }
137 
138     /**
139      * Create a new instance with the channel ID set to the given ID and the pipeline
140      * initialized with the specified handlers.
141      *
142      * @param channelId the {@link ChannelId} that will be used to identify this channel
143      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
144      *                      to {@link #close()}, {@link false} otherwise.
145      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
146      */
147     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
148         this(channelId, true, hasDisconnect, handlers);
149     }
150 
151     /**
152      * Create a new instance with the channel ID set to the given ID and the pipeline
153      * initialized with the specified handlers.
154      *
155      * @param channelId the {@link ChannelId} that will be used to identify this channel
156      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
157      *                 constructor. If {@code false} the user will need to call {@link #register()}.
158      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
159      *                      to {@link #close()}, {@link false} otherwise.
160      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
161      */
162     public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
163                            final ChannelHandler... handlers) {
164         super(null, channelId);
165         metadata = metadata(hasDisconnect);
166         config = new DefaultChannelConfig(this);
167         setup(register, handlers);
168     }
169 
170     /**
171      * Create a new instance with the channel ID set to the given ID and the pipeline
172      * initialized with the specified handlers.
173      *
174      * @param channelId the {@link ChannelId} that will be used to identify this channel
175      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
176      *                      to {@link #close()}, {@link false} otherwise.
177      * @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
178      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
179      */
180     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
181                            final ChannelHandler... handlers) {
182         super(null, channelId);
183         metadata = metadata(hasDisconnect);
184         this.config = ObjectUtil.checkNotNull(config, "config");
185         setup(true, handlers);
186     }
187 
188     private static ChannelMetadata metadata(boolean hasDisconnect) {
189         return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
190     }
191 
192     private void setup(boolean register, final ChannelHandler... handlers) {
193         ObjectUtil.checkNotNull(handlers, "handlers");
194         ChannelPipeline p = pipeline();
195         p.addLast(new ChannelInitializer<Channel>() {
196             @Override
197             protected void initChannel(Channel ch) throws Exception {
198                 ChannelPipeline pipeline = ch.pipeline();
199                 for (ChannelHandler h: handlers) {
200                     if (h == null) {
201                         break;
202                     }
203                     pipeline.addLast(h);
204                 }
205             }
206         });
207         if (register) {
208             ChannelFuture future = loop.register(this);
209             assert future.isDone();
210         }
211     }
212 
213     /**
214      * Register this {@code Channel} on its {@link EventLoop}.
215      */
216     public void register() throws Exception {
217         ChannelFuture future = loop.register(this);
218         assert future.isDone();
219         Throwable cause = future.cause();
220         if (cause != null) {
221             PlatformDependent.throwException(cause);
222         }
223     }
224 
225     @Override
226     protected final DefaultChannelPipeline newChannelPipeline() {
227         return new EmbeddedChannelPipeline(this);
228     }
229 
230     @Override
231     public ChannelMetadata metadata() {
232         return metadata;
233     }
234 
235     @Override
236     public ChannelConfig config() {
237         return config;
238     }
239 
240     @Override
241     public boolean isOpen() {
242         return state != State.CLOSED;
243     }
244 
245     @Override
246     public boolean isActive() {
247         return state == State.ACTIVE;
248     }
249 
250     /**
251      * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
252      */
253     public Queue<Object> inboundMessages() {
254         if (inboundMessages == null) {
255             inboundMessages = new ArrayDeque<Object>();
256         }
257         return inboundMessages;
258     }
259 
260     /**
261      * @deprecated use {@link #inboundMessages()}
262      */
263     @Deprecated
264     public Queue<Object> lastInboundBuffer() {
265         return inboundMessages();
266     }
267 
268     /**
269      * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
270      */
271     public Queue<Object> outboundMessages() {
272         if (outboundMessages == null) {
273             outboundMessages = new ArrayDeque<Object>();
274         }
275         return outboundMessages;
276     }
277 
278     /**
279      * @deprecated use {@link #outboundMessages()}
280      */
281     @Deprecated
282     public Queue<Object> lastOutboundBuffer() {
283         return outboundMessages();
284     }
285 
286     /**
287      * Return received data from this {@link Channel}
288      */
289     @SuppressWarnings("unchecked")
290     public <T> T readInbound() {
291         T message = (T) poll(inboundMessages);
292         if (message != null) {
293             ReferenceCountUtil.touch(message, "Caller of readInbound() will handle the message from this point");
294         }
295         return message;
296     }
297 
298     /**
299      * Read data from the outbound. This may return {@code null} if nothing is readable.
300      */
301     @SuppressWarnings("unchecked")
302     public <T> T readOutbound() {
303         T message =  (T) poll(outboundMessages);
304         if (message != null) {
305             ReferenceCountUtil.touch(message, "Caller of readOutbound() will handle the message from this point.");
306         }
307         return message;
308     }
309 
310     /**
311      * Write messages to the inbound of this {@link Channel}.
312      *
313      * @param msgs the messages to be written
314      *
315      * @return {@code true} if the write operation did add something to the inbound buffer
316      */
317     public boolean writeInbound(Object... msgs) {
318         ensureOpen();
319         if (msgs.length == 0) {
320             return isNotEmpty(inboundMessages);
321         }
322 
323         ChannelPipeline p = pipeline();
324         for (Object m: msgs) {
325             p.fireChannelRead(m);
326         }
327 
328         flushInbound(false, voidPromise());
329         return isNotEmpty(inboundMessages);
330     }
331 
332     /**
333      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
334      * method is conceptually equivalent to {@link #write(Object)}.
335      *
336      * @see #writeOneOutbound(Object)
337      */
338     public ChannelFuture writeOneInbound(Object msg) {
339         return writeOneInbound(msg, newPromise());
340     }
341 
342     /**
343      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
344      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
345      *
346      * @see #writeOneOutbound(Object, ChannelPromise)
347      */
348     public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
349         if (checkOpen(true)) {
350             pipeline().fireChannelRead(msg);
351         }
352         return checkException(promise);
353     }
354 
355     /**
356      * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
357      *
358      * @see #flushOutbound()
359      */
360     public EmbeddedChannel flushInbound() {
361         flushInbound(true, voidPromise());
362         return this;
363     }
364 
365     private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
366       if (checkOpen(recordException)) {
367           pipeline().fireChannelReadComplete();
368           runPendingTasks();
369       }
370 
371       return checkException(promise);
372     }
373 
374     /**
375      * Write messages to the outbound of this {@link Channel}.
376      *
377      * @param msgs              the messages to be written
378      * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
379      */
380     public boolean writeOutbound(Object... msgs) {
381         ensureOpen();
382         if (msgs.length == 0) {
383             return isNotEmpty(outboundMessages);
384         }
385 
386         RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
387         try {
388             for (Object m: msgs) {
389                 if (m == null) {
390                     break;
391                 }
392                 futures.add(write(m));
393             }
394 
395             flushOutbound0();
396 
397             int size = futures.size();
398             for (int i = 0; i < size; i++) {
399                 ChannelFuture future = (ChannelFuture) futures.get(i);
400                 if (future.isDone()) {
401                     recordException(future);
402                 } else {
403                     // The write may be delayed to run later by runPendingTasks()
404                     future.addListener(recordExceptionListener);
405                 }
406             }
407 
408             checkException();
409             return isNotEmpty(outboundMessages);
410         } finally {
411             futures.recycle();
412         }
413     }
414 
415     /**
416      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
417      * method is conceptually equivalent to {@link #write(Object)}.
418      *
419      * @see #writeOneInbound(Object)
420      */
421     public ChannelFuture writeOneOutbound(Object msg) {
422         return writeOneOutbound(msg, newPromise());
423     }
424 
425     /**
426      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
427      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
428      *
429      * @see #writeOneInbound(Object, ChannelPromise)
430      */
431     public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
432         if (checkOpen(true)) {
433             return write(msg, promise);
434         }
435         return checkException(promise);
436     }
437 
438     /**
439      * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
440      *
441      * @see #flushInbound()
442      */
443     public EmbeddedChannel flushOutbound() {
444         if (checkOpen(true)) {
445             flushOutbound0();
446         }
447         checkException(voidPromise());
448         return this;
449     }
450 
451     private void flushOutbound0() {
452         // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
453         // delay the write on the next eventloop run.
454         runPendingTasks();
455 
456         flush();
457     }
458 
459     /**
460      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
461      *
462      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
463      */
464     public boolean finish() {
465         return finish(false);
466     }
467 
468     /**
469      * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
470      * Any further try to write data to it will fail.
471      *
472      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
473      */
474     public boolean finishAndReleaseAll() {
475         return finish(true);
476     }
477 
478     /**
479      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
480      *
481      * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
482      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
483      */
484     private boolean finish(boolean releaseAll) {
485         close();
486         try {
487             checkException();
488             return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
489         } finally {
490             if (releaseAll) {
491                 releaseAll(inboundMessages);
492                 releaseAll(outboundMessages);
493             }
494         }
495     }
496 
497     /**
498      * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
499      * otherwise.
500      */
501     public boolean releaseInbound() {
502         return releaseAll(inboundMessages);
503     }
504 
505     /**
506      * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
507      * otherwise.
508      */
509     public boolean releaseOutbound() {
510         return releaseAll(outboundMessages);
511     }
512 
513     private static boolean releaseAll(Queue<Object> queue) {
514         if (isNotEmpty(queue)) {
515             for (;;) {
516                 Object msg = queue.poll();
517                 if (msg == null) {
518                     break;
519                 }
520                 ReferenceCountUtil.release(msg);
521             }
522             return true;
523         }
524         return false;
525     }
526 
527     private void finishPendingTasks(boolean cancel) {
528         runPendingTasks();
529         if (cancel) {
530             // Cancel all scheduled tasks that are left.
531             loop.cancelScheduledTasks();
532         }
533     }
534 
535     @Override
536     public final ChannelFuture close() {
537         return close(newPromise());
538     }
539 
540     @Override
541     public final ChannelFuture disconnect() {
542         return disconnect(newPromise());
543     }
544 
545     @Override
546     public final ChannelFuture close(ChannelPromise promise) {
547         // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
548         // that needs to be run before the actual close takes place.
549         runPendingTasks();
550         ChannelFuture future = super.close(promise);
551 
552         // Now finish everything else and cancel all scheduled tasks that were not ready set.
553         finishPendingTasks(true);
554         return future;
555     }
556 
557     @Override
558     public final ChannelFuture disconnect(ChannelPromise promise) {
559         ChannelFuture future = super.disconnect(promise);
560         finishPendingTasks(!metadata.hasDisconnect());
561         return future;
562     }
563 
564     private static boolean isNotEmpty(Queue<Object> queue) {
565         return queue != null && !queue.isEmpty();
566     }
567 
568     private static Object poll(Queue<Object> queue) {
569         return queue != null ? queue.poll() : null;
570     }
571 
572     /**
573      * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
574      * for this {@link Channel}
575      */
576     public void runPendingTasks() {
577         try {
578             loop.runTasks();
579         } catch (Exception e) {
580             recordException(e);
581         }
582 
583         try {
584             loop.runScheduledTasks();
585         } catch (Exception e) {
586             recordException(e);
587         }
588     }
589 
590     /**
591      * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
592      * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
593      * {@code -1}.
594      */
595     public long runScheduledPendingTasks() {
596         try {
597             return loop.runScheduledTasks();
598         } catch (Exception e) {
599             recordException(e);
600             return loop.nextScheduledTask();
601         }
602     }
603 
604     private void recordException(ChannelFuture future) {
605         if (!future.isSuccess()) {
606             recordException(future.cause());
607         }
608     }
609 
610     private void recordException(Throwable cause) {
611         if (lastException == null) {
612             lastException = cause;
613         } else {
614             logger.warn(
615                     "More than one exception was raised. " +
616                             "Will report only the first one and log others.", cause);
617         }
618     }
619 
620     /**
621      * Checks for the presence of an {@link Exception}.
622      */
623     private ChannelFuture checkException(ChannelPromise promise) {
624       Throwable t = lastException;
625       if (t != null) {
626         lastException = null;
627 
628         if (promise.isVoid()) {
629             PlatformDependent.throwException(t);
630         }
631 
632         return promise.setFailure(t);
633       }
634 
635       return promise.setSuccess();
636     }
637 
638     /**
639      * Check if there was any {@link Throwable} received and if so rethrow it.
640      */
641     public void checkException() {
642       checkException(voidPromise());
643     }
644 
645     /**
646      * Returns {@code true} if the {@link Channel} is open and records optionally
647      * an {@link Exception} if it isn't.
648      */
649     private boolean checkOpen(boolean recordException) {
650         if (!isOpen()) {
651           if (recordException) {
652               recordException(new ClosedChannelException());
653           }
654           return false;
655       }
656 
657       return true;
658     }
659 
660     /**
661      * Ensure the {@link Channel} is open and if not throw an exception.
662      */
663     protected final void ensureOpen() {
664         if (!checkOpen(true)) {
665             checkException();
666         }
667     }
668 
669     @Override
670     protected boolean isCompatible(EventLoop loop) {
671         return loop instanceof EmbeddedEventLoop;
672     }
673 
674     @Override
675     protected SocketAddress localAddress0() {
676         return isActive()? LOCAL_ADDRESS : null;
677     }
678 
679     @Override
680     protected SocketAddress remoteAddress0() {
681         return isActive()? REMOTE_ADDRESS : null;
682     }
683 
684     @Override
685     protected void doRegister() throws Exception {
686         state = State.ACTIVE;
687     }
688 
689     @Override
690     protected void doBind(SocketAddress localAddress) throws Exception {
691         // NOOP
692     }
693 
694     @Override
695     protected void doDisconnect() throws Exception {
696         if (!metadata.hasDisconnect()) {
697             doClose();
698         }
699     }
700 
701     @Override
702     protected void doClose() throws Exception {
703         state = State.CLOSED;
704     }
705 
706     @Override
707     protected void doBeginRead() throws Exception {
708         // NOOP
709     }
710 
711     @Override
712     protected AbstractUnsafe newUnsafe() {
713         return new EmbeddedUnsafe();
714     }
715 
716     @Override
717     public Unsafe unsafe() {
718         return ((EmbeddedUnsafe) super.unsafe()).wrapped;
719     }
720 
721     @Override
722     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
723         for (;;) {
724             Object msg = in.current();
725             if (msg == null) {
726                 break;
727             }
728 
729             ReferenceCountUtil.retain(msg);
730             handleOutboundMessage(msg);
731             in.remove();
732         }
733     }
734 
735     /**
736      * Called for each outbound message.
737      *
738      * @see #doWrite(ChannelOutboundBuffer)
739      */
740     protected void handleOutboundMessage(Object msg) {
741         outboundMessages().add(msg);
742     }
743 
744     /**
745      * Called for each inbound message.
746      */
747     protected void handleInboundMessage(Object msg) {
748         inboundMessages().add(msg);
749     }
750 
751     private final class EmbeddedUnsafe extends AbstractUnsafe {
752 
753         // Delegates to the EmbeddedUnsafe instance but ensures runPendingTasks() is called after each operation
754         // that may change the state of the Channel and may schedule tasks for later execution.
755         final Unsafe wrapped = new Unsafe() {
756             @Override
757             public RecvByteBufAllocator.Handle recvBufAllocHandle() {
758                 return EmbeddedUnsafe.this.recvBufAllocHandle();
759             }
760 
761             @Override
762             public SocketAddress localAddress() {
763                 return EmbeddedUnsafe.this.localAddress();
764             }
765 
766             @Override
767             public SocketAddress remoteAddress() {
768                 return EmbeddedUnsafe.this.remoteAddress();
769             }
770 
771             @Override
772             public void register(EventLoop eventLoop, ChannelPromise promise) {
773                 EmbeddedUnsafe.this.register(eventLoop, promise);
774                 runPendingTasks();
775             }
776 
777             @Override
778             public void bind(SocketAddress localAddress, ChannelPromise promise) {
779                 EmbeddedUnsafe.this.bind(localAddress, promise);
780                 runPendingTasks();
781             }
782 
783             @Override
784             public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
785                 EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
786                 runPendingTasks();
787             }
788 
789             @Override
790             public void disconnect(ChannelPromise promise) {
791                 EmbeddedUnsafe.this.disconnect(promise);
792                 runPendingTasks();
793             }
794 
795             @Override
796             public void close(ChannelPromise promise) {
797                 EmbeddedUnsafe.this.close(promise);
798                 runPendingTasks();
799             }
800 
801             @Override
802             public void closeForcibly() {
803                 EmbeddedUnsafe.this.closeForcibly();
804                 runPendingTasks();
805             }
806 
807             @Override
808             public void deregister(ChannelPromise promise) {
809                 EmbeddedUnsafe.this.deregister(promise);
810                 runPendingTasks();
811             }
812 
813             @Override
814             public void beginRead() {
815                 EmbeddedUnsafe.this.beginRead();
816                 runPendingTasks();
817             }
818 
819             @Override
820             public void write(Object msg, ChannelPromise promise) {
821                 EmbeddedUnsafe.this.write(msg, promise);
822                 runPendingTasks();
823             }
824 
825             @Override
826             public void flush() {
827                 EmbeddedUnsafe.this.flush();
828                 runPendingTasks();
829             }
830 
831             @Override
832             public ChannelPromise voidPromise() {
833                 return EmbeddedUnsafe.this.voidPromise();
834             }
835 
836             @Override
837             public ChannelOutboundBuffer outboundBuffer() {
838                 return EmbeddedUnsafe.this.outboundBuffer();
839             }
840         };
841 
842         @Override
843         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
844             safeSetSuccess(promise);
845         }
846     }
847 
848     private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
849         EmbeddedChannelPipeline(EmbeddedChannel channel) {
850             super(channel);
851         }
852 
853         @Override
854         protected void onUnhandledInboundException(Throwable cause) {
855             recordException(cause);
856         }
857 
858         @Override
859         protected void onUnhandledInboundMessage(Object msg) {
860           handleInboundMessage(msg);
861         }
862     }
863 }