View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelMetadata;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.RecvByteBufAllocator;
34  import io.netty.channel.socket.ChannelInputShutdownEvent;
35  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36  import io.netty.channel.unix.FileDescriptor;
37  import io.netty.channel.unix.UnixChannel;
38  import io.netty.util.ReferenceCountUtil;
39  
40  import java.io.IOException;
41  import java.net.InetSocketAddress;
42  import java.net.SocketAddress;
43  import java.nio.ByteBuffer;
44  import java.nio.channels.AlreadyConnectedException;
45  import java.nio.channels.ConnectionPendingException;
46  import java.nio.channels.NotYetConnectedException;
47  import java.nio.channels.UnresolvedAddressException;
48  import java.util.concurrent.ScheduledFuture;
49  import java.util.concurrent.TimeUnit;
50  
51  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
52  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
53  import static io.netty.util.internal.ObjectUtil.checkNotNull;
54  
55  abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
56      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57      /**
58       * The future of the current connection attempt.  If not null, subsequent
59       * connection attempts will fail.
60       */
61      private ChannelPromise connectPromise;
62      private ScheduledFuture<?> connectTimeoutFuture;
63      private SocketAddress requestedRemoteAddress;
64  
65      final BsdSocket socket;
66      private boolean readFilterEnabled = true;
67      private boolean writeFilterEnabled;
68      boolean readReadyRunnablePending;
69      boolean inputClosedSeenErrorOnRead;
70      /**
71       * This member variable means we don't have to have a map in {@link KQueueEventLoop} which associates the FDs
72       * from kqueue to instances of this class. This field will be initialized by JNI when modifying kqueue events.
73       * If there is no global reference when JNI gets a kqueue evSet call (aka this field is 0) then a global reference
74       * will be created and the address will be saved in this member variable. Then when we process a kevent in Java
75       * we can ask JNI to give us the {@link AbstractKQueueChannel} that corresponds to that event.
76       */
77      long jniSelfPtr;
78  
79      protected volatile boolean active;
80      private volatile SocketAddress local;
81      private volatile SocketAddress remote;
82  
83      AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
84          this(parent, fd, active, false);
85      }
86  
87      AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active, boolean writeFilterEnabled) {
88          super(parent);
89          socket = checkNotNull(fd, "fd");
90          this.active = active;
91          this.writeFilterEnabled = writeFilterEnabled;
92          if (active) {
93              // Directly cache the remote and local addresses
94              // See https://github.com/netty/netty/issues/2359
95              local = fd.localAddress();
96              remote = fd.remoteAddress();
97          }
98      }
99  
100     AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
101         super(parent);
102         socket = checkNotNull(fd, "fd");
103         active = true;
104         // Directly cache the remote and local addresses
105         // See https://github.com/netty/netty/issues/2359
106         this.remote = remote;
107         local = fd.localAddress();
108     }
109 
110     static boolean isSoErrorZero(BsdSocket fd) {
111         try {
112             return fd.getSoError() == 0;
113         } catch (IOException e) {
114             throw new ChannelException(e);
115         }
116     }
117 
118     @Override
119     public final FileDescriptor fd() {
120         return socket;
121     }
122 
123     @Override
124     public boolean isActive() {
125         return active;
126     }
127 
128     @Override
129     public ChannelMetadata metadata() {
130         return METADATA;
131     }
132 
133     @Override
134     protected void doClose() throws Exception {
135         active = false;
136         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
137         // socket which has not even been connected yet. This has been observed to block during unit tests.
138         inputClosedSeenErrorOnRead = true;
139         try {
140             if (isRegistered()) {
141                 // The FD will be closed, which should take care of deleting any associated events from kqueue, but
142                 // since we rely upon jniSelfRef to be consistent we make sure that we clear this reference out for
143                 // all events which are pending in kqueue to avoid referencing a deleted pointer at a later time.
144 
145                 // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
146                 // if SO_LINGER is used.
147                 //
148                 // See https://github.com/netty/netty/issues/7159
149                 EventLoop loop = eventLoop();
150                 if (loop.inEventLoop()) {
151                     doDeregister();
152                 } else {
153                     loop.execute(new Runnable() {
154                         @Override
155                         public void run() {
156                             try {
157                                 doDeregister();
158                             } catch (Throwable cause) {
159                                 pipeline().fireExceptionCaught(cause);
160                             }
161                         }
162                     });
163                 }
164             }
165         } finally {
166             socket.close();
167         }
168     }
169 
170     @Override
171     protected void doDisconnect() throws Exception {
172         doClose();
173     }
174 
175     @Override
176     protected boolean isCompatible(EventLoop loop) {
177         return loop instanceof KQueueEventLoop;
178     }
179 
180     @Override
181     public boolean isOpen() {
182         return socket.isOpen();
183     }
184 
185     @Override
186     protected void doDeregister() throws Exception {
187         // Make sure we unregister our filters from kqueue!
188         readFilter(false);
189         writeFilter(false);
190         evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0);
191 
192         ((KQueueEventLoop) eventLoop()).remove(this);
193 
194         // Set the filters back to the initial state in case this channel is registered with another event loop.
195         readFilterEnabled = true;
196     }
197 
198     @Override
199     protected final void doBeginRead() throws Exception {
200         // Channel.read() or ChannelHandlerContext.read() was called
201         final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
202         unsafe.readPending = true;
203 
204         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
205         // executeReadReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
206         // never get data after this.
207         readFilter(true);
208 
209         // If auto read was toggled off on the last read loop then we may not be notified
210         // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
211         if (unsafe.maybeMoreDataToRead) {
212             unsafe.executeReadReadyRunnable(config());
213         }
214     }
215 
216     @Override
217     protected void doRegister() throws Exception {
218         // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
219         // make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
220         // new EventLoop.
221         readReadyRunnablePending = false;
222         // Add the write event first so we get notified of connection refused on the client side!
223         if (writeFilterEnabled) {
224             evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
225         }
226         if (readFilterEnabled) {
227             evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
228         }
229         evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
230     }
231 
232     @Override
233     protected abstract AbstractKQueueUnsafe newUnsafe();
234 
235     @Override
236     public abstract KQueueChannelConfig config();
237 
238     /**
239      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
240      */
241     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
242         return newDirectBuffer(buf, buf);
243     }
244 
245     /**
246      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
247      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
248      * this method.
249      */
250     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
251         final int readableBytes = buf.readableBytes();
252         if (readableBytes == 0) {
253             ReferenceCountUtil.release(holder);
254             return Unpooled.EMPTY_BUFFER;
255         }
256 
257         final ByteBufAllocator alloc = alloc();
258         if (alloc.isDirectBufferPooled()) {
259             return newDirectBuffer0(holder, buf, alloc, readableBytes);
260         }
261 
262         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
263         if (directBuf == null) {
264             return newDirectBuffer0(holder, buf, alloc, readableBytes);
265         }
266 
267         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
268         ReferenceCountUtil.safeRelease(holder);
269         return directBuf;
270     }
271 
272     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
273         final ByteBuf directBuf = alloc.directBuffer(capacity);
274         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
275         ReferenceCountUtil.safeRelease(holder);
276         return directBuf;
277     }
278 
279     protected static void checkResolvable(InetSocketAddress addr) {
280         if (addr.isUnresolved()) {
281             throw new UnresolvedAddressException();
282         }
283     }
284 
285     /**
286      * Read bytes into the given {@link ByteBuf} and return the amount.
287      */
288     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
289         int writerIndex = byteBuf.writerIndex();
290         int localReadAmount;
291         unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
292         if (byteBuf.hasMemoryAddress()) {
293             localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
294         } else {
295             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
296             localReadAmount = socket.read(buf, buf.position(), buf.limit());
297         }
298         if (localReadAmount > 0) {
299             byteBuf.writerIndex(writerIndex + localReadAmount);
300         }
301         return localReadAmount;
302     }
303 
304     protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
305         if (buf.hasMemoryAddress()) {
306             int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
307             if (localFlushedAmount > 0) {
308                 in.removeBytes(localFlushedAmount);
309                 return 1;
310             }
311         } else {
312             final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
313                     buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
314             int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
315             if (localFlushedAmount > 0) {
316                 nioBuf.position(nioBuf.position() + localFlushedAmount);
317                 in.removeBytes(localFlushedAmount);
318                 return 1;
319             }
320         }
321         return WRITE_STATUS_SNDBUF_FULL;
322     }
323 
324     final boolean shouldBreakReadReady(ChannelConfig config) {
325         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
326     }
327 
328     private static boolean isAllowHalfClosure(ChannelConfig config) {
329         return config instanceof KQueueSocketChannelConfig &&
330                 ((KQueueSocketChannelConfig) config).isAllowHalfClosure();
331     }
332 
333     final void clearReadFilter() {
334         // Only clear if registered with an EventLoop as otherwise
335         if (isRegistered()) {
336             final EventLoop loop = eventLoop();
337             final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
338             if (loop.inEventLoop()) {
339                 unsafe.clearReadFilter0();
340             } else {
341                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
342                 loop.execute(new Runnable() {
343                     @Override
344                     public void run() {
345                         if (!unsafe.readPending && !config().isAutoRead()) {
346                             // Still no read triggered so clear it now
347                             unsafe.clearReadFilter0();
348                         }
349                     }
350                 });
351             }
352         } else  {
353             // The EventLoop is not registered atm so just update the flags so the correct value
354             // will be used once the channel is registered
355             readFilterEnabled = false;
356         }
357     }
358 
359     void readFilter(boolean readFilterEnabled) throws IOException {
360         if (this.readFilterEnabled != readFilterEnabled) {
361             this.readFilterEnabled = readFilterEnabled;
362             evSet(Native.EVFILT_READ, readFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
363         }
364     }
365 
366     void writeFilter(boolean writeFilterEnabled) throws IOException {
367         if (this.writeFilterEnabled != writeFilterEnabled) {
368             this.writeFilterEnabled = writeFilterEnabled;
369             evSet(Native.EVFILT_WRITE, writeFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
370         }
371     }
372 
373     private void evSet(short filter, short flags) {
374         if (isOpen() && isRegistered()) {
375             evSet0(filter, flags);
376         }
377     }
378 
379     private void evSet0(short filter, short flags) {
380         evSet0(filter, flags, 0);
381     }
382 
383     private void evSet0(short filter, short flags, int fflags) {
384         ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags);
385     }
386 
387     abstract class AbstractKQueueUnsafe extends AbstractUnsafe {
388         boolean readPending;
389         boolean maybeMoreDataToRead;
390         private KQueueRecvByteAllocatorHandle allocHandle;
391         private final Runnable readReadyRunnable = new Runnable() {
392             @Override
393             public void run() {
394                 readReadyRunnablePending = false;
395                 readReady(recvBufAllocHandle());
396             }
397         };
398 
399         final void readReady(long numberBytesPending) {
400             KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
401             allocHandle.numberBytesPending(numberBytesPending);
402             readReady(allocHandle);
403         }
404 
405         abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
406 
407         final void readReadyBefore() { maybeMoreDataToRead = false; }
408 
409         final void readReadyFinally(ChannelConfig config) {
410             maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
411             // Check if there is a readPending which was not processed yet.
412             // This could be for two reasons:
413             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
414             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
415             //
416             // See https://github.com/netty/netty/issues/2254
417             if (!readPending && !config.isAutoRead()) {
418                 clearReadFilter0();
419             } else if (readPending && maybeMoreDataToRead) {
420                 // trigger a read again as there may be something left to read and because of ET we
421                 // will not get notified again until we read everything from the socket
422                 //
423                 // It is possible the last fireChannelRead call could cause the user to call read() again, or if
424                 // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
425                 // to false before every read operation to prevent re-entry into readReady() we will not read from
426                 // the underlying OS again unless the user happens to call read again.
427                 executeReadReadyRunnable(config);
428             }
429         }
430 
431         final void writeReady() {
432             if (connectPromise != null) {
433                 // pending connect which is now complete so handle it.
434                 finishConnect();
435             } else if (!socket.isOutputShutdown()) {
436                 // directly call super.flush0() to force a flush now
437                 super.flush0();
438             }
439         }
440 
441         /**
442          * Shutdown the input side of the channel.
443          */
444         void shutdownInput(boolean readEOF) {
445             // We need to take special care of calling finishConnect() if readEOF is true and we not
446             // fullfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
447             // with a ClosedChannelException as a close() will happen and so the FD is closed before we
448             // have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
449             // we observe the correct exception in case of a connect failure.
450             if (readEOF && connectPromise != null) {
451                 finishConnect();
452             }
453             if (!socket.isInputShutdown()) {
454                 if (isAllowHalfClosure(config())) {
455                     try {
456                         socket.shutdown(true, false);
457                     } catch (IOException ignored) {
458                         // We attempted to shutdown and failed, which means the input has already effectively been
459                         // shutdown.
460                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
461                         return;
462                     } catch (NotYetConnectedException ignore) {
463                         // We attempted to shutdown and failed, which means the input has already effectively been
464                         // shutdown.
465                     }
466                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
467                 } else {
468                     close(voidPromise());
469                 }
470             } else if (!readEOF) {
471                 inputClosedSeenErrorOnRead = true;
472                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
473             }
474         }
475 
476         final void readEOF() {
477             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
478             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
479             allocHandle.readEOF();
480 
481             if (isActive()) {
482                 // If it is still active, we need to call readReady as otherwise we may miss to
483                 // read pending data from the underlying file descriptor.
484                 // See https://github.com/netty/netty/issues/3709
485                 readReady(allocHandle);
486             } else {
487                 // Just to be safe make sure the input marked as closed.
488                 shutdownInput(true);
489             }
490         }
491 
492         @Override
493         public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
494             if (allocHandle == null) {
495                 allocHandle = new KQueueRecvByteAllocatorHandle(
496                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
497             }
498             return allocHandle;
499         }
500 
501         final void executeReadReadyRunnable(ChannelConfig config) {
502             if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
503                 return;
504             }
505             readReadyRunnablePending = true;
506             eventLoop().execute(readReadyRunnable);
507         }
508 
509         protected final void clearReadFilter0() {
510             assert eventLoop().inEventLoop();
511             try {
512                 readPending = false;
513                 readFilter(false);
514             } catch (IOException e) {
515                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
516                 // so fire the exception through the pipeline and close the Channel.
517                 pipeline().fireExceptionCaught(e);
518                 unsafe().close(unsafe().voidPromise());
519             }
520         }
521 
522         private void fireEventAndClose(Object evt) {
523             pipeline().fireUserEventTriggered(evt);
524             close(voidPromise());
525         }
526 
527         @Override
528         public void connect(
529                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
530             if (!promise.setUncancellable() || !ensureOpen(promise)) {
531                 return;
532             }
533 
534             try {
535                 if (connectPromise != null) {
536                     throw new ConnectionPendingException();
537                 }
538 
539                 boolean wasActive = isActive();
540                 if (doConnect(remoteAddress, localAddress)) {
541                     fulfillConnectPromise(promise, wasActive);
542                 } else {
543                     connectPromise = promise;
544                     requestedRemoteAddress = remoteAddress;
545 
546                     // Schedule connect timeout.
547                     int connectTimeoutMillis = config().getConnectTimeoutMillis();
548                     if (connectTimeoutMillis > 0) {
549                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
550                             @Override
551                             public void run() {
552                                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
553                                 ConnectTimeoutException cause =
554                                         new ConnectTimeoutException("connection timed out: " + remoteAddress);
555                                 if (connectPromise != null && connectPromise.tryFailure(cause)) {
556                                     close(voidPromise());
557                                 }
558                             }
559                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
560                     }
561 
562                     promise.addListener(new ChannelFutureListener() {
563                         @Override
564                         public void operationComplete(ChannelFuture future) throws Exception {
565                             if (future.isCancelled()) {
566                                 if (connectTimeoutFuture != null) {
567                                     connectTimeoutFuture.cancel(false);
568                                 }
569                                 connectPromise = null;
570                                 close(voidPromise());
571                             }
572                         }
573                     });
574                 }
575             } catch (Throwable t) {
576                 closeIfClosed();
577                 promise.tryFailure(annotateConnectException(t, remoteAddress));
578             }
579         }
580 
581         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
582             if (promise == null) {
583                 // Closed via cancellation and the promise has been notified already.
584                 return;
585             }
586             active = true;
587 
588             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
589             // We still need to ensure we call fireChannelActive() in this case.
590             boolean active = isActive();
591 
592             // trySuccess() will return false if a user cancelled the connection attempt.
593             boolean promiseSet = promise.trySuccess();
594 
595             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
596             // because what happened is what happened.
597             if (!wasActive && active) {
598                 pipeline().fireChannelActive();
599             }
600 
601             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
602             if (!promiseSet) {
603                 close(voidPromise());
604             }
605         }
606 
607         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
608             if (promise == null) {
609                 // Closed via cancellation and the promise has been notified already.
610                 return;
611             }
612 
613             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
614             promise.tryFailure(cause);
615             closeIfClosed();
616         }
617 
618         private void finishConnect() {
619             // Note this method is invoked by the event loop only if the connection attempt was
620             // neither cancelled nor timed out.
621 
622             assert eventLoop().inEventLoop();
623 
624             boolean connectStillInProgress = false;
625             try {
626                 boolean wasActive = isActive();
627                 if (!doFinishConnect()) {
628                     connectStillInProgress = true;
629                     return;
630                 }
631                 fulfillConnectPromise(connectPromise, wasActive);
632             } catch (Throwable t) {
633                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
634             } finally {
635                 if (!connectStillInProgress) {
636                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
637                     // See https://github.com/netty/netty/issues/1770
638                     if (connectTimeoutFuture != null) {
639                         connectTimeoutFuture.cancel(false);
640                     }
641                     connectPromise = null;
642                 }
643             }
644         }
645 
646         private boolean doFinishConnect() throws Exception {
647             if (socket.finishConnect()) {
648                 writeFilter(false);
649                 if (requestedRemoteAddress instanceof InetSocketAddress) {
650                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
651                 }
652                 requestedRemoteAddress = null;
653                 return true;
654             }
655             writeFilter(true);
656             return false;
657         }
658     }
659 
660     @Override
661     protected void doBind(SocketAddress local) throws Exception {
662         if (local instanceof InetSocketAddress) {
663             checkResolvable((InetSocketAddress) local);
664         }
665         socket.bind(local);
666         this.local = socket.localAddress();
667     }
668 
669     /**
670      * Connect to the remote peer
671      */
672     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
673         if (localAddress instanceof InetSocketAddress) {
674             checkResolvable((InetSocketAddress) localAddress);
675         }
676 
677         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
678                 ? (InetSocketAddress) remoteAddress : null;
679         if (remoteSocketAddr != null) {
680             checkResolvable(remoteSocketAddr);
681         }
682 
683         if (remote != null) {
684             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
685             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
686             // later.
687             throw new AlreadyConnectedException();
688         }
689 
690         if (localAddress != null) {
691             socket.bind(localAddress);
692         }
693 
694         boolean connected = doConnect0(remoteAddress);
695         if (connected) {
696             remote = remoteSocketAddr == null ?
697                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
698         }
699         // We always need to set the localAddress even if not connected yet as the bind already took place.
700         //
701         // See https://github.com/netty/netty/issues/3463
702         local = socket.localAddress();
703         return connected;
704     }
705 
706     private boolean doConnect0(SocketAddress remote) throws Exception {
707         boolean success = false;
708         try {
709             boolean connected = socket.connect(remote);
710             if (!connected) {
711                 writeFilter(true);
712             }
713             success = true;
714             return connected;
715         } finally {
716             if (!success) {
717                 doClose();
718             }
719         }
720     }
721 
722     @Override
723     protected SocketAddress localAddress0() {
724         return local;
725     }
726 
727     @Override
728     protected SocketAddress remoteAddress0() {
729         return remote;
730     }
731 }