View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandle;
22  import io.netty.channel.IoHandler;
23  import io.netty.channel.IoHandlerFactory;
24  import io.netty.channel.IoOps;
25  import io.netty.channel.IoRegistration;
26  import io.netty.channel.SelectStrategy;
27  import io.netty.channel.SelectStrategyFactory;
28  import io.netty.channel.unix.FileDescriptor;
29  import io.netty.util.IntSupplier;
30  import io.netty.util.collection.LongObjectHashMap;
31  import io.netty.util.collection.LongObjectMap;
32  import io.netty.util.concurrent.ThreadAwareExecutor;
33  import io.netty.util.internal.ObjectUtil;
34  import io.netty.util.internal.StringUtil;
35  import io.netty.util.internal.logging.InternalLogger;
36  import io.netty.util.internal.logging.InternalLoggerFactory;
37  
38  import java.io.IOException;
39  import java.util.ArrayDeque;
40  import java.util.ArrayList;
41  import java.util.Collections;
42  import java.util.List;
43  import java.util.Queue;
44  import java.util.concurrent.atomic.AtomicBoolean;
45  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
46  
47  import static java.lang.Math.min;
48  
49  /**
50   * {@link IoHandler} which uses kqueue under the covers. Only works on BSD!
51   */
52  public final class KQueueIoHandler implements IoHandler {
53      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
54      private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
55              AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
56      private static final int KQUEUE_WAKE_UP_IDENT = 0;
57      // `kqueue()` may return EINVAL when a large number such as Integer.MAX_VALUE is specified as timeout.
58      // 24 hours would be a large enough value.
59      // https://man.freebsd.org/cgi/man.cgi?query=kevent&apropos=0&sektion=0&manpath=FreeBSD+6.1-RELEASE&format=html#end
60      private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399; // 24 hours - 1 second
61  
62      static {
63          // Ensure JNI is initialized by the time this class is loaded by this time!
64          // We use unix-common methods in this class which are backed by JNI methods.
65          KQueue.ensureAvailability();
66      }
67  
68      private final boolean allowGrowing;
69      private final FileDescriptor kqueueFd;
70      private final KQueueEventArray changeList;
71      private final KQueueEventArray eventList;
72      private final SelectStrategy selectStrategy;
73      private final NativeArrays nativeArrays;
74      private final IntSupplier selectNowSupplier = new IntSupplier() {
75          @Override
76          public int get() throws Exception {
77              return kqueueWaitNow();
78          }
79      };
80      private final ThreadAwareExecutor executor;
81      private final Queue<DefaultKqueueIoRegistration> cancelledRegistrations = new ArrayDeque<>();
82      private final LongObjectMap<DefaultKqueueIoRegistration> registrations = new LongObjectHashMap<>(4096);
83      private int numChannels;
84      private long nextId;
85  
86      private volatile int wakenUp;
87  
88      private long generateNextId() {
89          boolean reset = false;
90          for (;;) {
91              if (nextId == Long.MAX_VALUE) {
92                  if (reset) {
93                      throw new IllegalStateException("All possible ids in use");
94                  }
95                  reset = true;
96              }
97              nextId++;
98              if (nextId == KQUEUE_WAKE_UP_IDENT) {
99                  continue;
100             }
101             if (!registrations.containsKey(nextId)) {
102                 return nextId;
103             }
104         }
105     }
106 
107     /**
108      * Returns a new {@link IoHandlerFactory} that creates {@link KQueueIoHandler} instances.
109      */
110     public static IoHandlerFactory newFactory() {
111         return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
112     }
113 
114     /**
115      * Returns a new {@link IoHandlerFactory} that creates {@link KQueueIoHandler} instances.
116      */
117     public static IoHandlerFactory newFactory(final int maxEvents,
118                                               final SelectStrategyFactory selectStrategyFactory) {
119         ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
120         ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
121         return executor -> new KQueueIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
122     }
123 
124     private KQueueIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
125         this.executor = ObjectUtil.checkNotNull(executor, "executor");
126         this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
127         this.kqueueFd = Native.newKQueue();
128         if (maxEvents == 0) {
129             allowGrowing = true;
130             maxEvents = 4096;
131         } else {
132             allowGrowing = false;
133         }
134         this.changeList = new KQueueEventArray(maxEvents);
135         this.eventList = new KQueueEventArray(maxEvents);
136         nativeArrays = new NativeArrays();
137         int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
138         if (result < 0) {
139             destroy();
140             throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
141         }
142     }
143 
144     @Override
145     public void wakeup() {
146         if (!executor.isExecutorThread(Thread.currentThread())
147                 && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
148             wakeup0();
149         }
150     }
151 
152     private void wakeup0() {
153         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
154         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
155         // So it is not very practical to assert the return value is always >= 0.
156     }
157 
158     private int kqueueWait(IoHandlerContext context, boolean oldWakeup) throws IOException {
159         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
160         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
161         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
162         // in pipeline.
163         if (oldWakeup && !context.canBlock()) {
164             return kqueueWaitNow();
165         }
166 
167         long totalDelay = context.delayNanos(System.nanoTime());
168         int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
169         int delayNanos = (int) (totalDelay % 1000000000L);
170         return kqueueWait(delaySeconds, delayNanos);
171     }
172 
173     private int kqueueWaitNow() throws IOException {
174         return kqueueWait(0, 0);
175     }
176 
177     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
178         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
179         changeList.clear();
180         return numEvents;
181     }
182 
183     private void processReady(int ready) {
184         for (int i = 0; i < ready; ++i) {
185             final short filter = eventList.filter(i);
186             final short flags = eventList.flags(i);
187             final int ident = eventList.ident(i);
188             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
189                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
190                 // we later attempt to delete the filters from kqueue.
191                 assert filter != Native.EVFILT_USER ||
192                         (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
193                 continue;
194             }
195 
196             long id = eventList.udata(i);
197             DefaultKqueueIoRegistration registration = registrations.get(id);
198             if (registration == null) {
199                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
200                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
201                 // thus removed from kqueue FD.
202                 logger.warn("events[{}]=[{}, {}, {}] had no registration!", i, ident, id, filter);
203                 continue;
204             }
205             registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i), id);
206         }
207     }
208 
209     @Override
210     public int run(IoHandlerContext context) {
211         int handled = 0;
212         try {
213             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
214             switch (strategy) {
215                 case SelectStrategy.CONTINUE:
216                     return 0;
217 
218                 case SelectStrategy.BUSY_WAIT:
219                     // fall-through to SELECT since the busy-wait is not supported with kqueue
220 
221                 case SelectStrategy.SELECT:
222                     strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
223 
224                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
225                     // before calling 'selector.wakeup()' to reduce the wake-up
226                     // overhead. (Selector.wakeup() is an expensive operation.)
227                     //
228                     // However, there is a race condition in this approach.
229                     // The race condition is triggered when 'wakenUp' is set to
230                     // true too early.
231                     //
232                     // 'wakenUp' is set to true too early if:
233                     // 1) Selector is waken up between 'wakenUp.set(false)' and
234                     //    'selector.select(...)'. (BAD)
235                     // 2) Selector is waken up between 'selector.select(...)' and
236                     //    'if (wakenUp.get()) { ... }'. (OK)
237                     //
238                     // In the first case, 'wakenUp' is set to true and the
239                     // following 'selector.select(...)' will wake up immediately.
240                     // Until 'wakenUp' is set to false again in the next round,
241                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
242                     // any attempt to wake up the Selector will fail, too, causing
243                     // the following 'selector.select(...)' call to block
244                     // unnecessarily.
245                     //
246                     // To fix this problem, we wake up the selector again if wakenUp
247                     // is true immediately after selector.select(...).
248                     // It is inefficient in that it wakes up the selector for both
249                     // the first case (BAD - wake-up required) and the second case
250                     // (OK - no wake-up required).
251 
252                     if (wakenUp == 1) {
253                         wakeup0();
254                     }
255                     // fall-through
256                 default:
257             }
258 
259             if (strategy > 0) {
260                 handled = strategy;
261                 processReady(strategy);
262             }
263 
264             if (allowGrowing && strategy == eventList.capacity()) {
265                 //increase the size of the array as we needed the whole space for the events
266                 eventList.realloc(false);
267             }
268         } catch (Error e) {
269             throw e;
270         } catch (Throwable t) {
271             handleLoopException(t);
272         } finally {
273             processCancelledRegistrations();
274         }
275         return handled;
276     }
277 
278     // Process all previous cannceld registrations and remove them from the registration map.
279     private void processCancelledRegistrations() {
280         for (;;) {
281             DefaultKqueueIoRegistration cancelledRegistration = cancelledRegistrations.poll();
282             if (cancelledRegistration == null) {
283                 return;
284             }
285             DefaultKqueueIoRegistration removed = registrations.remove(cancelledRegistration.id);
286             assert removed == cancelledRegistration;
287             if (removed.isHandleForChannel()) {
288                 numChannels--;
289             }
290         }
291     }
292 
293     int numRegisteredChannels() {
294         return numChannels;
295     }
296 
297     List<Channel> registeredChannelsList() {
298         LongObjectMap<DefaultKqueueIoRegistration> ch = registrations;
299         if (ch.isEmpty()) {
300             return Collections.emptyList();
301         }
302 
303         List<Channel> channels = new ArrayList<>(ch.size());
304 
305         for (DefaultKqueueIoRegistration registration : ch.values()) {
306             if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
307                 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
308             }
309         }
310         return Collections.unmodifiableList(channels);
311     }
312 
313     private static void handleLoopException(Throwable t) {
314         logger.warn("Unexpected exception in the selector loop.", t);
315 
316         // Prevent possible consecutive immediate failures that lead to
317         // excessive CPU consumption.
318         try {
319             Thread.sleep(1000);
320         } catch (InterruptedException e) {
321             // Ignore.
322         }
323     }
324 
325     @Override
326     public void prepareToDestroy() {
327         try {
328             kqueueWaitNow();
329         } catch (IOException e) {
330             // ignore on close
331         }
332 
333         // Using the intermediate collection to prevent ConcurrentModificationException.
334         // In the `close()` method, the channel is deleted from `channels` map.
335         DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
336 
337         for (DefaultKqueueIoRegistration reg: copy) {
338             reg.close();
339         }
340 
341         processCancelledRegistrations();
342     }
343 
344     @Override
345     public void destroy() {
346         try {
347             try {
348                 kqueueFd.close();
349             } catch (IOException e) {
350                 logger.warn("Failed to close the kqueue fd.", e);
351             }
352         } finally {
353             // Cleanup all native memory!
354             nativeArrays.free();
355             changeList.free();
356             eventList.free();
357         }
358     }
359 
360     @Override
361     public IoRegistration register(IoHandle handle) {
362         final KQueueIoHandle kqueueHandle = cast(handle);
363         if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
364             throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
365         }
366 
367         DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
368                 executor, kqueueHandle);
369         DefaultKqueueIoRegistration old = registrations.put(registration.id, registration);
370         if (old != null) {
371             // This should never happen but just in case.
372             registrations.put(old.id, old);
373             throw new IllegalStateException();
374         }
375 
376         if (registration.isHandleForChannel()) {
377             numChannels++;
378         }
379         return registration;
380     }
381 
382     private static KQueueIoHandle cast(IoHandle handle) {
383         if (handle instanceof KQueueIoHandle) {
384             return (KQueueIoHandle) handle;
385         }
386         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
387     }
388 
389     private static KQueueIoOps cast(IoOps ops) {
390         if (ops instanceof KQueueIoOps) {
391             return (KQueueIoOps) ops;
392         }
393         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
394     }
395 
396     @Override
397     public boolean isCompatible(Class<? extends IoHandle> handleType) {
398         return KQueueIoHandle.class.isAssignableFrom(handleType);
399     }
400 
401     private final class DefaultKqueueIoRegistration implements IoRegistration {
402         private boolean cancellationPending;
403         private final AtomicBoolean canceled = new AtomicBoolean();
404         private final KQueueIoEvent event = new KQueueIoEvent();
405 
406         final KQueueIoHandle handle;
407         final long id;
408         private final ThreadAwareExecutor executor;
409 
410         DefaultKqueueIoRegistration(ThreadAwareExecutor executor, KQueueIoHandle handle) {
411             this.executor = executor;
412             this.handle = handle;
413             id = generateNextId();
414         }
415 
416         boolean isHandleForChannel() {
417             return handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe;
418         }
419 
420         @SuppressWarnings("unchecked")
421         @Override
422         public <T> T attachment() {
423             return (T) nativeArrays;
424         }
425 
426         @Override
427         public long submit(IoOps ops) {
428             KQueueIoOps kQueueIoOps = cast(ops);
429             if (!isValid()) {
430                 return -1;
431             }
432             short filter = kQueueIoOps.filter();
433             short flags = kQueueIoOps.flags();
434             int fflags = kQueueIoOps.fflags();
435             if (executor.isExecutorThread(Thread.currentThread())) {
436                 evSet(filter, flags, fflags);
437             } else {
438                 executor.execute(() -> evSet(filter, flags, fflags));
439             }
440             return 0;
441         }
442 
443         void handle(int ident, short filter, short flags, int fflags, long data, long udata) {
444             if (cancellationPending) {
445                 // This registration was already cancelled but not removed from the map yet, just ignore.
446                 return;
447             }
448             event.update(ident, filter, flags, fflags, data, udata);
449             handle.handle(this, event);
450         }
451 
452         private void evSet(short filter, short flags, int fflags) {
453             if (cancellationPending) {
454                 // This registration was already cancelled but not removed from the map yet, just ignore.
455                 return;
456             }
457             changeList.evSet(handle.ident(), filter, flags, fflags, 0, id);
458         }
459 
460         @Override
461         public boolean isValid() {
462             return !canceled.get();
463         }
464 
465         @Override
466         public boolean cancel() {
467             if (!canceled.compareAndSet(false, true)) {
468                 return false;
469             }
470             if (executor.isExecutorThread(Thread.currentThread())) {
471                 cancel0();
472             } else {
473                 executor.execute(this::cancel0);
474             }
475             return true;
476         }
477 
478         private void cancel0() {
479             // Let's add the registration to our cancelledRegistrations queue so we will process it
480             // after we processed all events. This is needed as otherwise we might end up removing it
481             // from the registration map while we still have some unprocessed events.
482             cancellationPending = true;
483             cancelledRegistrations.offer(this);
484         }
485 
486         void close() {
487             cancel();
488             try {
489                 handle.close();
490             } catch (Exception e) {
491                 logger.debug("Exception during closing " + handle, e);
492             }
493         }
494     }
495 }