1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel;
17
18 import io.netty5.buffer.api.BufferAllocator;
19 import io.netty5.buffer.api.DefaultBufferAllocators;
20 import io.netty5.util.Resource;
21 import io.netty5.util.DefaultAttributeMap;
22 import io.netty5.util.concurrent.DefaultPromise;
23 import io.netty5.util.concurrent.EventExecutor;
24 import io.netty5.util.concurrent.Future;
25 import io.netty5.util.concurrent.Promise;
26 import io.netty5.util.internal.PlatformDependent;
27 import io.netty5.util.internal.StringUtil;
28 import io.netty5.util.internal.logging.InternalLogger;
29 import io.netty5.util.internal.logging.InternalLoggerFactory;
30
31 import java.io.IOException;
32 import java.net.ConnectException;
33 import java.net.InetSocketAddress;
34 import java.net.NoRouteToHostException;
35 import java.net.SocketAddress;
36 import java.net.SocketException;
37 import java.nio.channels.AlreadyConnectedException;
38 import java.nio.channels.ClosedChannelException;
39 import java.nio.channels.ConnectionPendingException;
40 import java.nio.channels.NotYetConnectedException;
41 import java.util.Collections;
42 import java.util.IdentityHashMap;
43 import java.util.NoSuchElementException;
44 import java.util.Set;
45 import java.util.concurrent.Executor;
46 import java.util.concurrent.RejectedExecutionException;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
50
51 import static io.netty5.channel.ChannelOption.ALLOW_HALF_CLOSURE;
52 import static io.netty5.channel.ChannelOption.AUTO_CLOSE;
53 import static io.netty5.channel.ChannelOption.AUTO_READ;
54 import static io.netty5.channel.ChannelOption.BUFFER_ALLOCATOR;
55 import static io.netty5.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
56 import static io.netty5.channel.ChannelOption.MAX_MESSAGES_PER_READ;
57 import static io.netty5.channel.ChannelOption.MAX_MESSAGES_PER_WRITE;
58 import static io.netty5.channel.ChannelOption.MESSAGE_SIZE_ESTIMATOR;
59 import static io.netty5.channel.ChannelOption.RCVBUFFER_ALLOCATOR;
60 import static io.netty5.channel.ChannelOption.WRITE_BUFFER_WATER_MARK;
61 import static io.netty5.channel.ChannelOption.WRITE_SPIN_COUNT;
62 import static io.netty5.util.internal.ObjectUtil.checkPositive;
63 import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
64 import static java.util.Objects.requireNonNull;
65
66
67
68
69 public abstract class AbstractChannel<P extends Channel, L extends SocketAddress, R extends SocketAddress>
70 extends DefaultAttributeMap implements Channel {
71
72 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
73 private static final MessageSizeEstimator DEFAULT_MSG_SIZE_ESTIMATOR = DefaultMessageSizeEstimator.DEFAULT;
74
75 private static final int DEFAULT_CONNECT_TIMEOUT = 30000;
76
77 private static final Set<ChannelOption<?>> SUPPORTED_CHANNEL_OPTIONS = supportedOptions();
78
79 private final P parent;
80 private final ChannelId id;
81 private final ChannelPipeline pipeline;
82 private final ClosePromise closePromise;
83 private final Runnable fireChannelWritabilityChangedTask;
84 private final EventLoop eventLoop;
85 private final ChannelMetadata metadata;
86
87
88 private boolean strValActive;
89 private String strVal;
90
91 @SuppressWarnings("rawtypes")
92 private static final AtomicIntegerFieldUpdater<AbstractChannel> WRITABLE_UPDATER =
93 AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "writable");
94 private volatile int writable = 1;
95 private volatile ChannelOutboundBuffer outboundBuffer;
96 private volatile L localAddress;
97 private volatile R remoteAddress;
98 private volatile boolean registered;
99
100 private static final AtomicIntegerFieldUpdater<AbstractChannel> AUTOREAD_UPDATER =
101 AtomicIntegerFieldUpdater.newUpdater(AbstractChannel.class, "autoRead");
102 private static final AtomicReferenceFieldUpdater<AbstractChannel, WriteBufferWaterMark> WATERMARK_UPDATER =
103 AtomicReferenceFieldUpdater.newUpdater(
104 AbstractChannel.class, WriteBufferWaterMark.class, "writeBufferWaterMark");
105
106 private volatile BufferAllocator bufferAllocator = DefaultBufferAllocators.preferredAllocator();
107 private volatile RecvBufferAllocator rcvBufAllocator;
108 private volatile MessageSizeEstimator msgSizeEstimator = DEFAULT_MSG_SIZE_ESTIMATOR;
109
110 private volatile int connectTimeoutMillis = DEFAULT_CONNECT_TIMEOUT;
111 private volatile int writeSpinCount = 16;
112 private volatile int maxMessagesPerWrite = Integer.MAX_VALUE;
113
114 @SuppressWarnings("FieldMayBeFinal")
115 private volatile int autoRead = 1;
116 private volatile boolean autoClose = true;
117 private volatile WriteBufferWaterMark writeBufferWaterMark = WriteBufferWaterMark.DEFAULT;
118 private volatile boolean allowHalfClosure;
119
120
121 private boolean closeInitiated;
122 private Throwable initialCloseCause;
123 private boolean readBeforeActive;
124 private RecvBufferAllocator.Handle recvHandle;
125 private MessageSizeEstimator.Handle estimatorHandler;
126 private boolean inWriteFlushed;
127
128 private boolean neverRegistered = true;
129 private boolean neverActive = true;
130
131
132
133
134
135 private Promise<Void> connectPromise;
136 private Future<?> connectTimeoutFuture;
137 private R requestedRemoteAddress;
138
139
140
141
142
143
144
145
146 protected AbstractChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata) {
147 this(parent, eventLoop, metadata, new AdaptiveRecvBufferAllocator());
148 }
149
150
151
152
153
154
155
156
157
158 protected AbstractChannel(P parent, EventLoop eventLoop,
159 ChannelMetadata metadata, RecvBufferAllocator defaultRecvBufferAllocator) {
160 this(parent, eventLoop, metadata, defaultRecvBufferAllocator, DefaultChannelId.newInstance());
161 }
162
163
164
165
166
167
168
169
170
171
172 protected AbstractChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
173 RecvBufferAllocator defaultRecvBufferAllocator, ChannelId id) {
174 this.parent = parent;
175 this.eventLoop = validateEventLoopGroup(eventLoop, "eventLoop", getClass());
176 this.metadata = requireNonNull(metadata, "metadata");
177 closePromise = new ClosePromise(eventLoop);
178 outboundBuffer = new ChannelOutboundBuffer(eventLoop);
179 this.id = id;
180 pipeline = newChannelPipeline();
181 fireChannelWritabilityChangedTask = () -> pipeline().fireChannelWritabilityChanged();
182 rcvBufAllocator = validateAndConfigure(defaultRecvBufferAllocator, metadata);
183 }
184
185 private static RecvBufferAllocator validateAndConfigure(RecvBufferAllocator defaultRecvBufferAllocator,
186 ChannelMetadata metadata) {
187 requireNonNull(defaultRecvBufferAllocator, "defaultRecvBufferAllocator");
188 if (defaultRecvBufferAllocator instanceof MaxMessagesRecvBufferAllocator) {
189 ((MaxMessagesRecvBufferAllocator) defaultRecvBufferAllocator)
190 .maxMessagesPerRead(metadata.defaultMaxMessagesPerRead());
191 }
192 return defaultRecvBufferAllocator;
193 }
194
195 protected static <T extends EventLoopGroup> T validateEventLoopGroup(
196 T group, String name, Class<? extends Channel> channelType) {
197 requireNonNull(group, name);
198 if (!group.isCompatible(channelType)) {
199 throw new IllegalArgumentException(group + " does not support channel of type " +
200 StringUtil.simpleClassName(channelType));
201 }
202 return group;
203 }
204
205 @Override
206 public final ChannelId id() {
207 return id;
208 }
209
210 @Override
211 public final ChannelMetadata metadata() {
212 return metadata;
213 }
214
215
216
217
218 protected ChannelPipeline newChannelPipeline() {
219 return new DefaultAbstractChannelPipeline(this);
220 }
221
222 @Override
223 public BufferAllocator bufferAllocator() {
224 return bufferAllocator;
225 }
226
227 @Override
228 public final P parent() {
229 return parent;
230 }
231
232 @Override
233 public final ChannelPipeline pipeline() {
234 return pipeline;
235 }
236
237 @Override
238 public final EventLoop executor() {
239 return eventLoop;
240 }
241
242 @Override
243 public final L localAddress() {
244 L localAddress = this.localAddress;
245 if (localAddress == null) {
246 try {
247 this.localAddress = localAddress = localAddress0();
248 } catch (Error e) {
249 throw e;
250 } catch (Throwable t) {
251
252 return null;
253 }
254 }
255 return localAddress;
256 }
257
258 @Override
259 public final R remoteAddress() {
260 R remoteAddress = this.remoteAddress;
261 if (remoteAddress == null) {
262 try {
263 this.remoteAddress = remoteAddress = remoteAddress0();
264 } catch (Error e) {
265 throw e;
266 } catch (Throwable t) {
267
268 return null;
269 }
270 }
271 return remoteAddress;
272 }
273
274 protected final void cacheAddresses(L localAddress, R remoteAddress) {
275 this.localAddress = localAddress;
276 this.remoteAddress = remoteAddress;
277 }
278
279 @Override
280 public final boolean isRegistered() {
281 return registered;
282 }
283
284 @Override
285 public final Future<Void> closeFuture() {
286 return closePromise;
287 }
288
289 private long totalPending() {
290 ChannelOutboundBuffer buf = outboundBuffer();
291 if (buf == null) {
292 return -1;
293 }
294 return buf.totalPendingWriteBytes() + pipeline().pendingOutboundBytes();
295 }
296
297 @Override
298 public final long writableBytes() {
299 long totalPending = totalPending();
300 if (totalPending == -1) {
301
302 return 0;
303 }
304
305 long bytes = writeBufferWaterMark.high() -
306 totalPending;
307
308 if (bytes > 0) {
309 return WRITABLE_UPDATER.get(this) == 0 ? 0: bytes;
310 }
311 return 0;
312 }
313
314
315
316
317 @Override
318 public final int hashCode() {
319 return id.hashCode();
320 }
321
322
323
324
325
326 @Override
327 public final boolean equals(Object o) {
328 return this == o;
329 }
330
331 @Override
332 public final int compareTo(Channel o) {
333 if (this == o) {
334 return 0;
335 }
336
337 return id().compareTo(o.id());
338 }
339
340
341
342
343
344
345
346 @Override
347 public String toString() {
348 boolean active = isActive();
349 if (strValActive == active && strVal != null) {
350 return strVal;
351 }
352
353 SocketAddress remoteAddr = remoteAddress();
354 SocketAddress localAddr = localAddress();
355 if (remoteAddr != null) {
356 StringBuilder buf = new StringBuilder(96)
357 .append("[id: 0x")
358 .append(id.asShortText())
359 .append(", L:")
360 .append(localAddr)
361 .append(active? " - " : " ! ")
362 .append("R:")
363 .append(remoteAddr)
364 .append(']');
365 strVal = buf.toString();
366 } else if (localAddr != null) {
367 StringBuilder buf = new StringBuilder(64)
368 .append("[id: 0x")
369 .append(id.asShortText())
370 .append(", L:")
371 .append(localAddr)
372 .append(']');
373 strVal = buf.toString();
374 } else {
375 StringBuilder buf = new StringBuilder(16)
376 .append("[id: 0x")
377 .append(id.asShortText())
378 .append(']');
379 strVal = buf.toString();
380 }
381
382 strValActive = active;
383 return strVal;
384 }
385
386 protected final void readIfIsAutoRead() {
387 assertEventLoop();
388
389 if (isAutoRead() || readBeforeActive) {
390 readBeforeActive = false;
391 read();
392 }
393 }
394
395 protected final void assertEventLoop() {
396 assert eventLoop.inEventLoop();
397 }
398
399 protected RecvBufferAllocator.Handle recvBufAllocHandle() {
400 assertEventLoop();
401
402 if (recvHandle == null) {
403 recvHandle = getRecvBufferAllocator().newHandle();
404 }
405 return recvHandle;
406 }
407
408 private void registerTransport(final Promise<Void> promise) {
409 assertEventLoop();
410
411 if (isRegistered()) {
412 promise.setFailure(new IllegalStateException("registered to an event loop already"));
413 return;
414 }
415
416 try {
417
418
419 if (!promise.setUncancellable() || !ensureOpen(promise)) {
420 return;
421 }
422 boolean firstRegistration = neverRegistered;
423 executor().registerForIo(this).addListener(f -> {
424 if (f.isSuccess()) {
425
426 neverRegistered = false;
427 registered = true;
428
429 safeSetSuccess(promise);
430 pipeline.fireChannelRegistered();
431
432
433 if (isActive()) {
434 if (firstRegistration) {
435 fireChannelActiveIfNotActiveBefore();
436 }
437 readIfIsAutoRead();
438 }
439 } else {
440
441 closeNowAndFail(promise, f.cause());
442 }
443 });
444
445 } catch (Throwable t) {
446
447 closeNowAndFail(promise, t);
448 }
449 }
450
451
452
453
454
455
456 protected final boolean fireChannelActiveIfNotActiveBefore() {
457 assertEventLoop();
458
459 if (neverActive) {
460 neverActive = false;
461 pipeline().fireChannelActive();
462 return true;
463 }
464 return false;
465 }
466
467 private void closeNowAndFail(Promise<Void> promise, Throwable cause) {
468 closeForciblyTransport();
469 closePromise.setClosed();
470 safeSetFailure(promise, cause);
471 }
472
473 private void bindTransport(final SocketAddress localAddress, final Promise<Void> promise) {
474 assertEventLoop();
475
476 if (!promise.setUncancellable() || !ensureOpen(promise)) {
477 return;
478 }
479
480
481 if (localAddress instanceof InetSocketAddress && isOptionSupported(ChannelOption.SO_BROADCAST) &&
482 Boolean.TRUE.equals(getOption(ChannelOption.SO_BROADCAST)) &&
483 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
484 !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
485
486
487 logger.warn(
488 "A non-root user can't receive a broadcast packet if the socket " +
489 "is not bound to a wildcard address; binding to a non-wildcard " +
490 "address (" + localAddress + ") anyway as requested.");
491 }
492
493 boolean wasActive = isActive();
494 try {
495 doBind(localAddress);
496 } catch (Throwable t) {
497 safeSetFailure(promise, t);
498 closeIfClosed();
499 return;
500 }
501
502 if (!wasActive && isActive()) {
503 invokeLater(() -> {
504 if (fireChannelActiveIfNotActiveBefore()) {
505 readIfIsAutoRead();
506 }
507 });
508 }
509
510 safeSetSuccess(promise);
511 }
512
513 private void disconnectTransport(final Promise<Void> promise) {
514 assertEventLoop();
515
516 if (!promise.setUncancellable()) {
517 return;
518 }
519
520 boolean wasActive = isActive();
521 try {
522 doDisconnect();
523
524 remoteAddress = null;
525 localAddress = null;
526 neverActive = true;
527 } catch (Throwable t) {
528 safeSetFailure(promise, t);
529 closeIfClosed();
530 return;
531 }
532
533 if (wasActive && !isActive()) {
534 invokeLater(pipeline::fireChannelInactive);
535 }
536
537 safeSetSuccess(promise);
538 closeIfClosed();
539 }
540
541 protected void closeTransport(final Promise<Void> promise) {
542 assertEventLoop();
543
544 ClosedChannelException closedChannelException =
545 StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(Promise)");
546 close(promise, closedChannelException, closedChannelException);
547 }
548
549 private void updateWritabilityIfNeeded(boolean notify, boolean notifyLater) {
550 long totalPending = totalPending();
551
552 if (totalPending > writeBufferWaterMark.high()) {
553 if (WRITABLE_UPDATER.compareAndSet(this, 1, 0)) {
554 fireChannelWritabilityChangedIfNeeded(notify, notifyLater);
555 }
556 } else if (totalPending < writeBufferWaterMark.low()) {
557 if (WRITABLE_UPDATER.compareAndSet(this, 0, 1)) {
558 fireChannelWritabilityChangedIfNeeded(notify, notifyLater);
559 }
560 }
561 }
562
563 private void fireChannelWritabilityChangedIfNeeded(boolean notify, boolean notifyLater) {
564 if (!notify) {
565 return;
566 }
567 if (notifyLater) {
568 executor().execute(fireChannelWritabilityChangedTask);
569 } else {
570 pipeline().fireChannelWritabilityChanged();
571 }
572 }
573
574
575
576
577
578
579 private boolean shutdownOutput(final Promise<Void> promise, Throwable cause) {
580 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
581 if (outboundBuffer == null) {
582 promise.setFailure(new ClosedChannelException());
583 return false;
584 }
585 this.outboundBuffer = null;
586
587 final Throwable shutdownCause = cause == null ?
588 new ChannelOutputShutdownException("Channel output shutdown") :
589 new ChannelOutputShutdownException("Channel output shutdown", cause);
590
591
592
593
594 try {
595
596
597 doShutdown(ChannelShutdownDirection.Outbound);
598 promise.setSuccess(null);
599 } catch (Throwable err) {
600 promise.setFailure(err);
601 } finally {
602 outboundBuffer.failFlushedAndClose(shutdownCause, shutdownCause);
603 }
604 return true;
605 }
606
607 private void close(final Promise<Void> promise, final Throwable cause,
608 final ClosedChannelException closeCause) {
609 if (!promise.setUncancellable()) {
610 return;
611 }
612
613 if (closeInitiated) {
614 if (closePromise.isDone()) {
615
616 safeSetSuccess(promise);
617 } else {
618
619 closePromise.addListener(promise, (p, future) -> p.setSuccess(null));
620 }
621 return;
622 }
623
624 closeInitiated = true;
625
626 final boolean wasActive = isActive();
627 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
628 this.outboundBuffer = null;
629 Future<Executor> closeExecutorFuture = prepareToClose();
630 if (closeExecutorFuture != null) {
631 closeExecutorFuture.addListener(f -> {
632 if (f.isFailed()) {
633 logger.warn("We couldnt obtain the closeExecutor", f.cause());
634 closeNow(outboundBuffer, wasActive, promise, cause, closeCause);
635 } else {
636 Executor closeExecutor = f.getNow();
637 closeExecutor.execute(() -> {
638 try {
639
640 doClose0(promise);
641 } finally {
642
643 invokeLater(() -> {
644 closeAndUpdateWritability(outboundBuffer, cause, closeCause);
645 fireChannelInactiveAndDeregister(wasActive);
646 });
647 }
648 });
649 }
650 });
651 } else {
652 closeNow(outboundBuffer, wasActive, promise, cause, closeCause);
653 }
654 }
655
656 private void closeNow(ChannelOutboundBuffer outboundBuffer, boolean wasActive, Promise<Void> promise,
657 Throwable cause, ClosedChannelException closeCause) {
658 try {
659
660 doClose0(promise);
661 } finally {
662 closeAndUpdateWritability(outboundBuffer, cause, closeCause);
663 }
664 if (inWriteFlushed) {
665 invokeLater(() -> fireChannelInactiveAndDeregister(wasActive));
666 } else {
667 fireChannelInactiveAndDeregister(wasActive);
668 }
669 }
670
671 private void closeAndUpdateWritability(
672 ChannelOutboundBuffer outboundBuffer, Throwable cause, Throwable closeCause) {
673 if (outboundBuffer != null) {
674
675 outboundBuffer.failFlushedAndClose(cause, closeCause);
676 updateWritabilityIfNeeded(false, false);
677 }
678 }
679
680 private void doClose0(Promise<Void> promise) {
681 try {
682 cancelConnect();
683 doClose();
684 closePromise.setClosed();
685 safeSetSuccess(promise);
686 } catch (Throwable t) {
687 closePromise.setClosed();
688 safeSetFailure(promise, t);
689 }
690 }
691
692 private void fireChannelInactiveAndDeregister(final boolean wasActive) {
693 deregister(newPromise(), wasActive && !isActive());
694 }
695
696 protected final void closeForciblyTransport() {
697 assertEventLoop();
698
699 try {
700 cancelConnect();
701 doClose();
702 } catch (Exception e) {
703 logger.warn("Failed to close a channel.", e);
704 }
705 }
706
707 private void cancelConnect() {
708 Promise<Void> promise = connectPromise;
709 if (promise != null) {
710
711 promise.tryFailure(new ClosedChannelException());
712 connectPromise = null;
713 }
714
715 Future<?> future = connectTimeoutFuture;
716 if (future != null) {
717 future.cancel();
718 connectTimeoutFuture = null;
719 }
720 }
721
722 protected final void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise) {
723 assertEventLoop();
724
725 if (!promise.setUncancellable()) {
726 return;
727 }
728 if (!isActive()) {
729 if (isOpen()) {
730 promise.setFailure(new NotYetConnectedException());
731 } else {
732 promise.setFailure(new ClosedChannelException());
733 }
734 return;
735 }
736 if (isShutdown(direction)) {
737
738 promise.setSuccess(null);
739 return;
740 }
741 boolean fireEvent = false;
742 switch (direction) {
743 case Outbound:
744 fireEvent = shutdownOutput(promise, null);
745 break;
746 case Inbound:
747 try {
748 doShutdown(direction);
749 promise.setSuccess(null);
750 fireEvent = true;
751 } catch (Throwable cause) {
752 promise.setFailure(cause);
753 }
754 break;
755 default:
756
757 promise.setFailure(new AssertionError());
758 break;
759 }
760 if (fireEvent) {
761 pipeline().fireChannelShutdown(direction);
762 }
763 }
764
765 protected void deregisterTransport(final Promise<Void> promise) {
766 assertEventLoop();
767
768 deregister(promise, false);
769 }
770
771 private void deregister(final Promise<Void> promise, final boolean fireChannelInactive) {
772 if (!promise.setUncancellable()) {
773 return;
774 }
775
776 if (!registered) {
777 safeSetSuccess(promise);
778 return;
779 }
780
781
782
783
784
785
786
787
788
789
790 invokeLater(() -> {
791 try {
792 eventLoop.deregisterForIo(this).addListener(f -> {
793 if (f.isFailed()) {
794 logger.warn("Unexpected exception occurred while deregistering a channel.", f.cause());
795 }
796 deregisterDone(fireChannelInactive, promise);
797 });
798 } catch (Throwable t) {
799 logger.warn("Unexpected exception occurred while deregistering a channel.", t);
800 deregisterDone(fireChannelInactive, promise);
801 }
802 });
803 }
804
805 private void deregisterDone(boolean fireChannelInactive, Promise<Void> promise) {
806 if (fireChannelInactive) {
807 pipeline.fireChannelInactive();
808 }
809
810
811
812
813 if (registered) {
814 registered = false;
815 pipeline.fireChannelUnregistered();
816
817 if (!isOpen()) {
818
819
820 while (!pipeline.isEmpty()) {
821 try {
822 pipeline.removeLast();
823 } catch (NoSuchElementException ignore) {
824
825
826 }
827 }
828 }
829 }
830 safeSetSuccess(promise);
831 }
832
833 private void readTransport() {
834 assertEventLoop();
835
836 if (!isActive()) {
837 readBeforeActive = true;
838 return;
839 }
840
841 if (isShutdown(ChannelShutdownDirection.Inbound)) {
842
843 return;
844 }
845 try {
846 doBeginRead();
847 } catch (final Exception e) {
848 invokeLater(() -> pipeline.fireChannelExceptionCaught(e));
849 closeTransport(newPromise());
850 }
851 }
852
853 private void writeTransport(Object msg, Promise<Void> promise) {
854 assertEventLoop();
855
856 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
857 if (outboundBuffer == null) {
858 try {
859
860 Resource.dispose(msg);
861 } finally {
862
863
864
865
866 final Throwable cause;
867 if (!isActive()) {
868 cause = newClosedChannelException(initialCloseCause, "write(Object, Promise)");
869 } else {
870 cause = ChannelOutputShutdownException.newInstance(AbstractChannel.class,
871 "writeTransport(Object, Promise)");
872 }
873 safeSetFailure(promise, cause);
874 }
875 return;
876 }
877
878 int size;
879 try {
880 msg = filterOutboundMessage(msg);
881 if (estimatorHandler == null) {
882 estimatorHandler = getMessageSizeEstimator().newHandle();
883 }
884 size = estimatorHandler.size(msg);
885 if (size < 0) {
886 size = 0;
887 }
888 } catch (Throwable t) {
889 try {
890 Resource.dispose(msg);
891 } catch (Throwable inner) {
892 t.addSuppressed(inner);
893 } finally {
894 safeSetFailure(promise, t);
895 }
896 return;
897 }
898
899 outboundBuffer.addMessage(msg, size, promise);
900 updateWritabilityIfNeeded(true, false);
901 }
902
903 private void flushTransport() {
904 assertEventLoop();
905
906 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
907 if (outboundBuffer == null) {
908 return;
909 }
910
911 outboundBuffer.addFlush();
912 writeFlushed();
913 }
914
915
916
917
918 protected void writeFlushed() {
919 assertEventLoop();
920
921 if (inWriteFlushed) {
922
923 return;
924 }
925
926 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
927 if (outboundBuffer == null || outboundBuffer.isEmpty()) {
928 return;
929 }
930
931 inWriteFlushed = true;
932
933
934 if (!isActive()) {
935 try {
936
937 if (!outboundBuffer.isEmpty()) {
938 if (isOpen()) {
939 outboundBuffer.failFlushed(new NotYetConnectedException());
940 updateWritabilityIfNeeded(true, true);
941 } else {
942
943 outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "writeFlushed()"));
944 }
945 }
946 } finally {
947 inWriteFlushed = false;
948 }
949 return;
950 }
951
952 try {
953 doWrite(outboundBuffer);
954 } catch (Throwable t) {
955 handleWriteError(t);
956 } finally {
957
958
959 updateWritabilityIfNeeded(true, true);
960 inWriteFlushed = false;
961 }
962 }
963
964 protected final void handleWriteError(Throwable t) {
965 assertEventLoop();
966
967 if (t instanceof IOException && isAutoClose()) {
968
969
970
971
972
973
974
975
976
977 initialCloseCause = t;
978 close(newPromise(), t, newClosedChannelException(t, "writeFlushed()"));
979 } else {
980 try {
981 if (shutdownOutput(newPromise(), t)) {
982 pipeline().fireChannelShutdown(ChannelShutdownDirection.Outbound);
983 }
984 } catch (Throwable t2) {
985 initialCloseCause = t;
986 close(newPromise(), t2, newClosedChannelException(t, "writeFlushed()"));
987 }
988 }
989 }
990
991 private ClosedChannelException newClosedChannelException(Throwable cause, String method) {
992 ClosedChannelException exception =
993 StacklessClosedChannelException.newInstance(AbstractChannel.class, method);
994 if (cause != null) {
995 exception.initCause(cause);
996 }
997 return exception;
998 }
999
1000 private void sendOutboundEventTransport(Object event, Promise<Void> promise) {
1001 Resource.dispose(event);
1002 promise.setSuccess(null);
1003 }
1004
1005 protected final boolean ensureOpen(Promise<Void> promise) {
1006 if (isOpen()) {
1007 return true;
1008 }
1009
1010 safeSetFailure(promise, newClosedChannelException(initialCloseCause, "ensureOpen(Promise)"));
1011 return false;
1012 }
1013
1014
1015
1016
1017 protected final void safeSetSuccess(Promise<Void> promise) {
1018 if (!promise.trySuccess(null)) {
1019 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
1020 }
1021 }
1022
1023
1024
1025
1026 protected final void safeSetFailure(Promise<Void> promise, Throwable cause) {
1027 if (!promise.tryFailure(cause)) {
1028 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
1029 }
1030 }
1031
1032 protected final void closeIfClosed() {
1033 assertEventLoop();
1034
1035 if (isOpen()) {
1036 return;
1037 }
1038 closeTransport(newPromise());
1039 }
1040
1041 private void invokeLater(Runnable task) {
1042 try {
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054 executor().execute(task);
1055 } catch (RejectedExecutionException e) {
1056 logger.warn("Can't invoke task later as EventLoop rejected it", e);
1057 }
1058 }
1059
1060
1061
1062
1063 protected static Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
1064 if (cause instanceof ConnectException) {
1065 return new AnnotatedConnectException((ConnectException) cause, remoteAddress);
1066 }
1067 if (cause instanceof NoRouteToHostException) {
1068 return new AnnotatedNoRouteToHostException((NoRouteToHostException) cause, remoteAddress);
1069 }
1070 if (cause instanceof SocketException) {
1071 return new AnnotatedSocketException((SocketException) cause, remoteAddress);
1072 }
1073
1074 return cause;
1075 }
1076
1077
1078
1079
1080
1081
1082
1083 protected Future<Executor> prepareToClose() {
1084 return null;
1085 }
1086
1087
1088
1089
1090
1091
1092
1093 protected final ChannelOutboundBuffer outboundBuffer() {
1094 return outboundBuffer;
1095 }
1096
1097
1098
1099
1100 protected abstract L localAddress0();
1101
1102
1103
1104
1105 protected abstract R remoteAddress0();
1106
1107
1108
1109
1110 protected abstract void doBind(SocketAddress localAddress) throws Exception;
1111
1112
1113
1114
1115 protected abstract void doDisconnect() throws Exception;
1116
1117
1118
1119
1120 protected abstract void doClose() throws Exception;
1121
1122
1123
1124
1125
1126
1127
1128 protected abstract void doShutdown(ChannelShutdownDirection direction) throws Exception;
1129
1130
1131
1132
1133 protected abstract void doBeginRead() throws Exception;
1134
1135
1136
1137
1138 protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149 protected abstract boolean doConnect(
1150 SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160 protected abstract boolean doFinishConnect(R requestedRemoteAddress) throws Exception;
1161
1162
1163
1164
1165
1166
1167 protected final boolean isConnectPending() {
1168 assertEventLoop();
1169 return connectPromise != null;
1170 }
1171
1172 @SuppressWarnings("unchecked")
1173 private void connectTransport(
1174 SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
1175 assertEventLoop();
1176 if (!promise.setUncancellable() || !ensureOpen(promise)) {
1177 return;
1178 }
1179
1180 try {
1181 if (connectPromise != null) {
1182 throw new ConnectionPendingException();
1183 }
1184
1185 if (remoteAddress() != null) {
1186
1187 throw new AlreadyConnectedException();
1188 }
1189
1190 boolean wasActive = isActive();
1191 if (doConnect(remoteAddress, localAddress)) {
1192 fulfillConnectPromise(promise, wasActive);
1193 } else {
1194 connectPromise = promise;
1195 requestedRemoteAddress = (R) remoteAddress;
1196
1197
1198 int connectTimeoutMillis = getConnectTimeoutMillis();
1199 if (connectTimeoutMillis > 0) {
1200 connectTimeoutFuture = executor().schedule(() -> {
1201 Promise<Void> connectPromise = this.connectPromise;
1202 if (connectPromise != null && !connectPromise.isDone()
1203 && connectPromise.tryFailure(new ConnectTimeoutException(
1204 "connection timed out: " + remoteAddress))) {
1205 closeTransport(newPromise());
1206 }
1207 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
1208 }
1209
1210 promise.asFuture().addListener(future -> {
1211 if (future.isCancelled()) {
1212 if (connectTimeoutFuture != null) {
1213 connectTimeoutFuture.cancel();
1214 }
1215 connectPromise = null;
1216 closeTransport(newPromise());
1217 }
1218 });
1219 }
1220 } catch (Throwable t) {
1221 closeIfClosed();
1222 promise.tryFailure(annotateConnectException(t, remoteAddress));
1223 }
1224 }
1225
1226 private void fulfillConnectPromise(Promise<Void> promise, boolean wasActive) {
1227 if (promise == null) {
1228
1229 return;
1230 }
1231
1232
1233
1234 boolean active = isActive();
1235
1236
1237 boolean promiseSet = promise.trySuccess(null);
1238
1239
1240
1241 if (!wasActive && active) {
1242 if (fireChannelActiveIfNotActiveBefore()) {
1243 readIfIsAutoRead();
1244 }
1245 }
1246
1247
1248 if (!promiseSet) {
1249 closeTransport(newPromise());
1250 }
1251 }
1252
1253 private void fulfillConnectPromise(Promise<Void> promise, Throwable cause) {
1254 if (promise == null) {
1255
1256 return;
1257 }
1258
1259
1260 promise.tryFailure(cause);
1261 closeIfClosed();
1262 }
1263
1264
1265
1266
1267 protected final void finishConnect() {
1268
1269
1270 assertEventLoop();
1271
1272 boolean connectStillInProgress = false;
1273 try {
1274 boolean wasActive = isActive();
1275 if (!doFinishConnect(requestedRemoteAddress)) {
1276 connectStillInProgress = true;
1277 return;
1278 }
1279 requestedRemoteAddress = null;
1280 fulfillConnectPromise(connectPromise, wasActive);
1281 } catch (Throwable t) {
1282 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
1283 } finally {
1284 if (!connectStillInProgress) {
1285
1286
1287 if (connectTimeoutFuture != null) {
1288 connectTimeoutFuture.cancel();
1289 }
1290 connectPromise = null;
1291 }
1292 }
1293 }
1294
1295
1296
1297
1298
1299 protected Object filterOutboundMessage(Object msg) throws Exception {
1300 return msg;
1301 }
1302
1303 protected static void validateFileRegion(DefaultFileRegion region, long position) throws IOException {
1304 DefaultFileRegion.validate(region, position);
1305 }
1306
1307 @Override
1308 @SuppressWarnings({ "unchecked", "deprecation" })
1309 public final <T> T getOption(ChannelOption<T> option) {
1310 requireNonNull(option, "option");
1311 if (option == AUTO_READ) {
1312 return (T) Boolean.valueOf(isAutoRead());
1313 }
1314 if (option == WRITE_BUFFER_WATER_MARK) {
1315 return (T) getWriteBufferWaterMark();
1316 }
1317 if (option == CONNECT_TIMEOUT_MILLIS) {
1318 return (T) Integer.valueOf(getConnectTimeoutMillis());
1319 }
1320 if (option == MAX_MESSAGES_PER_READ) {
1321 return (T) Integer.valueOf(getMaxMessagesPerRead());
1322 }
1323 if (option == WRITE_SPIN_COUNT) {
1324 return (T) Integer.valueOf(getWriteSpinCount());
1325 }
1326 if (option == BUFFER_ALLOCATOR) {
1327 return (T) getBufferAllocator();
1328 }
1329 if (option == RCVBUFFER_ALLOCATOR) {
1330 return getRecvBufferAllocator();
1331 }
1332 if (option == AUTO_CLOSE) {
1333 return (T) Boolean.valueOf(isAutoClose());
1334 }
1335 if (option == MESSAGE_SIZE_ESTIMATOR) {
1336 return (T) getMessageSizeEstimator();
1337 }
1338 if (option == MAX_MESSAGES_PER_WRITE) {
1339 return (T) Integer.valueOf(getMaxMessagesPerWrite());
1340 }
1341 if (option == ALLOW_HALF_CLOSURE) {
1342 return (T) Boolean.valueOf(isAllowHalfClosure());
1343 }
1344
1345 return getExtendedOption(option);
1346 }
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357 protected <T> T getExtendedOption(ChannelOption<T> option) {
1358 throw new UnsupportedOperationException("ChannelOption not supported: " + option);
1359 }
1360
1361 @Override
1362 @SuppressWarnings("deprecation")
1363 public final <T> Channel setOption(ChannelOption<T> option, T value) {
1364 validate(option, value);
1365
1366 if (option == AUTO_READ) {
1367 setAutoRead((Boolean) value);
1368 } else if (option == WRITE_BUFFER_WATER_MARK) {
1369 setWriteBufferWaterMark((WriteBufferWaterMark) value);
1370 } else if (option == CONNECT_TIMEOUT_MILLIS) {
1371 setConnectTimeoutMillis((Integer) value);
1372 } else if (option == MAX_MESSAGES_PER_READ) {
1373 setMaxMessagesPerRead((Integer) value);
1374 } else if (option == WRITE_SPIN_COUNT) {
1375 setWriteSpinCount((Integer) value);
1376 } else if (option == BUFFER_ALLOCATOR) {
1377 setBufferAllocator((BufferAllocator) value);
1378 } else if (option == RCVBUFFER_ALLOCATOR) {
1379 setRecvBufferAllocator((RecvBufferAllocator) value);
1380 } else if (option == AUTO_CLOSE) {
1381 setAutoClose((Boolean) value);
1382 } else if (option == MESSAGE_SIZE_ESTIMATOR) {
1383 setMessageSizeEstimator((MessageSizeEstimator) value);
1384 } else if (option == MAX_MESSAGES_PER_WRITE) {
1385 setMaxMessagesPerWrite((Integer) value);
1386 } else if (option == ALLOW_HALF_CLOSURE) {
1387 setAllowHalfClosure((Boolean) value);
1388 } else {
1389 setExtendedOption(option, value);
1390 }
1391
1392 return this;
1393 }
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403 protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
1404 throw new UnsupportedOperationException("ChannelOption not supported: " + option);
1405 }
1406
1407 @Override
1408 public final boolean isOptionSupported(ChannelOption<?> option) {
1409 if (SUPPORTED_CHANNEL_OPTIONS.contains(option)) {
1410 return true;
1411 }
1412 return isExtendedOptionSupported(option);
1413 }
1414
1415
1416
1417
1418
1419
1420
1421
1422 protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
1423 return false;
1424 }
1425
1426 private static Set<ChannelOption<?>> supportedOptions() {
1427 return newSupportedIdentityOptionsSet(
1428 AUTO_READ, WRITE_BUFFER_WATER_MARK, CONNECT_TIMEOUT_MILLIS, MAX_MESSAGES_PER_READ,
1429 WRITE_SPIN_COUNT, BUFFER_ALLOCATOR, RCVBUFFER_ALLOCATOR, AUTO_CLOSE, MESSAGE_SIZE_ESTIMATOR,
1430 MAX_MESSAGES_PER_WRITE, ALLOW_HALF_CLOSURE);
1431 }
1432
1433 protected static Set<ChannelOption<?>> newSupportedIdentityOptionsSet(ChannelOption<?>... options) {
1434 Set<ChannelOption<?>> supportedOptionsSet = Collections.newSetFromMap(new IdentityHashMap<>());
1435 Collections.addAll(supportedOptionsSet, options);
1436 return Collections.unmodifiableSet(supportedOptionsSet);
1437 }
1438
1439 protected <T> void validate(ChannelOption<T> option, T value) {
1440 requireNonNull(option, "option");
1441 option.validate(value);
1442 }
1443
1444 private int getConnectTimeoutMillis() {
1445 return connectTimeoutMillis;
1446 }
1447
1448 private void setConnectTimeoutMillis(int connectTimeoutMillis) {
1449 checkPositiveOrZero(connectTimeoutMillis, "connectTimeoutMillis");
1450 this.connectTimeoutMillis = connectTimeoutMillis;
1451 }
1452
1453
1454
1455
1456
1457
1458 @Deprecated
1459 private int getMaxMessagesPerRead() {
1460 try {
1461 MaxMessagesRecvBufferAllocator allocator = getRecvBufferAllocator();
1462 return allocator.maxMessagesPerRead();
1463 } catch (ClassCastException e) {
1464 throw new IllegalStateException("getRecvBufferAllocator() must return an object of type " +
1465 "MaxMessagesRecvBufferAllocator", e);
1466 }
1467 }
1468
1469
1470
1471
1472
1473
1474 @Deprecated
1475 private void setMaxMessagesPerRead(int maxMessagesPerRead) {
1476 try {
1477 MaxMessagesRecvBufferAllocator allocator = getRecvBufferAllocator();
1478 allocator.maxMessagesPerRead(maxMessagesPerRead);
1479 } catch (ClassCastException e) {
1480 throw new IllegalStateException("getRecvBufferAllocator() must return an object of type " +
1481 "MaxMessagesRecvBufferAllocator", e);
1482 }
1483 }
1484
1485
1486
1487
1488
1489 protected final int getMaxMessagesPerWrite() {
1490 return maxMessagesPerWrite;
1491 }
1492
1493
1494
1495
1496
1497 private void setMaxMessagesPerWrite(int maxMessagesPerWrite) {
1498 this.maxMessagesPerWrite = checkPositive(maxMessagesPerWrite, "maxMessagesPerWrite");
1499 }
1500
1501 protected final int getWriteSpinCount() {
1502 return writeSpinCount;
1503 }
1504
1505 private void setWriteSpinCount(int writeSpinCount) {
1506 checkPositive(writeSpinCount, "writeSpinCount");
1507
1508
1509
1510
1511 if (writeSpinCount == Integer.MAX_VALUE) {
1512 --writeSpinCount;
1513 }
1514 this.writeSpinCount = writeSpinCount;
1515 }
1516
1517 private BufferAllocator getBufferAllocator() {
1518 return bufferAllocator;
1519 }
1520
1521 public void setBufferAllocator(BufferAllocator bufferAllocator) {
1522 requireNonNull(bufferAllocator, "bufferAllocator");
1523 this.bufferAllocator = bufferAllocator;
1524 }
1525
1526 @SuppressWarnings("unchecked")
1527 private <T extends RecvBufferAllocator> T getRecvBufferAllocator() {
1528 return (T) rcvBufAllocator;
1529 }
1530
1531 private void setRecvBufferAllocator(RecvBufferAllocator allocator) {
1532 rcvBufAllocator = requireNonNull(allocator, "allocator");
1533 }
1534
1535 protected final boolean isAutoRead() {
1536 return autoRead == 1;
1537 }
1538
1539 private void setAutoRead(boolean autoRead) {
1540 boolean oldAutoRead = AUTOREAD_UPDATER.getAndSet(this, autoRead ? 1 : 0) == 1;
1541 if (autoRead && !oldAutoRead) {
1542 read();
1543 } else if (!autoRead && oldAutoRead) {
1544 autoReadCleared();
1545 }
1546 }
1547
1548
1549
1550
1551
1552 protected void autoReadCleared() { }
1553
1554 private boolean isAutoClose() {
1555 return autoClose;
1556 }
1557
1558 private void setAutoClose(boolean autoClose) {
1559 this.autoClose = autoClose;
1560 }
1561
1562 private void setWriteBufferWaterMark(WriteBufferWaterMark writeBufferWaterMark) {
1563 this.writeBufferWaterMark = requireNonNull(writeBufferWaterMark, "writeBufferWaterMark");
1564 }
1565
1566 private WriteBufferWaterMark getWriteBufferWaterMark() {
1567 return writeBufferWaterMark;
1568 }
1569
1570 private MessageSizeEstimator getMessageSizeEstimator() {
1571 return msgSizeEstimator;
1572 }
1573
1574 private void setMessageSizeEstimator(MessageSizeEstimator estimator) {
1575 requireNonNull(estimator, "estimator");
1576 msgSizeEstimator = estimator;
1577 }
1578
1579 protected final boolean isAllowHalfClosure() {
1580 return allowHalfClosure;
1581 }
1582
1583 private void setAllowHalfClosure(boolean allowHalfClosure) {
1584 this.allowHalfClosure = allowHalfClosure;
1585 }
1586
1587 private static final class ClosePromise extends DefaultPromise<Void> {
1588
1589 ClosePromise(EventExecutor eventExecutor) {
1590 super(eventExecutor);
1591 }
1592
1593 @Override
1594 public Promise<Void> setSuccess(Void result) {
1595 throw new IllegalStateException();
1596 }
1597
1598 @Override
1599 public Promise<Void> setFailure(Throwable cause) {
1600 throw new IllegalStateException();
1601 }
1602
1603 @Override
1604 public boolean trySuccess(Void result) {
1605 throw new IllegalStateException();
1606 }
1607
1608 @Override
1609 public boolean tryFailure(Throwable cause) {
1610 throw new IllegalStateException();
1611 }
1612
1613 @Override
1614 public boolean setUncancellable() {
1615 return false;
1616 }
1617
1618 void setClosed() {
1619 super.trySuccess(null);
1620 }
1621 }
1622
1623 private static final class AnnotatedConnectException extends ConnectException {
1624
1625 private static final long serialVersionUID = 3901958112696433556L;
1626
1627 AnnotatedConnectException(ConnectException exception, SocketAddress remoteAddress) {
1628 super(exception.getMessage() + ": " + remoteAddress);
1629 initCause(exception);
1630 }
1631
1632
1633 @Override
1634 public Throwable fillInStackTrace() {
1635 return this;
1636 }
1637 }
1638
1639 private static final class AnnotatedNoRouteToHostException extends NoRouteToHostException {
1640
1641 private static final long serialVersionUID = -6801433937592080623L;
1642
1643 AnnotatedNoRouteToHostException(NoRouteToHostException exception, SocketAddress remoteAddress) {
1644 super(exception.getMessage() + ": " + remoteAddress);
1645 initCause(exception);
1646 }
1647
1648
1649 @Override
1650 public Throwable fillInStackTrace() {
1651 return this;
1652 }
1653 }
1654
1655 private static final class AnnotatedSocketException extends SocketException {
1656
1657 private static final long serialVersionUID = 3896743275010454039L;
1658
1659 AnnotatedSocketException(SocketException exception, SocketAddress remoteAddress) {
1660 super(exception.getMessage() + ": " + remoteAddress);
1661 initCause(exception);
1662 }
1663
1664
1665 @Override
1666 public Throwable fillInStackTrace() {
1667 return this;
1668 }
1669 }
1670
1671 protected void runAfterTransportAction() {
1672
1673 }
1674
1675 protected static class DefaultAbstractChannelPipeline extends DefaultChannelPipeline {
1676 protected DefaultAbstractChannelPipeline(AbstractChannel<?, ?, ?> channel) {
1677 super(channel);
1678 }
1679
1680 protected final AbstractChannel<?, ?, ?> abstractChannel() {
1681 return (AbstractChannel<?, ?, ?>) channel();
1682 }
1683
1684 @Override
1685 protected final EventExecutor transportExecutor() {
1686 return abstractChannel().executor();
1687 }
1688
1689 @Override
1690 protected final void pendingOutboundBytesUpdated(long pendingOutboundBytes) {
1691 abstractChannel().updateWritabilityIfNeeded(true, false);
1692 }
1693
1694 @Override
1695 protected final void registerTransport(Promise<Void> promise) {
1696 AbstractChannel<?, ?, ?> channel = abstractChannel();
1697 channel.registerTransport(promise);
1698 channel.runAfterTransportAction();
1699 }
1700
1701 @Override
1702 protected final void bindTransport(SocketAddress localAddress, Promise<Void> promise) {
1703 AbstractChannel<?, ?, ?> channel = abstractChannel();
1704 channel.bindTransport(localAddress, promise);
1705 channel.runAfterTransportAction();
1706 }
1707
1708 @Override
1709 protected final void connectTransport(
1710 SocketAddress remoteAddress, SocketAddress localAddress, Promise<Void> promise) {
1711 AbstractChannel<?, ?, ?> channel = abstractChannel();
1712 channel.connectTransport(remoteAddress, localAddress, promise);
1713 channel.runAfterTransportAction();
1714 }
1715
1716 @Override
1717 protected final void disconnectTransport(Promise<Void> promise) {
1718 AbstractChannel<?, ?, ?> channel = abstractChannel();
1719 channel.disconnectTransport(promise);
1720 channel.runAfterTransportAction();
1721 }
1722
1723 @Override
1724 protected final void closeTransport(Promise<Void> promise) {
1725 AbstractChannel<?, ?, ?> channel = abstractChannel();
1726 abstractChannel().closeTransport(promise);
1727 channel.runAfterTransportAction();
1728 }
1729
1730 @Override
1731 protected final void shutdownTransport(ChannelShutdownDirection direction, Promise<Void> promise) {
1732 AbstractChannel<?, ?, ?> channel = abstractChannel();
1733 channel.shutdownTransport(direction, promise);
1734 channel.runAfterTransportAction();
1735 }
1736
1737 @Override
1738 protected final void deregisterTransport(Promise<Void> promise) {
1739 AbstractChannel<?, ?, ?> channel = abstractChannel();
1740 channel.deregisterTransport(promise);
1741 channel.runAfterTransportAction();
1742 }
1743
1744 @Override
1745 protected final void readTransport() {
1746 AbstractChannel<?, ?, ?> channel = abstractChannel();
1747 channel.readTransport();
1748 channel.runAfterTransportAction();
1749 }
1750
1751 @Override
1752 protected final void writeTransport(Object msg, Promise<Void> promise) {
1753 AbstractChannel<?, ?, ?> channel = abstractChannel();
1754 channel.writeTransport(msg, promise);
1755 channel.runAfterTransportAction();
1756 }
1757
1758 @Override
1759 protected final void flushTransport() {
1760 AbstractChannel<?, ?, ?> channel = abstractChannel();
1761 channel.flushTransport();
1762 channel.runAfterTransportAction();
1763 }
1764
1765 @Override
1766 protected final void sendOutboundEventTransport(Object event, Promise<Void> promise) {
1767 AbstractChannel<?, ?, ?> channel = abstractChannel();
1768 channel.sendOutboundEventTransport(event, promise);
1769 channel.runAfterTransportAction();
1770 }
1771 }
1772 }