View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.kqueue;
17  
18  import io.netty5.channel.DefaultSelectStrategyFactory;
19  import io.netty5.channel.IoExecutionContext;
20  import io.netty5.channel.IoHandle;
21  import io.netty5.channel.IoHandler;
22  import io.netty5.channel.IoHandlerFactory;
23  import io.netty5.channel.SelectStrategy;
24  import io.netty5.channel.SelectStrategyFactory;
25  import io.netty5.channel.unix.FileDescriptor;
26  import io.netty5.channel.unix.IovArray;
27  import io.netty5.util.collection.IntObjectHashMap;
28  import io.netty5.util.collection.IntObjectMap;
29  import io.netty5.util.internal.StringUtil;
30  import io.netty5.util.internal.UnstableApi;
31  import io.netty5.util.internal.logging.InternalLogger;
32  import io.netty5.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.io.UncheckedIOException;
36  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
37  import java.util.function.IntSupplier;
38  
39  import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;
40  import static java.lang.Math.min;
41  import static java.util.Objects.requireNonNull;
42  
43  /**
44   * {@link IoHandler} which uses kqueue under the covers. Only works on BSD!
45   */
46  @UnstableApi
47  public final class KQueueHandler implements IoHandler {
48      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueHandler.class);
49      private static final AtomicIntegerFieldUpdater<KQueueHandler> WAKEN_UP_UPDATER =
50              AtomicIntegerFieldUpdater.newUpdater(KQueueHandler.class, "wakenUp");
51      private static final int KQUEUE_WAKE_UP_IDENT = 0;
52  
53      static {
54          // Ensure JNI is initialized by the time this class is loaded by this time!
55          // We use unix-common methods in this class which are backed by JNI methods.
56          KQueue.ensureAvailability();
57      }
58  
59      private final boolean allowGrowing;
60      private final FileDescriptor kqueueFd;
61      private final KQueueEventArray changeList;
62      private final KQueueEventArray eventList;
63      private final SelectStrategy selectStrategy;
64      private final IovArray iovArray = new IovArray();
65      private final IntSupplier selectNowSupplier = () -> {
66          try {
67              return kqueueWaitNow();
68          } catch (IOException e) {
69              throw new UncheckedIOException(e);
70          }
71      };
72  
73      private final IntObjectMap<AbstractKQueueChannel<?>> channels = new IntObjectHashMap<>(4096);
74  
75      private volatile int wakenUp;
76  
77      private static AbstractKQueueChannel<?> cast(IoHandle handle) {
78          if (handle instanceof AbstractKQueueChannel) {
79              return (AbstractKQueueChannel<?>) handle;
80          }
81          throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
82      }
83  
84      private KQueueHandler() {
85          this(0, DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy());
86      }
87  
88      private KQueueHandler(int maxEvents, SelectStrategy strategy) {
89          selectStrategy = requireNonNull(strategy, "strategy");
90          this.kqueueFd = Native.newKQueue();
91          if (maxEvents == 0) {
92              allowGrowing = true;
93              maxEvents = 4096;
94          } else {
95              allowGrowing = false;
96          }
97          changeList = new KQueueEventArray(maxEvents);
98          eventList = new KQueueEventArray(maxEvents);
99          int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
100         if (result < 0) {
101             destroy();
102             throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
103         }
104     }
105 
106     /**
107      * Returns a new {@link IoHandlerFactory} that creates {@link KQueueHandler} instances.
108      */
109     public static IoHandlerFactory newFactory() {
110         return KQueueHandler::new;
111     }
112 
113     /**
114      * Returns a new {@link IoHandlerFactory} that creates {@link KQueueHandler} instances.
115      */
116     public static IoHandlerFactory newFactory(final int maxEvents,
117                                               final SelectStrategyFactory selectStrategyFactory) {
118         checkPositiveOrZero(maxEvents, "maxEvents");
119         requireNonNull(selectStrategyFactory, "selectStrategyFactory");
120         return () -> new KQueueHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
121     }
122 
123     @Override
124     public void register(IoHandle handle) {
125         final AbstractKQueueChannel<?> kQueueChannel = cast(handle);
126         final int id = kQueueChannel.fd().intValue();
127         AbstractKQueueChannel<?> old = channels.put(id, kQueueChannel);
128         // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
129         // closed.
130         assert old == null || !old.isOpen();
131 
132         kQueueChannel.register0(new KQueueRegistration() {
133             @Override
134             public void evSet(short filter, short flags, int fflags) {
135                 KQueueHandler.this.evSet(kQueueChannel, filter, flags, fflags);
136             }
137 
138             @Override
139             public IovArray cleanArray() {
140                 return KQueueHandler.this.cleanArray();
141             }
142         });
143     }
144 
145     @Override
146     public void deregister(IoHandle handle) throws Exception {
147         AbstractKQueueChannel<?> kQueueChannel = cast(handle);
148         int fd = kQueueChannel.fd().intValue();
149 
150         AbstractKQueueChannel<?> old = channels.remove(fd);
151         if (old != null && old != kQueueChannel) {
152             // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
153             channels.put(fd, old);
154 
155             // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
156             assert !kQueueChannel.isOpen();
157         } else if (kQueueChannel.isOpen()) {
158             // Remove the filters. This is only needed if it's still open as otherwise it will be automatically
159             // removed once the file-descriptor is closed.
160             //
161             // See also https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
162             kQueueChannel.unregisterFilters();
163         }
164 
165         kQueueChannel.deregister0();
166     }
167 
168     private void evSet(AbstractKQueueChannel<?> ch, short filter, short flags, int fflags) {
169         changeList.evSet(ch, filter, flags, fflags);
170     }
171 
172     private IovArray cleanArray() {
173         iovArray.clear();
174         return iovArray;
175     }
176 
177     @Override
178     public void wakeup(boolean inEventLoop) {
179         if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
180             wakeup();
181         }
182     }
183 
184     private void wakeup() {
185         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
186         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
187         // So it is not very practical to assert the return value is always >= 0.
188     }
189 
190     private int kqueueWait(IoExecutionContext context, boolean oldWakeup) throws IOException {
191         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
192         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
193         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
194         // in pipeline.
195         if (oldWakeup && !context.canBlock()) {
196             return kqueueWaitNow();
197         }
198 
199         long totalDelay = context.delayNanos(System.nanoTime());
200         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
201         return kqueueWait(delaySeconds, (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
202     }
203 
204     private int kqueueWaitNow() throws IOException {
205         return kqueueWait(0, 0);
206     }
207 
208     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
209         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
210         changeList.clear();
211         return numEvents;
212     }
213 
214     private void processReady(int ready) {
215         for (int i = 0; i < ready; ++i) {
216             final short filter = eventList.filter(i);
217             final short flags = eventList.flags(i);
218             final int fd = eventList.fd(i);
219             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
220                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
221                 // we later attempt to delete the filters from kqueue.
222                 assert filter != Native.EVFILT_USER ||
223                         (filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT);
224                 continue;
225             }
226 
227             AbstractKQueueChannel<?> channel = channels.get(fd);
228             if (channel == null) {
229                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
230                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
231                 // thus removed from kqueue FD.
232                 logger.warn("events[{}]=[{}, {}] had no channel!", i, fd, filter);
233                 continue;
234             }
235 
236             // First check for EPOLLOUT as we may need to fail the connect Promise before try
237             // to read from the file descriptor.
238             if (filter == Native.EVFILT_WRITE) {
239                 channel.writeReady();
240             } else if (filter == Native.EVFILT_READ) {
241                 // Check READ before EOF to ensure all data is read before shutting down the input.
242                 channel.readReady(eventList.data(i));
243             } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
244                 channel.readEOF();
245             }
246 
247             // Check if EV_EOF was set, this will notify us for connection-reset in which case
248             // we may close the channel directly or try to read more data depending on the state of the
249             // Channel and also depending on the AbstractKQueueChannel subtype.
250             if ((flags & Native.EV_EOF) != 0) {
251                 channel.readEOF();
252             }
253         }
254     }
255 
256     @Override
257     public int run(IoExecutionContext context) {
258         int handled = 0;
259         try {
260             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
261             switch (strategy) {
262                 case SelectStrategy.CONTINUE:
263                     return 0;
264 
265                 case SelectStrategy.BUSY_WAIT:
266                     // fall-through to SELECT since the busy-wait is not supported with kqueue
267 
268                 case SelectStrategy.SELECT:
269                     strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
270 
271                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
272                     // before calling 'selector.wakeup()' to reduce the wake-up
273                     // overhead. (Selector.wakeup() is an expensive operation.)
274                     //
275                     // However, there is a race condition in this approach.
276                     // The race condition is triggered when 'wakenUp' is set to
277                     // true too early.
278                     //
279                     // 'wakenUp' is set to true too early if:
280                     // 1) Selector is waken up between 'wakenUp.set(false)' and
281                     //    'selector.select(...)'. (BAD)
282                     // 2) Selector is waken up between 'selector.select(...)' and
283                     //    'if (wakenUp.get()) { ... }'. (OK)
284                     //
285                     // In the first case, 'wakenUp' is set to true and the
286                     // following 'selector.select(...)' will wake up immediately.
287                     // Until 'wakenUp' is set to false again in the next round,
288                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
289                     // any attempt to wake up the Selector will fail, too, causing
290                     // the following 'selector.select(...)' call to block
291                     // unnecessarily.
292                     //
293                     // To fix this problem, we wake up the selector again if wakenUp
294                     // is true immediately after selector.select(...).
295                     // It is inefficient in that it wakes up the selector for both
296                     // the first case (BAD - wake-up required) and the second case
297                     // (OK - no wake-up required).
298 
299                     if (wakenUp == 1) {
300                         wakeup();
301                     }
302                     // fallthrough
303                 default:
304             }
305 
306             if (strategy > 0) {
307                 handled = strategy;
308                 processReady(strategy);
309             }
310             if (allowGrowing && strategy == eventList.capacity()) {
311                 //increase the size of the array as we needed the whole space for the events
312                 eventList.realloc(false);
313             }
314         } catch (Error e) {
315             throw e;
316         } catch (Throwable t) {
317             handleLoopException(t);
318         }
319         return handled;
320     }
321 
322     @Override
323     public void destroy() {
324         try {
325             try {
326                 kqueueFd.close();
327             } catch (IOException e) {
328                 logger.warn("Failed to close the kqueue fd.", e);
329             }
330         } finally {
331             // Cleanup all native memory!
332             changeList.free();
333             eventList.free();
334         }
335     }
336 
337     @Override
338     public void prepareToDestroy() {
339         try {
340             kqueueWaitNow();
341         } catch (IOException e) {
342             // ignore on close
343         }
344 
345         // Using the intermediate collection to prevent ConcurrentModificationException.
346         // In the `close()` method, the channel is deleted from `channels` map.
347         AbstractKQueueChannel<?>[] localChannels = channels.values().toArray(new AbstractKQueueChannel[0]);
348 
349         for (AbstractKQueueChannel<?> ch: localChannels) {
350             ch.closeTransportNow();
351         }
352     }
353 
354     @Override
355     public boolean isCompatible(Class<? extends IoHandle> handleType) {
356         return AbstractKQueueChannel.class.isAssignableFrom(handleType);
357     }
358 
359     private static void handleLoopException(Throwable t) {
360         logger.warn("Unexpected exception in the selector loop.", t);
361 
362         // Prevent possible consecutive immediate failures that lead to
363         // excessive CPU consumption.
364         try {
365             Thread.sleep(1000);
366         } catch (InterruptedException e) {
367             // Ignore.
368         }
369     }
370 }