View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelMetadata;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.socket.ChannelInputShutdownEvent;
38  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39  import io.netty.channel.socket.SocketChannelConfig;
40  import io.netty.channel.unix.FileDescriptor;
41  import io.netty.channel.unix.UnixChannel;
42  import io.netty.util.ReferenceCountUtil;
43  import io.netty.util.concurrent.Future;
44  import io.netty.util.internal.UnstableApi;
45  
46  import java.io.IOException;
47  import java.net.ConnectException;
48  import java.net.InetSocketAddress;
49  import java.net.SocketAddress;
50  import java.nio.ByteBuffer;
51  import java.nio.channels.AlreadyConnectedException;
52  import java.nio.channels.ConnectionPendingException;
53  import java.nio.channels.NotYetConnectedException;
54  import java.nio.channels.UnresolvedAddressException;
55  import java.util.concurrent.TimeUnit;
56  
57  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
58  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
59  import static io.netty.util.internal.ObjectUtil.checkNotNull;
60  
61  abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
62  
63      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64      /**
65       * The future of the current connection attempt.  If not null, subsequent
66       * connection attempts will fail.
67       */
68      private ChannelPromise connectPromise;
69      private Future<?> connectTimeoutFuture;
70      private SocketAddress requestedRemoteAddress;
71  
72      final BsdSocket socket;
73      private IoRegistration registration;
74      private boolean readFilterEnabled;
75      private boolean writeFilterEnabled;
76  
77      boolean readReadyRunnablePending;
78      boolean inputClosedSeenErrorOnRead;
79      protected volatile boolean active;
80      private volatile SocketAddress local;
81      private volatile SocketAddress remote;
82  
83      AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
84          super(parent);
85          socket = checkNotNull(fd, "fd");
86          this.active = active;
87          if (active) {
88              // Directly cache the remote and local addresses
89              // See https://github.com/netty/netty/issues/2359
90              local = fd.localAddress();
91              remote = fd.remoteAddress();
92          }
93      }
94  
95      AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
96          super(parent);
97          socket = checkNotNull(fd, "fd");
98          active = true;
99          // Directly cache the remote and local addresses
100         // See https://github.com/netty/netty/issues/2359
101         this.remote = remote;
102         local = fd.localAddress();
103     }
104 
105     @Override
106     protected boolean isCompatible(EventLoop loop) {
107         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractKQueueUnsafe.class);
108     }
109 
110     static boolean isSoErrorZero(BsdSocket fd) {
111         try {
112             return fd.getSoError() == 0;
113         } catch (IOException e) {
114             throw new ChannelException(e);
115         }
116     }
117 
118     protected final IoRegistration registration() {
119         assert registration != null;
120         return registration;
121     }
122 
123     @Override
124     public final FileDescriptor fd() {
125         return socket;
126     }
127 
128     @Override
129     public boolean isActive() {
130         return active;
131     }
132 
133     @Override
134     public ChannelMetadata metadata() {
135         return METADATA;
136     }
137 
138     @Override
139     protected void doClose() throws Exception {
140         active = false;
141         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
142         // socket which has not even been connected yet. This has been observed to block during unit tests.
143         inputClosedSeenErrorOnRead = true;
144         socket.close();
145     }
146 
147     @Override
148     protected void doDisconnect() throws Exception {
149         doClose();
150     }
151 
152     void resetCachedAddresses() {
153         local = socket.localAddress();
154         remote = socket.remoteAddress();
155     }
156 
157     @Override
158     public boolean isOpen() {
159         return socket.isOpen();
160     }
161 
162     @Override
163     protected void doDeregister() throws Exception {
164         // As unregisteredFilters() may have not been called because isOpen() returned false we just set both filters
165         // to false to ensure a consistent state in all cases.
166         // Make sure we unregister our filters from kqueue!
167         readFilter(false);
168         writeFilter(false);
169         clearRdHup0();
170 
171         IoRegistration registration = this.registration;
172         if (registration != null) {
173             registration.cancel();
174         }
175     }
176 
177     private void clearRdHup0() {
178         submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP));
179     }
180 
181     private void submit(KQueueIoOps ops) {
182         if (!isOpen()) {
183             // If the channel was closed in the meantime we should just drop the ops as otherwise we might end up
184             // affected another channel that re-used the fd.
185             return;
186         }
187         try {
188             registration.submit(ops);
189         } catch (Exception e) {
190             throw new ChannelException(e);
191         }
192     }
193 
194     @Override
195     protected final void doBeginRead() throws Exception {
196         // Channel.read() or ChannelHandlerContext.read() was called
197         final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
198         unsafe.readPending = true;
199 
200         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
201         // executeReadReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
202         // never get data after this.
203         readFilter(true);
204     }
205 
206     @Override
207     protected void doRegister(ChannelPromise promise) {
208         ((IoEventLoop) eventLoop()).register((AbstractKQueueUnsafe) unsafe()).addListener(f -> {
209             if (f.isSuccess()) {
210                 this.registration = (IoRegistration) f.getNow();
211                 // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old
212                 // EventLoop make sure the readReadyRunnablePending variable is reset so we will be able to execute
213                 // the Runnable on the new EventLoop.
214                 readReadyRunnablePending = false;
215 
216                 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP));
217 
218                 // Add the write event first so we get notified of connection refused on the client side!
219                 if (writeFilterEnabled) {
220                     submit(Native.WRITE_ENABLED_OPS);
221                 }
222                 if (readFilterEnabled) {
223                     submit(Native.READ_ENABLED_OPS);
224                 }
225                 promise.setSuccess();
226             } else {
227                 promise.setFailure(f.cause());
228             }
229         });
230     }
231 
232     @Override
233     protected abstract AbstractKQueueUnsafe newUnsafe();
234 
235     @Override
236     public abstract KQueueChannelConfig config();
237 
238     /**
239      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
240      */
241     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
242         return newDirectBuffer(buf, buf);
243     }
244 
245     /**
246      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
247      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
248      * this method.
249      */
250     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
251         final int readableBytes = buf.readableBytes();
252         if (readableBytes == 0) {
253             ReferenceCountUtil.release(holder);
254             return Unpooled.EMPTY_BUFFER;
255         }
256 
257         final ByteBufAllocator alloc = alloc();
258         if (alloc.isDirectBufferPooled()) {
259             return newDirectBuffer0(holder, buf, alloc, readableBytes);
260         }
261 
262         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
263         if (directBuf == null) {
264             return newDirectBuffer0(holder, buf, alloc, readableBytes);
265         }
266 
267         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
268         ReferenceCountUtil.safeRelease(holder);
269         return directBuf;
270     }
271 
272     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
273         final ByteBuf directBuf = alloc.directBuffer(capacity);
274         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
275         ReferenceCountUtil.safeRelease(holder);
276         return directBuf;
277     }
278 
279     protected static void checkResolvable(InetSocketAddress addr) {
280         if (addr.isUnresolved()) {
281             throw new UnresolvedAddressException();
282         }
283     }
284 
285     /**
286      * Read bytes into the given {@link ByteBuf} and return the amount.
287      */
288     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
289         int writerIndex = byteBuf.writerIndex();
290         int localReadAmount;
291         unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
292         if (byteBuf.hasMemoryAddress()) {
293             localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
294         } else {
295             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
296             localReadAmount = socket.read(buf, buf.position(), buf.limit());
297         }
298         if (localReadAmount > 0) {
299             byteBuf.writerIndex(writerIndex + localReadAmount);
300         }
301         return localReadAmount;
302     }
303 
304     protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
305         if (buf.hasMemoryAddress()) {
306             int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
307             if (localFlushedAmount > 0) {
308                 in.removeBytes(localFlushedAmount);
309                 return 1;
310             }
311         } else {
312             final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
313                     buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
314             int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
315             if (localFlushedAmount > 0) {
316                 nioBuf.position(nioBuf.position() + localFlushedAmount);
317                 in.removeBytes(localFlushedAmount);
318                 return 1;
319             }
320         }
321         return WRITE_STATUS_SNDBUF_FULL;
322     }
323 
324     final boolean shouldBreakReadReady(ChannelConfig config) {
325         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
326     }
327 
328     private static boolean isAllowHalfClosure(ChannelConfig config) {
329         if (config instanceof KQueueDomainSocketChannelConfig) {
330             return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
331         }
332 
333         return config instanceof SocketChannelConfig &&
334                 ((SocketChannelConfig) config).isAllowHalfClosure();
335     }
336 
337     final void clearReadFilter() {
338         // Only clear if registered with an EventLoop as otherwise
339         if (isRegistered()) {
340             final EventLoop loop = eventLoop();
341             final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
342             if (loop.inEventLoop()) {
343                 unsafe.clearReadFilter0();
344             } else {
345                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
346                 loop.execute(new Runnable() {
347                     @Override
348                     public void run() {
349                         if (!unsafe.readPending && !config().isAutoRead()) {
350                             // Still no read triggered so clear it now
351                             unsafe.clearReadFilter0();
352                         }
353                     }
354                 });
355             }
356         } else  {
357             // The EventLoop is not registered atm so just update the flags so the correct value
358             // will be used once the channel is registered
359             readFilterEnabled = false;
360         }
361     }
362 
363     void readFilter(boolean readFilterEnabled) throws IOException {
364         if (this.readFilterEnabled != readFilterEnabled) {
365             this.readFilterEnabled = readFilterEnabled;
366             submit(readFilterEnabled ? Native.READ_ENABLED_OPS : Native.READ_DISABLED_OPS);
367         }
368     }
369 
370     void writeFilter(boolean writeFilterEnabled) throws IOException {
371         if (this.writeFilterEnabled != writeFilterEnabled) {
372             this.writeFilterEnabled = writeFilterEnabled;
373             submit(writeFilterEnabled ? Native.WRITE_ENABLED_OPS : Native.WRITE_DISABLED_OPS);
374         }
375     }
376 
377     @UnstableApi
378     public abstract class AbstractKQueueUnsafe extends AbstractUnsafe implements KQueueIoHandle {
379         boolean readPending;
380         private KQueueRecvByteAllocatorHandle allocHandle;
381 
382         Channel channel() {
383             return AbstractKQueueChannel.this;
384         }
385 
386         @Override
387         public int ident() {
388             return fd().intValue();
389         }
390 
391         @Override
392         public void close() {
393             close(voidPromise());
394         }
395 
396         @Override
397         public void handle(IoRegistration registration, IoEvent event) {
398             KQueueIoEvent kqueueEvent = (KQueueIoEvent) event;
399             final short filter = kqueueEvent.filter();
400             final short flags = kqueueEvent.flags();
401             final int fflags = kqueueEvent.fflags();
402             final long data = kqueueEvent.data();
403 
404             // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
405             // to read from the file descriptor.
406             if (filter == Native.EVFILT_WRITE) {
407                 writeReady();
408             } else if (filter == Native.EVFILT_READ) {
409                 // Check READ before EOF to ensure all data is read before shutting down the input.
410                 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
411                 readReady(allocHandle);
412             } else if (filter == Native.EVFILT_SOCK && (fflags & Native.NOTE_RDHUP) != 0) {
413                 readEOF();
414                 return;
415             }
416 
417             // Check if EV_EOF was set, this will notify us for connection-reset in which case
418             // we may close the channel directly or try to read more data depending on the state of the
419             // Channel and also depending on the AbstractKQueueChannel subtype.
420             if ((flags & Native.EV_EOF) != 0) {
421                 readEOF();
422             }
423         }
424 
425         abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
426 
427         final boolean shouldStopReading(ChannelConfig config) {
428             // Check if there is a readPending which was not processed yet.
429             // This could be for two reasons:
430             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
431             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
432             //
433             // See https://github.com/netty/netty/issues/2254
434             return !readPending && !config.isAutoRead();
435         }
436 
437         final boolean failConnectPromise(Throwable cause) {
438             if (connectPromise != null) {
439                 // SO_ERROR has been shown to return 0 on macOS if detect an error via read() and the write filter was
440                 // not set before calling connect. This means finishConnect will not detect any error and would
441                 // successfully complete the connectPromise and update the channel state to active (which is incorrect).
442                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
443                 AbstractKQueueChannel.this.connectPromise = null;
444                 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
445                                 : new ConnectException("failed to connect").initCause(cause))) {
446                     closeIfClosed();
447                     return true;
448                 }
449             }
450             return false;
451         }
452 
453         private void writeReady() {
454             if (connectPromise != null) {
455                 // pending connect which is now complete so handle it.
456                 finishConnect();
457             } else if (!socket.isOutputShutdown()) {
458                 // directly call super.flush0() to force a flush now
459                 super.flush0();
460             }
461         }
462 
463         /**
464          * Shutdown the input side of the channel.
465          */
466         void shutdownInput(boolean readEOF) {
467             // We need to take special care of calling finishConnect() if readEOF is true and we not
468             // fulfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
469             // with a ClosedChannelException as a close() will happen and so the FD is closed before we
470             // have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
471             // we observe the correct exception in case of a connect failure.
472             if (readEOF && connectPromise != null) {
473                 finishConnect();
474             }
475             if (!socket.isInputShutdown()) {
476                 if (isAllowHalfClosure(config())) {
477                     try {
478                         socket.shutdown(true, false);
479                     } catch (IOException ignored) {
480                         // We attempted to shutdown and failed, which means the input has already effectively been
481                         // shutdown.
482                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
483                         return;
484                     } catch (NotYetConnectedException ignore) {
485                         // We attempted to shutdown and failed, which means the input has already effectively been
486                         // shutdown.
487                     }
488                     if (shouldStopReading(config())) {
489                         clearReadFilter0();
490                     }
491                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
492                 } else {
493                     close(voidPromise());
494                     return;
495                 }
496             }
497             if (!readEOF && !inputClosedSeenErrorOnRead) {
498                 inputClosedSeenErrorOnRead = true;
499                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
500             }
501         }
502 
503         private void readEOF() {
504             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
505             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
506             allocHandle.readEOF();
507 
508             if (isActive()) {
509                 // If it is still active, we need to call readReady as otherwise we may miss to
510                 // read pending data from the underlying file descriptor.
511                 // See https://github.com/netty/netty/issues/3709
512                 readReady(allocHandle);
513             } else {
514                 // Just to be safe make sure the input marked as closed.
515                 shutdownInput(true);
516             }
517 
518             // Clear the RDHUP flag to prevent continuously getting woken up on this event.
519             clearRdHup0();
520         }
521 
522         @Override
523         public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
524             if (allocHandle == null) {
525                 allocHandle = new KQueueRecvByteAllocatorHandle(
526                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
527             }
528             return allocHandle;
529         }
530 
531         @Override
532         protected final void flush0() {
533             // Flush immediately only when there's no pending flush.
534             // If there's a pending flush operation, event loop will call forceFlush() later,
535             // and thus there's no need to call it now.
536             if (!writeFilterEnabled) {
537                 super.flush0();
538             }
539         }
540 
541         protected final void clearReadFilter0() {
542             assert eventLoop().inEventLoop();
543             try {
544                 readPending = false;
545                 readFilter(false);
546             } catch (IOException e) {
547                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
548                 // so fire the exception through the pipeline and close the Channel.
549                 pipeline().fireExceptionCaught(e);
550                 unsafe().close(unsafe().voidPromise());
551             }
552         }
553 
554         private void fireEventAndClose(Object evt) {
555             pipeline().fireUserEventTriggered(evt);
556             close(voidPromise());
557         }
558 
559         @Override
560         public void connect(
561                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
562             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
563             // non-blocking io.
564             if (promise.isDone() || !ensureOpen(promise)) {
565                 return;
566             }
567 
568             try {
569                 if (connectPromise != null) {
570                     throw new ConnectionPendingException();
571                 }
572 
573                 boolean wasActive = isActive();
574                 if (doConnect(remoteAddress, localAddress)) {
575                     fulfillConnectPromise(promise, wasActive);
576                 } else {
577                     connectPromise = promise;
578                     requestedRemoteAddress = remoteAddress;
579 
580                     // Schedule connect timeout.
581                     final int connectTimeoutMillis = config().getConnectTimeoutMillis();
582                     if (connectTimeoutMillis > 0) {
583                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
584                             @Override
585                             public void run() {
586                                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
587                                 if (connectPromise != null && !connectPromise.isDone()
588                                         && connectPromise.tryFailure(new ConnectTimeoutException(
589                                                 "connection timed out after " + connectTimeoutMillis + " ms: " +
590                                                         remoteAddress))) {
591                                     close(voidPromise());
592                                 }
593                             }
594                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
595                     }
596 
597                     promise.addListener(new ChannelFutureListener() {
598                         @Override
599                         public void operationComplete(ChannelFuture future) {
600                             // If the connect future is cancelled we also cancel the timeout and close the
601                             // underlying socket.
602                             if (future.isCancelled()) {
603                                 if (connectTimeoutFuture != null) {
604                                     connectTimeoutFuture.cancel(false);
605                                 }
606                                 connectPromise = null;
607                                 close(voidPromise());
608                             }
609                         }
610                     });
611                 }
612             } catch (Throwable t) {
613                 closeIfClosed();
614                 promise.tryFailure(annotateConnectException(t, remoteAddress));
615             }
616         }
617 
618         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
619             if (promise == null) {
620                 // Closed via cancellation and the promise has been notified already.
621                 return;
622             }
623             active = true;
624 
625             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
626             // We still need to ensure we call fireChannelActive() in this case.
627             boolean active = isActive();
628 
629             // trySuccess() will return false if a user cancelled the connection attempt.
630             boolean promiseSet = promise.trySuccess();
631 
632             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
633             // because what happened is what happened.
634             if (!wasActive && active) {
635                 pipeline().fireChannelActive();
636             }
637 
638             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
639             if (!promiseSet) {
640                 close(voidPromise());
641             }
642         }
643 
644         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
645             if (promise == null) {
646                 // Closed via cancellation and the promise has been notified already.
647                 return;
648             }
649 
650             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
651             promise.tryFailure(cause);
652             closeIfClosed();
653         }
654 
655         private void finishConnect() {
656             // Note this method is invoked by the event loop only if the connection attempt was
657             // neither cancelled nor timed out.
658 
659             assert eventLoop().inEventLoop();
660 
661             boolean connectStillInProgress = false;
662             try {
663                 boolean wasActive = isActive();
664                 if (!doFinishConnect()) {
665                     connectStillInProgress = true;
666                     return;
667                 }
668                 fulfillConnectPromise(connectPromise, wasActive);
669             } catch (Throwable t) {
670                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
671             } finally {
672                 if (!connectStillInProgress) {
673                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
674                     // See https://github.com/netty/netty/issues/1770
675                     if (connectTimeoutFuture != null) {
676                         connectTimeoutFuture.cancel(false);
677                     }
678                     connectPromise = null;
679                 }
680             }
681         }
682 
683         private boolean doFinishConnect() throws Exception {
684             if (socket.finishConnect()) {
685                 writeFilter(false);
686                 if (requestedRemoteAddress instanceof InetSocketAddress) {
687                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
688                 }
689                 requestedRemoteAddress = null;
690                 return true;
691             }
692             writeFilter(true);
693             return false;
694         }
695     }
696 
697     @Override
698     protected void doBind(SocketAddress local) throws Exception {
699         if (local instanceof InetSocketAddress) {
700             checkResolvable((InetSocketAddress) local);
701         }
702         socket.bind(local);
703         this.local = socket.localAddress();
704     }
705 
706     /**
707      * Connect to the remote peer
708      */
709     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
710         if (localAddress instanceof InetSocketAddress) {
711             checkResolvable((InetSocketAddress) localAddress);
712         }
713 
714         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
715                 ? (InetSocketAddress) remoteAddress : null;
716         if (remoteSocketAddr != null) {
717             checkResolvable(remoteSocketAddr);
718         }
719 
720         if (remote != null) {
721             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
722             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
723             // later.
724             throw new AlreadyConnectedException();
725         }
726 
727         if (localAddress != null) {
728             socket.bind(localAddress);
729         }
730 
731         boolean connected = doConnect0(remoteAddress, localAddress);
732         if (connected) {
733             remote = remoteSocketAddr == null?
734                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
735         }
736         // We always need to set the localAddress even if not connected yet as the bind already took place.
737         //
738         // See https://github.com/netty/netty/issues/3463
739         local = socket.localAddress();
740         return connected;
741     }
742 
743     protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
744         boolean success = false;
745         try {
746             boolean connected = socket.connect(remoteAddress);
747             if (!connected) {
748                 writeFilter(true);
749             }
750             success = true;
751             return connected;
752         } finally {
753             if (!success) {
754                 doClose();
755             }
756         }
757     }
758 
759     @Override
760     protected SocketAddress localAddress0() {
761         return local;
762     }
763 
764     @Override
765     protected SocketAddress remoteAddress0() {
766         return remote;
767     }
768 }