View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelMetadata;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.RecvByteBufAllocator;
34  import io.netty.channel.socket.ChannelInputShutdownEvent;
35  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36  import io.netty.channel.socket.SocketChannelConfig;
37  import io.netty.channel.unix.FileDescriptor;
38  import io.netty.channel.unix.UnixChannel;
39  import io.netty.util.ReferenceCountUtil;
40  import io.netty.util.concurrent.Future;
41  import io.netty.util.internal.UnstableApi;
42  
43  import java.io.IOException;
44  import java.net.ConnectException;
45  import java.net.InetSocketAddress;
46  import java.net.SocketAddress;
47  import java.nio.ByteBuffer;
48  import java.nio.channels.AlreadyConnectedException;
49  import java.nio.channels.ConnectionPendingException;
50  import java.nio.channels.NotYetConnectedException;
51  import java.nio.channels.UnresolvedAddressException;
52  import java.util.concurrent.TimeUnit;
53  
54  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
55  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
56  import static io.netty.util.internal.ObjectUtil.checkNotNull;
57  
58  abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
59      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60      /**
61       * The future of the current connection attempt.  If not null, subsequent
62       * connection attempts will fail.
63       */
64      private ChannelPromise connectPromise;
65      private Future<?> connectTimeoutFuture;
66      private SocketAddress requestedRemoteAddress;
67  
68      final BsdSocket socket;
69      private boolean readFilterEnabled;
70      private boolean writeFilterEnabled;
71      boolean readReadyRunnablePending;
72      boolean inputClosedSeenErrorOnRead;
73      protected volatile boolean active;
74      private volatile SocketAddress local;
75      private volatile SocketAddress remote;
76  
77      AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
78          super(parent);
79          socket = checkNotNull(fd, "fd");
80          this.active = active;
81          if (active) {
82              // Directly cache the remote and local addresses
83              // See https://github.com/netty/netty/issues/2359
84              local = fd.localAddress();
85              remote = fd.remoteAddress();
86          }
87      }
88  
89      AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
90          super(parent);
91          socket = checkNotNull(fd, "fd");
92          active = true;
93          // Directly cache the remote and local addresses
94          // See https://github.com/netty/netty/issues/2359
95          this.remote = remote;
96          local = fd.localAddress();
97      }
98  
99      static boolean isSoErrorZero(BsdSocket fd) {
100         try {
101             return fd.getSoError() == 0;
102         } catch (IOException e) {
103             throw new ChannelException(e);
104         }
105     }
106 
107     @Override
108     public final FileDescriptor fd() {
109         return socket;
110     }
111 
112     @Override
113     public boolean isActive() {
114         return active;
115     }
116 
117     @Override
118     public ChannelMetadata metadata() {
119         return METADATA;
120     }
121 
122     @Override
123     protected void doClose() throws Exception {
124         active = false;
125         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
126         // socket which has not even been connected yet. This has been observed to block during unit tests.
127         inputClosedSeenErrorOnRead = true;
128         socket.close();
129     }
130 
131     @Override
132     protected void doDisconnect() throws Exception {
133         doClose();
134     }
135 
136     void resetCachedAddresses() {
137         local = socket.localAddress();
138         remote = socket.remoteAddress();
139     }
140 
141     @Override
142     protected boolean isCompatible(EventLoop loop) {
143         return loop instanceof KQueueEventLoop;
144     }
145 
146     @Override
147     public boolean isOpen() {
148         return socket.isOpen();
149     }
150 
151     @Override
152     protected void doDeregister() throws Exception {
153         ((KQueueEventLoop) eventLoop()).remove(this);
154 
155         // As unregisteredFilters() may have not been called because isOpen() returned false we just set both filters
156         // to false to ensure a consistent state in all cases.
157         readFilterEnabled = false;
158         writeFilterEnabled = false;
159     }
160 
161     void unregisterFilters() throws Exception {
162         // Make sure we unregister our filters from kqueue!
163         readFilter(false);
164         writeFilter(false);
165         evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0);
166     }
167 
168     @Override
169     protected final void doBeginRead() throws Exception {
170         // Channel.read() or ChannelHandlerContext.read() was called
171         final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
172         unsafe.readPending = true;
173 
174         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
175         // executeReadReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
176         // never get data after this.
177         readFilter(true);
178 
179         // If auto read was toggled off on the last read loop then we may not be notified
180         // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
181         if (unsafe.maybeMoreDataToRead) {
182             unsafe.executeReadReadyRunnable(config());
183         }
184     }
185 
186     @Override
187     protected void doRegister() throws Exception {
188         // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
189         // make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
190         // new EventLoop.
191         readReadyRunnablePending = false;
192 
193         ((KQueueEventLoop) eventLoop()).add(this);
194 
195         // Add the write event first so we get notified of connection refused on the client side!
196         if (writeFilterEnabled) {
197             evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
198         }
199         if (readFilterEnabled) {
200             evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
201         }
202         evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
203     }
204 
205     @Override
206     protected abstract AbstractKQueueUnsafe newUnsafe();
207 
208     @Override
209     public abstract KQueueChannelConfig config();
210 
211     /**
212      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
213      */
214     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
215         return newDirectBuffer(buf, buf);
216     }
217 
218     /**
219      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
220      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
221      * this method.
222      */
223     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
224         final int readableBytes = buf.readableBytes();
225         if (readableBytes == 0) {
226             ReferenceCountUtil.release(holder);
227             return Unpooled.EMPTY_BUFFER;
228         }
229 
230         final ByteBufAllocator alloc = alloc();
231         if (alloc.isDirectBufferPooled()) {
232             return newDirectBuffer0(holder, buf, alloc, readableBytes);
233         }
234 
235         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
236         if (directBuf == null) {
237             return newDirectBuffer0(holder, buf, alloc, readableBytes);
238         }
239 
240         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
241         ReferenceCountUtil.safeRelease(holder);
242         return directBuf;
243     }
244 
245     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
246         final ByteBuf directBuf = alloc.directBuffer(capacity);
247         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
248         ReferenceCountUtil.safeRelease(holder);
249         return directBuf;
250     }
251 
252     protected static void checkResolvable(InetSocketAddress addr) {
253         if (addr.isUnresolved()) {
254             throw new UnresolvedAddressException();
255         }
256     }
257 
258     /**
259      * Read bytes into the given {@link ByteBuf} and return the amount.
260      */
261     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
262         int writerIndex = byteBuf.writerIndex();
263         int localReadAmount;
264         unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
265         if (byteBuf.hasMemoryAddress()) {
266             localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
267         } else {
268             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
269             localReadAmount = socket.read(buf, buf.position(), buf.limit());
270         }
271         if (localReadAmount > 0) {
272             byteBuf.writerIndex(writerIndex + localReadAmount);
273         }
274         return localReadAmount;
275     }
276 
277     protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
278         if (buf.hasMemoryAddress()) {
279             int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
280             if (localFlushedAmount > 0) {
281                 in.removeBytes(localFlushedAmount);
282                 return 1;
283             }
284         } else {
285             final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
286                     buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
287             int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
288             if (localFlushedAmount > 0) {
289                 nioBuf.position(nioBuf.position() + localFlushedAmount);
290                 in.removeBytes(localFlushedAmount);
291                 return 1;
292             }
293         }
294         return WRITE_STATUS_SNDBUF_FULL;
295     }
296 
297     final boolean shouldBreakReadReady(ChannelConfig config) {
298         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
299     }
300 
301     private static boolean isAllowHalfClosure(ChannelConfig config) {
302         if (config instanceof KQueueDomainSocketChannelConfig) {
303             return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
304         }
305 
306         return config instanceof SocketChannelConfig &&
307                 ((SocketChannelConfig) config).isAllowHalfClosure();
308     }
309 
310     final void clearReadFilter() {
311         // Only clear if registered with an EventLoop as otherwise
312         if (isRegistered()) {
313             final EventLoop loop = eventLoop();
314             final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
315             if (loop.inEventLoop()) {
316                 unsafe.clearReadFilter0();
317             } else {
318                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
319                 loop.execute(new Runnable() {
320                     @Override
321                     public void run() {
322                         if (!unsafe.readPending && !config().isAutoRead()) {
323                             // Still no read triggered so clear it now
324                             unsafe.clearReadFilter0();
325                         }
326                     }
327                 });
328             }
329         } else  {
330             // The EventLoop is not registered atm so just update the flags so the correct value
331             // will be used once the channel is registered
332             readFilterEnabled = false;
333         }
334     }
335 
336     void readFilter(boolean readFilterEnabled) throws IOException {
337         if (this.readFilterEnabled != readFilterEnabled) {
338             this.readFilterEnabled = readFilterEnabled;
339             evSet(Native.EVFILT_READ, readFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
340         }
341     }
342 
343     void writeFilter(boolean writeFilterEnabled) throws IOException {
344         if (this.writeFilterEnabled != writeFilterEnabled) {
345             this.writeFilterEnabled = writeFilterEnabled;
346             evSet(Native.EVFILT_WRITE, writeFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
347         }
348     }
349 
350     private void evSet(short filter, short flags) {
351         if (isRegistered()) {
352             evSet0(filter, flags);
353         }
354     }
355 
356     private void evSet0(short filter, short flags) {
357         evSet0(filter, flags, 0);
358     }
359 
360     private void evSet0(short filter, short flags, int fflags) {
361         // Only try to add to changeList if the FD is still open, if not we already closed it in the meantime.
362         if (isOpen()) {
363             ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags);
364         }
365     }
366 
367     @UnstableApi
368     public abstract class AbstractKQueueUnsafe extends AbstractUnsafe {
369         boolean readPending;
370         boolean maybeMoreDataToRead;
371         private KQueueRecvByteAllocatorHandle allocHandle;
372         private final Runnable readReadyRunnable = new Runnable() {
373             @Override
374             public void run() {
375                 readReadyRunnablePending = false;
376                 readReady(recvBufAllocHandle());
377             }
378         };
379 
380         final void readReady(long numberBytesPending) {
381             KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
382             allocHandle.numberBytesPending(numberBytesPending);
383             readReady(allocHandle);
384         }
385 
386         abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
387 
388         final void readReadyBefore() {
389             maybeMoreDataToRead = false;
390         }
391 
392         final void readReadyFinally(ChannelConfig config) {
393             maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
394 
395             if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) {
396                 // trigger a read again as there may be something left to read and because of ET we
397                 // will not get notified again until we read everything from the socket
398                 //
399                 // It is possible the last fireChannelRead call could cause the user to call read() again, or if
400                 // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
401                 // to false before every read operation to prevent re-entry into readReady() we will not read from
402                 // the underlying OS again unless the user happens to call read again.
403                 executeReadReadyRunnable(config);
404             } else if (!readPending && !config.isAutoRead()) {
405                 // Check if there is a readPending which was not processed yet.
406                 // This could be for two reasons:
407                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
408                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
409                 //
410                 // See https://github.com/netty/netty/issues/2254
411                 clearReadFilter0();
412             }
413         }
414 
415         final boolean failConnectPromise(Throwable cause) {
416             if (connectPromise != null) {
417                 // SO_ERROR has been shown to return 0 on macOS if detect an error via read() and the write filter was
418                 // not set before calling connect. This means finishConnect will not detect any error and would
419                 // successfully complete the connectPromise and update the channel state to active (which is incorrect).
420                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
421                 AbstractKQueueChannel.this.connectPromise = null;
422                 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
423                                 : new ConnectException("failed to connect").initCause(cause))) {
424                     closeIfClosed();
425                     return true;
426                 }
427             }
428             return false;
429         }
430 
431         final void writeReady() {
432             if (connectPromise != null) {
433                 // pending connect which is now complete so handle it.
434                 finishConnect();
435             } else if (!socket.isOutputShutdown()) {
436                 // directly call super.flush0() to force a flush now
437                 super.flush0();
438             }
439         }
440 
441         /**
442          * Shutdown the input side of the channel.
443          */
444         void shutdownInput(boolean readEOF) {
445             // We need to take special care of calling finishConnect() if readEOF is true and we not
446             // fullfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
447             // with a ClosedChannelException as a close() will happen and so the FD is closed before we
448             // have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
449             // we observe the correct exception in case of a connect failure.
450             if (readEOF && connectPromise != null) {
451                 finishConnect();
452             }
453             if (!socket.isInputShutdown()) {
454                 if (isAllowHalfClosure(config())) {
455                     try {
456                         socket.shutdown(true, false);
457                     } catch (IOException ignored) {
458                         // We attempted to shutdown and failed, which means the input has already effectively been
459                         // shutdown.
460                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
461                         return;
462                     } catch (NotYetConnectedException ignore) {
463                         // We attempted to shutdown and failed, which means the input has already effectively been
464                         // shutdown.
465                     }
466                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
467                 } else {
468                     close(voidPromise());
469                 }
470             } else if (!readEOF) {
471                 inputClosedSeenErrorOnRead = true;
472                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
473             }
474         }
475 
476         final void readEOF() {
477             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
478             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
479             allocHandle.readEOF();
480 
481             if (isActive()) {
482                 // If it is still active, we need to call readReady as otherwise we may miss to
483                 // read pending data from the underlying file descriptor.
484                 // See https://github.com/netty/netty/issues/3709
485                 readReady(allocHandle);
486             } else {
487                 // Just to be safe make sure the input marked as closed.
488                 shutdownInput(true);
489             }
490         }
491 
492         @Override
493         public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
494             if (allocHandle == null) {
495                 allocHandle = new KQueueRecvByteAllocatorHandle(
496                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
497             }
498             return allocHandle;
499         }
500 
501         @Override
502         protected final void flush0() {
503             // Flush immediately only when there's no pending flush.
504             // If there's a pending flush operation, event loop will call forceFlush() later,
505             // and thus there's no need to call it now.
506             if (!writeFilterEnabled) {
507                 super.flush0();
508             }
509         }
510 
511         final void executeReadReadyRunnable(ChannelConfig config) {
512             if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
513                 return;
514             }
515             readReadyRunnablePending = true;
516             eventLoop().execute(readReadyRunnable);
517         }
518 
519         protected final void clearReadFilter0() {
520             assert eventLoop().inEventLoop();
521             try {
522                 readPending = false;
523                 readFilter(false);
524             } catch (IOException e) {
525                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
526                 // so fire the exception through the pipeline and close the Channel.
527                 pipeline().fireExceptionCaught(e);
528                 unsafe().close(unsafe().voidPromise());
529             }
530         }
531 
532         private void fireEventAndClose(Object evt) {
533             pipeline().fireUserEventTriggered(evt);
534             close(voidPromise());
535         }
536 
537         @Override
538         public void connect(
539                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
540             if (!promise.setUncancellable() || !ensureOpen(promise)) {
541                 return;
542             }
543 
544             try {
545                 if (connectPromise != null) {
546                     throw new ConnectionPendingException();
547                 }
548 
549                 boolean wasActive = isActive();
550                 if (doConnect(remoteAddress, localAddress)) {
551                     fulfillConnectPromise(promise, wasActive);
552                 } else {
553                     connectPromise = promise;
554                     requestedRemoteAddress = remoteAddress;
555 
556                     // Schedule connect timeout.
557                     int connectTimeoutMillis = config().getConnectTimeoutMillis();
558                     if (connectTimeoutMillis > 0) {
559                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
560                             @Override
561                             public void run() {
562                                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
563                                 if (connectPromise != null && !connectPromise.isDone()
564                                         && connectPromise.tryFailure(new ConnectTimeoutException(
565                                         "connection timed out: " + remoteAddress))) {
566                                     close(voidPromise());
567                                 }
568                             }
569                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
570                     }
571 
572                     promise.addListener(new ChannelFutureListener() {
573                         @Override
574                         public void operationComplete(ChannelFuture future) throws Exception {
575                             if (future.isCancelled()) {
576                                 if (connectTimeoutFuture != null) {
577                                     connectTimeoutFuture.cancel(false);
578                                 }
579                                 connectPromise = null;
580                                 close(voidPromise());
581                             }
582                         }
583                     });
584                 }
585             } catch (Throwable t) {
586                 closeIfClosed();
587                 promise.tryFailure(annotateConnectException(t, remoteAddress));
588             }
589         }
590 
591         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
592             if (promise == null) {
593                 // Closed via cancellation and the promise has been notified already.
594                 return;
595             }
596             active = true;
597 
598             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
599             // We still need to ensure we call fireChannelActive() in this case.
600             boolean active = isActive();
601 
602             // trySuccess() will return false if a user cancelled the connection attempt.
603             boolean promiseSet = promise.trySuccess();
604 
605             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
606             // because what happened is what happened.
607             if (!wasActive && active) {
608                 pipeline().fireChannelActive();
609             }
610 
611             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
612             if (!promiseSet) {
613                 close(voidPromise());
614             }
615         }
616 
617         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
618             if (promise == null) {
619                 // Closed via cancellation and the promise has been notified already.
620                 return;
621             }
622 
623             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
624             promise.tryFailure(cause);
625             closeIfClosed();
626         }
627 
628         private void finishConnect() {
629             // Note this method is invoked by the event loop only if the connection attempt was
630             // neither cancelled nor timed out.
631 
632             assert eventLoop().inEventLoop();
633 
634             boolean connectStillInProgress = false;
635             try {
636                 boolean wasActive = isActive();
637                 if (!doFinishConnect()) {
638                     connectStillInProgress = true;
639                     return;
640                 }
641                 fulfillConnectPromise(connectPromise, wasActive);
642             } catch (Throwable t) {
643                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
644             } finally {
645                 if (!connectStillInProgress) {
646                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
647                     // See https://github.com/netty/netty/issues/1770
648                     if (connectTimeoutFuture != null) {
649                         connectTimeoutFuture.cancel(false);
650                     }
651                     connectPromise = null;
652                 }
653             }
654         }
655 
656         private boolean doFinishConnect() throws Exception {
657             if (socket.finishConnect()) {
658                 writeFilter(false);
659                 if (requestedRemoteAddress instanceof InetSocketAddress) {
660                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
661                 }
662                 requestedRemoteAddress = null;
663                 return true;
664             }
665             writeFilter(true);
666             return false;
667         }
668     }
669 
670     @Override
671     protected void doBind(SocketAddress local) throws Exception {
672         if (local instanceof InetSocketAddress) {
673             checkResolvable((InetSocketAddress) local);
674         }
675         socket.bind(local);
676         this.local = socket.localAddress();
677     }
678 
679     /**
680      * Connect to the remote peer
681      */
682     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
683         if (localAddress instanceof InetSocketAddress) {
684             checkResolvable((InetSocketAddress) localAddress);
685         }
686 
687         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
688                 ? (InetSocketAddress) remoteAddress : null;
689         if (remoteSocketAddr != null) {
690             checkResolvable(remoteSocketAddr);
691         }
692 
693         if (remote != null) {
694             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
695             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
696             // later.
697             throw new AlreadyConnectedException();
698         }
699 
700         if (localAddress != null) {
701             socket.bind(localAddress);
702         }
703 
704         boolean connected = doConnect0(remoteAddress, localAddress);
705         if (connected) {
706             remote = remoteSocketAddr == null?
707                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
708         }
709         // We always need to set the localAddress even if not connected yet as the bind already took place.
710         //
711         // See https://github.com/netty/netty/issues/3463
712         local = socket.localAddress();
713         return connected;
714     }
715 
716     protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
717         boolean success = false;
718         try {
719             boolean connected = socket.connect(remoteAddress);
720             if (!connected) {
721                 writeFilter(true);
722             }
723             success = true;
724             return connected;
725         } finally {
726             if (!success) {
727                 doClose();
728             }
729         }
730     }
731 
732     @Override
733     protected SocketAddress localAddress0() {
734         return local;
735     }
736 
737     @Override
738     protected SocketAddress remoteAddress0() {
739         return remote;
740     }
741 }