View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.IoHandle;
21  import io.netty.channel.IoHandler;
22  import io.netty.channel.IoHandlerContext;
23  import io.netty.channel.IoHandlerFactory;
24  import io.netty.channel.IoOps;
25  import io.netty.channel.IoRegistration;
26  import io.netty.channel.SelectStrategy;
27  import io.netty.channel.SelectStrategyFactory;
28  import io.netty.channel.unix.FileDescriptor;
29  import io.netty.util.IntSupplier;
30  import io.netty.util.collection.LongObjectHashMap;
31  import io.netty.util.collection.LongObjectMap;
32  import io.netty.util.concurrent.ThreadAwareExecutor;
33  import io.netty.util.internal.ObjectUtil;
34  import io.netty.util.internal.StringUtil;
35  import io.netty.util.internal.logging.InternalLogger;
36  import io.netty.util.internal.logging.InternalLoggerFactory;
37  
38  import java.io.IOException;
39  import java.util.ArrayDeque;
40  import java.util.ArrayList;
41  import java.util.Collections;
42  import java.util.List;
43  import java.util.Queue;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
46  
47  import static java.lang.Math.min;
48  
49  /**
50   * {@link IoHandler} which uses kqueue under the covers. Only works on BSD!
51   */
52  public final class KQueueIoHandler implements IoHandler {
53      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
54      private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
55              AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
56      private static final int KQUEUE_WAKE_UP_IDENT = 0;
57      // `kqueue()` may return EINVAL when a large number such as Integer.MAX_VALUE is specified as timeout.
58      // 24 hours would be a large enough value.
59      // https://man.freebsd.org/cgi/man.cgi?query=kevent&apropos=0&sektion=0&manpath=FreeBSD+6.1-RELEASE&format=html#end
60      private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399; // 24 hours - 1 second
61  
62      {
63          KQueue.ensureAvailability();
64      }
65  
66      private final boolean allowGrowing;
67      private final FileDescriptor kqueueFd;
68      private final KQueueEventArray changeList;
69      private final KQueueEventArray eventList;
70      private final SelectStrategy selectStrategy;
71      private final NativeArrays nativeArrays;
72      private final IntSupplier selectNowSupplier = new IntSupplier() {
73          @Override
74          public int get() throws Exception {
75              return kqueueWaitNow();
76          }
77      };
78      private final ThreadAwareExecutor executor;
79      private final Queue<DefaultKqueueIoRegistration> cancelledRegistrations = new ArrayDeque<>();
80      private final LongObjectMap<DefaultKqueueIoRegistration> registrations = new LongObjectHashMap<>(4096);
81      private int numChannels;
82      private long nextId;
83  
84      private volatile int wakenUp;
85  
86      private long generateNextId() {
87          boolean reset = false;
88          for (;;) {
89              if (nextId == Long.MAX_VALUE) {
90                  if (reset) {
91                      throw new IllegalStateException("All possible ids in use");
92                  }
93                  reset = true;
94              }
95              nextId++;
96              if (nextId == KQUEUE_WAKE_UP_IDENT) {
97                  continue;
98              }
99              if (!registrations.containsKey(nextId)) {
100                 return nextId;
101             }
102         }
103     }
104 
105     /**
106      * Returns a new {@link IoHandlerFactory} that creates {@link KQueueIoHandler} instances.
107      */
108     public static IoHandlerFactory newFactory() {
109         return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
110     }
111 
112     /**
113      * Returns a new {@link IoHandlerFactory} that creates {@link KQueueIoHandler} instances.
114      */
115     public static IoHandlerFactory newFactory(final int maxEvents,
116                                               final SelectStrategyFactory selectStrategyFactory) {
117         KQueue.ensureAvailability();
118         ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
119         ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
120         return new IoHandlerFactory() {
121             @Override
122             public IoHandler newHandler(ThreadAwareExecutor executor) {
123                 return new KQueueIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
124             }
125 
126             @Override
127             public boolean isChangingThreadSupported() {
128                 return true;
129             }
130         };
131     }
132 
133     private KQueueIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
134         this.executor = ObjectUtil.checkNotNull(executor, "executor");
135         this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
136         this.kqueueFd = Native.newKQueue();
137         if (maxEvents == 0) {
138             allowGrowing = true;
139             maxEvents = 4096;
140         } else {
141             allowGrowing = false;
142         }
143         this.changeList = new KQueueEventArray(maxEvents);
144         this.eventList = new KQueueEventArray(maxEvents);
145         nativeArrays = new NativeArrays();
146         int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
147         if (result < 0) {
148             destroy();
149             throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
150         }
151     }
152 
153     @Override
154     public void wakeup() {
155         if (!executor.isExecutorThread(Thread.currentThread())
156                 && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
157             wakeup0();
158         }
159     }
160 
161     private void wakeup0() {
162         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
163         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
164         // So it is not very practical to assert the return value is always >= 0.
165     }
166 
167     private int kqueueWait(IoHandlerContext context, boolean oldWakeup) throws IOException {
168         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
169         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
170         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
171         // in pipeline.
172         if (oldWakeup && !context.canBlock()) {
173             return kqueueWaitNow();
174         }
175 
176         long totalDelay = context.delayNanos(System.nanoTime());
177         int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
178         int delayNanos = (int) (totalDelay % 1000000000L);
179         return kqueueWait(delaySeconds, delayNanos);
180     }
181 
182     private int kqueueWaitNow() throws IOException {
183         return kqueueWait(0, 0);
184     }
185 
186     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
187         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
188         changeList.clear();
189         return numEvents;
190     }
191 
192     private void processReady(int ready) {
193         for (int i = 0; i < ready; ++i) {
194             final short filter = eventList.filter(i);
195             final short flags = eventList.flags(i);
196             final int ident = eventList.ident(i);
197             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
198                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
199                 // we later attempt to delete the filters from kqueue.
200                 assert filter != Native.EVFILT_USER ||
201                         (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
202                 continue;
203             }
204 
205             long id = eventList.udata(i);
206             DefaultKqueueIoRegistration registration = registrations.get(id);
207             if (registration == null) {
208                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
209                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
210                 // thus removed from kqueue FD.
211                 logger.warn("events[{}]=[{}, {}, {}] had no registration!", i, ident, id, filter);
212                 continue;
213             }
214             registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i), id);
215         }
216     }
217 
218     @Override
219     public int run(IoHandlerContext context) {
220         int handled = 0;
221         try {
222             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
223             switch (strategy) {
224                 case SelectStrategy.CONTINUE:
225                     if (context.shouldReportActiveIoTime()) {
226                         context.reportActiveIoTime(0); // Report zero as we did no I/O.
227                     }
228                     return 0;
229 
230                 case SelectStrategy.BUSY_WAIT:
231                     // fall-through to SELECT since the busy-wait is not supported with kqueue
232 
233                 case SelectStrategy.SELECT:
234                     strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
235 
236                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
237                     // before calling 'selector.wakeup()' to reduce the wake-up
238                     // overhead. (Selector.wakeup() is an expensive operation.)
239                     //
240                     // However, there is a race condition in this approach.
241                     // The race condition is triggered when 'wakenUp' is set to
242                     // true too early.
243                     //
244                     // 'wakenUp' is set to true too early if:
245                     // 1) Selector is waken up between 'wakenUp.set(false)' and
246                     //    'selector.select(...)'. (BAD)
247                     // 2) Selector is waken up between 'selector.select(...)' and
248                     //    'if (wakenUp.get()) { ... }'. (OK)
249                     //
250                     // In the first case, 'wakenUp' is set to true and the
251                     // following 'selector.select(...)' will wake up immediately.
252                     // Until 'wakenUp' is set to false again in the next round,
253                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
254                     // any attempt to wake up the Selector will fail, too, causing
255                     // the following 'selector.select(...)' call to block
256                     // unnecessarily.
257                     //
258                     // To fix this problem, we wake up the selector again if wakenUp
259                     // is true immediately after selector.select(...).
260                     // It is inefficient in that it wakes up the selector for both
261                     // the first case (BAD - wake-up required) and the second case
262                     // (OK - no wake-up required).
263 
264                     if (wakenUp == 1) {
265                         wakeup0();
266                     }
267                     // fall-through
268                 default:
269             }
270 
271             if (strategy > 0) {
272                 handled = strategy;
273                 if (context.shouldReportActiveIoTime()) {
274                     // The Timer starts after the blocking kqueueWait() call returns with events.
275                     long activeIoStartTimeNanos = System.nanoTime();
276                     processReady(strategy);
277                     long activeIoEndTimeNanos = System.nanoTime();
278                     context.reportActiveIoTime(activeIoEndTimeNanos - activeIoStartTimeNanos);
279                 } else {
280                     processReady(strategy);
281                 }
282             } else if (context.shouldReportActiveIoTime()) {
283                 context.reportActiveIoTime(0);
284             }
285 
286             if (allowGrowing && strategy == eventList.capacity()) {
287                 //increase the size of the array as we needed the whole space for the events
288                 eventList.realloc(false);
289             }
290         } catch (Error e) {
291             throw e;
292         } catch (Throwable t) {
293             handleLoopException(t);
294         } finally {
295             processCancelledRegistrations();
296         }
297         return handled;
298     }
299 
300     // Process all previous cannceld registrations and remove them from the registration map.
301     private void processCancelledRegistrations() {
302         for (;;) {
303             DefaultKqueueIoRegistration cancelledRegistration = cancelledRegistrations.poll();
304             if (cancelledRegistration == null) {
305                 return;
306             }
307             DefaultKqueueIoRegistration removed = registrations.remove(cancelledRegistration.id);
308             assert removed == cancelledRegistration;
309             if (removed.isHandleForChannel()) {
310                 numChannels--;
311             }
312             removed.handle.unregistered();
313         }
314     }
315 
316     int numRegisteredChannels() {
317         return numChannels;
318     }
319 
320     List<Channel> registeredChannelsList() {
321         LongObjectMap<DefaultKqueueIoRegistration> ch = registrations;
322         if (ch.isEmpty()) {
323             return Collections.emptyList();
324         }
325 
326         List<Channel> channels = new ArrayList<>(ch.size());
327 
328         for (DefaultKqueueIoRegistration registration : ch.values()) {
329             if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
330                 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
331             }
332         }
333         return Collections.unmodifiableList(channels);
334     }
335 
336     private static void handleLoopException(Throwable t) {
337         logger.warn("Unexpected exception in the selector loop.", t);
338 
339         // Prevent possible consecutive immediate failures that lead to
340         // excessive CPU consumption.
341         try {
342             Thread.sleep(1000);
343         } catch (InterruptedException e) {
344             // Ignore.
345         }
346     }
347 
348     @Override
349     public void prepareToDestroy() {
350         try {
351             kqueueWaitNow();
352         } catch (IOException e) {
353             // ignore on close
354         }
355 
356         // Using the intermediate collection to prevent ConcurrentModificationException.
357         // In the `close()` method, the channel is deleted from `channels` map.
358         DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
359 
360         for (DefaultKqueueIoRegistration reg: copy) {
361             reg.close();
362         }
363 
364         processCancelledRegistrations();
365     }
366 
367     @Override
368     public void destroy() {
369         try {
370             try {
371                 kqueueFd.close();
372             } catch (IOException e) {
373                 logger.warn("Failed to close the kqueue fd.", e);
374             }
375         } finally {
376             // Cleanup all native memory!
377             nativeArrays.free();
378             changeList.free();
379             eventList.free();
380         }
381     }
382 
383     @Override
384     public IoRegistration register(IoHandle handle) {
385         final KQueueIoHandle kqueueHandle = cast(handle);
386         if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
387             throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
388         }
389 
390         DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
391                 executor, kqueueHandle);
392         DefaultKqueueIoRegistration old = registrations.put(registration.id, registration);
393         if (old != null) {
394             // This should never happen but just in case.
395             registrations.put(old.id, old);
396             throw new IllegalStateException();
397         }
398         if (registration.isHandleForChannel()) {
399             numChannels++;
400         }
401         handle.registered();
402         return registration;
403     }
404 
405     private static KQueueIoHandle cast(IoHandle handle) {
406         if (handle instanceof KQueueIoHandle) {
407             return (KQueueIoHandle) handle;
408         }
409         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
410     }
411 
412     private static KQueueIoOps cast(IoOps ops) {
413         if (ops instanceof KQueueIoOps) {
414             return (KQueueIoOps) ops;
415         }
416         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
417     }
418 
419     @Override
420     public boolean isCompatible(Class<? extends IoHandle> handleType) {
421         return KQueueIoHandle.class.isAssignableFrom(handleType);
422     }
423 
424     private final class DefaultKqueueIoRegistration implements IoRegistration {
425         private boolean cancellationPending;
426         private final AtomicBoolean canceled = new AtomicBoolean();
427         private final KQueueIoEvent event = new KQueueIoEvent();
428 
429         final KQueueIoHandle handle;
430         final long id;
431         private final ThreadAwareExecutor executor;
432 
433         DefaultKqueueIoRegistration(ThreadAwareExecutor executor, KQueueIoHandle handle) {
434             this.executor = executor;
435             this.handle = handle;
436             id = generateNextId();
437         }
438 
439         boolean isHandleForChannel() {
440             return handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe;
441         }
442 
443         @SuppressWarnings("unchecked")
444         @Override
445         public <T> T attachment() {
446             return (T) nativeArrays;
447         }
448 
449         @Override
450         public long submit(IoOps ops) {
451             KQueueIoOps kQueueIoOps = cast(ops);
452             if (!isValid()) {
453                 return -1;
454             }
455             short filter = kQueueIoOps.filter();
456             short flags = kQueueIoOps.flags();
457             int fflags = kQueueIoOps.fflags();
458             if (executor.isExecutorThread(Thread.currentThread())) {
459                 evSet(filter, flags, fflags);
460             } else {
461                 executor.execute(() -> evSet(filter, flags, fflags));
462             }
463             return 0;
464         }
465 
466         void handle(int ident, short filter, short flags, int fflags, long data, long udata) {
467             if (cancellationPending) {
468                 // This registration was already cancelled but not removed from the map yet, just ignore.
469                 return;
470             }
471             event.update(ident, filter, flags, fflags, data, udata);
472             handle.handle(this, event);
473         }
474 
475         private void evSet(short filter, short flags, int fflags) {
476             if (cancellationPending) {
477                 // This registration was already cancelled but not removed from the map yet, just ignore.
478                 return;
479             }
480             changeList.evSet(handle.ident(), filter, flags, fflags, 0, id);
481         }
482 
483         @Override
484         public boolean isValid() {
485             return !canceled.get();
486         }
487 
488         @Override
489         public boolean cancel() {
490             if (!canceled.compareAndSet(false, true)) {
491                 return false;
492             }
493             if (executor.isExecutorThread(Thread.currentThread())) {
494                 cancel0();
495             } else {
496                 executor.execute(this::cancel0);
497             }
498             return true;
499         }
500 
501         private void cancel0() {
502             // Let's add the registration to our cancelledRegistrations queue so we will process it
503             // after we processed all events. This is needed as otherwise we might end up removing it
504             // from the registration map while we still have some unprocessed events.
505             cancellationPending = true;
506             cancelledRegistrations.offer(this);
507         }
508 
509         void close() {
510             cancel();
511             try {
512                 handle.close();
513             } catch (Exception e) {
514                 logger.debug("Exception during closing " + handle, e);
515             }
516         }
517     }
518 }