View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelMetadata;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.socket.ChannelInputShutdownEvent;
38  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39  import io.netty.channel.socket.SocketChannelConfig;
40  import io.netty.channel.unix.FileDescriptor;
41  import io.netty.channel.unix.UnixChannel;
42  import io.netty.util.ReferenceCountUtil;
43  import io.netty.util.concurrent.Future;
44  import io.netty.util.internal.UnstableApi;
45  
46  import java.io.IOException;
47  import java.net.ConnectException;
48  import java.net.InetSocketAddress;
49  import java.net.SocketAddress;
50  import java.nio.ByteBuffer;
51  import java.nio.channels.AlreadyConnectedException;
52  import java.nio.channels.ConnectionPendingException;
53  import java.nio.channels.NotYetConnectedException;
54  import java.nio.channels.UnresolvedAddressException;
55  import java.util.concurrent.TimeUnit;
56  
57  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
58  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
59  import static io.netty.util.internal.ObjectUtil.checkNotNull;
60  
61  abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChannel {
62      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
63      /**
64       * The future of the current connection attempt.  If not null, subsequent
65       * connection attempts will fail.
66       */
67      private ChannelPromise connectPromise;
68      private Future<?> connectTimeoutFuture;
69      private SocketAddress requestedRemoteAddress;
70  
71      final BsdSocket socket;
72      private KQueueIoRegistration registration;
73      private boolean readFilterEnabled;
74      private boolean writeFilterEnabled;
75  
76      private final KQueueIoOps readEnabledOps;
77      private final KQueueIoOps writeEnabledOps;
78      private final KQueueIoOps readDisabledOps;
79      private final KQueueIoOps writeDisabledOps;
80  
81      boolean readReadyRunnablePending;
82      boolean inputClosedSeenErrorOnRead;
83      protected volatile boolean active;
84      private volatile SocketAddress local;
85      private volatile SocketAddress remote;
86  
87      AbstractKQueueChannel(Channel parent, BsdSocket fd, boolean active) {
88          super(parent);
89          socket = checkNotNull(fd, "fd");
90          this.active = active;
91          if (active) {
92              // Directly cache the remote and local addresses
93              // See https://github.com/netty/netty/issues/2359
94              local = fd.localAddress();
95              remote = fd.remoteAddress();
96          }
97  
98          readEnabledOps = KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE, 0);
99          writeEnabledOps = KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE, 0);
100         readDisabledOps = KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_DELETE_DISABLE, 0);
101         writeDisabledOps = KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_DELETE_DISABLE, 0);
102     }
103 
104     AbstractKQueueChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
105         super(parent);
106         socket = checkNotNull(fd, "fd");
107         active = true;
108         // Directly cache the remote and local addresses
109         // See https://github.com/netty/netty/issues/2359
110         this.remote = remote;
111         local = fd.localAddress();
112 
113         readEnabledOps = KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE, 0);
114         writeEnabledOps = KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE, 0);
115         readDisabledOps = KQueueIoOps.newOps(Native.EVFILT_READ, Native.EV_DELETE_DISABLE, 0);
116         writeDisabledOps = KQueueIoOps.newOps(Native.EVFILT_WRITE, Native.EV_DELETE_DISABLE, 0);
117     }
118 
119     @Override
120     protected boolean isCompatible(EventLoop loop) {
121         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractKQueueUnsafe.class);
122     }
123 
124     static boolean isSoErrorZero(BsdSocket fd) {
125         try {
126             return fd.getSoError() == 0;
127         } catch (IOException e) {
128             throw new ChannelException(e);
129         }
130     }
131 
132     protected final KQueueIoRegistration registration() {
133         assert registration != null;
134         return registration;
135     }
136 
137     @Override
138     public final FileDescriptor fd() {
139         return socket;
140     }
141 
142     @Override
143     public boolean isActive() {
144         return active;
145     }
146 
147     @Override
148     public ChannelMetadata metadata() {
149         return METADATA;
150     }
151 
152     @Override
153     protected void doClose() throws Exception {
154         active = false;
155         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
156         // socket which has not even been connected yet. This has been observed to block during unit tests.
157         inputClosedSeenErrorOnRead = true;
158         socket.close();
159     }
160 
161     @Override
162     protected void doDisconnect() throws Exception {
163         doClose();
164     }
165 
166     void resetCachedAddresses() {
167         local = socket.localAddress();
168         remote = socket.remoteAddress();
169     }
170 
171     @Override
172     public boolean isOpen() {
173         return socket.isOpen();
174     }
175 
176     @Override
177     protected void doDeregister() throws Exception {
178         // As unregisteredFilters() may have not been called because isOpen() returned false we just set both filters
179         // to false to ensure a consistent state in all cases.
180         // Make sure we unregister our filters from kqueue!
181         readFilter(false);
182         writeFilter(false);
183         clearRdHup0();
184 
185         KQueueIoRegistration registration = this.registration;
186         if (registration != null) {
187             registration.cancel();
188         }
189     }
190 
191     private void clearRdHup0() {
192         submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_DELETE_DISABLE, Native.NOTE_RDHUP));
193     }
194 
195     private void submit(KQueueIoOps ops) {
196         try {
197             registration.submit(ops);
198         } catch (Exception e) {
199             throw new ChannelException(e);
200         }
201     }
202 
203     @Override
204     protected final void doBeginRead() throws Exception {
205         // Channel.read() or ChannelHandlerContext.read() was called
206         final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
207         unsafe.readPending = true;
208 
209         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
210         // executeReadReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
211         // never get data after this.
212         readFilter(true);
213 
214         // If auto read was toggled off on the last read loop then we may not be notified
215         // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
216         if (unsafe.maybeMoreDataToRead) {
217             unsafe.executeReadReadyRunnable(config());
218         }
219     }
220 
221     @Override
222     protected void doRegister(ChannelPromise promise) {
223         ((IoEventLoop) eventLoop()).register((AbstractKQueueUnsafe) unsafe()).addListener(f -> {
224             if (f.isSuccess()) {
225                 this.registration = (KQueueIoRegistration) f.getNow();
226                 // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old
227                 // EventLoop make sure the readReadyRunnablePending variable is reset so we will be able to execute
228                 // the Runnable on the new EventLoop.
229                 readReadyRunnablePending = false;
230 
231                 submit(KQueueIoOps.newOps(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP));
232 
233                 // Add the write event first so we get notified of connection refused on the client side!
234                 if (writeFilterEnabled) {
235                     submit(writeEnabledOps);
236                 }
237                 if (readFilterEnabled) {
238                     submit(readEnabledOps);
239                 }
240                 promise.setSuccess();
241             } else {
242                 promise.setFailure(f.cause());
243             }
244         });
245     }
246 
247     @Override
248     protected abstract AbstractKQueueUnsafe newUnsafe();
249 
250     @Override
251     public abstract KQueueChannelConfig config();
252 
253     /**
254      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
255      */
256     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
257         return newDirectBuffer(buf, buf);
258     }
259 
260     /**
261      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
262      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
263      * this method.
264      */
265     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
266         final int readableBytes = buf.readableBytes();
267         if (readableBytes == 0) {
268             ReferenceCountUtil.release(holder);
269             return Unpooled.EMPTY_BUFFER;
270         }
271 
272         final ByteBufAllocator alloc = alloc();
273         if (alloc.isDirectBufferPooled()) {
274             return newDirectBuffer0(holder, buf, alloc, readableBytes);
275         }
276 
277         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
278         if (directBuf == null) {
279             return newDirectBuffer0(holder, buf, alloc, readableBytes);
280         }
281 
282         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
283         ReferenceCountUtil.safeRelease(holder);
284         return directBuf;
285     }
286 
287     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
288         final ByteBuf directBuf = alloc.directBuffer(capacity);
289         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
290         ReferenceCountUtil.safeRelease(holder);
291         return directBuf;
292     }
293 
294     protected static void checkResolvable(InetSocketAddress addr) {
295         if (addr.isUnresolved()) {
296             throw new UnresolvedAddressException();
297         }
298     }
299 
300     /**
301      * Read bytes into the given {@link ByteBuf} and return the amount.
302      */
303     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
304         int writerIndex = byteBuf.writerIndex();
305         int localReadAmount;
306         unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
307         if (byteBuf.hasMemoryAddress()) {
308             localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
309         } else {
310             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
311             localReadAmount = socket.read(buf, buf.position(), buf.limit());
312         }
313         if (localReadAmount > 0) {
314             byteBuf.writerIndex(writerIndex + localReadAmount);
315         }
316         return localReadAmount;
317     }
318 
319     protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
320         if (buf.hasMemoryAddress()) {
321             int localFlushedAmount = socket.writeAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
322             if (localFlushedAmount > 0) {
323                 in.removeBytes(localFlushedAmount);
324                 return 1;
325             }
326         } else {
327             final ByteBuffer nioBuf = buf.nioBufferCount() == 1?
328                     buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
329             int localFlushedAmount = socket.write(nioBuf, nioBuf.position(), nioBuf.limit());
330             if (localFlushedAmount > 0) {
331                 nioBuf.position(nioBuf.position() + localFlushedAmount);
332                 in.removeBytes(localFlushedAmount);
333                 return 1;
334             }
335         }
336         return WRITE_STATUS_SNDBUF_FULL;
337     }
338 
339     final boolean shouldBreakReadReady(ChannelConfig config) {
340         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
341     }
342 
343     private static boolean isAllowHalfClosure(ChannelConfig config) {
344         if (config instanceof KQueueDomainSocketChannelConfig) {
345             return ((KQueueDomainSocketChannelConfig) config).isAllowHalfClosure();
346         }
347 
348         return config instanceof SocketChannelConfig &&
349                 ((SocketChannelConfig) config).isAllowHalfClosure();
350     }
351 
352     final void clearReadFilter() {
353         // Only clear if registered with an EventLoop as otherwise
354         if (isRegistered()) {
355             final EventLoop loop = eventLoop();
356             final AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) unsafe();
357             if (loop.inEventLoop()) {
358                 unsafe.clearReadFilter0();
359             } else {
360                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
361                 loop.execute(new Runnable() {
362                     @Override
363                     public void run() {
364                         if (!unsafe.readPending && !config().isAutoRead()) {
365                             // Still no read triggered so clear it now
366                             unsafe.clearReadFilter0();
367                         }
368                     }
369                 });
370             }
371         } else  {
372             // The EventLoop is not registered atm so just update the flags so the correct value
373             // will be used once the channel is registered
374             readFilterEnabled = false;
375         }
376     }
377 
378     void readFilter(boolean readFilterEnabled) throws IOException {
379         if (this.readFilterEnabled != readFilterEnabled) {
380             this.readFilterEnabled = readFilterEnabled;
381             submit(readFilterEnabled ? readEnabledOps : readDisabledOps);
382         }
383     }
384 
385     void writeFilter(boolean writeFilterEnabled) throws IOException {
386         if (this.writeFilterEnabled != writeFilterEnabled) {
387             this.writeFilterEnabled = writeFilterEnabled;
388             submit(writeFilterEnabled ? writeEnabledOps : writeDisabledOps);
389         }
390     }
391 
392     @UnstableApi
393     public abstract class AbstractKQueueUnsafe extends AbstractUnsafe implements KQueueIoHandle {
394         boolean readPending;
395         boolean maybeMoreDataToRead;
396         private KQueueRecvByteAllocatorHandle allocHandle;
397         private final Runnable readReadyRunnable = new Runnable() {
398             @Override
399             public void run() {
400                 readReadyRunnablePending = false;
401                 readReady(recvBufAllocHandle());
402             }
403         };
404 
405         Channel channel() {
406             return AbstractKQueueChannel.this;
407         }
408 
409         @Override
410         public int ident() {
411             return fd().intValue();
412         }
413 
414         @Override
415         public void close() {
416             close(voidPromise());
417         }
418 
419         @Override
420         public void handle(IoRegistration registration, IoEvent event) {
421             KQueueIoEvent kqueueEvent = (KQueueIoEvent) event;
422             final short filter = kqueueEvent.filter();
423             final short flags = kqueueEvent.flags();
424             final int fflags = kqueueEvent.fflags();
425             final long data = kqueueEvent.data();
426 
427             // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
428             // to read from the file descriptor.
429             if (filter == Native.EVFILT_WRITE) {
430                 writeReady();
431             } else if (filter == Native.EVFILT_READ) {
432                 // Check READ before EOF to ensure all data is read before shutting down the input.
433                 readReady(data);
434             } else if (filter == Native.EVFILT_SOCK && (fflags & Native.NOTE_RDHUP) != 0) {
435                 readEOF();
436             }
437 
438             // Check if EV_EOF was set, this will notify us for connection-reset in which case
439             // we may close the channel directly or try to read more data depending on the state of the
440             // Channel and also depending on the AbstractKQueueChannel subtype.
441             if ((flags & Native.EV_EOF) != 0) {
442                 readEOF();
443             }
444         }
445 
446         private void readReady(long numberBytesPending) {
447             KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
448             allocHandle.numberBytesPending(numberBytesPending);
449             readReady(allocHandle);
450         }
451 
452         abstract void readReady(KQueueRecvByteAllocatorHandle allocHandle);
453 
454         final void readReadyBefore() {
455             maybeMoreDataToRead = false;
456         }
457 
458         final void readReadyFinally(ChannelConfig config) {
459             maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();
460 
461             if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) {
462                 // trigger a read again as there may be something left to read and because of ET we
463                 // will not get notified again until we read everything from the socket
464                 //
465                 // It is possible the last fireChannelRead call could cause the user to call read() again, or if
466                 // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
467                 // to false before every read operation to prevent re-entry into readReady() we will not read from
468                 // the underlying OS again unless the user happens to call read again.
469                 executeReadReadyRunnable(config);
470             } else if (!readPending && !config.isAutoRead()) {
471                 // Check if there is a readPending which was not processed yet.
472                 // This could be for two reasons:
473                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
474                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
475                 //
476                 // See https://github.com/netty/netty/issues/2254
477                 clearReadFilter0();
478             }
479         }
480 
481         final boolean failConnectPromise(Throwable cause) {
482             if (connectPromise != null) {
483                 // SO_ERROR has been shown to return 0 on macOS if detect an error via read() and the write filter was
484                 // not set before calling connect. This means finishConnect will not detect any error and would
485                 // successfully complete the connectPromise and update the channel state to active (which is incorrect).
486                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
487                 AbstractKQueueChannel.this.connectPromise = null;
488                 if (connectPromise.tryFailure((cause instanceof ConnectException) ? cause
489                                 : new ConnectException("failed to connect").initCause(cause))) {
490                     closeIfClosed();
491                     return true;
492                 }
493             }
494             return false;
495         }
496 
497         private void writeReady() {
498             if (connectPromise != null) {
499                 // pending connect which is now complete so handle it.
500                 finishConnect();
501             } else if (!socket.isOutputShutdown()) {
502                 // directly call super.flush0() to force a flush now
503                 super.flush0();
504             }
505         }
506 
507         /**
508          * Shutdown the input side of the channel.
509          */
510         void shutdownInput(boolean readEOF) {
511             // We need to take special care of calling finishConnect() if readEOF is true and we not
512             // fulfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
513             // with a ClosedChannelException as a close() will happen and so the FD is closed before we
514             // have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
515             // we observe the correct exception in case of a connect failure.
516             if (readEOF && connectPromise != null) {
517                 finishConnect();
518             }
519             if (!socket.isInputShutdown()) {
520                 if (isAllowHalfClosure(config())) {
521                     try {
522                         socket.shutdown(true, false);
523                     } catch (IOException ignored) {
524                         // We attempted to shutdown and failed, which means the input has already effectively been
525                         // shutdown.
526                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
527                         return;
528                     } catch (NotYetConnectedException ignore) {
529                         // We attempted to shutdown and failed, which means the input has already effectively been
530                         // shutdown.
531                     }
532                     clearReadFilter0();
533                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
534                 } else {
535                     close(voidPromise());
536                 }
537             } else if (!readEOF && !inputClosedSeenErrorOnRead) {
538                 inputClosedSeenErrorOnRead = true;
539                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
540             }
541         }
542 
543         private void readEOF() {
544             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
545             final KQueueRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
546             allocHandle.readEOF();
547 
548             if (isActive()) {
549                 // If it is still active, we need to call readReady as otherwise we may miss to
550                 // read pending data from the underlying file descriptor.
551                 // See https://github.com/netty/netty/issues/3709
552                 readReady(allocHandle);
553             } else {
554                 // Just to be safe make sure the input marked as closed.
555                 shutdownInput(true);
556             }
557 
558             // Clear the RDHUP flag to prevent continuously getting woken up on this event.
559             clearRdHup0();
560         }
561 
562         @Override
563         public KQueueRecvByteAllocatorHandle recvBufAllocHandle() {
564             if (allocHandle == null) {
565                 allocHandle = new KQueueRecvByteAllocatorHandle(
566                         (RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
567             }
568             return allocHandle;
569         }
570 
571         @Override
572         protected final void flush0() {
573             // Flush immediately only when there's no pending flush.
574             // If there's a pending flush operation, event loop will call forceFlush() later,
575             // and thus there's no need to call it now.
576             if (!writeFilterEnabled) {
577                 super.flush0();
578             }
579         }
580 
581         final void executeReadReadyRunnable(ChannelConfig config) {
582             if (readReadyRunnablePending || !isActive() || shouldBreakReadReady(config)) {
583                 return;
584             }
585             readReadyRunnablePending = true;
586             eventLoop().execute(readReadyRunnable);
587         }
588 
589         protected final void clearReadFilter0() {
590             assert eventLoop().inEventLoop();
591             try {
592                 readPending = false;
593                 readFilter(false);
594             } catch (IOException e) {
595                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
596                 // so fire the exception through the pipeline and close the Channel.
597                 pipeline().fireExceptionCaught(e);
598                 unsafe().close(unsafe().voidPromise());
599             }
600         }
601 
602         private void fireEventAndClose(Object evt) {
603             pipeline().fireUserEventTriggered(evt);
604             close(voidPromise());
605         }
606 
607         @Override
608         public void connect(
609                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
610             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
611             // non-blocking io.
612             if (promise.isDone() || !ensureOpen(promise)) {
613                 return;
614             }
615 
616             try {
617                 if (connectPromise != null) {
618                     throw new ConnectionPendingException();
619                 }
620 
621                 boolean wasActive = isActive();
622                 if (doConnect(remoteAddress, localAddress)) {
623                     fulfillConnectPromise(promise, wasActive);
624                 } else {
625                     connectPromise = promise;
626                     requestedRemoteAddress = remoteAddress;
627 
628                     // Schedule connect timeout.
629                     final int connectTimeoutMillis = config().getConnectTimeoutMillis();
630                     if (connectTimeoutMillis > 0) {
631                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
632                             @Override
633                             public void run() {
634                                 ChannelPromise connectPromise = AbstractKQueueChannel.this.connectPromise;
635                                 if (connectPromise != null && !connectPromise.isDone()
636                                         && connectPromise.tryFailure(new ConnectTimeoutException(
637                                                 "connection timed out after " + connectTimeoutMillis + " ms: " +
638                                                         remoteAddress))) {
639                                     close(voidPromise());
640                                 }
641                             }
642                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
643                     }
644 
645                     promise.addListener(new ChannelFutureListener() {
646                         @Override
647                         public void operationComplete(ChannelFuture future) {
648                             // If the connect future is cancelled we also cancel the timeout and close the
649                             // underlying socket.
650                             if (future.isCancelled()) {
651                                 if (connectTimeoutFuture != null) {
652                                     connectTimeoutFuture.cancel(false);
653                                 }
654                                 connectPromise = null;
655                                 close(voidPromise());
656                             }
657                         }
658                     });
659                 }
660             } catch (Throwable t) {
661                 closeIfClosed();
662                 promise.tryFailure(annotateConnectException(t, remoteAddress));
663             }
664         }
665 
666         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
667             if (promise == null) {
668                 // Closed via cancellation and the promise has been notified already.
669                 return;
670             }
671             active = true;
672 
673             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
674             // We still need to ensure we call fireChannelActive() in this case.
675             boolean active = isActive();
676 
677             // trySuccess() will return false if a user cancelled the connection attempt.
678             boolean promiseSet = promise.trySuccess();
679 
680             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
681             // because what happened is what happened.
682             if (!wasActive && active) {
683                 pipeline().fireChannelActive();
684             }
685 
686             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
687             if (!promiseSet) {
688                 close(voidPromise());
689             }
690         }
691 
692         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
693             if (promise == null) {
694                 // Closed via cancellation and the promise has been notified already.
695                 return;
696             }
697 
698             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
699             promise.tryFailure(cause);
700             closeIfClosed();
701         }
702 
703         private void finishConnect() {
704             // Note this method is invoked by the event loop only if the connection attempt was
705             // neither cancelled nor timed out.
706 
707             assert eventLoop().inEventLoop();
708 
709             boolean connectStillInProgress = false;
710             try {
711                 boolean wasActive = isActive();
712                 if (!doFinishConnect()) {
713                     connectStillInProgress = true;
714                     return;
715                 }
716                 fulfillConnectPromise(connectPromise, wasActive);
717             } catch (Throwable t) {
718                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
719             } finally {
720                 if (!connectStillInProgress) {
721                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
722                     // See https://github.com/netty/netty/issues/1770
723                     if (connectTimeoutFuture != null) {
724                         connectTimeoutFuture.cancel(false);
725                     }
726                     connectPromise = null;
727                 }
728             }
729         }
730 
731         private boolean doFinishConnect() throws Exception {
732             if (socket.finishConnect()) {
733                 writeFilter(false);
734                 if (requestedRemoteAddress instanceof InetSocketAddress) {
735                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
736                 }
737                 requestedRemoteAddress = null;
738                 return true;
739             }
740             writeFilter(true);
741             return false;
742         }
743     }
744 
745     @Override
746     protected void doBind(SocketAddress local) throws Exception {
747         if (local instanceof InetSocketAddress) {
748             checkResolvable((InetSocketAddress) local);
749         }
750         socket.bind(local);
751         this.local = socket.localAddress();
752     }
753 
754     /**
755      * Connect to the remote peer
756      */
757     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
758         if (localAddress instanceof InetSocketAddress) {
759             checkResolvable((InetSocketAddress) localAddress);
760         }
761 
762         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
763                 ? (InetSocketAddress) remoteAddress : null;
764         if (remoteSocketAddr != null) {
765             checkResolvable(remoteSocketAddr);
766         }
767 
768         if (remote != null) {
769             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
770             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
771             // later.
772             throw new AlreadyConnectedException();
773         }
774 
775         if (localAddress != null) {
776             socket.bind(localAddress);
777         }
778 
779         boolean connected = doConnect0(remoteAddress, localAddress);
780         if (connected) {
781             remote = remoteSocketAddr == null?
782                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
783         }
784         // We always need to set the localAddress even if not connected yet as the bind already took place.
785         //
786         // See https://github.com/netty/netty/issues/3463
787         local = socket.localAddress();
788         return connected;
789     }
790 
791     protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
792         boolean success = false;
793         try {
794             boolean connected = socket.connect(remoteAddress);
795             if (!connected) {
796                 writeFilter(true);
797             }
798             success = true;
799             return connected;
800         } finally {
801             if (!success) {
802                 doClose();
803             }
804         }
805     }
806 
807     @Override
808     protected SocketAddress localAddress0() {
809         return local;
810     }
811 
812     @Override
813     protected SocketAddress remoteAddress0() {
814         return remote;
815     }
816 }