View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelMetadata;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.RecvByteBufAllocator;
34  import io.netty.channel.socket.ChannelInputShutdownEvent;
35  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36  import io.netty.channel.socket.SocketChannelConfig;
37  import io.netty.channel.unix.FileDescriptor;
38  import io.netty.channel.unix.UnixChannel;
39  import io.netty.util.ReferenceCountUtil;
40  import io.netty.util.concurrent.Future;
41  import io.netty.util.internal.UnstableApi;
42  
43  import java.io.IOException;
44  import java.net.ConnectException;
45  import java.net.InetSocketAddress;
46  import java.net.SocketAddress;
47  import java.nio.ByteBuffer;
48  import java.nio.channels.AlreadyConnectedException;
49  import java.nio.channels.ConnectionPendingException;
50  import java.nio.channels.NotYetConnectedException;
51  import java.nio.channels.UnresolvedAddressException;
52  import java.util.concurrent.TimeUnit;
53  
54  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
55  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
56  import static io.netty.util.internal.ObjectUtil.checkNotNull;
57  
58  abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
59      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
60      /**
61       * The future of the current connection attempt.  If not null, subsequent
62       * connection attempts will fail.
63       */
64      private ChannelPromise connectPromise;
65      private Future<?> connectTimeoutFuture;
66      private SocketAddress requestedRemoteAddress;
67  
68      final BsdSocket socket;
69      private boolean readFilterEnabled;
70      private boolean writeFilterEnabled;
71      boolean readReadyRunnablePending;
72      boolean inputClosedSeenErrorOnRead;
73      protected volatile boolean active;
74      private volatile SocketAddress local;
75      private volatile SocketAddress remote;
76  
77      AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
78          super(parent);
79          socket = checkNotNull(fd, "fd");
80          this.active = active;
81          if (active) {
82              // Directly cache the remote and local addresses
83              // See https://github.com/netty/netty/issues/2359
84              local = fd.localAddress();
85              remote = fd.remoteAddress();
86          }
87      }
88  
89      AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
90          super(parent);
91          socket = checkNotNull(fd, "fd");
92          active = true;
93          // Directly cache the remote and local addresses
94          // See https://github.com/netty/netty/issues/2359
95          this.remote = remote;
96          local = fd.localAddress();
97      }
98  
99      static boolean isSoErrorZero(BsdSocket fd) {
100         try {
101             return fd.getSoError() == 0;
102         } catch (IOException e) {
103             throw new ChannelException(e);
104         }
105     }
106 
107     @Override
108     public final FileDescriptor fd() {
109         return socket;
110     }
111 
112     @Override
113     public boolean isActive() {
114         return active;
115     }
116 
117     @Override
118     public ChannelMetadata metadata() {
119         return METADATA;
120     }
121 
122     @Override
123     protected void doClose() throws Exception {
124         active = false;
125         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
126         // socket which has not even been connected yet. This has been observed to block during unit tests.
127         inputClosedSeenErrorOnRead = true;
128         socket.close();
129     }
130 
131     @Override
132     protected void doDisconnect() throws Exception {
133         doClose();
134     }
135 
136     void resetCachedAddresses() {
137         local = socket.localAddress();
138         remote = socket.remoteAddress();
139     }
140 
141     @Override
142     protected boolean isCompatible(EventLoop loop) {
143         return loop instanceof KQueueEventLoop;
144     }
145 
146     @Override
147     public boolean isOpen() {
148         return socket.isOpen();
149     }
150 
151     @Override
152     protected void doDeregister() throws Exception {
153         ((KQueueEventLoop) eventLoop()).remove(this);
154 
155         // As unregisteredFilters() may have not been called because isOpen() returned false we just set both filters
156         // to false to ensure a consistent state in all cases.
157         readFilterEnabled = false;
158         writeFilterEnabled = false;
159     }
160 
161     void unregisterFilters() throws Exception {
162         // Make sure we unregister our filters from kqueue!
163         readFilter(false);
164         writeFilter(false);
165         clearRdHup0();
166     }
167 
168     private void clearRdHup0() {
169         evSet0(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP);
170     }
171 
172     @Override
173     protected final void doBeginRead() throws Exception {
174         // Channel.read() or ChannelHandlerContext.read() was called
175         final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
176         unsafe.readPending = true;
177 
178         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
179         // executeReadReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
180         // never get data after this.
181         readFilter(true);
182 
183         // If auto read was toggled off on the last read loop then we may not be notified
184         // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
185         if (unsafe.maybeMoreDataToRead) {
186             unsafe.executeReadReadyRunnable(config());
187         }
188     }
189 
190     @Override
191     protected void doRegister() throws Exception {
192         // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
193         // make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
194         // new EventLoop.
195         readReadyRunnablePending = false;
196 
197         ((KQueueEventLoop) eventLoop()).add(this);
198 
199         // Add the write event first so we get notified of connection refused on the client side!
200         if (writeFilterEnabled) {
201             evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
202         }
203         if (readFilterEnabled) {
204             evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
205         }
206         evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
207     }
208 
209     @Override
210     protected abstract AbstractKQueueUnsafe newUnsafe();
211 
212     @Override
213     public abstract KQueueChannelConfig config();
214 
215     /**
216      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
217      */
218     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
219         return newDirectBuffer(buf, buf);
220     }
221 
222     /**
223      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
224      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
225      * this method.
226      */
227     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
228         final int readableBytes = buf.readableBytes();
229         if (readableBytes == 0) {
230             ReferenceCountUtil.release(holder);
231             return Unpooled.EMPTY_BUFFER;
232         }
233 
234         final ByteBufAllocator alloc = alloc();
235         if (alloc.isDirectBufferPooled()) {
236             return newDirectBuffer0(holder, buf, alloc, readableBytes);
237         }
238 
239         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
240         if (directBuf == null) {
241             return newDirectBuffer0(holder, buf, alloc, readableBytes);
242         }
243 
244         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
245         ReferenceCountUtil.safeRelease(holder);
246         return directBuf;
247     }
248 
249     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
250         final ByteBuf directBuf = alloc.directBuffer(capacity);
251         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
252         ReferenceCountUtil.safeRelease(holder);
253         return directBuf;
254     }
255 
256     protected static void checkResolvable(InetSocketAddress addr) {
257         if (addr.isUnresolved()) {
258             throw new UnresolvedAddressException();
259         }
260     }
261 
262     /**
263      * Read bytes into the given {@link ByteBuf} and return the amount.
264      */
265     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
266         int writerIndex = byteBuf.writerIndex();
267         int localReadAmount;
268         unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
269         if (byteBuf.hasMemoryAddress()) {
270             localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
271         } else {
272             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
273             localReadAmount = socket.read(buf, buf.position(), buf.limit());
274         }
275         if (localReadAmount > 0) {
276             byteBuf.writerIndex(writerIndex + localReadAmount);
277         }
278         return localReadAmount;
279     }
280 
281     protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
282         if (buf.hasMemoryAddress()) {
283             int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
284             if (localFlushedAmount > 0) {
285                 in.removeBytes(localFlushedAmount);
286                 return 1;
287             }
288         } else {
289             final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
290                     buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
291             int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
292             if (localFlushedAmount > 0) {
293                 nioBuf.position(nioBuf.position() + localFlushedAmount);
294                 in.removeBytes(localFlushedAmount);
295                 return 1;
296             }
297         }
298         return WRITE_STATUS_SNDBUF_FULL;
299     }
300 
301     final boolean shouldBreakReadReady(ChannelConfig config) {
302         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
303     }
304 
305     private static boolean isAllowHalfClosure(ChannelConfig config) {
306         if (config instanceof KQueueDomainSocketChannelConfig) {
307             return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
308         }
309 
310         return config instanceof SocketChannelConfig &&
311                 ((SocketChannelConfig) config).isAllowHalfClosure();
312     }
313 
314     final void clearReadFilter() {
315         // Only clear if registered with an EventLoop as otherwise
316         if (isRegistered()) {
317             final EventLoop loop = eventLoop();
318             final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
319             if (loop.inEventLoop()) {
320                 unsafe.clearReadFilter0();
321             } else {
322                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
323                 loop.execute(new Runnable() {
324                     @Override
325                     public void run() {
326                         if (!unsafe.readPending && !config().isAutoRead()) {
327                             // Still no read triggered so clear it now
328                             unsafe.clearReadFilter0();
329                         }
330                     }
331                 });
332             }
333         } else  {
334             // The EventLoop is not registered atm so just update the flags so the correct value
335             // will be used once the channel is registered
336             readFilterEnabled = false;
337         }
338     }
339 
340     void readFilter(boolean readFilterEnabled) throws IOException {
341         if (this.readFilterEnabled != readFilterEnabled) {
342             this.readFilterEnabled = readFilterEnabled;
343             evSet(Native.EVFILT_READ, readFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
344         }
345     }
346 
347     void writeFilter(boolean writeFilterEnabled) throws IOException {
348         if (this.writeFilterEnabled != writeFilterEnabled) {
349             this.writeFilterEnabled = writeFilterEnabled;
350             evSet(Native.EVFILT_WRITE, writeFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
351         }
352     }
353 
354     private void evSet(short filter, short flags) {
355         if (isRegistered()) {
356             evSet0(filter, flags);
357         }
358     }
359 
360     private void evSet0(short filter, short flags) {
361         evSet0(filter, flags, 0);
362     }
363 
364     private void evSet0(short filter, short flags, int fflags) {
365         // Only try to add to changeList if the FD is still open, if not we already closed it in the meantime.
366         if (isOpen()) {
367             ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags);
368         }
369     }
370 
371     @UnstableApi
372     public abstract class AbstractKQueueUnsafe extends AbstractUnsafe {
373         boolean readPending;
374         boolean maybeMoreDataToRead;
375         private KQueueRecvByteAllocatorHandle allocHandle;
376         private final Runnable readReadyRunnable = new Runnable() {
377             @Override
378             public void run() {
379                 readReadyRunnablePending = false;
380                 readReady(recvBufAllocHandle());
381             }
382         };
383 
384         final void readReady(long numberBytesPending) {
385             KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
386             allocHandle.numberBytesPending(numberBytesPending);
387             readReady(allocHandle);
388         }
389 
390         abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
391 
392         final void readReadyBefore() {
393             maybeMoreDataToRead = false;
394         }
395 
396         final void readReadyFinally(ChannelConfig config) {
397             maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
398 
399             if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) {
400                 // trigger a read again as there may be something left to read and because of ET we
401                 // will not get notified again until we read everything from the socket
402                 //
403                 // It is possible the last fireChannelRead call could cause the user to call read() again, or if
404                 // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
405                 // to false before every read operation to prevent re-entry into readReady() we will not read from
406                 // the underlying OS again unless the user happens to call read again.
407                 executeReadReadyRunnable(config);
408             } else if (!readPending && !config.isAutoRead()) {
409                 // Check if there is a readPending which was not processed yet.
410                 // This could be for two reasons:
411                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
412                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
413                 //
414                 // See https://github.com/netty/netty/issues/2254
415                 clearReadFilter0();
416             }
417         }
418 
419         final boolean failConnectPromise(Throwable cause) {
420             if (connectPromise != null) {
421                 // SO_ERROR has been shown to return 0 on macOS if detect an error via read() and the write filter was
422                 // not set before calling connect. This means finishConnect will not detect any error and would
423                 // successfully complete the connectPromise and update the channel state to active (which is incorrect).
424                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
425                 AbstractKQueueChannel.this.connectPromise = null;
426                 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
427                                 : new ConnectException("failed to connect").initCause(cause))) {
428                     closeIfClosed();
429                     return true;
430                 }
431             }
432             return false;
433         }
434 
435         final void writeReady() {
436             if (connectPromise != null) {
437                 // pending connect which is now complete so handle it.
438                 finishConnect();
439             } else if (!socket.isOutputShutdown()) {
440                 // directly call super.flush0() to force a flush now
441                 super.flush0();
442             }
443         }
444 
445         /**
446          * Shutdown the input side of the channel.
447          */
448         void shutdownInput(boolean readEOF) {
449             // We need to take special care of calling finishConnect() if readEOF is true and we not
450             // fullfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
451             // with a ClosedChannelException as a close() will happen and so the FD is closed before we
452             // have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
453             // we observe the correct exception in case of a connect failure.
454             if (readEOF && connectPromise != null) {
455                 finishConnect();
456             }
457             if (!socket.isInputShutdown()) {
458                 if (isAllowHalfClosure(config())) {
459                     try {
460                         socket.shutdown(true, false);
461                     } catch (IOException ignored) {
462                         // We attempted to shutdown and failed, which means the input has already effectively been
463                         // shutdown.
464                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
465                         return;
466                     } catch (NotYetConnectedException ignore) {
467                         // We attempted to shutdown and failed, which means the input has already effectively been
468                         // shutdown.
469                     }
470                     clearReadFilter0();
471                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
472                 } else {
473                     close(voidPromise());
474                 }
475             } else if (!readEOF && !inputClosedSeenErrorOnRead) {
476                 inputClosedSeenErrorOnRead = true;
477                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
478             }
479         }
480 
481         final void readEOF() {
482             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
483             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
484             allocHandle.readEOF();
485 
486             if (isActive()) {
487                 // If it is still active, we need to call readReady as otherwise we may miss to
488                 // read pending data from the underlying file descriptor.
489                 // See https://github.com/netty/netty/issues/3709
490                 readReady(allocHandle);
491             } else {
492                 // Just to be safe make sure the input marked as closed.
493                 shutdownInput(true);
494             }
495 
496             // Clear the RDHUP flag to prevent continuously getting woken up on this event.
497             clearRdHup0();
498         }
499 
500         @Override
501         public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
502             if (allocHandle == null) {
503                 allocHandle = new KQueueRecvByteAllocatorHandle(
504                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
505             }
506             return allocHandle;
507         }
508 
509         @Override
510         protected final void flush0() {
511             // Flush immediately only when there's no pending flush.
512             // If there's a pending flush operation, event loop will call forceFlush() later,
513             // and thus there's no need to call it now.
514             if (!writeFilterEnabled) {
515                 super.flush0();
516             }
517         }
518 
519         final void executeReadReadyRunnable(ChannelConfig config) {
520             if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
521                 return;
522             }
523             readReadyRunnablePending = true;
524             eventLoop().execute(readReadyRunnable);
525         }
526 
527         protected final void clearReadFilter0() {
528             assert eventLoop().inEventLoop();
529             try {
530                 readPending = false;
531                 readFilter(false);
532             } catch (IOException e) {
533                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
534                 // so fire the exception through the pipeline and close the Channel.
535                 pipeline().fireExceptionCaught(e);
536                 unsafe().close(unsafe().voidPromise());
537             }
538         }
539 
540         private void fireEventAndClose(Object evt) {
541             pipeline().fireUserEventTriggered(evt);
542             close(voidPromise());
543         }
544 
545         @Override
546         public void connect(
547                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
548             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
549             // non-blocking io.
550             if (promise.isDone() || !ensureOpen(promise)) {
551                 return;
552             }
553 
554             try {
555                 if (connectPromise != null) {
556                     throw new ConnectionPendingException();
557                 }
558 
559                 boolean wasActive = isActive();
560                 if (doConnect(remoteAddress, localAddress)) {
561                     fulfillConnectPromise(promise, wasActive);
562                 } else {
563                     connectPromise = promise;
564                     requestedRemoteAddress = remoteAddress;
565 
566                     // Schedule connect timeout.
567                     final int connectTimeoutMillis = config().getConnectTimeoutMillis();
568                     if (connectTimeoutMillis > 0) {
569                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
570                             @Override
571                             public void run() {
572                                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
573                                 if (connectPromise != null && !connectPromise.isDone()
574                                         && connectPromise.tryFailure(new ConnectTimeoutException(
575                                                 "connection timed out after " + connectTimeoutMillis + " ms: " +
576                                                         remoteAddress))) {
577                                     close(voidPromise());
578                                 }
579                             }
580                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
581                     }
582 
583                     promise.addListener(new ChannelFutureListener() {
584                         @Override
585                         public void operationComplete(ChannelFuture future) {
586                             // If the connect future is cancelled we also cancel the timeout and close the
587                             // underlying socket.
588                             if (future.isCancelled()) {
589                                 if (connectTimeoutFuture != null) {
590                                     connectTimeoutFuture.cancel(false);
591                                 }
592                                 connectPromise = null;
593                                 close(voidPromise());
594                             }
595                         }
596                     });
597                 }
598             } catch (Throwable t) {
599                 closeIfClosed();
600                 promise.tryFailure(annotateConnectException(t, remoteAddress));
601             }
602         }
603 
604         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
605             if (promise == null) {
606                 // Closed via cancellation and the promise has been notified already.
607                 return;
608             }
609             active = true;
610 
611             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
612             // We still need to ensure we call fireChannelActive() in this case.
613             boolean active = isActive();
614 
615             // trySuccess() will return false if a user cancelled the connection attempt.
616             boolean promiseSet = promise.trySuccess();
617 
618             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
619             // because what happened is what happened.
620             if (!wasActive && active) {
621                 pipeline().fireChannelActive();
622             }
623 
624             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
625             if (!promiseSet) {
626                 close(voidPromise());
627             }
628         }
629 
630         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
631             if (promise == null) {
632                 // Closed via cancellation and the promise has been notified already.
633                 return;
634             }
635 
636             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
637             promise.tryFailure(cause);
638             closeIfClosed();
639         }
640 
641         private void finishConnect() {
642             // Note this method is invoked by the event loop only if the connection attempt was
643             // neither cancelled nor timed out.
644 
645             assert eventLoop().inEventLoop();
646 
647             boolean connectStillInProgress = false;
648             try {
649                 boolean wasActive = isActive();
650                 if (!doFinishConnect()) {
651                     connectStillInProgress = true;
652                     return;
653                 }
654                 fulfillConnectPromise(connectPromise, wasActive);
655             } catch (Throwable t) {
656                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
657             } finally {
658                 if (!connectStillInProgress) {
659                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
660                     // See https://github.com/netty/netty/issues/1770
661                     if (connectTimeoutFuture != null) {
662                         connectTimeoutFuture.cancel(false);
663                     }
664                     connectPromise = null;
665                 }
666             }
667         }
668 
669         private boolean doFinishConnect() throws Exception {
670             if (socket.finishConnect()) {
671                 writeFilter(false);
672                 if (requestedRemoteAddress instanceof InetSocketAddress) {
673                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
674                 }
675                 requestedRemoteAddress = null;
676                 return true;
677             }
678             writeFilter(true);
679             return false;
680         }
681     }
682 
683     @Override
684     protected void doBind(SocketAddress local) throws Exception {
685         if (local instanceof InetSocketAddress) {
686             checkResolvable((InetSocketAddress) local);
687         }
688         socket.bind(local);
689         this.local = socket.localAddress();
690     }
691 
692     /**
693      * Connect to the remote peer
694      */
695     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
696         if (localAddress instanceof InetSocketAddress) {
697             checkResolvable((InetSocketAddress) localAddress);
698         }
699 
700         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
701                 ? (InetSocketAddress) remoteAddress : null;
702         if (remoteSocketAddr != null) {
703             checkResolvable(remoteSocketAddr);
704         }
705 
706         if (remote != null) {
707             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
708             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
709             // later.
710             throw new AlreadyConnectedException();
711         }
712 
713         if (localAddress != null) {
714             socket.bind(localAddress);
715         }
716 
717         boolean connected = doConnect0(remoteAddress, localAddress);
718         if (connected) {
719             remote = remoteSocketAddr == null?
720                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
721         }
722         // We always need to set the localAddress even if not connected yet as the bind already took place.
723         //
724         // See https://github.com/netty/netty/issues/3463
725         local = socket.localAddress();
726         return connected;
727     }
728 
729     protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
730         boolean success = false;
731         try {
732             boolean connected = socket.connect(remoteAddress);
733             if (!connected) {
734                 writeFilter(true);
735             }
736             success = true;
737             return connected;
738         } finally {
739             if (!success) {
740                 doClose();
741             }
742         }
743     }
744 
745     @Override
746     protected SocketAddress localAddress0() {
747         return local;
748     }
749 
750     @Override
751     protected SocketAddress remoteAddress0() {
752         return remote;
753     }
754 }