View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelMetadata;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.RecvByteBufAllocator;
34  import io.netty.channel.socket.ChannelInputShutdownEvent;
35  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36  import io.netty.channel.socket.SocketChannelConfig;
37  import io.netty.channel.unix.FileDescriptor;
38  import io.netty.channel.unix.UnixChannel;
39  import io.netty.util.ReferenceCountUtil;
40  import io.netty.util.concurrent.Future;
41  import io.netty.util.internal.UnstableApi;
42  
43  import java.io.IOException;
44  import java.net.ConnectException;
45  import java.net.InetSocketAddress;
46  import java.net.SocketAddress;
47  import java.nio.ByteBuffer;
48  import java.nio.channels.AlreadyConnectedException;
49  import java.nio.channels.ConnectionPendingException;
50  import java.nio.channels.NotYetConnectedException;
51  import java.nio.channels.UnresolvedAddressException;
52  import java.util.concurrent.TimeUnit;
53  
54  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
55  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
56  import static io.netty.util.internal.ObjectUtil.checkNotNull;
57  
58  abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
59      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60      /**
61       * The future of the current connection attempt.  If not null, subsequent
62       * connection attempts will fail.
63       */
64      private ChannelPromise connectPromise;
65      private Future<?> connectTimeoutFuture;
66      private SocketAddress requestedRemoteAddress;
67  
68      final BsdSocket socket;
69      private boolean readFilterEnabled;
70      private boolean writeFilterEnabled;
71      KQueueEventLoop.KQueueRegistration registration;
72      boolean readReadyRunnablePending;
73      boolean inputClosedSeenErrorOnRead;
74      protected volatile boolean active;
75      private volatile SocketAddress local;
76      private volatile SocketAddress remote;
77  
78      AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
79          super(parent);
80          socket = checkNotNull(fd, "fd");
81          this.active = active;
82          if (active) {
83              // Directly cache the remote and local addresses
84              // See https://github.com/netty/netty/issues/2359
85              local = fd.localAddress();
86              remote = fd.remoteAddress();
87          }
88      }
89  
90      AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
91          super(parent);
92          socket = checkNotNull(fd, "fd");
93          active = true;
94          // Directly cache the remote and local addresses
95          // See https://github.com/netty/netty/issues/2359
96          this.remote = remote;
97          local = fd.localAddress();
98      }
99  
100     static boolean isSoErrorZero(BsdSocket fd) {
101         try {
102             return fd.getSoError() == 0;
103         } catch (IOException e) {
104             throw new ChannelException(e);
105         }
106     }
107 
108     @Override
109     public final FileDescriptor fd() {
110         return socket;
111     }
112 
113     @Override
114     public boolean isActive() {
115         return active;
116     }
117 
118     @Override
119     public ChannelMetadata metadata() {
120         return METADATA;
121     }
122 
123     @Override
124     protected void doClose() throws Exception {
125         active = false;
126         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
127         // socket which has not even been connected yet. This has been observed to block during unit tests.
128         inputClosedSeenErrorOnRead = true;
129         socket.close();
130     }
131 
132     @Override
133     protected void doDisconnect() throws Exception {
134         doClose();
135     }
136 
137     void resetCachedAddresses() {
138         local = socket.localAddress();
139         remote = socket.remoteAddress();
140     }
141 
142     @Override
143     protected boolean isCompatible(EventLoop loop) {
144         return loop instanceof KQueueEventLoop;
145     }
146 
147     @Override
148     public boolean isOpen() {
149         return socket.isOpen();
150     }
151 
152     @Override
153     protected void doDeregister() throws Exception {
154         if (registration != null) {
155             registration.remove();
156         }
157 
158         // As unregisteredFilters() may have not been called because isOpen() returned false we just set both filters
159         // to false to ensure a consistent state in all cases.
160         readFilterEnabled = false;
161         writeFilterEnabled = false;
162     }
163 
164     void unregisterFilters() throws Exception {
165         // Make sure we unregister our filters from kqueue!
166         readFilter(false);
167         writeFilter(false);
168         clearRdHup0();
169     }
170 
171     private void clearRdHup0() {
172         evSet0(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP);
173     }
174 
175     @Override
176     protected final void doBeginRead() throws Exception {
177         // Channel.read() or ChannelHandlerContext.read() was called
178         final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
179         unsafe.readPending = true;
180 
181         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
182         // executeReadReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
183         // never get data after this.
184         readFilter(true);
185 
186         // If auto read was toggled off on the last read loop then we may not be notified
187         // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
188         if (unsafe.maybeMoreDataToRead) {
189             unsafe.executeReadReadyRunnable(config());
190         }
191     }
192 
193     @Override
194     protected void doRegister() throws Exception {
195         // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
196         // make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
197         // new EventLoop.
198         readReadyRunnablePending = false;
199 
200         registration = ((KQueueEventLoop) eventLoop()).add(this);
201 
202         // Add the write event first so we get notified of connection refused on the client side!
203         if (writeFilterEnabled) {
204             evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
205         }
206         if (readFilterEnabled) {
207             evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
208         }
209         evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
210     }
211 
212     @Override
213     protected abstract AbstractKQueueUnsafe newUnsafe();
214 
215     @Override
216     public abstract KQueueChannelConfig config();
217 
218     /**
219      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
220      */
221     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
222         return newDirectBuffer(buf, buf);
223     }
224 
225     /**
226      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
227      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
228      * this method.
229      */
230     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
231         final int readableBytes = buf.readableBytes();
232         if (readableBytes == 0) {
233             ReferenceCountUtil.release(holder);
234             return Unpooled.EMPTY_BUFFER;
235         }
236 
237         final ByteBufAllocator alloc = alloc();
238         if (alloc.isDirectBufferPooled()) {
239             return newDirectBuffer0(holder, buf, alloc, readableBytes);
240         }
241 
242         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
243         if (directBuf == null) {
244             return newDirectBuffer0(holder, buf, alloc, readableBytes);
245         }
246 
247         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
248         ReferenceCountUtil.safeRelease(holder);
249         return directBuf;
250     }
251 
252     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
253         final ByteBuf directBuf = alloc.directBuffer(capacity);
254         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
255         ReferenceCountUtil.safeRelease(holder);
256         return directBuf;
257     }
258 
259     protected static void checkResolvable(InetSocketAddress addr) {
260         if (addr.isUnresolved()) {
261             throw new UnresolvedAddressException();
262         }
263     }
264 
265     /**
266      * Read bytes into the given {@link ByteBuf} and return the amount.
267      */
268     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
269         int writerIndex = byteBuf.writerIndex();
270         int localReadAmount;
271         unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
272         if (byteBuf.hasMemoryAddress()) {
273             localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
274         } else {
275             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
276             localReadAmount = socket.read(buf, buf.position(), buf.limit());
277         }
278         if (localReadAmount > 0) {
279             byteBuf.writerIndex(writerIndex + localReadAmount);
280         }
281         return localReadAmount;
282     }
283 
284     protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
285         if (buf.hasMemoryAddress()) {
286             int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
287             if (localFlushedAmount > 0) {
288                 in.removeBytes(localFlushedAmount);
289                 return 1;
290             }
291         } else {
292             final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
293                     buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
294             int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
295             if (localFlushedAmount > 0) {
296                 nioBuf.position(nioBuf.position() + localFlushedAmount);
297                 in.removeBytes(localFlushedAmount);
298                 return 1;
299             }
300         }
301         return WRITE_STATUS_SNDBUF_FULL;
302     }
303 
304     final boolean shouldBreakReadReady(ChannelConfig config) {
305         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
306     }
307 
308     private static boolean isAllowHalfClosure(ChannelConfig config) {
309         if (config instanceof KQueueDomainSocketChannelConfig) {
310             return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
311         }
312 
313         return config instanceof SocketChannelConfig &&
314                 ((SocketChannelConfig) config).isAllowHalfClosure();
315     }
316 
317     final void clearReadFilter() {
318         // Only clear if registered with an EventLoop as otherwise
319         if (isRegistered()) {
320             final EventLoop loop = eventLoop();
321             final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
322             if (loop.inEventLoop()) {
323                 unsafe.clearReadFilter0();
324             } else {
325                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
326                 loop.execute(new Runnable() {
327                     @Override
328                     public void run() {
329                         if (!unsafe.readPending && !config().isAutoRead()) {
330                             // Still no read triggered so clear it now
331                             unsafe.clearReadFilter0();
332                         }
333                     }
334                 });
335             }
336         } else  {
337             // The EventLoop is not registered atm so just update the flags so the correct value
338             // will be used once the channel is registered
339             readFilterEnabled = false;
340         }
341     }
342 
343     void readFilter(boolean readFilterEnabled) throws IOException {
344         if (this.readFilterEnabled != readFilterEnabled) {
345             this.readFilterEnabled = readFilterEnabled;
346             evSet(Native.EVFILT_READ, readFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
347         }
348     }
349 
350     void writeFilter(boolean writeFilterEnabled) throws IOException {
351         if (this.writeFilterEnabled != writeFilterEnabled) {
352             this.writeFilterEnabled = writeFilterEnabled;
353             evSet(Native.EVFILT_WRITE, writeFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
354         }
355     }
356 
357     private void evSet(short filter, short flags) {
358         if (isRegistered()) {
359             evSet0(filter, flags);
360         }
361     }
362 
363     private void evSet0(short filter, short flags) {
364         evSet0(filter, flags, 0);
365     }
366 
367     private void evSet0(short filter, short flags, int fflags) {
368         // Only try to add to changeList if the FD is still open, if not we already closed it in the meantime.
369         if (isOpen() && registration != null) {
370             registration.evSet(filter, flags, fflags, 0);
371         }
372     }
373 
374     @UnstableApi
375     public abstract class AbstractKQueueUnsafe extends AbstractUnsafe {
376         boolean readPending;
377         boolean maybeMoreDataToRead;
378         private KQueueRecvByteAllocatorHandle allocHandle;
379         private final Runnable readReadyRunnable = new Runnable() {
380             @Override
381             public void run() {
382                 readReadyRunnablePending = false;
383                 readReady(recvBufAllocHandle());
384             }
385         };
386 
387         final void readReady(long numberBytesPending) {
388             KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
389             allocHandle.numberBytesPending(numberBytesPending);
390             readReady(allocHandle);
391         }
392 
393         abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
394 
395         final void readReadyBefore() {
396             maybeMoreDataToRead = false;
397         }
398 
399         final void readReadyFinally(ChannelConfig config) {
400             maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
401 
402             if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) {
403                 // trigger a read again as there may be something left to read and because of ET we
404                 // will not get notified again until we read everything from the socket
405                 //
406                 // It is possible the last fireChannelRead call could cause the user to call read() again, or if
407                 // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
408                 // to false before every read operation to prevent re-entry into readReady() we will not read from
409                 // the underlying OS again unless the user happens to call read again.
410                 executeReadReadyRunnable(config);
411             } else if (!readPending && !config.isAutoRead()) {
412                 // Check if there is a readPending which was not processed yet.
413                 // This could be for two reasons:
414                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
415                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
416                 //
417                 // See https://github.com/netty/netty/issues/2254
418                 clearReadFilter0();
419             }
420         }
421 
422         final boolean failConnectPromise(Throwable cause) {
423             if (connectPromise != null) {
424                 // SO_ERROR has been shown to return 0 on macOS if detect an error via read() and the write filter was
425                 // not set before calling connect. This means finishConnect will not detect any error and would
426                 // successfully complete the connectPromise and update the channel state to active (which is incorrect).
427                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
428                 AbstractKQueueChannel.this.connectPromise = null;
429                 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
430                                 : new ConnectException("failed to connect").initCause(cause))) {
431                     closeIfClosed();
432                     return true;
433                 }
434             }
435             return false;
436         }
437 
438         final void writeReady() {
439             if (connectPromise != null) {
440                 // pending connect which is now complete so handle it.
441                 finishConnect();
442             } else if (!socket.isOutputShutdown()) {
443                 // directly call super.flush0() to force a flush now
444                 super.flush0();
445             }
446         }
447 
448         /**
449          * Shutdown the input side of the channel.
450          */
451         void shutdownInput(boolean readEOF) {
452             // We need to take special care of calling finishConnect() if readEOF is true and we not
453             // fulfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
454             // with a ClosedChannelException as a close() will happen and so the FD is closed before we
455             // have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
456             // we observe the correct exception in case of a connect failure.
457             if (readEOF && connectPromise != null) {
458                 finishConnect();
459             }
460             if (!socket.isInputShutdown()) {
461                 if (isAllowHalfClosure(config())) {
462                     try {
463                         socket.shutdown(true, false);
464                     } catch (IOException ignored) {
465                         // We attempted to shutdown and failed, which means the input has already effectively been
466                         // shutdown.
467                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
468                         return;
469                     } catch (NotYetConnectedException ignore) {
470                         // We attempted to shutdown and failed, which means the input has already effectively been
471                         // shutdown.
472                     }
473                     clearReadFilter0();
474                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
475                 } else {
476                     close(voidPromise());
477                 }
478             } else if (!readEOF && !inputClosedSeenErrorOnRead) {
479                 inputClosedSeenErrorOnRead = true;
480                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
481             }
482         }
483 
484         final void readEOF() {
485             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
486             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
487             allocHandle.readEOF();
488 
489             if (isActive()) {
490                 // If it is still active, we need to call readReady as otherwise we may miss to
491                 // read pending data from the underlying file descriptor.
492                 // See https://github.com/netty/netty/issues/3709
493                 readReady(allocHandle);
494             } else {
495                 // Just to be safe make sure the input marked as closed.
496                 shutdownInput(true);
497             }
498 
499             // Clear the RDHUP flag to prevent continuously getting woken up on this event.
500             clearRdHup0();
501         }
502 
503         @Override
504         public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
505             if (allocHandle == null) {
506                 allocHandle = new KQueueRecvByteAllocatorHandle(
507                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
508             }
509             return allocHandle;
510         }
511 
512         @Override
513         protected final void flush0() {
514             // Flush immediately only when there's no pending flush.
515             // If there's a pending flush operation, event loop will call forceFlush() later,
516             // and thus there's no need to call it now.
517             if (!writeFilterEnabled) {
518                 super.flush0();
519             }
520         }
521 
522         final void executeReadReadyRunnable(ChannelConfig config) {
523             if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
524                 return;
525             }
526             readReadyRunnablePending = true;
527             eventLoop().execute(readReadyRunnable);
528         }
529 
530         protected final void clearReadFilter0() {
531             assert eventLoop().inEventLoop();
532             try {
533                 readPending = false;
534                 readFilter(false);
535             } catch (IOException e) {
536                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
537                 // so fire the exception through the pipeline and close the Channel.
538                 pipeline().fireExceptionCaught(e);
539                 unsafe().close(unsafe().voidPromise());
540             }
541         }
542 
543         private void fireEventAndClose(Object evt) {
544             pipeline().fireUserEventTriggered(evt);
545             close(voidPromise());
546         }
547 
548         @Override
549         public void connect(
550                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
551             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
552             // non-blocking io.
553             if (promise.isDone() || !ensureOpen(promise)) {
554                 return;
555             }
556 
557             try {
558                 if (connectPromise != null) {
559                     throw new ConnectionPendingException();
560                 }
561 
562                 boolean wasActive = isActive();
563                 if (doConnect(remoteAddress, localAddress)) {
564                     fulfillConnectPromise(promise, wasActive);
565                 } else {
566                     connectPromise = promise;
567                     requestedRemoteAddress = remoteAddress;
568 
569                     // Schedule connect timeout.
570                     final int connectTimeoutMillis = config().getConnectTimeoutMillis();
571                     if (connectTimeoutMillis > 0) {
572                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
573                             @Override
574                             public void run() {
575                                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
576                                 if (connectPromise != null && !connectPromise.isDone()
577                                         && connectPromise.tryFailure(new ConnectTimeoutException(
578                                                 "connection timed out after " + connectTimeoutMillis + " ms: " +
579                                                         remoteAddress))) {
580                                     close(voidPromise());
581                                 }
582                             }
583                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
584                     }
585 
586                     promise.addListener(new ChannelFutureListener() {
587                         @Override
588                         public void operationComplete(ChannelFuture future) {
589                             // If the connect future is cancelled we also cancel the timeout and close the
590                             // underlying socket.
591                             if (future.isCancelled()) {
592                                 if (connectTimeoutFuture != null) {
593                                     connectTimeoutFuture.cancel(false);
594                                 }
595                                 connectPromise = null;
596                                 close(voidPromise());
597                             }
598                         }
599                     });
600                 }
601             } catch (Throwable t) {
602                 closeIfClosed();
603                 promise.tryFailure(annotateConnectException(t, remoteAddress));
604             }
605         }
606 
607         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
608             if (promise == null) {
609                 // Closed via cancellation and the promise has been notified already.
610                 return;
611             }
612             active = true;
613 
614             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
615             // We still need to ensure we call fireChannelActive() in this case.
616             boolean active = isActive();
617 
618             // trySuccess() will return false if a user cancelled the connection attempt.
619             boolean promiseSet = promise.trySuccess();
620 
621             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
622             // because what happened is what happened.
623             if (!wasActive && active) {
624                 pipeline().fireChannelActive();
625             }
626 
627             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
628             if (!promiseSet) {
629                 close(voidPromise());
630             }
631         }
632 
633         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
634             if (promise == null) {
635                 // Closed via cancellation and the promise has been notified already.
636                 return;
637             }
638 
639             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
640             promise.tryFailure(cause);
641             closeIfClosed();
642         }
643 
644         private void finishConnect() {
645             // Note this method is invoked by the event loop only if the connection attempt was
646             // neither cancelled nor timed out.
647 
648             assert eventLoop().inEventLoop();
649 
650             boolean connectStillInProgress = false;
651             try {
652                 boolean wasActive = isActive();
653                 if (!doFinishConnect()) {
654                     connectStillInProgress = true;
655                     return;
656                 }
657                 fulfillConnectPromise(connectPromise, wasActive);
658             } catch (Throwable t) {
659                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
660             } finally {
661                 if (!connectStillInProgress) {
662                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
663                     // See https://github.com/netty/netty/issues/1770
664                     if (connectTimeoutFuture != null) {
665                         connectTimeoutFuture.cancel(false);
666                     }
667                     connectPromise = null;
668                 }
669             }
670         }
671 
672         private boolean doFinishConnect() throws Exception {
673             if (socket.finishConnect()) {
674                 writeFilter(false);
675                 if (requestedRemoteAddress instanceof InetSocketAddress) {
676                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
677                 }
678                 requestedRemoteAddress = null;
679                 return true;
680             }
681             writeFilter(true);
682             return false;
683         }
684     }
685 
686     @Override
687     protected void doBind(SocketAddress local) throws Exception {
688         if (local instanceof InetSocketAddress) {
689             checkResolvable((InetSocketAddress) local);
690         }
691         socket.bind(local);
692         this.local = socket.localAddress();
693     }
694 
695     /**
696      * Connect to the remote peer
697      */
698     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
699         if (localAddress instanceof InetSocketAddress) {
700             checkResolvable((InetSocketAddress) localAddress);
701         }
702 
703         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
704                 ? (InetSocketAddress) remoteAddress : null;
705         if (remoteSocketAddr != null) {
706             checkResolvable(remoteSocketAddr);
707         }
708 
709         if (remote != null) {
710             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
711             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
712             // later.
713             throw new AlreadyConnectedException();
714         }
715 
716         if (localAddress != null) {
717             socket.bind(localAddress);
718         }
719 
720         boolean connected = doConnect0(remoteAddress, localAddress);
721         if (connected) {
722             remote = remoteSocketAddr == null?
723                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
724         }
725         // We always need to set the localAddress even if not connected yet as the bind already took place.
726         //
727         // See https://github.com/netty/netty/issues/3463
728         local = socket.localAddress();
729         return connected;
730     }
731 
732     protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
733         boolean success = false;
734         try {
735             boolean connected = socket.connect(remoteAddress);
736             if (!connected) {
737                 writeFilter(true);
738             }
739             success = true;
740             return connected;
741         } finally {
742             if (!success) {
743                 doClose();
744             }
745         }
746     }
747 
748     @Override
749     protected SocketAddress localAddress0() {
750         return local;
751     }
752 
753     @Override
754     protected SocketAddress remoteAddress0() {
755         return remote;
756     }
757 }