View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelMetadata;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.RecvByteBufAllocator;
34  import io.netty.channel.socket.ChannelInputShutdownEvent;
35  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36  import io.netty.channel.socket.SocketChannelConfig;
37  import io.netty.channel.unix.FileDescriptor;
38  import io.netty.channel.unix.UnixChannel;
39  import io.netty.util.ReferenceCountUtil;
40  
41  import java.io.IOException;
42  import java.net.ConnectException;
43  import java.net.InetSocketAddress;
44  import java.net.SocketAddress;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.AlreadyConnectedException;
47  import java.nio.channels.ConnectionPendingException;
48  import java.nio.channels.NotYetConnectedException;
49  import java.nio.channels.UnresolvedAddressException;
50  import java.util.concurrent.ScheduledFuture;
51  import java.util.concurrent.TimeUnit;
52  
53  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
54  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
55  import static io.netty.util.internal.ObjectUtil.checkNotNull;
56  
57  abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
58      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
59      /**
60       * The future of the current connection attempt.  If not null, subsequent
61       * connection attempts will fail.
62       */
63      private ChannelPromise connectPromise;
64      private ScheduledFuture<?> connectTimeoutFuture;
65      private SocketAddress requestedRemoteAddress;
66  
67      final BsdSocket socket;
68      private boolean readFilterEnabled = true;
69      private boolean writeFilterEnabled;
70      boolean readReadyRunnablePending;
71      boolean inputClosedSeenErrorOnRead;
72      /**
73       * This member variable means we don't have to have a map in {@link KQueueEventLoop} which associates the FDs
74       * from kqueue to instances of this class. This field will be initialized by JNI when modifying kqueue events.
75       * If there is no global reference when JNI gets a kqueue evSet call (aka this field is 0) then a global reference
76       * will be created and the address will be saved in this member variable. Then when we process a kevent in Java
77       * we can ask JNI to give us the {@link AbstractKQueueChannel} that corresponds to that event.
78       */
79      long jniSelfPtr;
80  
81      protected volatile boolean active;
82      private volatile SocketAddress local;
83      private volatile SocketAddress remote;
84  
85      AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
86          super(parent);
87          socket = checkNotNull(fd, "fd");
88          this.active = active;
89          if (active) {
90              // Directly cache the remote and local addresses
91              // See https://github.com/netty/netty/issues/2359
92              local = fd.localAddress();
93              remote = fd.remoteAddress();
94          }
95      }
96  
97      AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
98          super(parent);
99          socket = checkNotNull(fd, "fd");
100         active = true;
101         // Directly cache the remote and local addresses
102         // See https://github.com/netty/netty/issues/2359
103         this.remote = remote;
104         local = fd.localAddress();
105     }
106 
107     static boolean isSoErrorZero(BsdSocket fd) {
108         try {
109             return fd.getSoError() == 0;
110         } catch (IOException e) {
111             throw new ChannelException(e);
112         }
113     }
114 
115     @Override
116     public final FileDescriptor fd() {
117         return socket;
118     }
119 
120     @Override
121     public boolean isActive() {
122         return active;
123     }
124 
125     @Override
126     public ChannelMetadata metadata() {
127         return METADATA;
128     }
129 
130     @Override
131     protected void doClose() throws Exception {
132         active = false;
133         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
134         // socket which has not even been connected yet. This has been observed to block during unit tests.
135         inputClosedSeenErrorOnRead = true;
136         try {
137             if (isRegistered()) {
138                 // The FD will be closed, which should take care of deleting any associated events from kqueue, but
139                 // since we rely upon jniSelfRef to be consistent we make sure that we clear this reference out for
140                 // all events which are pending in kqueue to avoid referencing a deleted pointer at a later time.
141 
142                 // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
143                 // if SO_LINGER is used.
144                 //
145                 // See https://github.com/netty/netty/issues/7159
146                 EventLoop loop = eventLoop();
147                 if (loop.inEventLoop()) {
148                     doDeregister();
149                 } else {
150                     loop.execute(new Runnable() {
151                         @Override
152                         public void run() {
153                             try {
154                                 doDeregister();
155                             } catch (Throwable cause) {
156                                 pipeline().fireExceptionCaught(cause);
157                             }
158                         }
159                     });
160                 }
161             }
162         } finally {
163             socket.close();
164         }
165     }
166 
167     @Override
168     protected void doDisconnect() throws Exception {
169         doClose();
170     }
171 
172     @Override
173     protected boolean isCompatible(EventLoop loop) {
174         return loop instanceof KQueueEventLoop;
175     }
176 
177     @Override
178     public boolean isOpen() {
179         return socket.isOpen();
180     }
181 
182     @Override
183     protected void doDeregister() throws Exception {
184         // Make sure we unregister our filters from kqueue!
185         readFilter(false);
186         writeFilter(false);
187         evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0);
188 
189         ((KQueueEventLoop) eventLoop()).remove(this);
190 
191         // Set the filters back to the initial state in case this channel is registered with another event loop.
192         readFilterEnabled = true;
193     }
194 
195     @Override
196     protected final void doBeginRead() throws Exception {
197         // Channel.read() or ChannelHandlerContext.read() was called
198         final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
199         unsafe.readPending = true;
200 
201         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
202         // executeReadReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
203         // never get data after this.
204         readFilter(true);
205 
206         // If auto read was toggled off on the last read loop then we may not be notified
207         // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
208         if (unsafe.maybeMoreDataToRead) {
209             unsafe.executeReadReadyRunnable(config());
210         }
211     }
212 
213     @Override
214     protected void doRegister() throws Exception {
215         // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
216         // make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
217         // new EventLoop.
218         readReadyRunnablePending = false;
219         // Add the write event first so we get notified of connection refused on the client side!
220         if (writeFilterEnabled) {
221             evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
222         }
223         if (readFilterEnabled) {
224             evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
225         }
226         evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
227     }
228 
229     @Override
230     protected abstract AbstractKQueueUnsafe newUnsafe();
231 
232     @Override
233     public abstract KQueueChannelConfig config();
234 
235     /**
236      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
237      */
238     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
239         return newDirectBuffer(buf, buf);
240     }
241 
242     /**
243      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
244      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
245      * this method.
246      */
247     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
248         final int readableBytes = buf.readableBytes();
249         if (readableBytes == 0) {
250             ReferenceCountUtil.release(holder);
251             return Unpooled.EMPTY_BUFFER;
252         }
253 
254         final ByteBufAllocator alloc = alloc();
255         if (alloc.isDirectBufferPooled()) {
256             return newDirectBuffer0(holder, buf, alloc, readableBytes);
257         }
258 
259         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
260         if (directBuf == null) {
261             return newDirectBuffer0(holder, buf, alloc, readableBytes);
262         }
263 
264         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
265         ReferenceCountUtil.safeRelease(holder);
266         return directBuf;
267     }
268 
269     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
270         final ByteBuf directBuf = alloc.directBuffer(capacity);
271         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
272         ReferenceCountUtil.safeRelease(holder);
273         return directBuf;
274     }
275 
276     protected static void checkResolvable(InetSocketAddress addr) {
277         if (addr.isUnresolved()) {
278             throw new UnresolvedAddressException();
279         }
280     }
281 
282     /**
283      * Read bytes into the given {@link ByteBuf} and return the amount.
284      */
285     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
286         int writerIndex = byteBuf.writerIndex();
287         int localReadAmount;
288         unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
289         if (byteBuf.hasMemoryAddress()) {
290             localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
291         } else {
292             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
293             localReadAmount = socket.read(buf, buf.position(), buf.limit());
294         }
295         if (localReadAmount > 0) {
296             byteBuf.writerIndex(writerIndex + localReadAmount);
297         }
298         return localReadAmount;
299     }
300 
301     protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
302         if (buf.hasMemoryAddress()) {
303             int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
304             if (localFlushedAmount > 0) {
305                 in.removeBytes(localFlushedAmount);
306                 return 1;
307             }
308         } else {
309             final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
310                     buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
311             int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
312             if (localFlushedAmount > 0) {
313                 nioBuf.position(nioBuf.position() + localFlushedAmount);
314                 in.removeBytes(localFlushedAmount);
315                 return 1;
316             }
317         }
318         return WRITE_STATUS_SNDBUF_FULL;
319     }
320 
321     final boolean shouldBreakReadReady(ChannelConfig config) {
322         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
323     }
324 
325     private static boolean isAllowHalfClosure(ChannelConfig config) {
326         return config instanceof SocketChannelConfig &&
327                 ((SocketChannelConfig) config).isAllowHalfClosure();
328     }
329 
330     final void clearReadFilter() {
331         // Only clear if registered with an EventLoop as otherwise
332         if (isRegistered()) {
333             final EventLoop loop = eventLoop();
334             final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
335             if (loop.inEventLoop()) {
336                 unsafe.clearReadFilter0();
337             } else {
338                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
339                 loop.execute(new Runnable() {
340                     @Override
341                     public void run() {
342                         if (!unsafe.readPending && !config().isAutoRead()) {
343                             // Still no read triggered so clear it now
344                             unsafe.clearReadFilter0();
345                         }
346                     }
347                 });
348             }
349         } else  {
350             // The EventLoop is not registered atm so just update the flags so the correct value
351             // will be used once the channel is registered
352             readFilterEnabled = false;
353         }
354     }
355 
356     void readFilter(boolean readFilterEnabled) throws IOException {
357         if (this.readFilterEnabled != readFilterEnabled) {
358             this.readFilterEnabled = readFilterEnabled;
359             evSet(Native.EVFILT_READ, readFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
360         }
361     }
362 
363     void writeFilter(boolean writeFilterEnabled) throws IOException {
364         if (this.writeFilterEnabled != writeFilterEnabled) {
365             this.writeFilterEnabled = writeFilterEnabled;
366             evSet(Native.EVFILT_WRITE, writeFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
367         }
368     }
369 
370     private void evSet(short filter, short flags) {
371         if (isOpen() && isRegistered()) {
372             evSet0(filter, flags);
373         }
374     }
375 
376     private void evSet0(short filter, short flags) {
377         evSet0(filter, flags, 0);
378     }
379 
380     private void evSet0(short filter, short flags, int fflags) {
381         ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags);
382     }
383 
384     abstract class AbstractKQueueUnsafe extends AbstractUnsafe {
385         boolean readPending;
386         boolean maybeMoreDataToRead;
387         private KQueueRecvByteAllocatorHandle allocHandle;
388         private final Runnable readReadyRunnable = new Runnable() {
389             @Override
390             public void run() {
391                 readReadyRunnablePending = false;
392                 readReady(recvBufAllocHandle());
393             }
394         };
395 
396         final void readReady(long numberBytesPending) {
397             KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
398             allocHandle.numberBytesPending(numberBytesPending);
399             readReady(allocHandle);
400         }
401 
402         abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
403 
404         final void readReadyBefore() { maybeMoreDataToRead = false; }
405 
406         final void readReadyFinally(ChannelConfig config) {
407             maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
408             // Check if there is a readPending which was not processed yet.
409             // This could be for two reasons:
410             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
411             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
412             //
413             // See https://github.com/netty/netty/issues/2254
414             if (!readPending && !config.isAutoRead()) {
415                 clearReadFilter0();
416             } else if (readPending && maybeMoreDataToRead) {
417                 // trigger a read again as there may be something left to read and because of ET we
418                 // will not get notified again until we read everything from the socket
419                 //
420                 // It is possible the last fireChannelRead call could cause the user to call read() again, or if
421                 // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
422                 // to false before every read operation to prevent re-entry into readReady() we will not read from
423                 // the underlying OS again unless the user happens to call read again.
424                 executeReadReadyRunnable(config);
425             }
426         }
427 
428         final boolean failConnectPromise(Throwable cause) {
429             if (connectPromise != null) {
430                 // SO_ERROR has been shown to return 0 on macOS if detect an error via read() and the write filter was
431                 // not set before calling connect. This means finishConnect will not detect any error and would
432                 // successfully complete the connectPromise and update the channel state to active (which is incorrect).
433                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
434                 AbstractKQueueChannel.this.connectPromise = null;
435                 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
436                                 : new ConnectException("failed to connect").initCause(cause))) {
437                     closeIfClosed();
438                     return true;
439                 }
440             }
441             return false;
442         }
443 
444         final void writeReady() {
445             if (connectPromise != null) {
446                 // pending connect which is now complete so handle it.
447                 finishConnect();
448             } else if (!socket.isOutputShutdown()) {
449                 // directly call super.flush0() to force a flush now
450                 super.flush0();
451             }
452         }
453 
454         /**
455          * Shutdown the input side of the channel.
456          */
457         void shutdownInput(boolean readEOF) {
458             // We need to take special care of calling finishConnect() if readEOF is true and we not
459             // fullfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
460             // with a ClosedChannelException as a close() will happen and so the FD is closed before we
461             // have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
462             // we observe the correct exception in case of a connect failure.
463             if (readEOF && connectPromise != null) {
464                 finishConnect();
465             }
466             if (!socket.isInputShutdown()) {
467                 if (isAllowHalfClosure(config())) {
468                     try {
469                         socket.shutdown(true, false);
470                     } catch (IOException ignored) {
471                         // We attempted to shutdown and failed, which means the input has already effectively been
472                         // shutdown.
473                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
474                         return;
475                     } catch (NotYetConnectedException ignore) {
476                         // We attempted to shutdown and failed, which means the input has already effectively been
477                         // shutdown.
478                     }
479                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
480                 } else {
481                     close(voidPromise());
482                 }
483             } else if (!readEOF) {
484                 inputClosedSeenErrorOnRead = true;
485                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
486             }
487         }
488 
489         final void readEOF() {
490             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
491             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
492             allocHandle.readEOF();
493 
494             if (isActive()) {
495                 // If it is still active, we need to call readReady as otherwise we may miss to
496                 // read pending data from the underlying file descriptor.
497                 // See https://github.com/netty/netty/issues/3709
498                 readReady(allocHandle);
499             } else {
500                 // Just to be safe make sure the input marked as closed.
501                 shutdownInput(true);
502             }
503         }
504 
505         @Override
506         public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
507             if (allocHandle == null) {
508                 allocHandle = new KQueueRecvByteAllocatorHandle(
509                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
510             }
511             return allocHandle;
512         }
513 
514         @Override
515         protected final void flush0() {
516             // Flush immediately only when there's no pending flush.
517             // If there's a pending flush operation, event loop will call forceFlush() later,
518             // and thus there's no need to call it now.
519             if (!writeFilterEnabled) {
520                 super.flush0();
521             }
522         }
523 
524         final void executeReadReadyRunnable(ChannelConfig config) {
525             if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
526                 return;
527             }
528             readReadyRunnablePending = true;
529             eventLoop().execute(readReadyRunnable);
530         }
531 
532         protected final void clearReadFilter0() {
533             assert eventLoop().inEventLoop();
534             try {
535                 readPending = false;
536                 readFilter(false);
537             } catch (IOException e) {
538                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
539                 // so fire the exception through the pipeline and close the Channel.
540                 pipeline().fireExceptionCaught(e);
541                 unsafe().close(unsafe().voidPromise());
542             }
543         }
544 
545         private void fireEventAndClose(Object evt) {
546             pipeline().fireUserEventTriggered(evt);
547             close(voidPromise());
548         }
549 
550         @Override
551         public void connect(
552                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
553             if (!promise.setUncancellable() || !ensureOpen(promise)) {
554                 return;
555             }
556 
557             try {
558                 if (connectPromise != null) {
559                     throw new ConnectionPendingException();
560                 }
561 
562                 boolean wasActive = isActive();
563                 if (doConnect(remoteAddress, localAddress)) {
564                     fulfillConnectPromise(promise, wasActive);
565                 } else {
566                     connectPromise = promise;
567                     requestedRemoteAddress = remoteAddress;
568 
569                     // Schedule connect timeout.
570                     int connectTimeoutMillis = config().getConnectTimeoutMillis();
571                     if (connectTimeoutMillis > 0) {
572                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
573                             @Override
574                             public void run() {
575                                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
576                                 ConnectTimeoutException cause =
577                                         new ConnectTimeoutException("connection timed out: " + remoteAddress);
578                                 if (connectPromise != null && connectPromise.tryFailure(cause)) {
579                                     close(voidPromise());
580                                 }
581                             }
582                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
583                     }
584 
585                     promise.addListener(new ChannelFutureListener() {
586                         @Override
587                         public void operationComplete(ChannelFuture future) throws Exception {
588                             if (future.isCancelled()) {
589                                 if (connectTimeoutFuture != null) {
590                                     connectTimeoutFuture.cancel(false);
591                                 }
592                                 connectPromise = null;
593                                 close(voidPromise());
594                             }
595                         }
596                     });
597                 }
598             } catch (Throwable t) {
599                 closeIfClosed();
600                 promise.tryFailure(annotateConnectException(t, remoteAddress));
601             }
602         }
603 
604         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
605             if (promise == null) {
606                 // Closed via cancellation and the promise has been notified already.
607                 return;
608             }
609             active = true;
610 
611             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
612             // We still need to ensure we call fireChannelActive() in this case.
613             boolean active = isActive();
614 
615             // trySuccess() will return false if a user cancelled the connection attempt.
616             boolean promiseSet = promise.trySuccess();
617 
618             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
619             // because what happened is what happened.
620             if (!wasActive && active) {
621                 pipeline().fireChannelActive();
622             }
623 
624             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
625             if (!promiseSet) {
626                 close(voidPromise());
627             }
628         }
629 
630         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
631             if (promise == null) {
632                 // Closed via cancellation and the promise has been notified already.
633                 return;
634             }
635 
636             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
637             promise.tryFailure(cause);
638             closeIfClosed();
639         }
640 
641         private void finishConnect() {
642             // Note this method is invoked by the event loop only if the connection attempt was
643             // neither cancelled nor timed out.
644 
645             assert eventLoop().inEventLoop();
646 
647             boolean connectStillInProgress = false;
648             try {
649                 boolean wasActive = isActive();
650                 if (!doFinishConnect()) {
651                     connectStillInProgress = true;
652                     return;
653                 }
654                 fulfillConnectPromise(connectPromise, wasActive);
655             } catch (Throwable t) {
656                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
657             } finally {
658                 if (!connectStillInProgress) {
659                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
660                     // See https://github.com/netty/netty/issues/1770
661                     if (connectTimeoutFuture != null) {
662                         connectTimeoutFuture.cancel(false);
663                     }
664                     connectPromise = null;
665                 }
666             }
667         }
668 
669         private boolean doFinishConnect() throws Exception {
670             if (socket.finishConnect()) {
671                 writeFilter(false);
672                 if (requestedRemoteAddress instanceof InetSocketAddress) {
673                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
674                 }
675                 requestedRemoteAddress = null;
676                 return true;
677             }
678             writeFilter(true);
679             return false;
680         }
681     }
682 
683     @Override
684     protected void doBind(SocketAddress local) throws Exception {
685         if (local instanceof InetSocketAddress) {
686             checkResolvable((InetSocketAddress) local);
687         }
688         socket.bind(local);
689         this.local = socket.localAddress();
690     }
691 
692     /**
693      * Connect to the remote peer
694      */
695     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
696         if (localAddress instanceof InetSocketAddress) {
697             checkResolvable((InetSocketAddress) localAddress);
698         }
699 
700         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
701                 ? (InetSocketAddress) remoteAddress : null;
702         if (remoteSocketAddr != null) {
703             checkResolvable(remoteSocketAddr);
704         }
705 
706         if (remote != null) {
707             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
708             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
709             // later.
710             throw new AlreadyConnectedException();
711         }
712 
713         if (localAddress != null) {
714             socket.bind(localAddress);
715         }
716 
717         boolean connected = doConnect0(remoteAddress);
718         if (connected) {
719             remote = remoteSocketAddr == null ?
720                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
721         }
722         // We always need to set the localAddress even if not connected yet as the bind already took place.
723         //
724         // See https://github.com/netty/netty/issues/3463
725         local = socket.localAddress();
726         return connected;
727     }
728 
729     private boolean doConnect0(SocketAddress remote) throws Exception {
730         boolean success = false;
731         try {
732             boolean connected = socket.connect(remote);
733             if (!connected) {
734                 writeFilter(true);
735             }
736             success = true;
737             return connected;
738         } finally {
739             if (!success) {
740                 doClose();
741             }
742         }
743     }
744 
745     @Override
746     protected SocketAddress localAddress0() {
747         return local;
748     }
749 
750     @Override
751     protected SocketAddress remoteAddress0() {
752         return remote;
753     }
754 }