View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.IoHandlerContext;
21  import io.netty.channel.IoHandle;
22  import io.netty.channel.IoHandler;
23  import io.netty.channel.IoHandlerFactory;
24  import io.netty.channel.IoOps;
25  import io.netty.channel.IoRegistration;
26  import io.netty.channel.SelectStrategy;
27  import io.netty.channel.SelectStrategyFactory;
28  import io.netty.channel.unix.FileDescriptor;
29  import io.netty.util.IntSupplier;
30  import io.netty.util.collection.LongObjectHashMap;
31  import io.netty.util.collection.LongObjectMap;
32  import io.netty.util.concurrent.ThreadAwareExecutor;
33  import io.netty.util.internal.ObjectUtil;
34  import io.netty.util.internal.StringUtil;
35  import io.netty.util.internal.logging.InternalLogger;
36  import io.netty.util.internal.logging.InternalLoggerFactory;
37  
38  import java.io.IOException;
39  import java.util.ArrayList;
40  import java.util.Collections;
41  import java.util.List;
42  import java.util.concurrent.atomic.AtomicBoolean;
43  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
44  
45  import static java.lang.Math.min;
46  
47  /**
48   * {@link IoHandler} which uses kqueue under the covers. Only works on BSD!
49   */
50  public final class KQueueIoHandler implements IoHandler {
51      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
52      private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
53              AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
54      private static final int KQUEUE_WAKE_UP_IDENT = 0;
55      // `kqueue()` may return EINVAL when a large number such as Integer.MAX_VALUE is specified as timeout.
56      // 24 hours would be a large enough value.
57      // https://man.freebsd.org/cgi/man.cgi?query=kevent&apropos=0&sektion=0&manpath=FreeBSD+6.1-RELEASE&format=html#end
58      private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399; // 24 hours - 1 second
59  
60      static {
61          // Ensure JNI is initialized by the time this class is loaded by this time!
62          // We use unix-common methods in this class which are backed by JNI methods.
63          KQueue.ensureAvailability();
64      }
65  
66      private final boolean allowGrowing;
67      private final FileDescriptor kqueueFd;
68      private final KQueueEventArray changeList;
69      private final KQueueEventArray eventList;
70      private final SelectStrategy selectStrategy;
71      private final NativeArrays nativeArrays;
72      private final IntSupplier selectNowSupplier = new IntSupplier() {
73          @Override
74          public int get() throws Exception {
75              return kqueueWaitNow();
76          }
77      };
78      private final ThreadAwareExecutor executor;
79      private final LongObjectMap<DefaultKqueueIoRegistration> registrations = new LongObjectHashMap<>(4096);
80      private int numChannels;
81      private long nextId;
82  
83      private volatile int wakenUp;
84  
85      private long generateNextId() {
86          boolean reset = false;
87          for (;;) {
88              if (nextId == Long.MAX_VALUE) {
89                  if (reset) {
90                      throw new IllegalStateException("All possible ids in use");
91                  }
92                  reset = true;
93              }
94              nextId++;
95              if (nextId == KQUEUE_WAKE_UP_IDENT) {
96                  continue;
97              }
98              if (!registrations.containsKey(nextId)) {
99                  return nextId;
100             }
101         }
102     }
103 
104     /**
105      * Returns a new {@link IoHandlerFactory} that creates {@link KQueueIoHandler} instances.
106      */
107     public static IoHandlerFactory newFactory() {
108         return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
109     }
110 
111     /**
112      * Returns a new {@link IoHandlerFactory} that creates {@link KQueueIoHandler} instances.
113      */
114     public static IoHandlerFactory newFactory(final int maxEvents,
115                                               final SelectStrategyFactory selectStrategyFactory) {
116         ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
117         ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
118         return executor -> new KQueueIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
119     }
120 
121     private KQueueIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
122         this.executor = ObjectUtil.checkNotNull(executor, "executor");
123         this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
124         this.kqueueFd = Native.newKQueue();
125         if (maxEvents == 0) {
126             allowGrowing = true;
127             maxEvents = 4096;
128         } else {
129             allowGrowing = false;
130         }
131         this.changeList = new KQueueEventArray(maxEvents);
132         this.eventList = new KQueueEventArray(maxEvents);
133         nativeArrays = new NativeArrays();
134         int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
135         if (result < 0) {
136             destroy();
137             throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
138         }
139     }
140 
141     @Override
142     public void wakeup() {
143         if (!executor.isExecutorThread(Thread.currentThread())
144                 && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
145             wakeup0();
146         }
147     }
148 
149     private void wakeup0() {
150         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
151         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
152         // So it is not very practical to assert the return value is always >= 0.
153     }
154 
155     private int kqueueWait(IoHandlerContext context, boolean oldWakeup) throws IOException {
156         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
157         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
158         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
159         // in pipeline.
160         if (oldWakeup && !context.canBlock()) {
161             return kqueueWaitNow();
162         }
163 
164         long totalDelay = context.delayNanos(System.nanoTime());
165         int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
166         int delayNanos = (int) (totalDelay % 1000000000L);
167         return kqueueWait(delaySeconds, delayNanos);
168     }
169 
170     private int kqueueWaitNow() throws IOException {
171         return kqueueWait(0, 0);
172     }
173 
174     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
175         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
176         changeList.clear();
177         return numEvents;
178     }
179 
180     private void processReady(int ready) {
181         for (int i = 0; i < ready; ++i) {
182             final short filter = eventList.filter(i);
183             final short flags = eventList.flags(i);
184             final int ident = eventList.ident(i);
185             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
186                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
187                 // we later attempt to delete the filters from kqueue.
188                 assert filter != Native.EVFILT_USER ||
189                         (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
190                 continue;
191             }
192 
193             long id = eventList.udata(i);
194             DefaultKqueueIoRegistration registration = registrations.get(id);
195             if (registration == null) {
196                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
197                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
198                 // thus removed from kqueue FD.
199                 logger.warn("events[{}]=[{}, {}, {}] had no registration!", i, ident, id, filter);
200                 continue;
201             }
202             registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i), id);
203         }
204     }
205 
206     @Override
207     public int run(IoHandlerContext context) {
208         int handled = 0;
209         try {
210             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
211             switch (strategy) {
212                 case SelectStrategy.CONTINUE:
213                     return 0;
214 
215                 case SelectStrategy.BUSY_WAIT:
216                     // fall-through to SELECT since the busy-wait is not supported with kqueue
217 
218                 case SelectStrategy.SELECT:
219                     strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
220 
221                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
222                     // before calling 'selector.wakeup()' to reduce the wake-up
223                     // overhead. (Selector.wakeup() is an expensive operation.)
224                     //
225                     // However, there is a race condition in this approach.
226                     // The race condition is triggered when 'wakenUp' is set to
227                     // true too early.
228                     //
229                     // 'wakenUp' is set to true too early if:
230                     // 1) Selector is waken up between 'wakenUp.set(false)' and
231                     //    'selector.select(...)'. (BAD)
232                     // 2) Selector is waken up between 'selector.select(...)' and
233                     //    'if (wakenUp.get()) { ... }'. (OK)
234                     //
235                     // In the first case, 'wakenUp' is set to true and the
236                     // following 'selector.select(...)' will wake up immediately.
237                     // Until 'wakenUp' is set to false again in the next round,
238                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
239                     // any attempt to wake up the Selector will fail, too, causing
240                     // the following 'selector.select(...)' call to block
241                     // unnecessarily.
242                     //
243                     // To fix this problem, we wake up the selector again if wakenUp
244                     // is true immediately after selector.select(...).
245                     // It is inefficient in that it wakes up the selector for both
246                     // the first case (BAD - wake-up required) and the second case
247                     // (OK - no wake-up required).
248 
249                     if (wakenUp == 1) {
250                         wakeup0();
251                     }
252                     // fall-through
253                 default:
254             }
255 
256             if (strategy > 0) {
257                 handled = strategy;
258                 processReady(strategy);
259             }
260 
261             if (allowGrowing && strategy == eventList.capacity()) {
262                 //increase the size of the array as we needed the whole space for the events
263                 eventList.realloc(false);
264             }
265         } catch (Error e) {
266             throw e;
267         } catch (Throwable t) {
268             handleLoopException(t);
269         }
270         return handled;
271     }
272 
273     int numRegisteredChannels() {
274         return numChannels;
275     }
276 
277     List<Channel> registeredChannelsList() {
278         LongObjectMap<DefaultKqueueIoRegistration> ch = registrations;
279         if (ch.isEmpty()) {
280             return Collections.emptyList();
281         }
282 
283         List<Channel> channels = new ArrayList<>(ch.size());
284 
285         for (DefaultKqueueIoRegistration registration : ch.values()) {
286             if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
287                 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
288             }
289         }
290         return Collections.unmodifiableList(channels);
291     }
292 
293     private static void handleLoopException(Throwable t) {
294         logger.warn("Unexpected exception in the selector loop.", t);
295 
296         // Prevent possible consecutive immediate failures that lead to
297         // excessive CPU consumption.
298         try {
299             Thread.sleep(1000);
300         } catch (InterruptedException e) {
301             // Ignore.
302         }
303     }
304 
305     @Override
306     public void prepareToDestroy() {
307         try {
308             kqueueWaitNow();
309         } catch (IOException e) {
310             // ignore on close
311         }
312 
313         // Using the intermediate collection to prevent ConcurrentModificationException.
314         // In the `close()` method, the channel is deleted from `channels` map.
315         DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
316 
317         for (DefaultKqueueIoRegistration reg: copy) {
318             reg.close();
319         }
320     }
321 
322     @Override
323     public void destroy() {
324         try {
325             try {
326                 kqueueFd.close();
327             } catch (IOException e) {
328                 logger.warn("Failed to close the kqueue fd.", e);
329             }
330         } finally {
331             // Cleanup all native memory!
332             nativeArrays.free();
333             changeList.free();
334             eventList.free();
335         }
336     }
337 
338     @Override
339     public IoRegistration register(IoHandle handle) {
340         final KQueueIoHandle kqueueHandle = cast(handle);
341         if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
342             throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
343         }
344 
345         DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
346                 executor, kqueueHandle);
347         DefaultKqueueIoRegistration old = registrations.put(registration.id, registration);
348         if (old != null) {
349             // This should never happen but just in case.
350             registrations.put(old.id, old);
351             throw new IllegalStateException();
352         }
353 
354         if (kqueueHandle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
355             numChannels++;
356         }
357         return registration;
358     }
359 
360     private static KQueueIoHandle cast(IoHandle handle) {
361         if (handle instanceof KQueueIoHandle) {
362             return (KQueueIoHandle) handle;
363         }
364         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
365     }
366 
367     private static KQueueIoOps cast(IoOps ops) {
368         if (ops instanceof KQueueIoOps) {
369             return (KQueueIoOps) ops;
370         }
371         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
372     }
373 
374     @Override
375     public boolean isCompatible(Class<? extends IoHandle> handleType) {
376         return KQueueIoHandle.class.isAssignableFrom(handleType);
377     }
378 
379     private final class DefaultKqueueIoRegistration implements IoRegistration {
380         private final AtomicBoolean canceled = new AtomicBoolean();
381         private final KQueueIoEvent event = new KQueueIoEvent();
382 
383         final KQueueIoHandle handle;
384         final long id;
385         private final ThreadAwareExecutor executor;
386 
387         DefaultKqueueIoRegistration(ThreadAwareExecutor executor, KQueueIoHandle handle) {
388             this.executor = executor;
389             this.handle = handle;
390             id = generateNextId();
391         }
392 
393         @SuppressWarnings("unchecked")
394         @Override
395         public <T> T attachment() {
396             return (T) nativeArrays;
397         }
398 
399         @Override
400         public long submit(IoOps ops) {
401             KQueueIoOps kQueueIoOps = cast(ops);
402             if (!isValid()) {
403                 return -1;
404             }
405             short filter = kQueueIoOps.filter();
406             short flags = kQueueIoOps.flags();
407             int fflags = kQueueIoOps.fflags();
408             if (executor.isExecutorThread(Thread.currentThread())) {
409                 evSet(filter, flags, fflags);
410             } else {
411                 executor.execute(() -> evSet(filter, flags, fflags));
412             }
413             return 0;
414         }
415 
416         void handle(int ident, short filter, short flags, int fflags, long data, long udata) {
417             event.update(ident, filter, flags, fflags, data, udata);
418             handle.handle(this, event);
419         }
420 
421         private void evSet(short filter, short flags, int fflags) {
422             changeList.evSet(handle.ident(), filter, flags, fflags, 0, id);
423         }
424 
425         @Override
426         public boolean isValid() {
427             return !canceled.get();
428         }
429 
430         @Override
431         public boolean cancel() {
432             if (!canceled.compareAndSet(false, true)) {
433                 return false;
434             }
435             if (executor.isExecutorThread(Thread.currentThread())) {
436                 cancel0();
437             } else {
438                 executor.execute(this::cancel0);
439             }
440             return true;
441         }
442 
443         private void cancel0() {
444             DefaultKqueueIoRegistration old = registrations.remove(id);
445             if (old != null) {
446                 if (old != this) {
447                     // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
448                     registrations.put(id, old);
449                 } else if (old.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
450                     numChannels--;
451                 }
452             }
453         }
454 
455         void close() {
456             cancel();
457             try {
458                 handle.close();
459             } catch (Exception e) {
460                 logger.debug("Exception during closing " + handle, e);
461             }
462         }
463     }
464 }