View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.epoll;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.buffer.api.DefaultBufferAllocators;
21  import io.netty5.channel.ChannelOption;
22  import io.netty5.channel.ChannelShutdownDirection;
23  import io.netty5.channel.RecvBufferAllocator;
24  import io.netty5.channel.socket.DomainSocketAddress;
25  import io.netty5.channel.socket.SocketProtocolFamily;
26  import io.netty5.channel.unix.IntegerUnixChannelOption;
27  import io.netty5.channel.unix.RawUnixChannelOption;
28  import io.netty5.util.Resource;
29  import io.netty5.channel.AbstractChannel;
30  import io.netty5.channel.ChannelException;
31  import io.netty5.channel.ChannelMetadata;
32  import io.netty5.channel.ChannelOutboundBuffer;
33  import io.netty5.channel.EventLoop;
34  import io.netty5.channel.unix.FileDescriptor;
35  import io.netty5.channel.unix.IovArray;
36  import io.netty5.channel.unix.Socket;
37  import io.netty5.channel.unix.UnixChannel;
38  
39  import java.io.IOException;
40  import java.net.InetSocketAddress;
41  import java.net.SocketAddress;
42  import java.nio.ByteBuffer;
43  import java.nio.channels.UnresolvedAddressException;
44  
45  import static io.netty5.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
46  import static io.netty5.channel.unix.UnixChannelUtil.computeRemoteAddr;
47  import static io.netty5.util.CharsetUtil.UTF_8;
48  import static java.util.Objects.requireNonNull;
49  
50  abstract class AbstractEpollChannel<P extends UnixChannel>
51          extends AbstractChannel<P, SocketAddress, SocketAddress> implements UnixChannel {
52      protected final LinuxSocket socket;
53  
54      private final Runnable epollInReadyRunnable = new Runnable() {
55          @Override
56          public void run() {
57              epollInReadyRunnablePending = false;
58              epollInReady();
59          }
60      };
61  
62      protected volatile boolean active;
63  
64      boolean readPending;
65  
66      private EpollRegistration registration;
67  
68      private int flags = Native.EPOLLET;
69      private boolean inputClosedSeenErrorOnRead;
70      private boolean epollInReadyRunnablePending;
71      private boolean maybeMoreDataToRead;
72  
73      private boolean receivedRdHup;
74      private volatile SocketAddress localAddress;
75      private volatile SocketAddress remoteAddress;
76  
77      AbstractEpollChannel(EventLoop eventLoop, ChannelMetadata metadata, int initialFlag,
78                           RecvBufferAllocator defaultRecvAllocator, LinuxSocket fd) {
79          this(null, eventLoop, metadata, initialFlag, defaultRecvAllocator, fd, false);
80      }
81  
82      @SuppressWarnings("unchecked")
83      AbstractEpollChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata, int initialFlag,
84                           RecvBufferAllocator defaultRecvAllocator, LinuxSocket fd, boolean active) {
85          super(parent, eventLoop, metadata, defaultRecvAllocator);
86          flags |= initialFlag;
87          socket = requireNonNull(fd, "fd");
88          this.active = active;
89          if (active) {
90              // Directly cache the remote and local addresses
91              // See https://github.com/netty/netty/issues/2359
92              localAddress = fd.localAddress();
93              remoteAddress = fd.remoteAddress();
94          }
95      }
96  
97      @SuppressWarnings("unchecked")
98      AbstractEpollChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata, int initialFlag,
99                           RecvBufferAllocator defaultRecvAllocator, LinuxSocket fd, SocketAddress remote) {
100         super(parent, eventLoop, metadata, defaultRecvAllocator);
101         flags |= initialFlag;
102         socket = requireNonNull(fd, "fd");
103         active = true;
104         // Directly cache the remote and local addresses
105         // See https://github.com/netty/netty/issues/2359
106         remoteAddress =  remote;
107         localAddress = fd.localAddress();
108     }
109 
110     protected final boolean fetchLocalAddress() {
111         return socket.protocolFamily() != SocketProtocolFamily.UNIX;
112     }
113 
114     protected static boolean isSoErrorZero(Socket fd) {
115         try {
116             return fd.getSoError() == 0;
117         } catch (IOException e) {
118             throw new ChannelException(e);
119         }
120     }
121 
122     protected final void setFlag(int flag) throws IOException {
123         if (!isFlagSet(flag)) {
124             flags |= flag;
125             modifyEvents();
126         }
127     }
128 
129     protected final void clearFlag(int flag) throws IOException {
130         if (isFlagSet(flag)) {
131             flags &= ~flag;
132             modifyEvents();
133         }
134     }
135 
136     protected final EpollRegistration registration() {
137         assert registration != null;
138         return registration;
139     }
140 
141     private boolean isFlagSet(int flag) {
142         return (flags & flag) != 0;
143     }
144 
145     final int flags() {
146         return flags;
147     }
148 
149     @Override
150     public final FileDescriptor fd() {
151         return socket;
152     }
153 
154     @Override
155     public boolean isActive() {
156         return active;
157     }
158 
159     @Override
160     protected void doClose() throws Exception {
161         active = false;
162         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
163         // socket which has not even been connected yet. This has been observed to block during unit tests.
164         inputClosedSeenErrorOnRead = true;
165         socket.close();
166     }
167 
168     final void resetCachedAddresses() {
169         cacheAddresses(localAddress, null);
170         remoteAddress = null;
171     }
172 
173     @Override
174     protected void doDisconnect() throws Exception {
175         doClose();
176     }
177 
178     @Override
179     public final boolean isOpen() {
180         return socket.isOpen();
181     }
182 
183     final void register0(EpollRegistration registration) {
184         // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
185         // make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
186         // new EventLoop.
187         epollInReadyRunnablePending = false;
188         this.registration = registration;
189     }
190 
191     final void deregister0() throws Exception {
192         if (registration != null) {
193             registration.remove();
194         }
195     }
196 
197     @Override
198     protected final void doBeginRead() throws Exception {
199         // Channel.read() or ChannelHandlerContext.read() was called
200         readPending = true;
201 
202         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
203         // executeEpollInReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
204         // never get data after this.
205         setFlag(Native.EPOLLIN);
206 
207         // If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
208         // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
209         if (maybeMoreDataToRead) {
210             executeEpollInReadyRunnable();
211         }
212     }
213 
214     final boolean shouldBreakEpollInReady() {
215         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || isAllowHalfClosure());
216     }
217 
218     private void clearEpollIn() {
219         // Only clear if registered with an EventLoop as otherwise
220         if (isRegistered()) {
221             final EventLoop loop = executor();
222             if (loop.inEventLoop()) {
223                 clearEpollIn0();
224             } else {
225                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
226                 loop.execute(() -> {
227                     if (!readPending && !isAutoRead()) {
228                         // Still no read triggered so clear it now
229                         clearEpollIn0();
230                     }
231                 });
232             }
233         } else  {
234             // The EventLoop is not registered atm so just update the flags so the correct value
235             // will be used once the channel is registered
236             flags &= ~Native.EPOLLIN;
237         }
238     }
239 
240     private void modifyEvents() throws IOException {
241         if (isOpen() && isRegistered() && registration != null) {
242             registration.update();
243         }
244     }
245 
246     /**
247      * Returns an off-heap copy of, and then closes, the given {@link Buffer}.
248      */
249     protected final Buffer newDirectBuffer(Buffer buf) {
250         return newDirectBuffer(buf, buf);
251     }
252 
253     /**
254      * Returns an off-heap copy of the given {@link Buffer}, and then closes the {@code holder} under the assumption
255      * that it owned (or was itself) the buffer.
256      */
257     protected final Buffer newDirectBuffer(Resource<?> holder, Buffer buf) {
258         BufferAllocator allocator = ioBufferAllocator();
259         try (holder) {
260             int readableBytes = buf.readableBytes();
261             Buffer directCopy = allocator.allocate(readableBytes);
262             if (readableBytes > 0) {
263                 directCopy.writeBytes(buf);
264             }
265             return directCopy;
266         }
267     }
268 
269     protected static void checkResolvable(InetSocketAddress addr) {
270         if (addr.isUnresolved()) {
271             throw new UnresolvedAddressException();
272         }
273     }
274 
275     /**
276      * Read bytes into the given {@link Buffer} and return the amount.
277      */
278     protected final void doReadBytes(Buffer buffer) throws Exception {
279         recvBufAllocHandle().attemptedBytesRead(buffer.writableBytes());
280         buffer.forEachWritable(0, (index, component) -> {
281             long address = component.writableNativeAddress();
282             assert address != 0;
283             int localReadAmount = socket.readAddress(address, 0, component.writableBytes());
284             recvBufAllocHandle().lastBytesRead(localReadAmount);
285             if (localReadAmount > 0) {
286                 component.skipWritableBytes(localReadAmount);
287             }
288             return false;
289         });
290     }
291 
292     protected final int doWriteBytes(ChannelOutboundBuffer in, Buffer buf) throws Exception {
293         int initialReaderOffset = buf.readerOffset();
294         buf.forEachReadable(0, (index, component) -> {
295             long address = component.readableNativeAddress();
296             assert address != 0;
297             int written = socket.writeAddress(address, 0, component.readableBytes());
298             if (written > 0) {
299                 component.skipReadableBytes(written);
300             }
301             return false;
302         });
303         int readerOffset = buf.readerOffset();
304         if (initialReaderOffset < readerOffset) {
305             buf.readerOffset(initialReaderOffset); // Restore read offset for ChannelOutboundBuffer.
306             int bytesWritten = readerOffset - initialReaderOffset;
307             in.removeBytes(bytesWritten);
308             return 1; // Some data was written to the socket.
309         }
310         return WRITE_STATUS_SNDBUF_FULL;
311     }
312 
313     /**
314      * Write bytes to the socket, with or without a remote address.
315      * Used for datagram and TCP client fast open writes.
316      */
317     protected final long doWriteOrSendBytes(Buffer data, SocketAddress remoteAddress, boolean fastOpen)
318             throws IOException {
319         assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
320 
321         IovArray array = registration().cleanIovArray();
322         data.forEachReadable(0, array);
323         int count = array.count();
324         assert count != 0;
325         if (remoteAddress == null) {
326             return socket.writevAddresses(array.memoryAddress(0), count);
327         }
328         if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
329              return socket.sendToAddressesDomainSocket(
330                     array.memoryAddress(0), count, ((DomainSocketAddress) remoteAddress)
331                             .path().getBytes(UTF_8));
332         } else {
333             InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
334             return socket.sendToAddresses(array.memoryAddress(0), count,
335                     inetSocketAddress.getAddress(), inetSocketAddress.getPort(), fastOpen);
336         }
337     }
338 
339     final void epollInReady() {
340         if (shouldBreakEpollInReady()) {
341             clearEpollIn0();
342             return;
343         }
344         maybeMoreDataToRead = false;
345         RecvBufferAllocator.Handle handle = recvBufAllocHandle();
346         handle.reset();
347 
348         try {
349             epollInReady(handle, ioBufferAllocator(), receivedRdHup);
350         } finally {
351             this.maybeMoreDataToRead = maybeMoreDataToRead(handle) || receivedRdHup;
352 
353             if (receivedRdHup || readPending && maybeMoreDataToRead) {
354                 // trigger a read again as there may be something left to read and because of epoll ET we
355                 // will not get notified again until we read everything from the socket
356                 //
357                 // It is possible the last fireChannelRead call could cause the user to call read() again, or if
358                 // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
359                 // to false before every read operation to prevent re-entry into epollInReady() we will not read from
360                 // the underlying OS again unless the user happens to call read again.
361                 executeEpollInReadyRunnable();
362             } else if (!readPending && !isAutoRead()) {
363                 // Check if there is a readPending which was not processed yet.
364                 // This could be for two reasons:
365                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
366                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
367                 //
368                 // See https://github.com/netty/netty/issues/2254
369                 clearEpollIn();
370             }
371         }
372     }
373 
374     /**
375      * Called once EPOLLIN event is ready to be processed
376      */
377     protected abstract void epollInReady(RecvBufferAllocator.Handle handle, BufferAllocator recvBufferAllocator,
378                                          boolean receivedRdHup);
379 
380     protected abstract boolean maybeMoreDataToRead(RecvBufferAllocator.Handle handle);
381 
382     private void executeEpollInReadyRunnable() {
383         if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady()) {
384             return;
385         }
386         epollInReadyRunnablePending = true;
387         executor().execute(epollInReadyRunnable);
388     }
389 
390     /**
391      * Called once EPOLLRDHUP event is ready to be processed
392      */
393     final void epollRdHupReady() {
394         // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
395         receivedRdHup = true;
396 
397         if (isActive()) {
398             // If it is still active, we need to call epollInReady as otherwise we may miss to
399             // read pending data from the underlying file descriptor.
400             // See https://github.com/netty/netty/issues/3709
401             epollInReady();
402         } else {
403             // Just to be safe make sure the input marked as closed.
404             shutdownInput(true);
405         }
406 
407         // Clear the EPOLLRDHUP flag to prevent continuously getting woken up on this event.
408         clearEpollRdHup();
409     }
410 
411     /**
412      * Clear the {@link Native#EPOLLRDHUP} flag from EPOLL, and close on failure.
413      */
414     private void clearEpollRdHup() {
415         try {
416             clearFlag(Native.EPOLLRDHUP);
417         } catch (IOException e) {
418             pipeline().fireChannelExceptionCaught(e);
419             closeTransport(newPromise());
420         }
421     }
422 
423     /**
424      * Shutdown the input side of the channel.
425      */
426     protected final void shutdownInput(boolean rdHup) {
427         if (!socket.isInputShutdown()) {
428             if (isAllowHalfClosure()) {
429                 clearEpollIn();
430                 shutdownTransport(ChannelShutdownDirection.Inbound, newPromise());
431             } else {
432                 closeTransport(newPromise());
433             }
434         } else if (!rdHup) {
435             inputClosedSeenErrorOnRead = true;
436         }
437     }
438 
439     @Override
440     protected final void writeFlushed() {
441         // Flush immediately only when there's no pending flush.
442         // If there's a pending flush operation, event loop will call forceFlush() later,
443         // and thus there's no need to call it now.
444         if (!isFlagSet(Native.EPOLLOUT)) {
445             super.writeFlushed();
446         }
447     }
448 
449     /**
450      * Called once a EPOLLOUT event is ready to be processed
451      */
452     final void epollOutReady() {
453         if (isConnectPending()) {
454             // pending connect which is now complete so handle it.
455             finishConnect();
456         } else if (!socket.isOutputShutdown()) {
457             // directly call super.flush0() to force a flush now
458             super.writeFlushed();
459         }
460     }
461 
462     private void clearEpollIn0() {
463         assert executor().inEventLoop();
464         try {
465             readPending = false;
466             clearFlag(Native.EPOLLIN);
467         } catch (IOException e) {
468             // When this happens there is something completely wrong with either the filedescriptor or epoll,
469             // so fire the exception through the pipeline and close the Channel.
470             pipeline().fireChannelExceptionCaught(e);
471             closeTransport(newPromise());
472         }
473     }
474 
475     @Override
476     protected boolean doFinishConnect(SocketAddress requestedRemoteAddress) throws Exception {
477         if (socket.finishConnect()) {
478             active = true;
479             clearFlag(Native.EPOLLOUT);
480             if (requestedRemoteAddress instanceof InetSocketAddress) {
481                 remoteAddress = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
482             } else {
483                 remoteAddress = requestedRemoteAddress;
484             }
485             return true;
486         }
487         setFlag(Native.EPOLLOUT);
488         return false;
489     }
490 
491     @Override
492     protected void doBind(SocketAddress local) throws Exception {
493         if (local instanceof InetSocketAddress) {
494             checkResolvable((InetSocketAddress) local);
495         }
496         socket.bind(local);
497         if (fetchLocalAddress()) {
498             this.localAddress = socket.localAddress();
499         } else {
500             this.localAddress = local;
501         }
502     }
503 
504     /**
505      * Connect to the remote peer
506      */
507     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
508         if (localAddress instanceof InetSocketAddress) {
509             checkResolvable((InetSocketAddress) localAddress);
510         }
511 
512         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
513                 ? (InetSocketAddress) remoteAddress : null;
514         if (remoteSocketAddr != null) {
515             checkResolvable(remoteSocketAddr);
516         }
517 
518         if (localAddress != null) {
519             socket.bind(localAddress);
520         }
521 
522         boolean connected = doConnect0(remoteAddress);
523         if (connected) {
524             this.remoteAddress = remoteSocketAddr == null ?
525                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
526             active = true;
527         }
528         if (fetchLocalAddress()) {
529             // We always need to set the localAddress even if not connected yet as the bind already took place.
530             //
531             // See https://github.com/netty/netty/issues/3463
532             this.localAddress = socket.localAddress();
533         }
534         return connected;
535     }
536 
537     protected boolean doConnect0(SocketAddress remote) throws Exception {
538         boolean success = false;
539         try {
540             boolean connected = socket.connect(remote);
541             if (!connected) {
542                 setFlag(Native.EPOLLOUT);
543             }
544             success = true;
545             return connected;
546         } finally {
547             if (!success) {
548                 doClose();
549             }
550         }
551     }
552 
553     @Override
554     protected final SocketAddress localAddress0() {
555         return localAddress;
556     }
557 
558     @Override
559     protected final SocketAddress remoteAddress0() {
560         return remoteAddress;
561     }
562 
563     final void closeTransportNow() {
564         closeTransport(newPromise());
565     }
566 
567     @SuppressWarnings("unchecked")
568     @Override
569     protected <T> T getExtendedOption(ChannelOption<T> option) {
570         try {
571             if (option instanceof IntegerUnixChannelOption) {
572                 IntegerUnixChannelOption opt = (IntegerUnixChannelOption) option;
573                 return (T) Integer.valueOf(socket.getIntOpt(opt.level(), opt.optname()));
574             }
575             if (option instanceof RawUnixChannelOption) {
576                 RawUnixChannelOption opt = (RawUnixChannelOption) option;
577                 ByteBuffer out = ByteBuffer.allocate(opt.length());
578                 socket.getRawOpt(opt.level(), opt.optname(), out);
579                 return (T) out.flip();
580             }
581         } catch (IOException e) {
582             throw new ChannelException(e);
583         }
584         return super.getExtendedOption(option);
585     }
586 
587     @Override
588     protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
589         try {
590             if (option instanceof IntegerUnixChannelOption) {
591                 IntegerUnixChannelOption opt = (IntegerUnixChannelOption) option;
592                 socket.setIntOpt(opt.level(), opt.optname(), (Integer) value);
593                 return;
594             } else if (option instanceof RawUnixChannelOption) {
595                 RawUnixChannelOption opt = (RawUnixChannelOption) option;
596                 socket.setRawOpt(opt.level(), opt.optname(), (ByteBuffer) value);
597                 return;
598             }
599         } catch (IOException e) {
600             throw new ChannelException(e);
601         }
602         super.setExtendedOption(option, value);
603     }
604 
605     @Override
606     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
607         if (option instanceof IntegerUnixChannelOption || option instanceof RawUnixChannelOption) {
608             return true;
609         }
610         return super.isExtendedOptionSupported(option);
611     }
612 
613     @Override
614     protected final void autoReadCleared() {
615         clearEpollIn();
616     }
617 
618     private BufferAllocator ioBufferAllocator() {
619         BufferAllocator alloc =  bufferAllocator();
620         // We need to ensure we always allocate a direct Buffer as we can only use a direct buffer to read via JNI.
621         if (!alloc.getAllocationType().isDirect()) {
622             return DefaultBufferAllocators.offHeapAllocator();
623         }
624         return alloc;
625     }
626 }