View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.embedded;
17  
18  import io.netty5.buffer.api.internal.ResourceSupport;
19  import io.netty5.buffer.api.internal.Statics;
20  import io.netty5.channel.AdaptiveRecvBufferAllocator;
21  import io.netty5.channel.ChannelShutdownDirection;
22  import io.netty5.util.Resource;
23  import io.netty5.channel.AbstractChannel;
24  import io.netty5.channel.Channel;
25  import io.netty5.channel.ChannelHandler;
26  import io.netty5.channel.ChannelHandlerContext;
27  import io.netty5.channel.ChannelId;
28  import io.netty5.channel.ChannelInitializer;
29  import io.netty5.channel.ChannelMetadata;
30  import io.netty5.channel.ChannelOutboundBuffer;
31  import io.netty5.channel.ChannelPipeline;
32  import io.netty5.channel.DefaultChannelPipeline;
33  import io.netty5.channel.EventLoop;
34  import io.netty5.util.ReferenceCountUtil;
35  import io.netty5.util.concurrent.Future;
36  import io.netty5.util.concurrent.FutureListener;
37  import io.netty5.util.internal.RecyclableArrayList;
38  import io.netty5.util.internal.logging.InternalLogger;
39  import io.netty5.util.internal.logging.InternalLoggerFactory;
40  
41  import java.net.SocketAddress;
42  import java.nio.channels.ClosedChannelException;
43  import java.util.ArrayDeque;
44  import java.util.Queue;
45  import java.util.concurrent.TimeUnit;
46  
47  import static io.netty5.util.internal.PlatformDependent.throwException;
48  import static java.util.Objects.requireNonNull;
49  
50  /**
51   * Base class for {@link Channel} implementations that are used in an embedded fashion.
52   */
53  public class EmbeddedChannel extends AbstractChannel<Channel, SocketAddress, SocketAddress> {
54  
55      private static final SocketAddress LOCAL_ADDRESS = new EmbeddedSocketAddress();
56      private static final SocketAddress REMOTE_ADDRESS = new EmbeddedSocketAddress();
57  
58      private static final ChannelHandler[] EMPTY_HANDLERS = new ChannelHandler[0];
59      private enum State { OPEN, ACTIVE, CLOSED }
60  
61      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EmbeddedChannel.class);
62  
63      private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
64      private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
65  
66      private final FutureListener<Void> recordExceptionListener = this::recordException;
67  
68      private Queue<Object> inboundMessages;
69      private Queue<Object> outboundMessages;
70      private Throwable lastException;
71      private State state;
72      private boolean inputShutdown;
73      private boolean outputShutdown;
74  
75      /**
76       * Create a new instance with an {@link EmbeddedChannelId} and an empty pipeline.
77       */
78      public EmbeddedChannel() {
79          this(EMPTY_HANDLERS);
80      }
81  
82      /**
83       * Create a new instance with the specified ID and an empty pipeline.
84       *
85       * @param channelId the {@link ChannelId} that will be used to identify this channel
86       */
87      public EmbeddedChannel(ChannelId channelId) {
88          this(channelId, EMPTY_HANDLERS);
89      }
90  
91      /**
92       * Create a new instance with the pipeline initialized with the specified handlers.
93       *
94       * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
95       */
96      public EmbeddedChannel(ChannelHandler... handlers) {
97          this(EmbeddedChannelId.INSTANCE, handlers);
98      }
99  
100     /**
101      * Create a new instance with the pipeline initialized with the specified handlers.
102      *
103      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
104      *                      to {@link #close()}, {@link false} otherwise.
105      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
106      */
107     public EmbeddedChannel(boolean hasDisconnect, ChannelHandler... handlers) {
108         this(EmbeddedChannelId.INSTANCE, hasDisconnect, handlers);
109     }
110 
111     /**
112      * Create a new instance with the pipeline initialized with the specified handlers.
113      *
114      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
115      *                 constructor. If {@code false} the user will need to call {@link #register()}.
116      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
117      *                      to {@link #close()}, {@link false} otherwise.
118      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
119      */
120     public EmbeddedChannel(boolean register, boolean hasDisconnect, ChannelHandler... handlers) {
121         this(EmbeddedChannelId.INSTANCE, register, hasDisconnect, handlers);
122     }
123 
124     /**
125      * Create a new instance with the channel ID set to the given ID and the pipeline
126      * initialized with the specified handlers.
127      *
128      * @param channelId the {@link ChannelId} that will be used to identify this channel
129      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
130      */
131     public EmbeddedChannel(ChannelId channelId, ChannelHandler... handlers) {
132         this(channelId, false, handlers);
133     }
134 
135     /**
136      * Create a new instance with the channel ID set to the given ID and the pipeline
137      * initialized with the specified handlers.
138      *
139      * @param channelId the {@link ChannelId} that will be used to identify this channel
140      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
141      *                      to {@link #close()}, {@link false} otherwise.
142      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
143      */
144     public EmbeddedChannel(ChannelId channelId, boolean hasDisconnect, ChannelHandler... handlers) {
145         this(channelId, true, hasDisconnect, handlers);
146     }
147 
148     /**
149      * Create a new instance with the channel ID set to the given ID and the pipeline
150      * initialized with the specified handlers.
151      *
152      * @param channelId the {@link ChannelId} that will be used to identify this channel
153      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
154      *                 constructor. If {@code false} the user will need to call {@link #register()}.
155      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
156      *                      to {@link #close()}, {@link false} otherwise.
157      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
158      */
159     public EmbeddedChannel(ChannelId channelId, boolean register, boolean hasDisconnect,
160                            ChannelHandler... handlers) {
161         this(null, channelId, register, hasDisconnect, handlers);
162     }
163 
164     /**
165      * Create a new instance with the channel ID set to the given ID and the pipeline
166      * initialized with the specified handlers.
167      *
168      * @param parent    the parent {@link Channel} of this {@link EmbeddedChannel}.
169      * @param channelId the {@link ChannelId} that will be used to identify this channel
170      * @param register {@code true} if this {@link Channel} is registered to the {@link EventLoop} in the
171      *                 constructor. If {@code false} the user will need to call {@link #register()}.
172      * @param hasDisconnect {@code false} if this {@link Channel} will delegate {@link #disconnect()}
173      *                      to {@link #close()}, {@link false} otherwise.
174      * @param handlers the {@link ChannelHandler}s which will be add in the {@link ChannelPipeline}
175      */
176     public EmbeddedChannel(Channel parent, ChannelId channelId, boolean register, boolean hasDisconnect,
177                            final ChannelHandler... handlers) {
178         super(parent, new EmbeddedEventLoop(), metadata(hasDisconnect), new AdaptiveRecvBufferAllocator(), channelId);
179         setup(register, handlers);
180     }
181 
182     private static ChannelMetadata metadata(boolean hasDisconnect) {
183         return hasDisconnect ? METADATA_DISCONNECT : METADATA_NO_DISCONNECT;
184     }
185 
186     private void setup(boolean register, final ChannelHandler... handlers) {
187         requireNonNull(handlers, "handlers");
188         ChannelPipeline p = pipeline();
189         p.addLast(new ChannelInitializer<>() {
190             @Override
191             protected void initChannel(Channel ch) throws Exception {
192                 ChannelPipeline pipeline = ch.pipeline();
193                 for (ChannelHandler h : handlers) {
194                     if (h == null) {
195                         break;
196                     }
197                     pipeline.addLast(h);
198                 }
199             }
200         });
201         if (register) {
202             Future<Void> future = register();
203             assert future.isDone();
204         }
205     }
206 
207     @Override
208     public Future<Void> register() {
209         Future<Void> future = super.register();
210         assert future.isDone();
211         Throwable cause = future.cause();
212         if (cause != null) {
213             throwException(cause);
214         }
215         return future;
216     }
217 
218     @Override
219     protected final DefaultChannelPipeline newChannelPipeline() {
220         return new EmbeddedChannelPipeline(this);
221     }
222 
223     @Override
224     public boolean isOpen() {
225         return state != State.CLOSED;
226     }
227 
228     @Override
229     public boolean isActive() {
230         return state == State.ACTIVE;
231     }
232 
233     /**
234      * Returns the {@link Queue} which holds all the {@link Object}s that were received by this {@link Channel}.
235      */
236     public Queue<Object> inboundMessages() {
237         if (inboundMessages == null) {
238             inboundMessages = new ArrayDeque<>();
239         }
240         return inboundMessages;
241     }
242 
243     /**
244      * @deprecated use {@link #inboundMessages()}
245      */
246     @Deprecated
247     public Queue<Object> lastInboundBuffer() {
248         return inboundMessages();
249     }
250 
251     /**
252      * Returns the {@link Queue} which holds all the {@link Object}s that were written by this {@link Channel}.
253      */
254     public Queue<Object> outboundMessages() {
255         if (outboundMessages == null) {
256             outboundMessages = new ArrayDeque<>();
257         }
258         return outboundMessages;
259     }
260 
261     /**
262      * @deprecated use {@link #outboundMessages()}
263      */
264     @Deprecated
265     public Queue<Object> lastOutboundBuffer() {
266         return outboundMessages();
267     }
268 
269     /**
270      * Return received data from this {@link Channel}
271      */
272     @SuppressWarnings("unchecked")
273     public <T> T readInbound() {
274         T message = (T) poll(inboundMessages);
275         if (message != null) {
276             Resource.touch(message, "Caller of readInbound() will handle the message from this point");
277         }
278         return message;
279     }
280 
281     /**
282      * Read data from the outbound. This may return {@code null} if nothing is readable.
283      */
284     @SuppressWarnings("unchecked")
285     public <T> T readOutbound() {
286         T message =  (T) poll(outboundMessages);
287         if (message != null) {
288             Resource.touch(message, "Caller of readOutbound() will handle the message from this point.");
289         }
290         return message;
291     }
292 
293     /**
294      * Write messages to the inbound of this {@link Channel}.
295      *
296      * @param msgs the messages to be written
297      *
298      * @return {@code true} if the write operation did add something to the inbound buffer
299      */
300     public boolean writeInbound(Object... msgs) {
301         ensureOpen();
302         if (msgs.length == 0) {
303             return isNotEmpty(inboundMessages);
304         }
305 
306         ChannelPipeline p = pipeline();
307         for (Object m: msgs) {
308             p.fireChannelRead(m);
309         }
310 
311         flushInbound(false);
312         return isNotEmpty(inboundMessages);
313     }
314 
315     /**
316      * Writes one message to the inbound of this {@link Channel} and does not flush it. This
317      * method is conceptually equivalent to {@link #write(Object)}.
318      *
319      * @see #writeOneOutbound(Object)
320      */
321     public Future<Void> writeOneInbound(Object msg) {
322         if (checkOpen(true)) {
323             pipeline().fireChannelRead(msg);
324         }
325         return checkException0();
326     }
327 
328     /**
329      * Flushes the inbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
330      *
331      * @see #flushOutbound()
332      */
333     public EmbeddedChannel flushInbound() {
334         flushInbound(true);
335         return this;
336     }
337 
338     private void flushInbound(boolean recordException) {
339       if (checkOpen(recordException)) {
340           pipeline().fireChannelReadComplete();
341           embeddedEventLoop().execute(this::readIfIsAutoRead);
342           runPendingTasks();
343       }
344       checkException();
345     }
346 
347     /**
348      * Write messages to the outbound of this {@link Channel}.
349      *
350      * @param msgs              the messages to be written
351      * @return bufferReadable   returns {@code true} if the write operation did add something to the outbound buffer
352      */
353     public boolean writeOutbound(Object... msgs) {
354         ensureOpen();
355         if (msgs.length == 0) {
356             return isNotEmpty(outboundMessages);
357         }
358 
359         RecyclableArrayList futures = RecyclableArrayList.newInstance(msgs.length);
360         try {
361             for (Object m: msgs) {
362                 if (m == null) {
363                     break;
364                 }
365                 futures.add(write(m));
366             }
367 
368             flushOutbound0();
369 
370             int size = futures.size();
371             for (int i = 0; i < size; i++) {
372                 Future<Void> future = (Future<Void>) futures.get(i);
373                 if (future.isDone()) {
374                     recordException(future);
375                 } else {
376                     // The write may be delayed to run later by runPendingTasks()
377                     future.addListener(recordExceptionListener);
378                 }
379             }
380 
381             checkException();
382             return isNotEmpty(outboundMessages);
383         } finally {
384             futures.recycle();
385         }
386     }
387 
388     /**
389      * Writes one message to the outbound of this {@link Channel} and does not flush it. This
390      * method is conceptually equivalent to {@link #write(Object)}.
391      *
392      * @see #writeOneInbound(Object)
393      */
394     public Future<Void> writeOneOutbound(Object msg) {
395         if (checkOpen(true)) {
396             return write(msg);
397         }
398         return checkException0();
399     }
400 
401     /**
402      * Flushes the outbound of this {@link Channel}. This method is conceptually equivalent to {@link #flush()}.
403      *
404      * @see #flushInbound()
405      */
406     public EmbeddedChannel flushOutbound() {
407         if (checkOpen(true)) {
408             flushOutbound0();
409         }
410         checkException();
411         return this;
412     }
413 
414     private void flushOutbound0() {
415         // We need to call runPendingTasks first as a ChannelOutboundHandler may used eventloop.execute(...) to
416         // delay the write on the next eventloop run.
417         runPendingTasks();
418 
419         flush();
420     }
421 
422     /**
423      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
424      *
425      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
426      */
427     public boolean finish() {
428         return finish(false);
429     }
430 
431     /**
432      * Mark this {@link Channel} as finished and release all pending message in the inbound and outbound buffer.
433      * Any further try to write data to it will fail.
434      *
435      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
436      */
437     public boolean finishAndReleaseAll() {
438         return finish(true);
439     }
440 
441     /**
442      * Mark this {@link Channel} as finished. Any further try to write data to it will fail.
443      *
444      * @param releaseAll if {@code true} all pending message in the inbound and outbound buffer are released.
445      * @return bufferReadable returns {@code true} if any of the used buffers has something left to read
446      */
447     private boolean finish(boolean releaseAll) {
448         close();
449         try {
450             checkException();
451             return isNotEmpty(inboundMessages) || isNotEmpty(outboundMessages);
452         } finally {
453             if (releaseAll) {
454                 releaseAll(inboundMessages);
455                 releaseAll(outboundMessages);
456             }
457         }
458     }
459 
460     /**
461      * Release all buffered inbound messages and return {@code true} if any were in the inbound buffer, {@code false}
462      * otherwise.
463      */
464     public boolean releaseInbound() {
465         return releaseAll(inboundMessages);
466     }
467 
468     /**
469      * Release all buffered outbound messages and return {@code true} if any were in the outbound buffer, {@code false}
470      * otherwise.
471      */
472     public boolean releaseOutbound() {
473         return releaseAll(outboundMessages);
474     }
475 
476     private static boolean releaseAll(Queue<Object> queue) {
477         Exception closeFailed = null;
478         if (isNotEmpty(queue)) {
479             for (;;) {
480                 Object msg = queue.poll();
481                 if (msg == null) {
482                     break;
483                 }
484                 try {
485                     Resource.dispose(msg);
486                 } catch (Exception e) {
487                     if (closeFailed == null) {
488                         closeFailed = e;
489                     } else {
490                         closeFailed.addSuppressed(e);
491                     }
492                 }
493             }
494             if (closeFailed != null) {
495                 throwException(closeFailed);
496             }
497             return true;
498         }
499         return false;
500     }
501 
502     private void finishPendingTasks(boolean cancel) {
503         runPendingTasks();
504         if (cancel) {
505             // Cancel all scheduled tasks that are left.
506             ((EmbeddedEventLoop) executor()).cancelScheduled();
507         }
508     }
509 
510     @Override
511     public final Future<Void> close() {
512         // We need to call runPendingTasks() before calling super.close() as there may be something in the queue
513         // that needs to be run before the actual close takes place.
514         runPendingTasks();
515         Future<Void> future = super.close();
516 
517         // Now finish everything else and cancel all scheduled tasks that were not ready set.
518         finishPendingTasks(true);
519         return future;
520     }
521 
522     @Override
523     public final Future<Void> disconnect() {
524         Future<Void> future = super.disconnect();
525         finishPendingTasks(!metadata().hasDisconnect());
526         return future;
527     }
528 
529     private static boolean isNotEmpty(Queue<Object> queue) {
530         return queue != null && !queue.isEmpty();
531     }
532 
533     private static Object poll(Queue<Object> queue) {
534         return queue != null ? queue.poll() : null;
535     }
536 
537     /**
538      * Run all tasks (which also includes scheduled tasks) that are pending in the {@link EventLoop}
539      * for this {@link Channel}
540      */
541     public void runPendingTasks() {
542         EmbeddedEventLoop embeddedEventLoop = (EmbeddedEventLoop) executor();
543         try {
544             embeddedEventLoop.runTasks();
545         } catch (Exception e) {
546             recordException(e);
547         }
548 
549         runScheduledPendingTasks();
550     }
551 
552     /**
553      * Run all pending scheduled tasks in the {@link EventLoop} for this {@link Channel} and return the
554      * {@code nanoseconds} when the next scheduled task is ready to run. If no other task was scheduled it will return
555      * {@code -1}.
556      */
557     public long runScheduledPendingTasks() {
558         EmbeddedEventLoop embeddedEventLoop = (EmbeddedEventLoop) executor();
559 
560         try {
561             return embeddedEventLoop.runScheduledTasks();
562         } catch (Exception e) {
563             recordException(e);
564             return embeddedEventLoop.nextScheduledTask();
565         } finally {
566             // A scheduled task may put something on the taskQueue so lets run it.
567             embeddedEventLoop.runTasks();
568         }
569     }
570 
571     /**
572      * Check whether this channel has any pending tasks that would be executed by a call to {@link #runPendingTasks()}.
573      * This includes normal tasks, and scheduled tasks where the deadline has expired. If this method returns
574      * {@code false}, a call to {@link #runPendingTasks()} would do nothing.
575      *
576      * @return {@code true} if there are any pending tasks, {@code false} otherwise.
577      */
578     public boolean hasPendingTasks() {
579         return embeddedEventLoop().hasPendingNormalTasks() ||
580                 embeddedEventLoop().nextScheduledTask() == 0;
581     }
582 
583     private void recordException(Future<?> future) {
584         if (future.isFailed()) {
585             recordException(future.cause());
586         }
587     }
588 
589     private void recordException(Throwable cause) {
590         if (lastException == null) {
591             lastException = cause;
592         } else {
593             logger.warn(
594                     "More than one exception was raised. " +
595                             "Will report only the first one and log others.", cause);
596         }
597     }
598 
599     private EmbeddedEventLoop embeddedEventLoop() {
600         return (EmbeddedEventLoop) executor();
601     }
602     /**
603      * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute
604      * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called).
605      */
606     public void advanceTimeBy(long duration, TimeUnit unit) {
607         embeddedEventLoop().advanceTimeBy(unit.toNanos(duration));
608     }
609 
610     /**
611      * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on
612      * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to
613      * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute.
614      */
615     public void freezeTime() {
616         embeddedEventLoop().freezeTime();
617     }
618 
619     /**
620      * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where
621      * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()}
622      * was called, it will run ten minutes after this method is called again (assuming no
623      * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using
624      * {@link #runScheduledPendingTasks()}).
625      */
626     public void unfreezeTime() {
627         embeddedEventLoop().unfreezeTime();
628     }
629 
630     /**
631      * Checks for the presence of an {@link Exception}.
632      */
633     private Future<Void> checkException0() {
634         try {
635             checkException();
636         } catch (Throwable cause) {
637             return newFailedFuture(cause);
638         }
639         return newSucceededFuture();
640     }
641 
642     /**
643      * Check if there was any {@link Throwable} received and if so rethrow it.
644      */
645     public void checkException() {
646         Throwable t = lastException;
647         if (t != null) {
648             lastException = null;
649 
650             throwException(t);
651         }
652     }
653 
654     /**
655      * Returns {@code true} if the {@link Channel} is open and records optionally
656      * an {@link Exception} if it isn't.
657      */
658     private boolean checkOpen(boolean recordException) {
659         if (!isOpen()) {
660           if (recordException) {
661               recordException(new ClosedChannelException());
662           }
663           return false;
664       }
665 
666       return true;
667     }
668 
669     /**
670      * Ensure the {@link Channel} is open and if not throw an exception.
671      */
672     protected final void ensureOpen() {
673         if (!checkOpen(true)) {
674             checkException();
675         }
676     }
677 
678     @Override
679     protected SocketAddress localAddress0() {
680         return isActive()? LOCAL_ADDRESS : null;
681     }
682 
683     @Override
684     protected SocketAddress remoteAddress0() {
685         return isActive()? REMOTE_ADDRESS : null;
686     }
687 
688     void setActive() {
689         state = State.ACTIVE;
690     }
691 
692     @Override
693     protected void doBind(SocketAddress localAddress) throws Exception {
694         // NOOP
695     }
696 
697     @Override
698     protected void doShutdown(ChannelShutdownDirection direction) {
699         switch (direction) {
700             case Inbound:
701                 inputShutdown = true;
702                 break;
703             case Outbound:
704                 outputShutdown = true;
705                 break;
706             default:
707                 throw new AssertionError();
708         }
709     }
710 
711     @Override
712     public boolean isShutdown(ChannelShutdownDirection direction) {
713         if (!isActive()) {
714             return true;
715         }
716         switch (direction) {
717             case Inbound:
718                 return inputShutdown;
719             case Outbound:
720                 return outputShutdown;
721             default:
722                 throw new AssertionError();
723         }
724     }
725 
726     @Override
727     protected void doDisconnect() throws Exception {
728         if (!metadata().hasDisconnect()) {
729             doClose();
730         }
731     }
732 
733     @Override
734     protected void doClose() throws Exception {
735         state = State.CLOSED;
736     }
737 
738     @Override
739     protected void doBeginRead() throws Exception {
740         // NOOP
741     }
742 
743     @Override
744     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
745         for (;;) {
746             Object msg = in.current();
747             if (msg == null) {
748                 break;
749             }
750 
751             if (msg instanceof ResourceSupport<?, ?>) {
752                 // Prevent the close in ChannelOutboundBuffer.remove() from ending the lifecycle of this message.
753                 // This allows tests to examine the message.
754                 handleOutboundMessage(Statics.acquire((ResourceSupport<?, ?>) msg));
755             } else if (msg instanceof Resource<?>) {
756                 // Resource life-cycle otherwise normally ends in ChannelOutboundBuffer.remove(), but using send()
757                 // here allows the close() in remove() to become a no-op.
758                 // Since message isn't a subclass of ResourceSupport, we can't rely on its internal reference counting.
759                 handleOutboundMessage(((Resource<?>) msg).send().receive());
760             } else {
761                 handleOutboundMessage(ReferenceCountUtil.retain(msg));
762             }
763             in.remove();
764         }
765     }
766 
767     /**
768      * Called for each outbound message.
769      *
770      * @see #doWrite(ChannelOutboundBuffer)
771      */
772     protected void handleOutboundMessage(Object msg) {
773         outboundMessages().add(msg);
774     }
775 
776     /**
777      * Called for each inbound message.
778      */
779     protected void handleInboundMessage(Object msg) {
780         inboundMessages().add(msg);
781     }
782 
783     @Override
784     protected void runAfterTransportAction() {
785         super.runAfterTransportAction();
786         if (!((EmbeddedEventLoop) executor()).running) {
787             runPendingTasks();
788         }
789     }
790 
791     @Override
792     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) {
793         return true;
794     }
795 
796     @Override
797     protected boolean doFinishConnect(SocketAddress requestedRemoteAddress) {
798         return true;
799     }
800 
801     private final class EmbeddedChannelPipeline extends DefaultAbstractChannelPipeline {
802         EmbeddedChannelPipeline(EmbeddedChannel channel) {
803             super(channel);
804         }
805 
806         @Override
807         protected void onUnhandledInboundException(Throwable cause) {
808             recordException(cause);
809         }
810 
811         @Override
812         protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
813             handleInboundMessage(msg);
814         }
815     }
816 }