View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.kqueue;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.buffer.api.DefaultBufferAllocators;
21  import io.netty5.channel.ChannelOption;
22  import io.netty5.channel.ChannelShutdownDirection;
23  import io.netty5.channel.RecvBufferAllocator;
24  import io.netty5.channel.socket.SocketProtocolFamily;
25  import io.netty5.channel.unix.IntegerUnixChannelOption;
26  import io.netty5.channel.unix.RawUnixChannelOption;
27  import io.netty5.util.Resource;
28  import io.netty5.channel.AbstractChannel;
29  import io.netty5.channel.ChannelException;
30  import io.netty5.channel.ChannelMetadata;
31  import io.netty5.channel.ChannelOutboundBuffer;
32  import io.netty5.channel.EventLoop;
33  import io.netty5.channel.unix.FileDescriptor;
34  import io.netty5.channel.unix.UnixChannel;
35  
36  import java.io.IOException;
37  import java.net.InetSocketAddress;
38  import java.net.SocketAddress;
39  import java.nio.ByteBuffer;
40  import java.nio.channels.UnresolvedAddressException;
41  import java.util.function.Predicate;
42  
43  import static io.netty5.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
44  import static io.netty5.channel.unix.Limits.SSIZE_MAX;
45  import static io.netty5.channel.unix.UnixChannelUtil.computeRemoteAddr;
46  import static java.lang.Math.min;
47  import static java.util.Objects.requireNonNull;
48  
49  abstract class AbstractKQueueChannel<P extends UnixChannel>
50          extends AbstractChannel<P, SocketAddress, SocketAddress> implements UnixChannel {
51  
52      final BsdSocket socket;
53  
54      protected volatile boolean active;
55  
56      protected boolean readPending;
57  
58      private long numberBytesPending;
59  
60      /**
61       * kqueue with EV_CLEAR flag set requires that we read until we consume "data" bytes (see kqueue man:
62       * https://www.freebsd.org/cgi/man.cgi?kqueue). However, in order to respect auto read we support reading to stop if
63       * auto read is off. If auto read is on we force reading to continue to avoid a {@link StackOverflowError} between
64       * channelReadComplete and reading from the channel. It is expected that the {@link KQueueSocketChannel}
65       * implementations will track if all data was not read, and will force a EVFILT_READ ready event.
66       * <p>
67       * It is assumed EOF is handled externally by checking {@link #eof}.
68       */
69      private final Predicate<RecvBufferAllocator.Handle> maybeMoreData = h -> {
70          if (h.lastBytesRead() > 0) {
71              numberBytesPending -= h.lastBytesRead();
72          }
73          return numberBytesPending != 0;
74      };
75  
76      private final Runnable readReadyRunnable = new Runnable() {
77          @Override
78          public void run() {
79              readReadyRunnablePending = false;
80              //RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
81              //allocHandle.reset();
82              //allocHandle.attemptedBytesRead((int) Math.max(numberBytesPending, 128));
83              readReady(numberBytesPending);
84          }
85      };
86  
87      private KQueueRegistration registration;
88  
89      private boolean readFilterEnabled;
90      private boolean writeFilterEnabled;
91      private boolean readReadyRunnablePending;
92      private boolean inputClosedSeenErrorOnRead;
93  
94      private boolean maybeMoreDataToRead;
95  
96      private boolean eof;
97  
98      private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
99  
100     private volatile SocketAddress localAddress;
101     private volatile SocketAddress remoteAddress;
102 
103     AbstractKQueueChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
104                           RecvBufferAllocator defaultRecvAllocator, BsdSocket fd, boolean active) {
105         super(parent, eventLoop, metadata, defaultRecvAllocator);
106         socket = requireNonNull(fd, "fd");
107         this.active = active;
108         if (active) {
109             // Directly cache the remote and local addresses
110             // See https://github.com/netty/netty/issues/2359
111             this.localAddress = fd.localAddress();
112             this.remoteAddress = fd.remoteAddress();
113         }
114     }
115 
116     AbstractKQueueChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
117                           RecvBufferAllocator defaultRecvAllocator, BsdSocket fd, SocketAddress remote) {
118         super(parent, eventLoop, metadata, defaultRecvAllocator);
119         socket = requireNonNull(fd, "fd");
120         active = true;
121         // Directly cache the remote and local addresses
122         // See https://github.com/netty/netty/issues/2359
123         this.localAddress = fd.localAddress();
124         this.remoteAddress = remote;
125     }
126 
127     @Override
128     @SuppressWarnings("unchecked")
129     protected  <T> T getExtendedOption(ChannelOption<T> option) {
130         try {
131             if (option instanceof IntegerUnixChannelOption) {
132                 IntegerUnixChannelOption opt = (IntegerUnixChannelOption) option;
133                 return (T) Integer.valueOf(socket.getIntOpt(opt.level(), opt.optname()));
134             }
135             if (option instanceof RawUnixChannelOption) {
136                 RawUnixChannelOption opt = (RawUnixChannelOption) option;
137                 ByteBuffer out = ByteBuffer.allocate(opt.length());
138                 socket.getRawOpt(opt.level(), opt.optname(), out);
139                 return (T) out.flip();
140             }
141         } catch (IOException e) {
142             throw new ChannelException(e);
143         }
144         return super.getExtendedOption(option);
145     }
146 
147     @Override
148     protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
149         try {
150             if (option instanceof IntegerUnixChannelOption) {
151                 IntegerUnixChannelOption opt = (IntegerUnixChannelOption) option;
152                 socket.setIntOpt(opt.level(), opt.optname(), (Integer) value);
153                 return;
154             } else if (option instanceof RawUnixChannelOption) {
155                 RawUnixChannelOption opt = (RawUnixChannelOption) option;
156                 socket.setRawOpt(opt.level(), opt.optname(), (ByteBuffer) value);
157                 return;
158             }
159         } catch (IOException e) {
160             throw new ChannelException(e);
161         }
162         super.setExtendedOption(option, value);
163     }
164 
165     @Override
166     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
167         if (option instanceof IntegerUnixChannelOption || option instanceof RawUnixChannelOption) {
168             return true;
169         }
170         return super.isExtendedOptionSupported(option);
171     }
172 
173     protected void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
174         this.maxBytesPerGatheringWrite = min(SSIZE_MAX, maxBytesPerGatheringWrite);
175     }
176 
177     protected long getMaxBytesPerGatheringWrite() {
178         return maxBytesPerGatheringWrite;
179     }
180 
181     @Override
182     protected final void autoReadCleared() {
183         clearReadFilter();
184     }
185 
186     static boolean isSoErrorZero(BsdSocket fd) {
187         try {
188             return fd.getSoError() == 0;
189         } catch (IOException e) {
190             throw new ChannelException(e);
191         }
192     }
193 
194     protected KQueueRegistration registration() {
195         assert registration != null;
196         return registration;
197     }
198 
199     @Override
200     public final FileDescriptor fd() {
201         return socket;
202     }
203 
204     @Override
205     public boolean isActive() {
206         return active;
207     }
208 
209     @Override
210     protected void doClose() throws Exception {
211         active = false;
212         // Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
213         // socket which has not even been connected yet. This has been observed to block during unit tests.
214         inputClosedSeenErrorOnRead = true;
215         socket.close();
216     }
217 
218     @Override
219     protected void doDisconnect() throws Exception {
220         doClose();
221     }
222 
223     @SuppressWarnings("unchecked")
224     final void resetCachedAddresses() {
225         cacheAddresses(localAddress, null);
226         remoteAddress = null;
227     }
228 
229     @Override
230     public final boolean isOpen() {
231         return socket.isOpen();
232     }
233 
234     @Override
235     protected final void doBeginRead() {
236         // Channel.read() or ChannelHandlerContext.read() was called
237         readPending = true;
238 
239         // We must set the read flag here as it is possible the user didn't read in the last read loop, the
240         // executeReadReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
241         // never get data after this.
242         readFilter(true);
243 
244         // If auto read was toggled off on the last read loop then we may not be notified
245         // again if we didn't consume all the data. So we force a read operation here if there maybe more data.
246         if (maybeMoreDataToRead) {
247             executeReadReadyRunnable();
248         }
249     }
250 
251     final void register0(KQueueRegistration registration)  {
252         this.registration = registration;
253         // Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
254         // make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
255         // new EventLoop.
256         readReadyRunnablePending = false;
257 
258         // Add the write event first so we get notified of connection refused on the client side!
259         if (writeFilterEnabled) {
260             evSet0(registration, Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
261         }
262         if (readFilterEnabled) {
263             evSet0(registration, Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
264         }
265         evSet0(registration, Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
266     }
267 
268     final void deregister0() {
269         // As unregisteredFilters() may have not been called because isOpen() returned false we just set both filters
270         // to false to ensure a consistent state in all cases.
271         readFilterEnabled = false;
272         writeFilterEnabled = false;
273     }
274 
275     final void unregisterFilters() throws Exception {
276         // Make sure we unregister our filters from kqueue!
277         readFilter(false);
278         writeFilter(false);
279 
280         if (registration != null) {
281             evSet0(registration, Native.EVFILT_SOCK, Native.EV_DELETE, 0);
282             registration = null;
283         }
284     }
285 
286     /**
287      * Returns an off-heap copy of, and then closes, the given {@link Buffer}.
288      */
289     protected final Buffer newDirectBuffer(Buffer buf) {
290         return newDirectBuffer(buf, buf);
291     }
292 
293     /**
294      * Returns an off-heap copy of the given {@link Buffer}, and then closes the {@code holder} under the assumption
295      * that it owned (or was itself) the buffer.
296      */
297     protected final Buffer newDirectBuffer(Resource<?> holder, Buffer buf) {
298         BufferAllocator allocator = ioBufferAllocator();
299         try (holder) {
300             int readableBytes = buf.readableBytes();
301             Buffer directCopy = allocator.allocate(readableBytes);
302             if (readableBytes > 0) {
303                 directCopy.writeBytes(buf);
304             }
305             return directCopy;
306         }
307     }
308 
309     protected static void checkResolvable(InetSocketAddress addr) {
310         if (addr.isUnresolved()) {
311             throw new UnresolvedAddressException();
312         }
313     }
314 
315     /**
316      * Read bytes into the given {@link Buffer} and return the amount.
317      */
318     protected final int doReadBytes(Buffer buffer) throws Exception {
319         recvBufAllocHandle().attemptedBytesRead(buffer.writableBytes());
320         try (var iterator = buffer.forEachWritable()) {
321             var component = iterator.first();
322             if (component == null) {
323                 recvBufAllocHandle().lastBytesRead(0);
324                 return 0;
325             }
326             long address = component.writableNativeAddress();
327             assert address != 0;
328             int localReadAmount = socket.readAddress(address, 0, component.writableBytes());
329             recvBufAllocHandle().lastBytesRead(localReadAmount);
330             if (localReadAmount > 0) {
331                 component.skipWritableBytes(localReadAmount);
332             }
333             return localReadAmount;
334         }
335     }
336 
337     protected final int doWriteBytes(ChannelOutboundBuffer in, Buffer buf) throws Exception {
338         int initialReaderOffset = buf.readerOffset();
339         buf.forEachReadable(0, (index, component) -> {
340             long address = component.readableNativeAddress();
341             assert address != 0;
342             int written = socket.writeAddress(address, 0, component.readableBytes());
343             if (written > 0) {
344                 component.skipReadableBytes(written);
345             }
346             return false;
347         });
348         int readerOffset = buf.readerOffset();
349         if (initialReaderOffset < readerOffset) {
350             buf.readerOffset(initialReaderOffset); // Restore read offset for ChannelOutboundBuffer.
351             int bytesWritten = readerOffset - initialReaderOffset;
352             in.removeBytes(bytesWritten);
353             return 1; // Some data was written to the socket.
354         }
355         return WRITE_STATUS_SNDBUF_FULL;
356     }
357 
358     final boolean shouldBreakReadReady() {
359         return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure());
360     }
361 
362     private void clearReadFilter() {
363         // Only clear if registered with an EventLoop as otherwise
364         if (isRegistered()) {
365             final EventLoop loop = executor();
366             if (loop.inEventLoop()) {
367                 clearReadFilter0();
368             } else {
369                 // schedule a task to clear the EPOLLIN as it is not safe to modify it directly
370                 loop.execute(() -> {
371                     if (readPending && !isAutoRead()) {
372                         // Still no read triggered so clear it now
373                         clearReadFilter0();
374                     }
375                 });
376             }
377         } else  {
378             // The EventLoop is not registered atm so just update the flags so the correct value
379             // will be used once the channel is registered
380             readFilterEnabled = false;
381         }
382     }
383 
384     final void readFilter(boolean readFilterEnabled) {
385         if (this.readFilterEnabled != readFilterEnabled) {
386             this.readFilterEnabled = readFilterEnabled;
387             evSet(Native.EVFILT_READ, readFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
388         }
389     }
390 
391     final void writeFilter(boolean writeFilterEnabled) {
392         if (this.writeFilterEnabled != writeFilterEnabled) {
393             this.writeFilterEnabled = writeFilterEnabled;
394             evSet(Native.EVFILT_WRITE, writeFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
395         }
396     }
397 
398     private void evSet(short filter, short flags) {
399         if (isRegistered()) {
400             evSet0(registration, filter, flags);
401         }
402     }
403 
404     private void evSet0(KQueueRegistration registration, short filter, short flags) {
405         evSet0(registration, filter, flags, 0);
406     }
407 
408     private void evSet0(KQueueRegistration registration, short filter, short flags, int fflags) {
409         // Only try to add to changeList if the FD is still open, if not we already closed it in the meantime.
410         if (isOpen()) {
411             registration.evSet(filter, flags, fflags);
412         }
413     }
414 
415     final void readReady(long numberBytesPending) {
416         RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
417         allocHandle.reset();
418         this.numberBytesPending = numberBytesPending;
419         allocHandle.attemptedBytesRead((int) Math.min(numberBytesPending, Integer.MAX_VALUE));
420 
421         assert executor().inEventLoop();
422         if (shouldBreakReadReady()) {
423             clearReadFilter0();
424             return;
425         }
426         maybeMoreDataToRead = false;
427 
428         try {
429             readReady(allocHandle, ioBufferAllocator(), maybeMoreData);
430         } finally {
431             maybeMoreDataToRead = this.numberBytesPending != 0;
432 
433             if (eof || readPending && maybeMoreDataToRead) {
434                 // trigger a read again as there may be something left to read and because of ET we
435                 // will not get notified again until we read everything from the socket
436                 //
437                 // It is possible the last fireChannelRead call could cause the user to call read() again, or if
438                 // autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
439                 // to false before every read operation to prevent re-entry into readReady() we will not read from
440                 // the underlying OS again unless the user happens to call read again.
441                 executeReadReadyRunnable();
442             } else if (!readPending && !isAutoRead()) {
443                 // Check if there is a readPending which was not processed yet.
444                 // This could be for two reasons:
445                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
446                 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
447                 //
448                 // See https://github.com/netty/netty/issues/2254
449                 clearReadFilter0();
450             }
451         }
452     }
453 
454     abstract void readReady(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
455                             Predicate<RecvBufferAllocator.Handle> maybeMoreData);
456 
457     final void writeReady() {
458         if (isConnectPending()) {
459             // pending connect which is now complete so handle it.
460             finishConnect();
461         } else if (!socket.isOutputShutdown()) {
462             // directly call super.flush0() to force a flush now
463             super.writeFlushed();
464         }
465     }
466 
467     /**
468      * Shutdown the input side of the channel.
469      */
470     final void shutdownInput(boolean readEOF) {
471         // We need to take special care of calling finishConnect() if readEOF is true and we not
472         // fullfilled the connectPromise yet. If we fail to do so the connectPromise will be failed
473         // with a ClosedChannelException as a close() will happen and so the FD is closed before we
474         // have a chance to call finishConnect() later on. Calling finishConnect() here will ensure
475         // we observe the correct exception in case of a connect failure.
476         if (readEOF && isConnectPending()) {
477             finishConnect();
478         }
479         if (!socket.isInputShutdown()) {
480             if (isAllowHalfClosure()) {
481                 shutdownTransport(ChannelShutdownDirection.Inbound, newPromise());
482             } else {
483                 closeTransport(newPromise());
484             }
485         } else if (!readEOF) {
486             inputClosedSeenErrorOnRead = true;
487         }
488     }
489 
490     final void readEOF() {
491         // This must happen before we attempt to read. This will ensure reading continues until an error occurs.
492         final RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
493         eof = true;
494 
495         if (isActive()) {
496             // If it is still active, we need to call readReady as otherwise we may miss to
497             // read pending data from the underlying file descriptor.
498             // See https://github.com/netty/netty/issues/3709
499             readReady(allocHandle, ioBufferAllocator(), maybeMoreData);
500         } else {
501             // Just to be safe make sure the input marked as closed.
502             shutdownInput(true);
503         }
504     }
505 
506     @Override
507     protected final void writeFlushed() {
508         // Flush immediately only when there's no pending flush.
509         // If there's a pending flush operation, event loop will call forceFlush() later,
510         // and thus there's no need to call it now.
511         if (!writeFilterEnabled) {
512             super.writeFlushed();
513         }
514     }
515 
516     private void executeReadReadyRunnable() {
517         if (readReadyRunnablePending || !isActive() || shouldBreakReadReady()) {
518             return;
519         }
520         readReadyRunnablePending = true;
521         executor().execute(readReadyRunnable);
522     }
523 
524     private void clearReadFilter0() {
525         assert executor().inEventLoop();
526         try {
527             readPending = false;
528             readFilter(false);
529         } catch (Throwable e) {
530             // When this happens there is something completely wrong with either the filedescriptor or epoll,
531             // so fire the exception through the pipeline and close the Channel.
532             pipeline().fireChannelExceptionCaught(e);
533             closeTransport(newPromise());
534         }
535     }
536 
537     @Override
538     protected final boolean doFinishConnect(SocketAddress requestedRemoteAddress) throws Exception {
539         if (socket.finishConnect()) {
540             active = true;
541             writeFilter(false);
542             if (requestedRemoteAddress instanceof InetSocketAddress) {
543                 remoteAddress = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
544             } else {
545                 remoteAddress = requestedRemoteAddress;
546             }
547             return true;
548         }
549         writeFilter(true);
550         return false;
551     }
552 
553     @SuppressWarnings("unchecked")
554     @Override
555     protected void doBind(SocketAddress local) throws Exception {
556         if (local instanceof InetSocketAddress) {
557             checkResolvable((InetSocketAddress) local);
558         }
559         socket.bind(local);
560         if (fetchLocalAddress()) {
561             this.localAddress = socket.localAddress();
562         } else {
563             this.localAddress = local;
564         }
565     }
566 
567     protected boolean fetchLocalAddress() {
568         return socket.protocolFamily() != SocketProtocolFamily.UNIX;
569     }
570 
571     /**
572      * Connect to the remote peer
573      */
574     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
575         if (localAddress instanceof InetSocketAddress) {
576             checkResolvable((InetSocketAddress) localAddress);
577         }
578 
579         InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
580                 ? (InetSocketAddress) remoteAddress : null;
581         if (remoteSocketAddr != null) {
582             checkResolvable(remoteSocketAddr);
583         }
584 
585         if (localAddress != null) {
586             doBind(localAddress);
587         }
588 
589         boolean connected = doConnect0(remoteAddress, localAddress);
590         if (connected) {
591             active = true;
592             this.remoteAddress = remoteSocketAddr == null?
593                     remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
594         }
595 
596         if (fetchLocalAddress()) {
597             // We always need to set the localAddress even if not connected yet as the bind already took place.
598             //
599             // See https://github.com/netty/netty/issues/3463
600             this.localAddress = socket.localAddress();
601         }
602         return connected;
603     }
604 
605     protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
606         boolean success = false;
607         try {
608             boolean connected = socket.connect(remoteAddress);
609             if (!connected) {
610                 writeFilter(true);
611             }
612             success = true;
613             return connected;
614         } finally {
615             if (!success) {
616                 doClose();
617             }
618         }
619     }
620 
621     @Override
622     protected SocketAddress localAddress0() {
623         return localAddress;
624     }
625 
626     @Override
627     protected SocketAddress remoteAddress0() {
628         return remoteAddress;
629     }
630 
631     final void closeTransportNow() {
632         closeTransport(newPromise());
633     }
634 
635     private BufferAllocator ioBufferAllocator() {
636         BufferAllocator alloc = bufferAllocator();
637         // We need to ensure we always allocate a direct Buffer as we can only use a direct buffer to read via JNI.
638         if (!alloc.getAllocationType().isDirect()) {
639             return DefaultBufferAllocators.offHeapAllocator();
640         }
641         return alloc;
642     }
643 }