View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelException;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelFutureListener;
27  import io.netty.channel.ChannelMetadata;
28  import io.netty.channel.ChannelOption;
29  import io.netty.channel.ChannelPromise;
30  import io.netty.channel.ConnectTimeoutException;
31  import io.netty.channel.EventLoop;
32  import io.netty.channel.socket.ChannelInputShutdownEvent;
33  import io.netty.channel.unix.Socket;
34  import io.netty.channel.unix.UnixChannel;
35  import io.netty.util.ReferenceCountUtil;
36  import io.netty.util.internal.ThrowableUtil;
37  
38  import java.io.IOException;
39  import java.net.InetSocketAddress;
40  import java.net.SocketAddress;
41  import java.nio.ByteBuffer;
42  import java.nio.channels.AlreadyConnectedException;
43  import java.nio.channels.ClosedChannelException;
44  import java.nio.channels.ConnectionPendingException;
45  import java.nio.channels.NotYetConnectedException;
46  import java.nio.channels.UnresolvedAddressException;
47  import java.util.concurrent.ScheduledFuture;
48  import java.util.concurrent.TimeUnit;
49  
50  import static io.netty.util.internal.ObjectUtil.checkNotNull;
51  import static io.netty.channel.epoll.UnixChannelUtil.computeRemoteAddr;
52  
53  abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
54      private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
55              new ClosedChannelException(), AbstractEpollChannel.class, "doClose()");
56      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57      private final int readFlag;
58      private final Socket socket;
59      /**
60       * The future of the current connection attempt.  If not null, subsequent
61       * connection attempts will fail.
62       */
63      private ChannelPromise connectPromise;
64      private ScheduledFuture<?> connectTimeoutFuture;
65      private SocketAddress requestedRemoteAddress;
66  
67      private volatile SocketAddress local;
68      private volatile SocketAddress remote;
69  
70      protected int flags = Native.EPOLLET;
71  
72      protected volatile boolean active;
73  
74      AbstractEpollChannel(Socket fd, int flag) {
75          this(null, fd, flag, false);
76      }
77  
78      AbstractEpollChannel(Channel parent, Socket fd, int flag, boolean active) {
79          super(parent);
80          socket = checkNotNull(fd, "fd");
81          readFlag = flag;
82          flags |= flag;
83          this.active = active;
84          if (active) {
85              // Directly cache the remote and local addresses
86              // See https://github.com/netty/netty/issues/2359
87              local = fd.localAddress();
88              remote = fd.remoteAddress();
89          }
90      }
91  
92      AbstractEpollChannel(Channel parent, Socket fd, int flag, SocketAddress remote) {
93          super(parent);
94          socket = checkNotNull(fd, "fd");
95          readFlag = flag;
96          flags |= flag;
97          active = true;
98          // Directly cache the remote and local addresses
99          // See https://github.com/netty/netty/issues/2359
100         this.remote = remote;
101         local = fd.localAddress();
102     }
103 
104     static boolean isSoErrorZero(Socket fd) {
105         try {
106             return fd.getSoError() == 0;
107         } catch (IOException e) {
108             throw new ChannelException(e);
109         }
110     }
111 
112     void setFlag(int flag) throws IOException {
113         if (!isFlagSet(flag)) {
114             flags |= flag;
115             modifyEvents();
116         }
117     }
118 
119     void clearFlag(int flag) throws IOException {
120         if (isFlagSet(flag)) {
121             flags &= ~flag;
122             modifyEvents();
123         }
124     }
125 
126     boolean isFlagSet(int flag) {
127         return (flags & flag) != 0;
128     }
129 
130     @Override
131     public final Socket fd() {
132         return socket;
133     }
134 
135     @Override
136     public abstract EpollChannelConfig config();
137 
138     @Override
139     public boolean isActive() {
140         return active;
141     }
142 
143     @Override
144     public ChannelMetadata metadata() {
145         return METADATA;
146     }
147 
148     @Override
149     protected void doClose() throws Exception {
150         active = false;
151         try {
152             ChannelPromise promise = connectPromise;
153             if (promise != null) {
154                 // Use tryFailure() instead of setFailure() to avoid the race against cancel().
155                 promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
156                 connectPromise = null;
157             }
158 
159             ScheduledFuture<?> future = connectTimeoutFuture;
160             if (future != null) {
161                 future.cancel(false);
162                 connectTimeoutFuture = null;
163             }
164 
165             if (isRegistered()) {
166                 // Need to check if we are on the EventLoop as doClose() may be triggered by the GlobalEventExecutor
167                 // if SO_LINGER is used.
168                 //
169                 // See https://github.com/netty/netty/issues/7159
170                 EventLoop loop = eventLoop();
171                 if (loop.inEventLoop()) {
172                     doDeregister();
173                 } else {
174                     loop.execute(new Runnable() {
175                         @Override
176                         public void run() {
177                             try {
178                                 doDeregister();
179                             } catch (Throwable cause) {
180                                 pipeline().fireExceptionCaught(cause);
181                             }
182                         }
183                     });
184                 }
185             }
186         } finally {
187             socket.close();
188         }
189     }
190 
191     @Override
192     protected void doDisconnect() throws Exception {
193         doClose();
194     }
195 
196     @Override
197     protected boolean isCompatible(EventLoop loop) {
198         return loop instanceof EpollEventLoop;
199     }
200 
201     @Override
202     public boolean isOpen() {
203         return socket.isOpen();
204     }
205 
206     @Override
207     protected void doDeregister() throws Exception {
208         ((EpollEventLoop) eventLoop()).remove(this);
209     }
210 
211     @Override
212     protected void doBeginRead() throws Exception {
213         // Channel.read() or ChannelHandlerContext.read() was called
214         ((AbstractEpollUnsafe) unsafe()).readPending = true;
215 
216         setFlag(readFlag);
217     }
218 
219     final void clearEpollIn() {
220         // Only clear if registered with an EventLoop as otherwise
221         if (isRegistered()) {
222             final EventLoop loop = eventLoop();
223             final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
224             if (loop.inEventLoop()) {
225                 unsafe.clearEpollIn0();
226             } else {
227                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
228                 loop.execute(new Runnable() {
229                     @Override
230                     public void run() {
231                         if (!config().isAutoRead() && !unsafe.readPending) {
232                             // Still no read triggered so clear it now
233                             unsafe.clearEpollIn0();
234                         }
235                     }
236                 });
237             }
238         } else  {
239             // The EventLoop is not registered atm so just update the flags so the correct value
240             // will be used once the channel is registered
241             flags &= ~readFlag;
242         }
243     }
244 
245     private void modifyEvents() throws IOException {
246         if (isOpen() && isRegistered()) {
247             ((EpollEventLoop) eventLoop()).modify(this);
248         }
249     }
250 
251     @Override
252     protected void doRegister() throws Exception {
253         EpollEventLoop loop = (EpollEventLoop) eventLoop();
254         loop.add(this);
255     }
256 
257     @Override
258     protected abstract AbstractEpollUnsafe newUnsafe();
259 
260     /**
261      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
262      */
263     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
264         return newDirectBuffer(buf, buf);
265     }
266 
267     /**
268      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
269      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
270      * this method.
271      */
272     protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
273         final int readableBytes = buf.readableBytes();
274         if (readableBytes == 0) {
275             ReferenceCountUtil.release(holder);
276             return Unpooled.EMPTY_BUFFER;
277         }
278 
279         final ByteBufAllocator alloc = alloc();
280         if (alloc.isDirectBufferPooled()) {
281             return newDirectBuffer0(holder, buf, alloc, readableBytes);
282         }
283 
284         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
285         if (directBuf == null) {
286             return newDirectBuffer0(holder, buf, alloc, readableBytes);
287         }
288 
289         directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
290         ReferenceCountUtil.safeRelease(holder);
291         return directBuf;
292     }
293 
294     private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
295         final ByteBuf directBuf = alloc.directBuffer(capacity);
296         directBuf.writeBytes(buf, buf.readerIndex(), capacity);
297         ReferenceCountUtil.safeRelease(holder);
298         return directBuf;
299     }
300 
301     protected static void checkResolvable(InetSocketAddress addr) {
302         if (addr.isUnresolved()) {
303             throw new UnresolvedAddressException();
304         }
305     }
306 
307     /**
308      * Read bytes into the given {@link ByteBuf} and return the amount.
309      */
310     protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
311         int writerIndex = byteBuf.writerIndex();
312         int localReadAmount;
313         if (byteBuf.hasMemoryAddress()) {
314             localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
315         } else {
316             ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
317             localReadAmount = socket.read(buf, buf.position(), buf.limit());
318         }
319         if (localReadAmount > 0) {
320             byteBuf.writerIndex(writerIndex + localReadAmount);
321         }
322         return localReadAmount;
323     }
324 
325     protected final int doWriteBytes(ByteBuf buf, int writeSpinCount) throws Exception {
326         int readableBytes = buf.readableBytes();
327         int writtenBytes = 0;
328         if (buf.hasMemoryAddress()) {
329             long memoryAddress = buf.memoryAddress();
330             int readerIndex = buf.readerIndex();
331             int writerIndex = buf.writerIndex();
332             for (int i = writeSpinCount - 1; i >= 0; i--) {
333                 int localFlushedAmount = socket.writeAddress(memoryAddress, readerIndex, writerIndex);
334                 if (localFlushedAmount > 0) {
335                     writtenBytes += localFlushedAmount;
336                     if (writtenBytes == readableBytes) {
337                         return writtenBytes;
338                     }
339                     readerIndex += localFlushedAmount;
340                 } else {
341                     break;
342                 }
343             }
344         } else {
345             ByteBuffer nioBuf;
346             if (buf.nioBufferCount() == 1) {
347                 nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes());
348             } else {
349                 nioBuf = buf.nioBuffer();
350             }
351             for (int i = writeSpinCount - 1; i >= 0; i--) {
352                 int pos = nioBuf.position();
353                 int limit = nioBuf.limit();
354                 int localFlushedAmount = socket.write(nioBuf, pos, limit);
355                 if (localFlushedAmount > 0) {
356                     nioBuf.position(pos + localFlushedAmount);
357                     writtenBytes += localFlushedAmount;
358                     if (writtenBytes == readableBytes) {
359                         return writtenBytes;
360                     }
361                 } else {
362                     break;
363                 }
364             }
365         }
366         if (writtenBytes < readableBytes) {
367             // Returned EAGAIN need to set EPOLLOUT
368             setFlag(Native.EPOLLOUT);
369         }
370         return writtenBytes;
371     }
372 
373     protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
374         protected boolean readPending;
375         private boolean rdHup;
376 
377         /**
378          * Called once EPOLLIN event is ready to be processed
379          */
380         abstract void epollInReady();
381 
382         public final boolean isRdHup() {
383             return rdHup;
384         }
385 
386         /**
387          * Called once EPOLLRDHUP event is ready to be processed
388          */
389         final void epollRdHupReady() {
390             // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
391             rdHup = true;
392 
393             if (isActive()) {
394                 // If it is still active, we need to call epollInReady as otherwise we may miss to
395                 // read pending data from the underlying file descriptor.
396                 // See https://github.com/netty/netty/issues/3709
397                 epollInReady();
398 
399                 // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
400                 clearEpollRdHup();
401             }
402 
403             // epollInReady may call this, but we should ensure that it gets called.
404             shutdownInput();
405         }
406 
407         /**
408          * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
409          */
410         private void clearEpollRdHup() {
411             try {
412                 clearFlag(Native.EPOLLRDHUP);
413             } catch (IOException e) {
414                 pipeline().fireExceptionCaught(e);
415                 close(voidPromise());
416             }
417         }
418 
419         /**
420          * Shutdown the input side of the channel.
421          */
422         void shutdownInput() {
423             if (!fd().isInputShutdown()) {
424                 if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
425                     try {
426                         fd().shutdown(true, false);
427                         clearEpollIn0();
428                     } catch (IOException ignored) {
429                         // We attempted to shutdown and failed, which means the input has already effectively been
430                         // shutdown.
431                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
432                         return;
433                     } catch (NotYetConnectedException ignore) {
434                         // We attempted to shutdown and failed, which means the input has already effectively been
435                         // shutdown.
436                         fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
437                         return;
438                     }
439                     pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
440                 } else {
441                     close(voidPromise());
442                 }
443             }
444         }
445 
446         private void fireEventAndClose(Object evt) {
447             pipeline().fireUserEventTriggered(evt);
448             close(voidPromise());
449         }
450 
451         @Override
452         protected void flush0() {
453             // Flush immediately only when there's no pending flush.
454             // If there's a pending flush operation, event loop will call forceFlush() later,
455             // and thus there's no need to call it now.
456             if (isFlagSet(Native.EPOLLOUT)) {
457                 return;
458             }
459             super.flush0();
460         }
461 
462         /**
463          * Called once a EPOLLOUT event is ready to be processed
464          */
465         final void epollOutReady() {
466             if (connectPromise != null) {
467                 // pending connect which is now complete so handle it.
468                 finishConnect();
469             } else if (!fd().isOutputShutdown()) {
470                 // directly call super.flush0() to force a flush now
471                 super.flush0();
472             }
473         }
474 
475         protected final void clearEpollIn0() {
476             assert eventLoop().inEventLoop();
477             try {
478                 clearFlag(readFlag);
479             } catch (IOException e) {
480                 // When this happens there is something completely wrong with either the filedescriptor or epoll,
481                 // so fire the exception through the pipeline and close the Channel.
482                 pipeline().fireExceptionCaught(e);
483                 unsafe().close(unsafe().voidPromise());
484             }
485         }
486 
487         @Override
488         public void connect(
489                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
490             if (!promise.setUncancellable() || !ensureOpen(promise)) {
491                 return;
492             }
493 
494             try {
495                 if (connectPromise != null) {
496                     throw new ConnectionPendingException();
497                 }
498 
499                 boolean wasActive = isActive();
500                 if (doConnect(remoteAddress, localAddress)) {
501                     fulfillConnectPromise(promise, wasActive);
502                 } else {
503                     connectPromise = promise;
504                     requestedRemoteAddress = remoteAddress;
505 
506                     // Schedule connect timeout.
507                     int connectTimeoutMillis = config().getConnectTimeoutMillis();
508                     if (connectTimeoutMillis > 0) {
509                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
510                             @Override
511                             public void run() {
512                                 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
513                                 ConnectTimeoutException cause =
514                                         new ConnectTimeoutException("connection timed out: " + remoteAddress);
515                                 if (connectPromise != null && connectPromise.tryFailure(cause)) {
516                                     close(voidPromise());
517                                 }
518                             }
519                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
520                     }
521 
522                     promise.addListener(new ChannelFutureListener() {
523                         @Override
524                         public void operationComplete(ChannelFuture future) throws Exception {
525                             if (future.isCancelled()) {
526                                 if (connectTimeoutFuture != null) {
527                                     connectTimeoutFuture.cancel(false);
528                                 }
529                                 connectPromise = null;
530                                 close(voidPromise());
531                             }
532                         }
533                     });
534                 }
535             } catch (Throwable t) {
536                 closeIfClosed();
537                 promise.tryFailure(annotateConnectException(t, remoteAddress));
538             }
539         }
540 
541         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
542             if (promise == null) {
543                 // Closed via cancellation and the promise has been notified already.
544                 return;
545             }
546             active = true;
547 
548             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
549             // We still need to ensure we call fireChannelActive() in this case.
550             boolean active = isActive();
551 
552             // trySuccess() will return false if a user cancelled the connection attempt.
553             boolean promiseSet = promise.trySuccess();
554 
555             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
556             // because what happened is what happened.
557             if (!wasActive && active) {
558                 pipeline().fireChannelActive();
559             }
560 
561             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
562             if (!promiseSet) {
563                 close(voidPromise());
564             }
565         }
566 
567         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
568             if (promise == null) {
569                 // Closed via cancellation and the promise has been notified already.
570                 return;
571             }
572 
573             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
574             promise.tryFailure(cause);
575             closeIfClosed();
576         }
577 
578         private void finishConnect() {
579             // Note this method is invoked by the event loop only if the connection attempt was
580             // neither cancelled nor timed out.
581 
582             assert eventLoop().inEventLoop();
583 
584             boolean connectStillInProgress = false;
585             try {
586                 boolean wasActive = isActive();
587                 if (!doFinishConnect()) {
588                     connectStillInProgress = true;
589                     return;
590                 }
591                 fulfillConnectPromise(connectPromise, wasActive);
592             } catch (Throwable t) {
593                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
594             } finally {
595                 if (!connectStillInProgress) {
596                     // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
597                     // See https://github.com/netty/netty/issues/1770
598                     if (connectTimeoutFuture != null) {
599                         connectTimeoutFuture.cancel(false);
600                     }
601                     connectPromise = null;
602                 }
603             }
604         }
605 
606         /**
607          * Finish the connect
608          */
609         private boolean doFinishConnect() throws Exception {
610             if (socket.finishConnect()) {
611                 clearFlag(Native.EPOLLOUT);
612                 if (requestedRemoteAddress instanceof InetSocketAddress) {
613                     remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress,
614                             socket.remoteAddress());
615                 }
616                 requestedRemoteAddress = null;
617 
618                 return true;
619             }
620             setFlag(Native.EPOLLOUT);
621             return false;
622         }
623     }
624 
625     @Override
626     protected void doBind(SocketAddress local) throws Exception {
627         if (local instanceof InetSocketAddress) {
628             checkResolvable((InetSocketAddress) local);
629         }
630         socket.bind(local);
631         this.local = socket.localAddress();
632     }
633 
634     /**
635      * Connect to the remote peer
636      */
637     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
638         if (localAddress instanceof InetSocketAddress) {
639             checkResolvable((InetSocketAddress) localAddress);
640         }
641 
642         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
643                 ? (InetSocketAddress) remoteAddress : null;
644         if (remoteSocketAddr != null) {
645             checkResolvable(remoteSocketAddr);
646         }
647 
648         if (remote != null) {
649             // Check if already connected before trying to connect. This is needed as connect(...) will not return -1
650             // and set errno to EISCONN if a previous connect(...) attempt was setting errno to EINPROGRESS and finished
651             // later.
652             throw new AlreadyConnectedException();
653         }
654 
655         if (localAddress != null) {
656             socket.bind(localAddress);
657         }
658 
659         boolean connected = doConnect0(remoteAddress);
660         if (connected) {
661             remote = remoteSocketAddr == null ?
662                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
663         }
664         // We always need to set the localAddress even if not connected yet as the bind already took place.
665         //
666         // See https://github.com/netty/netty/issues/3463
667         local = socket.localAddress();
668         return connected;
669     }
670 
671     private boolean doConnect0(SocketAddress remote) throws Exception {
672         boolean success = false;
673         try {
674             boolean connected = socket.connect(remote);
675             if (!connected) {
676                 setFlag(Native.EPOLLOUT);
677             }
678             success = true;
679             return connected;
680         } finally {
681             if (!success) {
682                 doClose();
683             }
684         }
685     }
686 
687     @Override
688     protected SocketAddress localAddress0() {
689         return local;
690     }
691 
692     @Override
693     protected SocketAddress remoteAddress0() {
694         return remote;
695     }
696 }