View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelMetadata;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.socket.ChannelInputShutdownEvent;
38  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39  import io.netty.channel.socket.SocketChannelConfig;
40  import io.netty.channel.unix.FileDescriptor;
41  import io.netty.channel.unix.UnixChannel;
42  import io.netty.util.ReferenceCountUtil;
43  import io.netty.util.concurrent.Future;
44  import io.netty.util.internal.UnstableApi;
45  
46  import java.io.IOException;
47  import java.net.ConnectException;
48  import java.net.InetSocketAddress;
49  import java.net.SocketAddress;
50  import java.nio.ByteBuffer;
51  import java.nio.channels.AlreadyConnectedException;
52  import java.nio.channels.ConnectionPendingException;
53  import java.nio.channels.NotYetConnectedException;
54  import java.nio.channels.UnresolvedAddressException;
55  import java.util.concurrent.TimeUnit;
56  
57  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
58  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
59  import static io.netty.util.internal.ObjectUtil.checkNotNull;
60  
61  abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
62  
63      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64      /**
65       * The future of the current connection attempt.  If not null, subsequent
66       * connection attempts will fail.
67       */
68      private ChannelPromise connectPromise;
69      private Future<?> connectTimeoutFuture;
70      private SocketAddress requestedRemoteAddress;
71  
72      final BsdSocket socket;
73      private IoRegistration registration;
74      private boolean readFilterEnabled;
75      private boolean writeFilterEnabled;
76  
77      boolean readReadyRunnablePending;
78      boolean inputClosedSeenErrorOnRead;
79      protected volatile boolean active;
80      private volatile SocketAddress local;
81      private volatile SocketAddress remote;
82  
83      AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
84          super(parent);
85          socket = checkNotNull(fd, "fd");
86          this.active = active;
87          if (active) {
88              // Directly cache the remote and local addresses
89              // See https://github.com/netty/netty/issues/2359
90              local = fd.localAddress();
91              remote = fd.remoteAddress();
92          }
93      }
94  
95      AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
96          super(parent);
97          socket = checkNotNull(fd, "fd");
98          active = true;
99          // Directly cache the remote and local addresses
100         // See https://github.com/netty/netty/issues/2359
101         this.remote = remote;
102         local = fd.localAddress();
103     }
104 
105     @Override
106     protected boolean isCompatible(EventLoop loop) {
107         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractKQueueUnsafe.class);
108     }
109 
110     static boolean isSoErrorZero(BsdSocket fd) {
111         try {
112             return fd.getSoError() == 0;
113         } catch (IOException e) {
114             throw new ChannelException(e);
115         }
116     }
117 
118     protected final IoRegistration registration() {
119         assert registration != null;
120         return registration;
121     }
122 
123     @Override
124     public final FileDescriptor fd() {
125         return socket;
126     }
127 
128     @Override
129     public boolean isActive() {
130         return active;
131     }
132 
133     @Override
134     public ChannelMetadata metadata() {
135         return METADATA;
136     }
137 
138     @Override
139     protected void doClose() throws Exception {
140         // Clear kqueue registrations *before* we close the file descriptor.
141         // Otherwise, the filter removals might hit a reused file descriptor.
142         doDeregister();
143 
144         active = false;
145         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
146         // socket which has not even been connected yet. This has been observed to block during unit tests.
147         inputClosedSeenErrorOnRead = true;
148         socket.close();
149     }
150 
151     @Override
152     protected void doDisconnect() throws Exception {
153         doClose();
154     }
155 
156     void resetCachedAddresses() {
157         local = socket.localAddress();
158         remote = socket.remoteAddress();
159     }
160 
161     @Override
162     public boolean isOpen() {
163         return socket.isOpen();
164     }
165 
166     @Override
167     protected void doDeregister() throws Exception {
168         IoRegistration registration = this.registration;
169         if (registration != null) {
170             // As unregisteredFilters() may have not been called because isOpen() returned false we just set both
171             // filters to false, to ensure a consistent state in all cases.
172             // Make sure we unregister our filters from kqueue!
173             readFilter(false);
174             writeFilter(false);
175             clearRdHup0();
176 
177             registration.cancel();
178             this.registration = null;
179         }
180     }
181 
182     private void clearRdHup0() {
183         submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP));
184     }
185 
186     private void submit(KQueueIoOps ops) {
187         if (!isOpen()) {
188             // If the channel was closed in the meantime we should just drop the ops as otherwise we might end up
189             // affected another channel that re-used the fd.
190             return;
191         }
192         try {
193             registration.submit(ops);
194         } catch (Exception e) {
195             throw new ChannelException(e);
196         }
197     }
198 
199     @Override
200     protected final void doBeginRead() throws Exception {
201         // Channel.read() or ChannelHandlerContext.read() was called
202         final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
203         unsafe.readPending = true;
204 
205         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
206         // executeReadReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
207         // never get data after this.
208         readFilter(true);
209     }
210 
211     @Override
212     protected void doRegister(ChannelPromise promise) {
213         ((IoEventLoop) eventLoop()).register((AbstractKQueueUnsafe) unsafe()).addListener(f -> {
214             if (f.isSuccess()) {
215                 this.registration = (IoRegistration) f.getNow();
216                 // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old
217                 // EventLoop make sure the readReadyRunnablePending variable is reset so we will be able to execute
218                 // the Runnable on the new EventLoop.
219                 readReadyRunnablePending = false;
220 
221                 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP));
222 
223                 // Add the write event first so we get notified of connection refused on the client side!
224                 if (writeFilterEnabled) {
225                     submit(Native.WRITE_ENABLED_OPS);
226                 }
227                 if (readFilterEnabled) {
228                     submit(Native.READ_ENABLED_OPS);
229                 }
230                 promise.setSuccess();
231             } else {
232                 promise.setFailure(f.cause());
233             }
234         });
235     }
236 
237     @Override
238     protected abstract AbstractKQueueUnsafe newUnsafe();
239 
240     @Override
241     public abstract KQueueChannelConfig config();
242 
243     /**
244      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
245      */
246     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
247         return newDirectBuffer(buf, buf);
248     }
249 
250     /**
251      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
252      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
253      * this method.
254      */
255     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
256         final int readableBytes = buf.readableBytes();
257         if (readableBytes == 0) {
258             ReferenceCountUtil.release(holder);
259             return Unpooled.EMPTY_BUFFER;
260         }
261 
262         final ByteBufAllocator alloc = alloc();
263         if (alloc.isDirectBufferPooled()) {
264             return newDirectBuffer0(holder, buf, alloc, readableBytes);
265         }
266 
267         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
268         if (directBuf == null) {
269             return newDirectBuffer0(holder, buf, alloc, readableBytes);
270         }
271 
272         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
273         ReferenceCountUtil.safeRelease(holder);
274         return directBuf;
275     }
276 
277     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
278         final ByteBuf directBuf = alloc.directBuffer(capacity);
279         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
280         ReferenceCountUtil.safeRelease(holder);
281         return directBuf;
282     }
283 
284     protected static void checkResolvable(InetSocketAddress addr) {
285         if (addr.isUnresolved()) {
286             throw new UnresolvedAddressException();
287         }
288     }
289 
290     /**
291      * Read bytes into the given {@link ByteBuf} and return the amount.
292      */
293     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
294         int writerIndex = byteBuf.writerIndex();
295         int localReadAmount;
296         unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
297         if (byteBuf.hasMemoryAddress()) {
298             localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
299         } else {
300             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
301             localReadAmount = socket.read(buf, buf.position(), buf.limit());
302         }
303         if (localReadAmount > 0) {
304             byteBuf.writerIndex(writerIndex + localReadAmount);
305         }
306         return localReadAmount;
307     }
308 
309     protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
310         if (buf.hasMemoryAddress()) {
311             int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
312             if (localFlushedAmount > 0) {
313                 in.removeBytes(localFlushedAmount);
314                 return 1;
315             }
316         } else {
317             final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
318                     buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
319             int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
320             if (localFlushedAmount > 0) {
321                 nioBuf.position(nioBuf.position() + localFlushedAmount);
322                 in.removeBytes(localFlushedAmount);
323                 return 1;
324             }
325         }
326         return WRITE_STATUS_SNDBUF_FULL;
327     }
328 
329     final boolean shouldBreakReadReady(ChannelConfig config) {
330         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
331     }
332 
333     private static boolean isAllowHalfClosure(ChannelConfig config) {
334         if (config instanceof KQueueDomainSocketChannelConfig) {
335             return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
336         }
337 
338         return config instanceof SocketChannelConfig &&
339                 ((SocketChannelConfig) config).isAllowHalfClosure();
340     }
341 
342     final void clearReadFilter() {
343         // Only clear if registered with an EventLoop as otherwise
344         if (isRegistered()) {
345             final EventLoop loop = eventLoop();
346             final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
347             if (loop.inEventLoop()) {
348                 unsafe.clearReadFilter0();
349             } else {
350                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
351                 loop.execute(new Runnable() {
352                     @Override
353                     public void run() {
354                         if (!unsafe.readPending && !config().isAutoRead()) {
355                             // Still no read triggered so clear it now
356                             unsafe.clearReadFilter0();
357                         }
358                     }
359                 });
360             }
361         } else  {
362             // The EventLoop is not registered atm so just update the flags so the correct value
363             // will be used once the channel is registered
364             readFilterEnabled = false;
365         }
366     }
367 
368     void readFilter(boolean readFilterEnabled) throws IOException {
369         if (this.readFilterEnabled != readFilterEnabled) {
370             this.readFilterEnabled = readFilterEnabled;
371             submit(readFilterEnabled ? Native.READ_ENABLED_OPS : Native.READ_DISABLED_OPS);
372         }
373     }
374 
375     void writeFilter(boolean writeFilterEnabled) throws IOException {
376         if (this.writeFilterEnabled != writeFilterEnabled) {
377             this.writeFilterEnabled = writeFilterEnabled;
378             submit(writeFilterEnabled ? Native.WRITE_ENABLED_OPS : Native.WRITE_DISABLED_OPS);
379         }
380     }
381 
382     @UnstableApi
383     public abstract class AbstractKQueueUnsafe extends AbstractUnsafe implements KQueueIoHandle {
384         boolean readPending;
385         private KQueueRecvByteAllocatorHandle allocHandle;
386 
387         Channel channel() {
388             return AbstractKQueueChannel.this;
389         }
390 
391         @Override
392         public int ident() {
393             return fd().intValue();
394         }
395 
396         @Override
397         public void close() {
398             close(voidPromise());
399         }
400 
401         @Override
402         public void handle(IoRegistration registration, IoEvent event) {
403             KQueueIoEvent kqueueEvent = (KQueueIoEvent) event;
404             final short filter = kqueueEvent.filter();
405             final short flags = kqueueEvent.flags();
406             final int fflags = kqueueEvent.fflags();
407             final long data = kqueueEvent.data();
408 
409             // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
410             // to read from the file descriptor.
411             if (filter == Native.EVFILT_WRITE) {
412                 writeReady();
413             } else if (filter == Native.EVFILT_READ) {
414                 // Check READ before EOF to ensure all data is read before shutting down the input.
415                 KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
416                 readReady(allocHandle);
417             } else if (filter == Native.EVFILT_SOCK && (fflags & Native.NOTE_RDHUP) != 0) {
418                 readEOF();
419                 return;
420             }
421 
422             // Check if EV_EOF was set, this will notify us for connection-reset in which case
423             // we may close the channel directly or try to read more data depending on the state of the
424             // Channel and also depending on the AbstractKQueueChannel subtype.
425             if ((flags & Native.EV_EOF) != 0) {
426                 readEOF();
427             }
428         }
429 
430         abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
431 
432         final boolean shouldStopReading(ChannelConfig config) {
433             // Check if there is a readPending which was not processed yet.
434             // This could be for two reasons:
435             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
436             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
437             //
438             // See https://github.com/netty/netty/issues/2254
439             return !readPending && !config.isAutoRead();
440         }
441 
442         final boolean failConnectPromise(Throwable cause) {
443             if (connectPromise != null) {
444                 // SO_ERROR has been shown to return 0 on macOS if detect an error via read() and the write filter was
445                 // not set before calling connect. This means finishConnect will not detect any error and would
446                 // successfully complete the connectPromise and update the channel state to active (which is incorrect).
447                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
448                 AbstractKQueueChannel.this.connectPromise = null;
449                 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
450                                 : new ConnectException("failed to connect").initCause(cause))) {
451                     closeIfClosed();
452                     return true;
453                 }
454             }
455             return false;
456         }
457 
458         private void writeReady() {
459             if (connectPromise != null) {
460                 // pending connect which is now complete so handle it.
461                 finishConnect();
462             } else if (!socket.isOutputShutdown()) {
463                 // directly call super.flush0() to force a flush now
464                 super.flush0();
465             }
466         }
467 
468         /**
469          * Shutdown the input side of the channel.
470          */
471         void shutdownInput(boolean readEOF) {
472             // We need to take special care of calling finishConnect() if readEOF is true and we not
473             // fulfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
474             // with a ClosedChannelException as a close() will happen and so the FD is closed before we
475             // have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
476             // we observe the correct exception in case of a connect failure.
477             if (readEOF && connectPromise != null) {
478                 finishConnect();
479             }
480             if (!socket.isInputShutdown()) {
481                 if (isAllowHalfClosure(config())) {
482                     try {
483                         socket.shutdown(true, false);
484                     } catch (IOException ignored) {
485                         // We attempted to shutdown and failed, which means the input has already effectively been
486                         // shutdown.
487                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
488                         return;
489                     } catch (NotYetConnectedException ignore) {
490                         // We attempted to shutdown and failed, which means the input has already effectively been
491                         // shutdown.
492                     }
493                     if (shouldStopReading(config())) {
494                         clearReadFilter0();
495                     }
496                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
497                 } else {
498                     close(voidPromise());
499                     return;
500                 }
501             }
502             if (!readEOF && !inputClosedSeenErrorOnRead) {
503                 inputClosedSeenErrorOnRead = true;
504                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
505             }
506         }
507 
508         private void readEOF() {
509             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
510             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
511             allocHandle.readEOF();
512 
513             if (isActive()) {
514                 // If it is still active, we need to call readReady as otherwise we may miss to
515                 // read pending data from the underlying file descriptor.
516                 // See https://github.com/netty/netty/issues/3709
517                 readReady(allocHandle);
518             } else {
519                 // Just to be safe make sure the input marked as closed.
520                 shutdownInput(true);
521             }
522 
523             // Clear the RDHUP flag to prevent continuously getting woken up on this event.
524             clearRdHup0();
525         }
526 
527         @Override
528         public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
529             if (allocHandle == null) {
530                 allocHandle = new KQueueRecvByteAllocatorHandle(
531                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
532             }
533             return allocHandle;
534         }
535 
536         @Override
537         protected final void flush0() {
538             // Flush immediately only when there's no pending flush.
539             // If there's a pending flush operation, event loop will call forceFlush() later,
540             // and thus there's no need to call it now.
541             if (!writeFilterEnabled) {
542                 super.flush0();
543             }
544         }
545 
546         protected final void clearReadFilter0() {
547             assert eventLoop().inEventLoop();
548             try {
549                 readPending = false;
550                 readFilter(false);
551             } catch (IOException e) {
552                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
553                 // so fire the exception through the pipeline and close the Channel.
554                 pipeline().fireExceptionCaught(e);
555                 unsafe().close(unsafe().voidPromise());
556             }
557         }
558 
559         private void fireEventAndClose(Object evt) {
560             pipeline().fireUserEventTriggered(evt);
561             close(voidPromise());
562         }
563 
564         @Override
565         public void connect(
566                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
567             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
568             // non-blocking io.
569             if (promise.isDone() || !ensureOpen(promise)) {
570                 return;
571             }
572 
573             try {
574                 if (connectPromise != null) {
575                     throw new ConnectionPendingException();
576                 }
577 
578                 boolean wasActive = isActive();
579                 if (doConnect(remoteAddress, localAddress)) {
580                     fulfillConnectPromise(promise, wasActive);
581                 } else {
582                     connectPromise = promise;
583                     requestedRemoteAddress = remoteAddress;
584 
585                     // Schedule connect timeout.
586                     final int connectTimeoutMillis = config().getConnectTimeoutMillis();
587                     if (connectTimeoutMillis > 0) {
588                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
589                             @Override
590                             public void run() {
591                                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
592                                 if (connectPromise != null && !connectPromise.isDone()
593                                         && connectPromise.tryFailure(new ConnectTimeoutException(
594                                                 "connection timed out after " + connectTimeoutMillis + " ms: " +
595                                                         remoteAddress))) {
596                                     close(voidPromise());
597                                 }
598                             }
599                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
600                     }
601 
602                     promise.addListener(new ChannelFutureListener() {
603                         @Override
604                         public void operationComplete(ChannelFuture future) {
605                             // If the connect future is cancelled we also cancel the timeout and close the
606                             // underlying socket.
607                             if (future.isCancelled()) {
608                                 if (connectTimeoutFuture != null) {
609                                     connectTimeoutFuture.cancel(false);
610                                 }
611                                 connectPromise = null;
612                                 close(voidPromise());
613                             }
614                         }
615                     });
616                 }
617             } catch (Throwable t) {
618                 closeIfClosed();
619                 promise.tryFailure(annotateConnectException(t, remoteAddress));
620             }
621         }
622 
623         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
624             if (promise == null) {
625                 // Closed via cancellation and the promise has been notified already.
626                 return;
627             }
628             active = true;
629 
630             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
631             // We still need to ensure we call fireChannelActive() in this case.
632             boolean active = isActive();
633 
634             // trySuccess() will return false if a user cancelled the connection attempt.
635             boolean promiseSet = promise.trySuccess();
636 
637             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
638             // because what happened is what happened.
639             if (!wasActive && active) {
640                 pipeline().fireChannelActive();
641             }
642 
643             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
644             if (!promiseSet) {
645                 close(voidPromise());
646             }
647         }
648 
649         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
650             if (promise == null) {
651                 // Closed via cancellation and the promise has been notified already.
652                 return;
653             }
654 
655             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
656             promise.tryFailure(cause);
657             closeIfClosed();
658         }
659 
660         private void finishConnect() {
661             // Note this method is invoked by the event loop only if the connection attempt was
662             // neither cancelled nor timed out.
663 
664             assert eventLoop().inEventLoop();
665 
666             boolean connectStillInProgress = false;
667             try {
668                 boolean wasActive = isActive();
669                 if (!doFinishConnect()) {
670                     connectStillInProgress = true;
671                     return;
672                 }
673                 fulfillConnectPromise(connectPromise, wasActive);
674             } catch (Throwable t) {
675                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
676             } finally {
677                 if (!connectStillInProgress) {
678                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
679                     // See https://github.com/netty/netty/issues/1770
680                     if (connectTimeoutFuture != null) {
681                         connectTimeoutFuture.cancel(false);
682                     }
683                     connectPromise = null;
684                 }
685             }
686         }
687 
688         private boolean doFinishConnect() throws Exception {
689             if (socket.finishConnect()) {
690                 writeFilter(false);
691                 if (requestedRemoteAddress instanceof InetSocketAddress) {
692                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
693                 }
694                 requestedRemoteAddress = null;
695                 return true;
696             }
697             writeFilter(true);
698             return false;
699         }
700     }
701 
702     @Override
703     protected void doBind(SocketAddress local) throws Exception {
704         if (local instanceof InetSocketAddress) {
705             checkResolvable((InetSocketAddress) local);
706         }
707         socket.bind(local);
708         this.local = socket.localAddress();
709     }
710 
711     /**
712      * Connect to the remote peer
713      */
714     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
715         if (localAddress instanceof InetSocketAddress) {
716             checkResolvable((InetSocketAddress) localAddress);
717         }
718 
719         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
720                 ? (InetSocketAddress) remoteAddress : null;
721         if (remoteSocketAddr != null) {
722             checkResolvable(remoteSocketAddr);
723         }
724 
725         if (remote != null) {
726             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
727             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
728             // later.
729             throw new AlreadyConnectedException();
730         }
731 
732         if (localAddress != null) {
733             socket.bind(localAddress);
734         }
735 
736         boolean connected = doConnect0(remoteAddress, localAddress);
737         if (connected) {
738             remote = remoteSocketAddr == null?
739                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
740         }
741         // We always need to set the localAddress even if not connected yet as the bind already took place.
742         //
743         // See https://github.com/netty/netty/issues/3463
744         local = socket.localAddress();
745         return connected;
746     }
747 
748     protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
749         boolean success = false;
750         try {
751             boolean connected = socket.connect(remoteAddress);
752             if (!connected) {
753                 writeFilter(true);
754             }
755             success = true;
756             return connected;
757         } finally {
758             if (!success) {
759                 doClose();
760             }
761         }
762     }
763 
764     @Override
765     protected SocketAddress localAddress0() {
766         return local;
767     }
768 
769     @Override
770     protected SocketAddress remoteAddress0() {
771         return remote;
772     }
773 }