View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelException;
26  import io.netty.channel.ChannelFuture;
27  import io.netty.channel.ChannelFutureListener;
28  import io.netty.channel.ChannelMetadata;
29  import io.netty.channel.ChannelOutboundBuffer;
30  import io.netty.channel.ChannelPromise;
31  import io.netty.channel.ConnectTimeoutException;
32  import io.netty.channel.EventLoop;
33  import io.netty.channel.IoEvent;
34  import io.netty.channel.IoEventLoop;
35  import io.netty.channel.IoRegistration;
36  import io.netty.channel.RecvByteBufAllocator;
37  import io.netty.channel.socket.ChannelInputShutdownEvent;
38  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39  import io.netty.channel.socket.SocketChannelConfig;
40  import io.netty.channel.unix.FileDescriptor;
41  import io.netty.channel.unix.IovArray;
42  import io.netty.channel.unix.Socket;
43  import io.netty.channel.unix.UnixChannel;
44  import io.netty.util.ReferenceCountUtil;
45  import io.netty.util.concurrent.Future;
46  
47  import java.io.IOException;
48  import java.io.UncheckedIOException;
49  import java.net.InetSocketAddress;
50  import java.net.SocketAddress;
51  import java.nio.ByteBuffer;
52  import java.nio.channels.AlreadyConnectedException;
53  import java.nio.channels.ClosedChannelException;
54  import java.nio.channels.ConnectionPendingException;
55  import java.nio.channels.NotYetConnectedException;
56  import java.nio.channels.UnresolvedAddressException;
57  import java.util.concurrent.TimeUnit;
58  
59  import static io.netty.channel.epoll.EpollIoOps.EPOLL_ERR_IN_MASK;
60  import static io.netty.channel.epoll.EpollIoOps.EPOLL_ERR_OUT_MASK;
61  import static io.netty.channel.epoll.EpollIoOps.EPOLL_RDHUP_MASK;
62  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
63  import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
64  import static io.netty.util.internal.ObjectUtil.checkNotNull;
65  
66  abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
67      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
68      protected final LinuxSocket socket;
69      private final EpollIoOps initial;
70  
71      /**
72       * The future of the current connection attempt.  If not null, subsequent
73       * connection attempts will fail.
74       */
75      private ChannelPromise connectPromise;
76      private Future<?> connectTimeoutFuture;
77      private SocketAddress requestedRemoteAddress;
78      private volatile SocketAddress local;
79      private volatile SocketAddress remote;
80  
81      private IoRegistration registration;
82      boolean inputClosedSeenErrorOnRead;
83      private EpollIoOps ops;
84  
85      protected volatile boolean active;
86  
87      AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
88          super(parent);
89          this.socket = checkNotNull(fd, "fd");
90          this.active = active;
91          if (active) {
92              // Directly cache the remote and local addresses
93              // See https://github.com/netty/netty/issues/2359
94              this.local = fd.localAddress();
95              this.remote = fd.remoteAddress();
96          }
97          this.initial = initialOps;
98          this.ops = initialOps;
99      }
100 
101     AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote, EpollIoOps initialOps) {
102         super(parent);
103         this.socket = checkNotNull(fd, "fd");
104         this.active = true;
105         // Directly cache the remote and local addresses
106         // See https://github.com/netty/netty/issues/2359
107         this.remote = remote;
108         this.local = fd.localAddress();
109         this.initial = initialOps;
110         this.ops = initialOps;
111     }
112 
113     static boolean isSoErrorZero(Socket fd) {
114         try {
115             return fd.getSoError() == 0;
116         } catch (IOException e) {
117             throw new ChannelException(e);
118         }
119     }
120 
121     protected void setFlag(int flag) throws IOException {
122         if (ops.contains(flag)) {
123             // we can save a syscall if the ops did not change
124             return;
125         }
126         ops = ops.with(EpollIoOps.valueOf(flag));
127         if (isRegistered()) {
128             IoRegistration registration = registration();
129             registration.submit(ops);
130         } else {
131             ops = ops.with(EpollIoOps.valueOf(flag));
132         }
133     }
134 
135     void clearFlag(int flag) throws IOException {
136         IoRegistration registration = registration();
137         if (!ops.contains(flag)) {
138             // we can save a syscall if the ops did not change
139             return;
140         }
141         ops = ops.without(EpollIoOps.valueOf(flag));
142         registration.submit(ops);
143     }
144 
145     protected final IoRegistration registration() {
146         assert registration != null;
147         return registration;
148     }
149 
150     boolean isFlagSet(int flag) {
151         return (ops.value & flag) != 0;
152     }
153 
154     @Override
155     public final FileDescriptor fd() {
156         return socket;
157     }
158 
159     @Override
160     public abstract EpollChannelConfig config();
161 
162     @Override
163     public boolean isActive() {
164         return active;
165     }
166 
167     @Override
168     public ChannelMetadata metadata() {
169         return METADATA;
170     }
171 
172     @Override
173     protected void doClose() throws Exception {
174         active = false;
175         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
176         // socket which has not even been connected yet. This has been observed to block during unit tests.
177         inputClosedSeenErrorOnRead = true;
178         try {
179             ChannelPromise promise = connectPromise;
180             if (promise != null) {
181                 // Use tryFailure() instead of setFailure() to avoid the race against cancel().
182                 promise.tryFailure(new ClosedChannelException());
183                 connectPromise = null;
184             }
185 
186             Future<?> future = connectTimeoutFuture;
187             if (future != null) {
188                 future.cancel(false);
189                 connectTimeoutFuture = null;
190             }
191 
192             if (isRegistered()) {
193                 // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
194                 // if SO_LINGER is used.
195                 //
196                 // See https://github.com/netty/netty/issues/7159
197                 EventLoop loop = eventLoop();
198                 if (loop.inEventLoop()) {
199                     doDeregister();
200                 } else {
201                     loop.execute(new Runnable() {
202                         @Override
203                         public void run() {
204                             try {
205                                 doDeregister();
206                             } catch (Throwable cause) {
207                                 pipeline().fireExceptionCaught(cause);
208                             }
209                         }
210                     });
211                 }
212             }
213         } finally {
214             socket.close();
215         }
216     }
217 
218     void resetCachedAddresses() {
219         local = socket.localAddress();
220         remote = socket.remoteAddress();
221     }
222 
223     @Override
224     protected void doDisconnect() throws Exception {
225         doClose();
226     }
227 
228     @Override
229     public boolean isOpen() {
230         return socket.isOpen();
231     }
232 
233     @Override
234     protected void doDeregister() throws Exception {
235         IoRegistration registration = this.registration;
236         if (registration != null) {
237             ops = initial;
238             registration.cancel();
239         }
240     }
241 
242     @Override
243     protected boolean isCompatible(EventLoop loop) {
244         return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractEpollUnsafe.class);
245     }
246 
247     @Override
248     protected void doBeginRead() throws Exception {
249         // Channel.read() or ChannelHandlerContext.read() was called
250         final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
251         unsafe.readPending = true;
252 
253         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
254         // executeEpollInReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
255         // never get data after this.
256         setFlag(Native.EPOLLIN);
257     }
258 
259     final boolean shouldBreakEpollInReady(ChannelConfig config) {
260         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
261     }
262 
263     private static boolean isAllowHalfClosure(ChannelConfig config) {
264         if (config instanceof EpollDomainSocketChannelConfig) {
265             return ((EpollDomainSocketChannelConfig) config).isAllowHalfClosure();
266         }
267         return config instanceof SocketChannelConfig &&
268                 ((SocketChannelConfig) config).isAllowHalfClosure();
269     }
270 
271     final void clearEpollIn() {
272         // Only clear if registered with an EventLoop as otherwise
273         if (isRegistered()) {
274             final EventLoop loop = eventLoop();
275             final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
276             if (loop.inEventLoop()) {
277                 unsafe.clearEpollIn0();
278             } else {
279                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
280                 loop.execute(new Runnable() {
281                     @Override
282                     public void run() {
283                         if (!unsafe.readPending && !config().isAutoRead()) {
284                             // Still no read triggered so clear it now
285                             unsafe.clearEpollIn0();
286                         }
287                     }
288                 });
289             }
290         } else  {
291             // The EventLoop is not registered atm so just update the flags so the correct value
292             // will be used once the channel is registered
293             ops = ops.without(EpollIoOps.EPOLLIN);
294         }
295     }
296 
297     @Override
298     protected void doRegister(ChannelPromise promise) {
299         ((IoEventLoop) eventLoop()).register((AbstractEpollUnsafe) unsafe()).addListener(f -> {
300             if (f.isSuccess()) {
301                 registration = (IoRegistration) f.getNow();
302                 if (isActive()) {
303                     // The channel is active, register with current ops now as we are ready to start receiving events.
304                     submitCurrentOps();
305                 }
306                 promise.setSuccess();
307             } else {
308                 promise.setFailure(f.cause());
309             }
310         });
311     }
312 
313     @Override
314     protected abstract AbstractEpollUnsafe newUnsafe();
315 
316     /**
317      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
318      */
319     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
320         return newDirectBuffer(buf, buf);
321     }
322 
323     /**
324      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
325      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
326      * this method.
327      */
328     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
329         final int readableBytes = buf.readableBytes();
330         if (readableBytes == 0) {
331             ReferenceCountUtil.release(holder);
332             return Unpooled.EMPTY_BUFFER;
333         }
334 
335         final ByteBufAllocator alloc = alloc();
336         if (alloc.isDirectBufferPooled()) {
337             return newDirectBuffer0(holder, buf, alloc, readableBytes);
338         }
339 
340         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
341         if (directBuf == null) {
342             return newDirectBuffer0(holder, buf, alloc, readableBytes);
343         }
344 
345         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
346         ReferenceCountUtil.safeRelease(holder);
347         return directBuf;
348     }
349 
350     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
351         final ByteBuf directBuf = alloc.directBuffer(capacity);
352         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
353         ReferenceCountUtil.safeRelease(holder);
354         return directBuf;
355     }
356 
357     protected static void checkResolvable(InetSocketAddress addr) {
358         if (addr.isUnresolved()) {
359             throw new UnresolvedAddressException();
360         }
361     }
362 
363     /**
364      * Read bytes into the given {@link ByteBuf} and return the amount.
365      */
366     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
367         int writerIndex = byteBuf.writerIndex();
368         int localReadAmount;
369         unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
370         if (byteBuf.hasMemoryAddress()) {
371             localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
372         } else {
373             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
374             localReadAmount = socket.recv(buf, buf.position(), buf.limit());
375         }
376         if (localReadAmount > 0) {
377             byteBuf.writerIndex(writerIndex + localReadAmount);
378         }
379         return localReadAmount;
380     }
381 
382     protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
383         if (buf.hasMemoryAddress()) {
384             int localFlushedAmount = socket.sendAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
385             if (localFlushedAmount > 0) {
386                 in.removeBytes(localFlushedAmount);
387                 return 1;
388             }
389         } else {
390             final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
391                     buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
392             int localFlushedAmount = socket.send(nioBuf, nioBuf.position(), nioBuf.limit());
393             if (localFlushedAmount > 0) {
394                 nioBuf.position(nioBuf.position() + localFlushedAmount);
395                 in.removeBytes(localFlushedAmount);
396                 return 1;
397             }
398         }
399         return WRITE_STATUS_SNDBUF_FULL;
400     }
401 
402     /**
403      * Write bytes to the socket, with or without a remote address.
404      * Used for datagram and TCP client fast open writes.
405      */
406     final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen)
407             throws IOException {
408         assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
409         if (data.hasMemoryAddress()) {
410             long memoryAddress = data.memoryAddress();
411             if (remoteAddress == null) {
412                 return socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
413             }
414             return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
415                     remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
416         }
417 
418         if (data.nioBufferCount() > 1) {
419             IovArray array = ((NativeArrays) registration.attachment()).cleanIovArray();
420             array.add(data, data.readerIndex(), data.readableBytes());
421             int cnt = array.count();
422             assert cnt != 0;
423 
424             if (remoteAddress == null) {
425                 return socket.writevAddresses(array.memoryAddress(0), cnt);
426             }
427             return socket.sendToAddresses(array.memoryAddress(0), cnt,
428                     remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
429         }
430 
431         ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
432         if (remoteAddress == null) {
433             return socket.send(nioData, nioData.position(), nioData.limit());
434         }
435         return socket.sendTo(nioData, nioData.position(), nioData.limit(),
436                 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
437     }
438 
439     protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
440         boolean readPending;
441         private EpollRecvByteAllocatorHandle allocHandle;
442 
443         Channel channel() {
444             return AbstractEpollChannel.this;
445         }
446 
447         @Override
448         public FileDescriptor fd() {
449             return AbstractEpollChannel.this.fd();
450         }
451 
452         @Override
453         public void close() {
454             close(voidPromise());
455         }
456 
457         @Override
458         public void handle(IoRegistration registration, IoEvent event) {
459             EpollIoEvent epollEvent = (EpollIoEvent) event;
460             int ops = epollEvent.ops().value;
461 
462             // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
463             // sure about it!
464             // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
465             // past.
466 
467             // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
468             // to read from the file descriptor.
469             // See https://github.com/netty/netty/issues/3785
470             //
471             // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
472             // In either case epollOutReady() will do the correct thing (finish connecting, or fail
473             // the connection).
474             // See https://github.com/netty/netty/issues/3848
475             if ((ops & EPOLL_ERR_OUT_MASK) != 0) {
476                 // Force flush of data as the epoll is writable again
477                 epollOutReady();
478             }
479 
480             // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
481             // See https://github.com/netty/netty/issues/4317.
482             //
483             // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
484             // try to read from the underlying file descriptor and so notify the user about the error.
485             if ((ops & EPOLL_ERR_IN_MASK) != 0) {
486                 // The Channel is still open and there is something to read. Do it now.
487                 epollInReady();
488             }
489 
490             // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
491             // we may close the channel directly or try to read more data depending on the state of the
492             // Channel and als depending on the AbstractEpollChannel subtype.
493             if ((ops & EPOLL_RDHUP_MASK) != 0) {
494                 epollRdHupReady();
495             }
496         }
497 
498         /**
499          * Called once EPOLLIN event is ready to be processed
500          */
501         abstract void epollInReady();
502 
503         final boolean shouldStopReading(ChannelConfig config) {
504             // Check if there is a readPending which was not processed yet.
505             // This could be for two reasons:
506             // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
507             // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
508             //
509             // See https://github.com/netty/netty/issues/2254
510             return !readPending && !config.isAutoRead();
511         }
512 
513         /**
514          * Called once EPOLLRDHUP event is ready to be processed
515          */
516         final void epollRdHupReady() {
517             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
518             recvBufAllocHandle().receivedRdHup();
519 
520             if (isActive()) {
521                 // If it is still active, we need to call epollInReady as otherwise we may miss to
522                 // read pending data from the underlying file descriptor.
523                 // See https://github.com/netty/netty/issues/3709
524                 epollInReady();
525             } else {
526                 // Just to be safe make sure the input marked as closed.
527                 shutdownInput(false);
528             }
529 
530             // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
531             clearEpollRdHup();
532         }
533 
534         /**
535          * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
536          */
537         private void clearEpollRdHup() {
538             try {
539                 clearFlag(Native.EPOLLRDHUP);
540             } catch (IOException e) {
541                 pipeline().fireExceptionCaught(e);
542                 close(voidPromise());
543             }
544         }
545 
546         /**
547          * Shutdown the input side of the channel.
548          */
549         void shutdownInput(boolean allDataRead) {
550             if (!socket.isInputShutdown()) {
551                 if (isAllowHalfClosure(config())) {
552                     try {
553                         socket.shutdown(true, false);
554                     } catch (IOException ignored) {
555                         // We attempted to shutdown and failed, which means the input has already effectively been
556                         // shutdown.
557                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
558                         return;
559                     } catch (NotYetConnectedException ignore) {
560                         // We attempted to shutdown and failed, which means the input has already effectively been
561                         // shutdown.
562                     }
563                     if (shouldStopReading(config())) {
564                         clearEpollIn0();
565                     }
566                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
567                 } else {
568                     close(voidPromise());
569                     return;
570                 }
571             }
572 
573             if (allDataRead && !inputClosedSeenErrorOnRead) {
574                 inputClosedSeenErrorOnRead = true;
575                 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
576             }
577         }
578 
579         private void fireEventAndClose(Object evt) {
580             pipeline().fireUserEventTriggered(evt);
581             close(voidPromise());
582         }
583 
584         @Override
585         public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
586             if (allocHandle == null) {
587                 allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
588             }
589             return allocHandle;
590         }
591 
592         /**
593          * Create a new {@link EpollRecvByteAllocatorHandle} instance.
594          * @param handle The handle to wrap with EPOLL specific logic.
595          */
596         EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
597             return new EpollRecvByteAllocatorHandle(handle);
598         }
599 
600         @Override
601         protected final void flush0() {
602             // Flush immediately only when there's no pending flush.
603             // If there's a pending flush operation, event loop will call forceFlush() later,
604             // and thus there's no need to call it now.
605             if (!isFlagSet(Native.EPOLLOUT)) {
606                 super.flush0();
607             }
608         }
609 
610         /**
611          * Called once a EPOLLOUT event is ready to be processed
612          */
613         final void epollOutReady() {
614             if (connectPromise != null) {
615                 // pending connect which is now complete so handle it.
616                 finishConnect();
617             } else if (!socket.isOutputShutdown()) {
618                 // directly call super.flush0() to force a flush now
619                 super.flush0();
620             }
621         }
622 
623         protected final void clearEpollIn0() {
624             assert eventLoop().inEventLoop();
625             try {
626                 readPending = false;
627                 if (!ops.contains(EpollIoOps.EPOLLIN)) {
628                     return;
629                 }
630                 ops = ops.without(EpollIoOps.EPOLLIN);
631                 IoRegistration registration = registration();
632                 registration.submit(ops);
633             } catch (UncheckedIOException e) {
634                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
635                 // so fire the exception through the pipeline and close the Channel.
636                 pipeline().fireExceptionCaught(e);
637                 unsafe().close(unsafe().voidPromise());
638             }
639         }
640 
641         @Override
642         public void connect(
643                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
644             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
645             // non-blocking io.
646             if (promise.isDone() || !ensureOpen(promise)) {
647                 return;
648             }
649 
650             try {
651                 if (connectPromise != null) {
652                     throw new ConnectionPendingException();
653                 }
654 
655                 boolean wasActive = isActive();
656                 if (doConnect(remoteAddress, localAddress)) {
657                     fulfillConnectPromise(promise, wasActive);
658                 } else {
659                     connectPromise = promise;
660                     requestedRemoteAddress = remoteAddress;
661 
662                     // Schedule connect timeout.
663                     final int connectTimeoutMillis = config().getConnectTimeoutMillis();
664                     if (connectTimeoutMillis > 0) {
665                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
666                             @Override
667                             public void run() {
668                                 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
669                                 if (connectPromise != null && !connectPromise.isDone()
670                                         && connectPromise.tryFailure(new ConnectTimeoutException(
671                                                 "connection timed out after " + connectTimeoutMillis + " ms: " +
672                                                         remoteAddress))) {
673                                     close(voidPromise());
674                                 }
675                             }
676                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
677                     }
678 
679                     promise.addListener(new ChannelFutureListener() {
680                         @Override
681                         public void operationComplete(ChannelFuture future) {
682                             // If the connect future is cancelled we also cancel the timeout and close the
683                             // underlying socket.
684                             if (future.isCancelled()) {
685                                 if (connectTimeoutFuture != null) {
686                                     connectTimeoutFuture.cancel(false);
687                                 }
688                                 connectPromise = null;
689                                 close(voidPromise());
690                             }
691                         }
692                     });
693                 }
694             } catch (Throwable t) {
695                 closeIfClosed();
696                 promise.tryFailure(annotateConnectException(t, remoteAddress));
697             }
698         }
699 
700         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
701             if (promise == null) {
702                 // Closed via cancellation and the promise has been notified already.
703                 return;
704             }
705             active = true;
706 
707             // The channel is active, register with current ops now as we are ready to start receiving events.
708             submitCurrentOps();
709 
710             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
711             // We still need to ensure we call fireChannelActive() in this case.
712             boolean active = isActive();
713 
714             // trySuccess() will return false if a user cancelled the connection attempt.
715             boolean promiseSet = promise.trySuccess();
716 
717             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
718             // because what happened is what happened.
719             if (!wasActive && active) {
720                 pipeline().fireChannelActive();
721             }
722 
723             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
724             if (!promiseSet) {
725                 close(voidPromise());
726             }
727         }
728 
729         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
730             if (promise == null) {
731                 // Closed via cancellation and the promise has been notified already.
732                 return;
733             }
734 
735             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
736             promise.tryFailure(cause);
737             closeIfClosed();
738         }
739 
740         private void finishConnect() {
741             // Note this method is invoked by the event loop only if the connection attempt was
742             // neither cancelled nor timed out.
743 
744             assert eventLoop().inEventLoop();
745 
746             boolean connectStillInProgress = false;
747             try {
748                 boolean wasActive = isActive();
749                 if (!doFinishConnect()) {
750                     connectStillInProgress = true;
751                     return;
752                 }
753                 fulfillConnectPromise(connectPromise, wasActive);
754             } catch (Throwable t) {
755                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
756             } finally {
757                 if (!connectStillInProgress) {
758                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
759                     // See https://github.com/netty/netty/issues/1770
760                     if (connectTimeoutFuture != null) {
761                         connectTimeoutFuture.cancel(false);
762                     }
763                     connectPromise = null;
764                 }
765             }
766         }
767 
768         /**
769          * Finish the connect
770          */
771         private boolean doFinishConnect() throws Exception {
772             if (socket.finishConnect()) {
773                 clearFlag(Native.EPOLLOUT);
774                 if (requestedRemoteAddress instanceof InetSocketAddress) {
775                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
776                 }
777                 requestedRemoteAddress = null;
778 
779                 return true;
780             }
781             setFlag(Native.EPOLLOUT);
782             return false;
783         }
784     }
785 
786     @Override
787     protected void doBind(SocketAddress local) throws Exception {
788         if (local instanceof InetSocketAddress) {
789             checkResolvable((InetSocketAddress) local);
790         }
791         socket.bind(local);
792         this.local = socket.localAddress();
793     }
794 
795     /**
796      * Connect to the remote peer
797      */
798     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
799         if (localAddress instanceof InetSocketAddress) {
800             checkResolvable((InetSocketAddress) localAddress);
801         }
802 
803         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
804                 ? (InetSocketAddress) remoteAddress : null;
805         if (remoteSocketAddr != null) {
806             checkResolvable(remoteSocketAddr);
807         }
808 
809         if (remote != null) {
810             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
811             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
812             // later.
813             throw new AlreadyConnectedException();
814         }
815 
816         if (localAddress != null) {
817             socket.bind(localAddress);
818         }
819 
820         boolean connected = doConnect0(remoteAddress);
821         if (connected) {
822             remote = remoteSocketAddr == null ?
823                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
824         }
825         // We always need to set the localAddress even if not connected yet as the bind already took place.
826         //
827         // See https://github.com/netty/netty/issues/3463
828         local = socket.localAddress();
829         return connected;
830     }
831 
832     boolean doConnect0(SocketAddress remote) throws Exception {
833         boolean success = false;
834         try {
835             boolean connected = socket.connect(remote);
836             if (!connected) {
837                 setFlag(Native.EPOLLOUT);
838             }
839             success = true;
840             return connected;
841         } finally {
842             if (!success) {
843                 doClose();
844             }
845         }
846     }
847 
848     final void submitCurrentOps() {
849         IoRegistration registration = registration();
850         registration.submit(ops);
851     }
852 
853     @Override
854     protected SocketAddress localAddress0() {
855         return local;
856     }
857 
858     @Override
859     protected SocketAddress remoteAddress0() {
860         return remote;
861     }
862 }