View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelMetadata;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.socket.ChannelInputShutdownEvent;
38  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39  import io.netty.channel.socket.SocketChannelConfig;
40  import io.netty.channel.unix.FileDescriptor;
41  import io.netty.channel.unix.UnixChannel;
42  import io.netty.util.ReferenceCountUtil;
43  import io.netty.util.concurrent.Future;
44  import io.netty.util.internal.UnstableApi;
45  
46  import java.io.IOException;
47  import java.net.ConnectException;
48  import java.net.InetSocketAddress;
49  import java.net.SocketAddress;
50  import java.nio.ByteBuffer;
51  import java.nio.channels.AlreadyConnectedException;
52  import java.nio.channels.ConnectionPendingException;
53  import java.nio.channels.NotYetConnectedException;
54  import java.nio.channels.UnresolvedAddressException;
55  import java.util.concurrent.TimeUnit;
56  
57  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
58  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
59  import static io.netty.util.internal.ObjectUtil.checkNotNull;
60  
61  abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
62      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
63      /**
64       * The future of the current connection attempt.  If not null, subsequent
65       * connection attempts will fail.
66       */
67      private ChannelPromise connectPromise;
68      private Future<?> connectTimeoutFuture;
69      private SocketAddress requestedRemoteAddress;
70  
71      final BsdSocket socket;
72      private IoRegistration registration;
73      private boolean readFilterEnabled;
74      private boolean writeFilterEnabled;
75  
76      private static final KQueueIoOps READ_ENABLED_OPS =
77              KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_ADD_ENABLE, 0);
78      private static final KQueueIoOps WRITE_ENABLED_OPS =
79              KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_ADD_ENABLE, 0);
80      private static final KQueueIoOps READ_DISABLED_OPS =
81              KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_DELETE_DISABLE, 0);
82      private static final KQueueIoOps WRITE_DISABLED_OPS =
83              KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_DELETE_DISABLE, 0);
84  
85      boolean readReadyRunnablePending;
86      boolean inputClosedSeenErrorOnRead;
87      protected volatile boolean active;
88      private volatile SocketAddress local;
89      private volatile SocketAddress remote;
90  
91      AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
92          super(parent);
93          socket = checkNotNull(fd, "fd");
94          this.active = active;
95          if (active) {
96              // Directly cache the remote and local addresses
97              // See https://github.com/netty/netty/issues/2359
98              local = fd.localAddress();
99              remote = fd.remoteAddress();
100         }
101     }
102 
103     AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
104         super(parent);
105         socket = checkNotNull(fd, "fd");
106         active = true;
107         // Directly cache the remote and local addresses
108         // See https://github.com/netty/netty/issues/2359
109         this.remote = remote;
110         local = fd.localAddress();
111     }
112 
113     @Override
114     protected boolean isCompatible(EventLoop loop) {
115         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractKQueueUnsafe.class);
116     }
117 
118     static boolean isSoErrorZero(BsdSocket fd) {
119         try {
120             return fd.getSoError() == 0;
121         } catch (IOException e) {
122             throw new ChannelException(e);
123         }
124     }
125 
126     protected final IoRegistration registration() {
127         assert registration != null;
128         return registration;
129     }
130 
131     @Override
132     public final FileDescriptor fd() {
133         return socket;
134     }
135 
136     @Override
137     public boolean isActive() {
138         return active;
139     }
140 
141     @Override
142     public ChannelMetadata metadata() {
143         return METADATA;
144     }
145 
146     @Override
147     protected void doClose() throws Exception {
148         active = false;
149         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
150         // socket which has not even been connected yet. This has been observed to block during unit tests.
151         inputClosedSeenErrorOnRead = true;
152         socket.close();
153     }
154 
155     @Override
156     protected void doDisconnect() throws Exception {
157         doClose();
158     }
159 
160     void resetCachedAddresses() {
161         local = socket.localAddress();
162         remote = socket.remoteAddress();
163     }
164 
165     @Override
166     public boolean isOpen() {
167         return socket.isOpen();
168     }
169 
170     @Override
171     protected void doDeregister() throws Exception {
172         // As unregisteredFilters() may have not been called because isOpen() returned false we just set both filters
173         // to false to ensure a consistent state in all cases.
174         // Make sure we unregister our filters from kqueue!
175         readFilter(false);
176         writeFilter(false);
177         clearRdHup0();
178 
179         IoRegistration registration = this.registration;
180         if (registration != null) {
181             registration.cancel();
182         }
183     }
184 
185     private void clearRdHup0() {
186         submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP));
187     }
188 
189     private void submit(KQueueIoOps ops) {
190         try {
191             registration.submit(ops);
192         } catch (Exception e) {
193             throw new ChannelException(e);
194         }
195     }
196 
197     @Override
198     protected final void doBeginRead() throws Exception {
199         // Channel.read() or ChannelHandlerContext.read() was called
200         final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
201         unsafe.readPending = true;
202 
203         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
204         // executeReadReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
205         // never get data after this.
206         readFilter(true);
207     }
208 
209     @Override
210     protected void doRegister(ChannelPromise promise) {
211         ((IoEventLoop) eventLoop()).register((AbstractKQueueUnsafe) unsafe()).addListener(f -> {
212             if (f.isSuccess()) {
213                 this.registration = (IoRegistration) f.getNow();
214                 // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old
215                 // EventLoop make sure the readReadyRunnablePending variable is reset so we will be able to execute
216                 // the Runnable on the new EventLoop.
217                 readReadyRunnablePending = false;
218 
219                 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP));
220 
221                 // Add the write event first so we get notified of connection refused on the client side!
222                 if (writeFilterEnabled) {
223                     submit(WRITE_ENABLED_OPS);
224                 }
225                 if (readFilterEnabled) {
226                     submit(READ_ENABLED_OPS);
227                 }
228                 promise.setSuccess();
229             } else {
230                 promise.setFailure(f.cause());
231             }
232         });
233     }
234 
235     @Override
236     protected abstract AbstractKQueueUnsafe newUnsafe();
237 
238     @Override
239     public abstract KQueueChannelConfig config();
240 
241     /**
242      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
243      */
244     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
245         return newDirectBuffer(buf, buf);
246     }
247 
248     /**
249      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
250      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
251      * this method.
252      */
253     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
254         final int readableBytes = buf.readableBytes();
255         if (readableBytes == 0) {
256             ReferenceCountUtil.release(holder);
257             return Unpooled.EMPTY_BUFFER;
258         }
259 
260         final ByteBufAllocator alloc = alloc();
261         if (alloc.isDirectBufferPooled()) {
262             return newDirectBuffer0(holder, buf, alloc, readableBytes);
263         }
264 
265         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
266         if (directBuf == null) {
267             return newDirectBuffer0(holder, buf, alloc, readableBytes);
268         }
269 
270         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
271         ReferenceCountUtil.safeRelease(holder);
272         return directBuf;
273     }
274 
275     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
276         final ByteBuf directBuf = alloc.directBuffer(capacity);
277         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
278         ReferenceCountUtil.safeRelease(holder);
279         return directBuf;
280     }
281 
282     protected static void checkResolvable(InetSocketAddress addr) {
283         if (addr.isUnresolved()) {
284             throw new UnresolvedAddressException();
285         }
286     }
287 
288     /**
289      * Read bytes into the given {@link ByteBuf} and return the amount.
290      */
291     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
292         int writerIndex = byteBuf.writerIndex();
293         int localReadAmount;
294         unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
295         if (byteBuf.hasMemoryAddress()) {
296             localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
297         } else {
298             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
299             localReadAmount = socket.read(buf, buf.position(), buf.limit());
300         }
301         if (localReadAmount > 0) {
302             byteBuf.writerIndex(writerIndex + localReadAmount);
303         }
304         return localReadAmount;
305     }
306 
307     protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
308         if (buf.hasMemoryAddress()) {
309             int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
310             if (localFlushedAmount > 0) {
311                 in.removeBytes(localFlushedAmount);
312                 return 1;
313             }
314         } else {
315             final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
316                     buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
317             int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
318             if (localFlushedAmount > 0) {
319                 nioBuf.position(nioBuf.position() + localFlushedAmount);
320                 in.removeBytes(localFlushedAmount);
321                 return 1;
322             }
323         }
324         return WRITE_STATUS_SNDBUF_FULL;
325     }
326 
327     final boolean shouldBreakReadReady(ChannelConfig config) {
328         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
329     }
330 
331     private static boolean isAllowHalfClosure(ChannelConfig config) {
332         if (config instanceof KQueueDomainSocketChannelConfig) {
333             return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
334         }
335 
336         return config instanceof SocketChannelConfig &&
337                 ((SocketChannelConfig) config).isAllowHalfClosure();
338     }
339 
340     final void clearReadFilter() {
341         // Only clear if registered with an EventLoop as otherwise
342         if (isRegistered()) {
343             final EventLoop loop = eventLoop();
344             final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
345             if (loop.inEventLoop()) {
346                 unsafe.clearReadFilter0();
347             } else {
348                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
349                 loop.execute(new Runnable() {
350                     @Override
351                     public void run() {
352                         if (!unsafe.readPending && !config().isAutoRead()) {
353                             // Still no read triggered so clear it now
354                             unsafe.clearReadFilter0();
355                         }
356                     }
357                 });
358             }
359         } else  {
360             // The EventLoop is not registered atm so just update the flags so the correct value
361             // will be used once the channel is registered
362             readFilterEnabled = false;
363         }
364     }
365 
366     void readFilter(boolean readFilterEnabled) throws IOException {
367         if (this.readFilterEnabled != readFilterEnabled) {
368             this.readFilterEnabled = readFilterEnabled;
369             submit(readFilterEnabled ? READ_ENABLED_OPS : READ_DISABLED_OPS);
370         }
371     }
372 
373     void writeFilter(boolean writeFilterEnabled) throws IOException {
374         if (this.writeFilterEnabled != writeFilterEnabled) {
375             this.writeFilterEnabled = writeFilterEnabled;
376             submit(writeFilterEnabled ? WRITE_ENABLED_OPS : WRITE_DISABLED_OPS);
377         }
378     }
379 
380     @UnstableApi
381     public abstract class AbstractKQueueUnsafe extends AbstractUnsafe implements KQueueIoHandle {
382         boolean readPending;
383         private KQueueRecvByteAllocatorHandle allocHandle;
384 
385         Channel channel() {
386             return AbstractKQueueChannel.this;
387         }
388 
389         @Override
390         public int ident() {
391             return fd().intValue();
392         }
393 
394         @Override
395         public void close() {
396             close(voidPromise());
397         }
398 
399         @Override
400         public void handle(IoRegistration registration, IoEvent event) {
401             KQueueIoEvent kqueueEvent = (KQueueIoEvent) event;
402             final short filter = kqueueEvent.filter();
403             final short flags = kqueueEvent.flags();
404             final int fflags = kqueueEvent.fflags();
405             final long data = kqueueEvent.data();
406 
407             // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
408             // to read from the file descriptor.
409             if (filter == Native.EVFILT_WRITE) {
410                 writeReady();
411             } else if (filter == Native.EVFILT_READ) {
412                 // Check READ before EOF to ensure all data is read before shutting down the input.
413                 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
414                 readReady(allocHandle);
415             } else if (filter == Native.EVFILT_SOCK && (fflags & Native.NOTE_RDHUP) != 0) {
416                 readEOF();
417                 return;
418             }
419 
420             // Check if EV_EOF was set, this will notify us for connection-reset in which case
421             // we may close the channel directly or try to read more data depending on the state of the
422             // Channel and also depending on the AbstractKQueueChannel subtype.
423             if ((flags & Native.EV_EOF) != 0) {
424                 readEOF();
425             }
426         }
427 
428         abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
429 
430         final boolean shouldStopReading(ChannelConfig config) {
431             // Check if there is a readPending which was not processed yet.
432             // This could be for two reasons:
433             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
434             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
435             //
436             // See https://github.com/netty/netty/issues/2254
437             return !readPending && !config.isAutoRead();
438         }
439 
440         final boolean failConnectPromise(Throwable cause) {
441             if (connectPromise != null) {
442                 // SO_ERROR has been shown to return 0 on macOS if detect an error via read() and the write filter was
443                 // not set before calling connect. This means finishConnect will not detect any error and would
444                 // successfully complete the connectPromise and update the channel state to active (which is incorrect).
445                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
446                 AbstractKQueueChannel.this.connectPromise = null;
447                 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
448                                 : new ConnectException("failed to connect").initCause(cause))) {
449                     closeIfClosed();
450                     return true;
451                 }
452             }
453             return false;
454         }
455 
456         private void writeReady() {
457             if (connectPromise != null) {
458                 // pending connect which is now complete so handle it.
459                 finishConnect();
460             } else if (!socket.isOutputShutdown()) {
461                 // directly call super.flush0() to force a flush now
462                 super.flush0();
463             }
464         }
465 
466         /**
467          * Shutdown the input side of the channel.
468          */
469         void shutdownInput(boolean readEOF) {
470             // We need to take special care of calling finishConnect() if readEOF is true and we not
471             // fulfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
472             // with a ClosedChannelException as a close() will happen and so the FD is closed before we
473             // have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
474             // we observe the correct exception in case of a connect failure.
475             if (readEOF && connectPromise != null) {
476                 finishConnect();
477             }
478             if (!socket.isInputShutdown()) {
479                 if (isAllowHalfClosure(config())) {
480                     try {
481                         socket.shutdown(true, false);
482                     } catch (IOException ignored) {
483                         // We attempted to shutdown and failed, which means the input has already effectively been
484                         // shutdown.
485                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
486                         return;
487                     } catch (NotYetConnectedException ignore) {
488                         // We attempted to shutdown and failed, which means the input has already effectively been
489                         // shutdown.
490                     }
491                     if (shouldStopReading(config())) {
492                         clearReadFilter0();
493                     }
494                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
495                 } else {
496                     close(voidPromise());
497                     return;
498                 }
499             }
500             if (!readEOF && !inputClosedSeenErrorOnRead) {
501                 inputClosedSeenErrorOnRead = true;
502                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
503             }
504         }
505 
506         private void readEOF() {
507             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
508             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
509             allocHandle.readEOF();
510 
511             if (isActive()) {
512                 // If it is still active, we need to call readReady as otherwise we may miss to
513                 // read pending data from the underlying file descriptor.
514                 // See https://github.com/netty/netty/issues/3709
515                 readReady(allocHandle);
516             } else {
517                 // Just to be safe make sure the input marked as closed.
518                 shutdownInput(true);
519             }
520 
521             // Clear the RDHUP flag to prevent continuously getting woken up on this event.
522             clearRdHup0();
523         }
524 
525         @Override
526         public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
527             if (allocHandle == null) {
528                 allocHandle = new KQueueRecvByteAllocatorHandle(
529                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
530             }
531             return allocHandle;
532         }
533 
534         @Override
535         protected final void flush0() {
536             // Flush immediately only when there's no pending flush.
537             // If there's a pending flush operation, event loop will call forceFlush() later,
538             // and thus there's no need to call it now.
539             if (!writeFilterEnabled) {
540                 super.flush0();
541             }
542         }
543 
544         protected final void clearReadFilter0() {
545             assert eventLoop().inEventLoop();
546             try {
547                 readPending = false;
548                 readFilter(false);
549             } catch (IOException e) {
550                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
551                 // so fire the exception through the pipeline and close the Channel.
552                 pipeline().fireExceptionCaught(e);
553                 unsafe().close(unsafe().voidPromise());
554             }
555         }
556 
557         private void fireEventAndClose(Object evt) {
558             pipeline().fireUserEventTriggered(evt);
559             close(voidPromise());
560         }
561 
562         @Override
563         public void connect(
564                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
565             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
566             // non-blocking io.
567             if (promise.isDone() || !ensureOpen(promise)) {
568                 return;
569             }
570 
571             try {
572                 if (connectPromise != null) {
573                     throw new ConnectionPendingException();
574                 }
575 
576                 boolean wasActive = isActive();
577                 if (doConnect(remoteAddress, localAddress)) {
578                     fulfillConnectPromise(promise, wasActive);
579                 } else {
580                     connectPromise = promise;
581                     requestedRemoteAddress = remoteAddress;
582 
583                     // Schedule connect timeout.
584                     final int connectTimeoutMillis = config().getConnectTimeoutMillis();
585                     if (connectTimeoutMillis > 0) {
586                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
587                             @Override
588                             public void run() {
589                                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
590                                 if (connectPromise != null && !connectPromise.isDone()
591                                         && connectPromise.tryFailure(new ConnectTimeoutException(
592                                                 "connection timed out after " + connectTimeoutMillis + " ms: " +
593                                                         remoteAddress))) {
594                                     close(voidPromise());
595                                 }
596                             }
597                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
598                     }
599 
600                     promise.addListener(new ChannelFutureListener() {
601                         @Override
602                         public void operationComplete(ChannelFuture future) {
603                             // If the connect future is cancelled we also cancel the timeout and close the
604                             // underlying socket.
605                             if (future.isCancelled()) {
606                                 if (connectTimeoutFuture != null) {
607                                     connectTimeoutFuture.cancel(false);
608                                 }
609                                 connectPromise = null;
610                                 close(voidPromise());
611                             }
612                         }
613                     });
614                 }
615             } catch (Throwable t) {
616                 closeIfClosed();
617                 promise.tryFailure(annotateConnectException(t, remoteAddress));
618             }
619         }
620 
621         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
622             if (promise == null) {
623                 // Closed via cancellation and the promise has been notified already.
624                 return;
625             }
626             active = true;
627 
628             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
629             // We still need to ensure we call fireChannelActive() in this case.
630             boolean active = isActive();
631 
632             // trySuccess() will return false if a user cancelled the connection attempt.
633             boolean promiseSet = promise.trySuccess();
634 
635             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
636             // because what happened is what happened.
637             if (!wasActive && active) {
638                 pipeline().fireChannelActive();
639             }
640 
641             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
642             if (!promiseSet) {
643                 close(voidPromise());
644             }
645         }
646 
647         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
648             if (promise == null) {
649                 // Closed via cancellation and the promise has been notified already.
650                 return;
651             }
652 
653             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
654             promise.tryFailure(cause);
655             closeIfClosed();
656         }
657 
658         private void finishConnect() {
659             // Note this method is invoked by the event loop only if the connection attempt was
660             // neither cancelled nor timed out.
661 
662             assert eventLoop().inEventLoop();
663 
664             boolean connectStillInProgress = false;
665             try {
666                 boolean wasActive = isActive();
667                 if (!doFinishConnect()) {
668                     connectStillInProgress = true;
669                     return;
670                 }
671                 fulfillConnectPromise(connectPromise, wasActive);
672             } catch (Throwable t) {
673                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
674             } finally {
675                 if (!connectStillInProgress) {
676                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
677                     // See https://github.com/netty/netty/issues/1770
678                     if (connectTimeoutFuture != null) {
679                         connectTimeoutFuture.cancel(false);
680                     }
681                     connectPromise = null;
682                 }
683             }
684         }
685 
686         private boolean doFinishConnect() throws Exception {
687             if (socket.finishConnect()) {
688                 writeFilter(false);
689                 if (requestedRemoteAddress instanceof InetSocketAddress) {
690                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
691                 }
692                 requestedRemoteAddress = null;
693                 return true;
694             }
695             writeFilter(true);
696             return false;
697         }
698     }
699 
700     @Override
701     protected void doBind(SocketAddress local) throws Exception {
702         if (local instanceof InetSocketAddress) {
703             checkResolvable((InetSocketAddress) local);
704         }
705         socket.bind(local);
706         this.local = socket.localAddress();
707     }
708 
709     /**
710      * Connect to the remote peer
711      */
712     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
713         if (localAddress instanceof InetSocketAddress) {
714             checkResolvable((InetSocketAddress) localAddress);
715         }
716 
717         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
718                 ? (InetSocketAddress) remoteAddress : null;
719         if (remoteSocketAddr != null) {
720             checkResolvable(remoteSocketAddr);
721         }
722 
723         if (remote != null) {
724             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
725             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
726             // later.
727             throw new AlreadyConnectedException();
728         }
729 
730         if (localAddress != null) {
731             socket.bind(localAddress);
732         }
733 
734         boolean connected = doConnect0(remoteAddress, localAddress);
735         if (connected) {
736             remote = remoteSocketAddr == null?
737                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
738         }
739         // We always need to set the localAddress even if not connected yet as the bind already took place.
740         //
741         // See https://github.com/netty/netty/issues/3463
742         local = socket.localAddress();
743         return connected;
744     }
745 
746     protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
747         boolean success = false;
748         try {
749             boolean connected = socket.connect(remoteAddress);
750             if (!connected) {
751                 writeFilter(true);
752             }
753             success = true;
754             return connected;
755         } finally {
756             if (!success) {
757                 doClose();
758             }
759         }
760     }
761 
762     @Override
763     protected SocketAddress localAddress0() {
764         return local;
765     }
766 
767     @Override
768     protected SocketAddress remoteAddress0() {
769         return remote;
770     }
771 }