View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.embedded;
17  
18  import java.net.SocketAddress;
19  import java.nio.channels.ClosedChannelException;
20  import java.util.ArrayDeque;
21  import java.util.Queue;
22  
23  import io.netty.channel.AbstractChannel;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelHandler;
29  import io.netty.channel.ChannelId;
30  import io.netty.channel.ChannelInitializer;
31  import io.netty.channel.ChannelMetadata;
32  import io.netty.channel.ChannelOutboundBuffer;
33  import io.netty.channel.ChannelPipeline;
34  import io.netty.channel.ChannelPromise;
35  import io.netty.channel.DefaultChannelConfig;
36  import io.netty.channel.DefaultChannelPipeline;
37  import io.netty.channel.EventLoop;
38  import io.netty.channel.RecvByteBufAllocator;
39  import io.netty.util.ReferenceCountUtil;
40  import io.netty.util.internal.ObjectUtil;
41  import io.netty.util.internal.PlatformDependent;
42  import io.netty.util.internal.RecyclableArrayList;
43  import io.netty.util.internal.UnstableApi;
44  import io.netty.util.internal.logging.InternalLogger;
45  import io.netty.util.internal.logging.InternalLoggerFactory;
46  
47  /**
48   * Base class for {@link Channel} implementations that are used in an embedded fashion.
49   */
50  public class EmbeddedChannel extends AbstractChannel {
51  
52      private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
53      private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
54  
55      private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
56      private enum State { OPEN, ACTIVE, CLOSED }
57  
58      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
59  
60      private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
61      private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
62  
63      private final EmbeddedEventLoop loop = new EmbeddedEventLoop();
64      private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
65          @Override
66          public void operationComplete(ChannelFuture future) throws Exception {
67              recordException(future);
68          }
69      };
70  
71      private final ChannelMetadata metadata;
72      private final ChannelConfig config;
73  
74      private Queue<Object> inboundMessages;
75      private Queue<Object> outboundMessages;
76      private Throwable lastException;
77      private State state;
78  
79      /**
80       * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
81       */
82      public EmbeddedChannel() {
83          this(EMPTY_HANDLERS);
84      }
85  
86      /**
87       * Create a new instance with the specified ID and an empty pipeline.
88       *
89       * @param channelId the {@link ChannelId} that will be used to identify this channel
90       */
91      public EmbeddedChannel(ChannelId channelId) {
92          this(channelId, EMPTY_HANDLERS);
93      }
94  
95      /**
96       * Create a new instance with the pipeline initialized with the specified handlers.
97       *
98       * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
99       */
100     public EmbeddedChannel(ChannelHandler... handlers) {
101         this(EmbeddedChannelId.INSTANCE, handlers);
102     }
103 
104     /**
105      * Create a new instance with the pipeline initialized with the specified handlers.
106      *
107      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
108      *                      to {@link #close()}, {@link false} otherwise.
109      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
110      */
111     public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
112         this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
113     }
114 
115     /**
116      * Create a new instance with the pipeline initialized with the specified handlers.
117      *
118      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
119      *                 constructor. If {@code false} the user will need to call {@link #register()}.
120      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
121      *                      to {@link #close()}, {@link false} otherwise.
122      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
123      */
124     public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
125         this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
126     }
127 
128     /**
129      * Create a new instance with the channel ID set to the given ID and the pipeline
130      * initialized with the specified handlers.
131      *
132      * @param channelId the {@link ChannelId} that will be used to identify this channel
133      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
134      */
135     public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
136         this(channelId, false, handlers);
137     }
138 
139     /**
140      * Create a new instance with the channel ID set to the given ID and the pipeline
141      * initialized with the specified handlers.
142      *
143      * @param channelId the {@link ChannelId} that will be used to identify this channel
144      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
145      *                      to {@link #close()}, {@link false} otherwise.
146      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
147      */
148     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
149         this(channelId, true, hasDisconnect, handlers);
150     }
151 
152     /**
153      * Create a new instance with the channel ID set to the given ID and the pipeline
154      * initialized with the specified handlers.
155      *
156      * @param channelId the {@link ChannelId} that will be used to identify this channel
157      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
158      *                 constructor. If {@code false} the user will need to call {@link #register()}.
159      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
160      *                      to {@link #close()}, {@link false} otherwise.
161      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
162      */
163     public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
164                            final ChannelHandler... handlers) {
165         super(null, channelId);
166         metadata = metadata(hasDisconnect);
167         config = new DefaultChannelConfig(this);
168         setup(register, handlers);
169     }
170 
171     /**
172      * Create a new instance with the channel ID set to the given ID and the pipeline
173      * initialized with the specified handlers.
174      *
175      * @param channelId the {@link ChannelId} that will be used to identify this channel
176      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
177      *                      to {@link #close()}, {@link false} otherwise.
178      * @param config the {@link ChannelConfig} which will be returned by {@link #config()}.
179      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
180      */
181     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, final ChannelConfig config,
182                            final ChannelHandler... handlers) {
183         super(null, channelId);
184         metadata = metadata(hasDisconnect);
185         this.config = ObjectUtil.checkNotNull(config, "config");
186         setup(true, handlers);
187     }
188 
189     private static ChannelMetadata metadata(boolean hasDisconnect) {
190         return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
191     }
192 
193     private void setup(boolean register, final ChannelHandler... handlers) {
194         ObjectUtil.checkNotNull(handlers, "handlers");
195         ChannelPipeline p = pipeline();
196         p.addLast(new ChannelInitializer<Channel>() {
197             @Override
198             protected void initChannel(Channel ch) throws Exception {
199                 ChannelPipeline pipeline = ch.pipeline();
200                 for (ChannelHandler h: handlers) {
201                     if (h == null) {
202                         break;
203                     }
204                     pipeline.addLast(h);
205                 }
206             }
207         });
208         if (register) {
209             ChannelFuture future = loop.register(this);
210             assert future.isDone();
211         }
212     }
213 
214     /**
215      * Register this {@code Channel} on its {@link EventLoop}.
216      */
217     public void register() throws Exception {
218         ChannelFuture future = loop.register(this);
219         assert future.isDone();
220         Throwable cause = future.cause();
221         if (cause != null) {
222             PlatformDependent.throwException(cause);
223         }
224     }
225 
226     @Override
227     protected final DefaultChannelPipeline newChannelPipeline() {
228         return new EmbeddedChannelPipeline(this);
229     }
230 
231     @Override
232     public ChannelMetadata metadata() {
233         return metadata;
234     }
235 
236     @Override
237     public ChannelConfig config() {
238         return config;
239     }
240 
241     @Override
242     public boolean isOpen() {
243         return state != State.CLOSED;
244     }
245 
246     @Override
247     public boolean isActive() {
248         return state == State.ACTIVE;
249     }
250 
251     /**
252      * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
253      */
254     public Queue<Object> inboundMessages() {
255         if (inboundMessages == null) {
256             inboundMessages = new ArrayDeque<Object>();
257         }
258         return inboundMessages;
259     }
260 
261     /**
262      * @deprecated use {@link #inboundMessages()}
263      */
264     @Deprecated
265     public Queue<Object> lastInboundBuffer() {
266         return inboundMessages();
267     }
268 
269     /**
270      * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
271      */
272     public Queue<Object> outboundMessages() {
273         if (outboundMessages == null) {
274             outboundMessages = new ArrayDeque<Object>();
275         }
276         return outboundMessages;
277     }
278 
279     /**
280      * @deprecated use {@link #outboundMessages()}
281      */
282     @Deprecated
283     public Queue<Object> lastOutboundBuffer() {
284         return outboundMessages();
285     }
286 
287     /**
288      * Return received data from this {@link Channel}
289      */
290     @SuppressWarnings("unchecked")
291     public <T> T readInbound() {
292         return (T) poll(inboundMessages);
293     }
294 
295     /**
296      * Read data from the outbound. This may return {@code null} if nothing is readable.
297      */
298     @SuppressWarnings("unchecked")
299     public <T> T readOutbound() {
300         return (T) poll(outboundMessages);
301     }
302 
303     /**
304      * Write messages to the inbound of this {@link Channel}.
305      *
306      * @param msgs the messages to be written
307      *
308      * @return {@code true} if the write operation did add something to the inbound buffer
309      */
310     public boolean writeInbound(Object... msgs) {
311         ensureOpen();
312         if (msgs.length == 0) {
313             return isNotEmpty(inboundMessages);
314         }
315 
316         ChannelPipeline p = pipeline();
317         for (Object m: msgs) {
318             p.fireChannelRead(m);
319         }
320 
321         flushInbound(false, voidPromise());
322         return isNotEmpty(inboundMessages);
323     }
324 
325     /**
326      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
327      * method is conceptually equivalent to {@link #write(Object)}.
328      *
329      * @see #writeOneOutbound(Object)
330      */
331     public ChannelFuture writeOneInbound(Object msg) {
332         return writeOneInbound(msg, newPromise());
333     }
334 
335     /**
336      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
337      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
338      *
339      * @see #writeOneOutbound(Object, ChannelPromise)
340      */
341     public ChannelFuture writeOneInbound(Object msg, ChannelPromise promise) {
342         if (checkOpen(true)) {
343             pipeline().fireChannelRead(msg);
344         }
345         return checkException(promise);
346     }
347 
348     /**
349      * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
350      *
351      * @see #flushOutbound()
352      */
353     public EmbeddedChannel flushInbound() {
354         flushInbound(true, voidPromise());
355         return this;
356     }
357 
358     private ChannelFuture flushInbound(boolean recordException, ChannelPromise promise) {
359       if (checkOpen(recordException)) {
360           pipeline().fireChannelReadComplete();
361           runPendingTasks();
362       }
363 
364       return checkException(promise);
365     }
366 
367     /**
368      * Write messages to the outbound of this {@link Channel}.
369      *
370      * @param msgs              the messages to be written
371      * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
372      */
373     public boolean writeOutbound(Object... msgs) {
374         ensureOpen();
375         if (msgs.length == 0) {
376             return isNotEmpty(outboundMessages);
377         }
378 
379         RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
380         try {
381             for (Object m: msgs) {
382                 if (m == null) {
383                     break;
384                 }
385                 futures.add(write(m));
386             }
387 
388             flushOutbound0();
389 
390             int size = futures.size();
391             for (int i = 0; i < size; i++) {
392                 ChannelFuture future = (ChannelFuture) futures.get(i);
393                 if (future.isDone()) {
394                     recordException(future);
395                 } else {
396                     // The write may be delayed to run later by runPendingTasks()
397                     future.addListener(recordExceptionListener);
398                 }
399             }
400 
401             checkException();
402             return isNotEmpty(outboundMessages);
403         } finally {
404             futures.recycle();
405         }
406     }
407 
408     /**
409      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
410      * method is conceptually equivalent to {@link #write(Object)}.
411      *
412      * @see #writeOneInbound(Object)
413      */
414     public ChannelFuture writeOneOutbound(Object msg) {
415         return writeOneOutbound(msg, newPromise());
416     }
417 
418     /**
419      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
420      * method is conceptually equivalent to {@link #write(Object, ChannelPromise)}.
421      *
422      * @see #writeOneInbound(Object, ChannelPromise)
423      */
424     public ChannelFuture writeOneOutbound(Object msg, ChannelPromise promise) {
425         if (checkOpen(true)) {
426             return write(msg, promise);
427         }
428         return checkException(promise);
429     }
430 
431     /**
432      * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
433      *
434      * @see #flushInbound()
435      */
436     public EmbeddedChannel flushOutbound() {
437         if (checkOpen(true)) {
438             flushOutbound0();
439         }
440         checkException(voidPromise());
441         return this;
442     }
443 
444     private void flushOutbound0() {
445         // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
446         // delay the write on the next eventloop run.
447         runPendingTasks();
448 
449         flush();
450     }
451 
452     /**
453      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
454      *
455      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
456      */
457     public boolean finish() {
458         return finish(false);
459     }
460 
461     /**
462      * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
463      * Any further try to write data to it will fail.
464      *
465      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
466      */
467     public boolean finishAndReleaseAll() {
468         return finish(true);
469     }
470 
471     /**
472      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
473      *
474      * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
475      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
476      */
477     private boolean finish(boolean releaseAll) {
478         close();
479         try {
480             checkException();
481             return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
482         } finally {
483             if (releaseAll) {
484                 releaseAll(inboundMessages);
485                 releaseAll(outboundMessages);
486             }
487         }
488     }
489 
490     /**
491      * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
492      * otherwise.
493      */
494     public boolean releaseInbound() {
495         return releaseAll(inboundMessages);
496     }
497 
498     /**
499      * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
500      * otherwise.
501      */
502     public boolean releaseOutbound() {
503         return releaseAll(outboundMessages);
504     }
505 
506     private static boolean releaseAll(Queue<Object> queue) {
507         if (isNotEmpty(queue)) {
508             for (;;) {
509                 Object msg = queue.poll();
510                 if (msg == null) {
511                     break;
512                 }
513                 ReferenceCountUtil.release(msg);
514             }
515             return true;
516         }
517         return false;
518     }
519 
520     private void finishPendingTasks(boolean cancel) {
521         runPendingTasks();
522         if (cancel) {
523             // Cancel all scheduled tasks that are left.
524             loop.cancelScheduledTasks();
525         }
526     }
527 
528     @Override
529     public final ChannelFuture close() {
530         return close(newPromise());
531     }
532 
533     @Override
534     public final ChannelFuture disconnect() {
535         return disconnect(newPromise());
536     }
537 
538     @Override
539     public final ChannelFuture close(ChannelPromise promise) {
540         // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
541         // that needs to be run before the actual close takes place.
542         runPendingTasks();
543         ChannelFuture future = super.close(promise);
544 
545         // Now finish everything else and cancel all scheduled tasks that were not ready set.
546         finishPendingTasks(true);
547         return future;
548     }
549 
550     @Override
551     public final ChannelFuture disconnect(ChannelPromise promise) {
552         ChannelFuture future = super.disconnect(promise);
553         finishPendingTasks(!metadata.hasDisconnect());
554         return future;
555     }
556 
557     private static boolean isNotEmpty(Queue<Object> queue) {
558         return queue != null && !queue.isEmpty();
559     }
560 
561     private static Object poll(Queue<Object> queue) {
562         return queue != null ? queue.poll() : null;
563     }
564 
565     /**
566      * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
567      * for this {@link Channel}
568      */
569     public void runPendingTasks() {
570         try {
571             loop.runTasks();
572         } catch (Exception e) {
573             recordException(e);
574         }
575 
576         try {
577             loop.runScheduledTasks();
578         } catch (Exception e) {
579             recordException(e);
580         }
581     }
582 
583     /**
584      * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
585      * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
586      * {@code -1}.
587      */
588     public long runScheduledPendingTasks() {
589         try {
590             return loop.runScheduledTasks();
591         } catch (Exception e) {
592             recordException(e);
593             return loop.nextScheduledTask();
594         }
595     }
596 
597     private void recordException(ChannelFuture future) {
598         if (!future.isSuccess()) {
599             recordException(future.cause());
600         }
601     }
602 
603     private void recordException(Throwable cause) {
604         if (lastException == null) {
605             lastException = cause;
606         } else {
607             logger.warn(
608                     "More than one exception was raised. " +
609                             "Will report only the first one and log others.", cause);
610         }
611     }
612 
613     /**
614      * Checks for the presence of an {@link Exception}.
615      */
616     private ChannelFuture checkException(ChannelPromise promise) {
617       Throwable t = lastException;
618       if (t != null) {
619         lastException = null;
620 
621         if (promise.isVoid()) {
622             PlatformDependent.throwException(t);
623         }
624 
625         return promise.setFailure(t);
626       }
627 
628       return promise.setSuccess();
629     }
630 
631     /**
632      * Check if there was any {@link Throwable} received and if so rethrow it.
633      */
634     public void checkException() {
635       checkException(voidPromise());
636     }
637 
638     /**
639      * Returns {@code true} if the {@link Channel} is open and records optionally
640      * an {@link Exception} if it isn't.
641      */
642     private boolean checkOpen(boolean recordException) {
643         if (!isOpen()) {
644           if (recordException) {
645               recordException(new ClosedChannelException());
646           }
647           return false;
648       }
649 
650       return true;
651     }
652 
653     /**
654      * Ensure the {@link Channel} is open and if not throw an exception.
655      */
656     protected final void ensureOpen() {
657         if (!checkOpen(true)) {
658             checkException();
659         }
660     }
661 
662     @Override
663     protected boolean isCompatible(EventLoop loop) {
664         return loop instanceof EmbeddedEventLoop;
665     }
666 
667     @Override
668     protected SocketAddress localAddress0() {
669         return isActive()? LOCAL_ADDRESS : null;
670     }
671 
672     @Override
673     protected SocketAddress remoteAddress0() {
674         return isActive()? REMOTE_ADDRESS : null;
675     }
676 
677     @Override
678     protected void doRegister() throws Exception {
679         state = State.ACTIVE;
680     }
681 
682     @Override
683     protected void doBind(SocketAddress localAddress) throws Exception {
684         // NOOP
685     }
686 
687     @Override
688     protected void doDisconnect() throws Exception {
689         if (!metadata.hasDisconnect()) {
690             doClose();
691         }
692     }
693 
694     @Override
695     protected void doClose() throws Exception {
696         state = State.CLOSED;
697     }
698 
699     @Override
700     protected void doBeginRead() throws Exception {
701         // NOOP
702     }
703 
704     @Override
705     protected AbstractUnsafe newUnsafe() {
706         return new EmbeddedUnsafe();
707     }
708 
709     @Override
710     public Unsafe unsafe() {
711         return ((EmbeddedUnsafe) super.unsafe()).wrapped;
712     }
713 
714     @Override
715     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
716         for (;;) {
717             Object msg = in.current();
718             if (msg == null) {
719                 break;
720             }
721 
722             ReferenceCountUtil.retain(msg);
723             handleOutboundMessage(msg);
724             in.remove();
725         }
726     }
727 
728     /**
729      * Called for each outbound message.
730      *
731      * @see #doWrite(ChannelOutboundBuffer)
732      */
733     protected void handleOutboundMessage(Object msg) {
734         outboundMessages().add(msg);
735     }
736 
737     /**
738      * Called for each inbound message.
739      */
740     protected void handleInboundMessage(Object msg) {
741         inboundMessages().add(msg);
742     }
743 
744     private final class EmbeddedUnsafe extends AbstractUnsafe {
745 
746         // Delegates to the EmbeddedUnsafe instance but ensures runPendingTasks() is called after each operation
747         // that may change the state of the Channel and may schedule tasks for later execution.
748         final Unsafe wrapped = new Unsafe() {
749             @Override
750             public RecvByteBufAllocator.Handle recvBufAllocHandle() {
751                 return EmbeddedUnsafe.this.recvBufAllocHandle();
752             }
753 
754             @Override
755             public SocketAddress localAddress() {
756                 return EmbeddedUnsafe.this.localAddress();
757             }
758 
759             @Override
760             public SocketAddress remoteAddress() {
761                 return EmbeddedUnsafe.this.remoteAddress();
762             }
763 
764             @Override
765             public void register(EventLoop eventLoop, ChannelPromise promise) {
766                 EmbeddedUnsafe.this.register(eventLoop, promise);
767                 runPendingTasks();
768             }
769 
770             @Override
771             public void bind(SocketAddress localAddress, ChannelPromise promise) {
772                 EmbeddedUnsafe.this.bind(localAddress, promise);
773                 runPendingTasks();
774             }
775 
776             @Override
777             public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
778                 EmbeddedUnsafe.this.connect(remoteAddress, localAddress, promise);
779                 runPendingTasks();
780             }
781 
782             @Override
783             public void disconnect(ChannelPromise promise) {
784                 EmbeddedUnsafe.this.disconnect(promise);
785                 runPendingTasks();
786             }
787 
788             @Override
789             public void close(ChannelPromise promise) {
790                 EmbeddedUnsafe.this.close(promise);
791                 runPendingTasks();
792             }
793 
794             @Override
795             public void closeForcibly() {
796                 EmbeddedUnsafe.this.closeForcibly();
797                 runPendingTasks();
798             }
799 
800             @Override
801             public void deregister(ChannelPromise promise) {
802                 EmbeddedUnsafe.this.deregister(promise);
803                 runPendingTasks();
804             }
805 
806             @Override
807             public void beginRead() {
808                 EmbeddedUnsafe.this.beginRead();
809                 runPendingTasks();
810             }
811 
812             @Override
813             public void write(Object msg, ChannelPromise promise) {
814                 EmbeddedUnsafe.this.write(msg, promise);
815                 runPendingTasks();
816             }
817 
818             @Override
819             public void flush() {
820                 EmbeddedUnsafe.this.flush();
821                 runPendingTasks();
822             }
823 
824             @Override
825             public ChannelPromise voidPromise() {
826                 return EmbeddedUnsafe.this.voidPromise();
827             }
828 
829             @Override
830             public ChannelOutboundBuffer outboundBuffer() {
831                 return EmbeddedUnsafe.this.outboundBuffer();
832             }
833         };
834 
835         @Override
836         public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
837             safeSetSuccess(promise);
838         }
839     }
840 
841     private final class EmbeddedChannelPipeline extends DefaultChannelPipeline {
842         EmbeddedChannelPipeline(EmbeddedChannel channel) {
843             super(channel);
844         }
845 
846         @Override
847         protected void onUnhandledInboundException(Throwable cause) {
848             recordException(cause);
849         }
850 
851         @Override
852         protected void onUnhandledInboundMessage(Object msg) {
853           handleInboundMessage(msg);
854         }
855     }
856 }