View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.IoEventLoop;
22  import io.netty.channel.IoExecutionContext;
23  import io.netty.channel.IoHandle;
24  import io.netty.channel.IoHandler;
25  import io.netty.channel.IoHandlerFactory;
26  import io.netty.channel.IoOps;
27  import io.netty.channel.SelectStrategy;
28  import io.netty.channel.SelectStrategyFactory;
29  import io.netty.channel.unix.FileDescriptor;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.util.IntSupplier;
32  import io.netty.util.collection.IntObjectHashMap;
33  import io.netty.util.collection.IntObjectMap;
34  import io.netty.util.internal.ObjectUtil;
35  import io.netty.util.internal.StringUtil;
36  import io.netty.util.internal.logging.InternalLogger;
37  import io.netty.util.internal.logging.InternalLoggerFactory;
38  
39  import java.io.IOException;
40  import java.util.ArrayList;
41  import java.util.Collections;
42  import java.util.List;
43  import java.util.concurrent.atomic.AtomicBoolean;
44  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
45  
46  import static java.lang.Math.min;
47  
48  /**
49   * {@link IoHandler} which uses kqueue under the covers. Only works on BSD!
50   */
51  public final class KQueueIoHandler implements IoHandler {
52      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
53      private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
54              AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
55      private static final int KQUEUE_WAKE_UP_IDENT = 0;
56      // `kqueue()` may return EINVAL when a large number such as Integer.MAX_VALUE is specified as timeout.
57      // 24 hours would be a large enough value.
58      // https://man.freebsd.org/cgi/man.cgi?query=kevent&apropos=0&sektion=0&manpath=FreeBSD+6.1-RELEASE&format=html#end
59      private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399; // 24 hours - 1 second
60  
61      static {
62          // Ensure JNI is initialized by the time this class is loaded by this time!
63          // We use unix-common methods in this class which are backed by JNI methods.
64          KQueue.ensureAvailability();
65      }
66  
67      private final boolean allowGrowing;
68      private final FileDescriptor kqueueFd;
69      private final KQueueEventArray changeList;
70      private final KQueueEventArray eventList;
71      private final SelectStrategy selectStrategy;
72      private final IovArray iovArray = new IovArray();
73      private final IntSupplier selectNowSupplier = new IntSupplier() {
74          @Override
75          public int get() throws Exception {
76              return kqueueWaitNow();
77          }
78      };
79      private final IntObjectMap<DefaultKqueueIoRegistration> registrations = new IntObjectHashMap<>(4096);
80      private int numChannels;
81  
82      private volatile int wakenUp;
83  
84      /**
85       * Returns a new {@link IoHandlerFactory} that creates {@link KQueueIoHandler} instances.
86       */
87      public static IoHandlerFactory newFactory() {
88          return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
89      }
90  
91      /**
92       * Returns a new {@link IoHandlerFactory} that creates {@link KQueueIoHandler} instances.
93       */
94      public static IoHandlerFactory newFactory(final int maxEvents,
95                                                final SelectStrategyFactory selectStrategyFactory) {
96          ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
97          ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
98          return new IoHandlerFactory() {
99              @Override
100             public IoHandler newHandler() {
101                 return new KQueueIoHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
102             }
103         };
104     }
105 
106     private KQueueIoHandler(int maxEvents, SelectStrategy strategy) {
107         this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
108         this.kqueueFd = Native.newKQueue();
109         if (maxEvents == 0) {
110             allowGrowing = true;
111             maxEvents = 4096;
112         } else {
113             allowGrowing = false;
114         }
115         this.changeList = new KQueueEventArray(maxEvents);
116         this.eventList = new KQueueEventArray(maxEvents);
117         int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
118         if (result < 0) {
119             destroy();
120             throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
121         }
122     }
123 
124     /**
125      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
126      */
127     IovArray cleanArray() {
128         iovArray.clear();
129         return iovArray;
130     }
131 
132     @Override
133     public void wakeup(IoEventLoop eventLoop) {
134         if (!eventLoop.inEventLoop() && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
135             wakeup();
136         }
137     }
138 
139     private void wakeup() {
140         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
141         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
142         // So it is not very practical to assert the return value is always >= 0.
143     }
144 
145     private int kqueueWait(IoExecutionContext context, boolean oldWakeup) throws IOException {
146         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
147         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
148         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
149         // in pipeline.
150         if (oldWakeup && !context.canBlock()) {
151             return kqueueWaitNow();
152         }
153 
154         long totalDelay = context.delayNanos(System.nanoTime());
155         int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
156         int delayNanos = (int) (totalDelay % 1000000000L);
157         return kqueueWait(delaySeconds, delayNanos);
158     }
159 
160     private int kqueueWaitNow() throws IOException {
161         return kqueueWait(0, 0);
162     }
163 
164     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
165         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
166         changeList.clear();
167         return numEvents;
168     }
169 
170     private void processReady(int ready) {
171         for (int i = 0; i < ready; ++i) {
172             final short filter = eventList.filter(i);
173             final short flags = eventList.flags(i);
174             final int ident = eventList.ident(i);
175             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
176                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
177                 // we later attempt to delete the filters from kqueue.
178                 assert filter != Native.EVFILT_USER ||
179                         (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
180                 continue;
181             }
182 
183             DefaultKqueueIoRegistration registration = registrations.get(ident);
184             if (registration == null) {
185                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
186                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
187                 // thus removed from kqueue FD.
188                 logger.warn("events[{}]=[{}, {}] had no registration!", i, ident, filter);
189                 continue;
190             }
191             registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i));
192         }
193     }
194 
195     @Override
196     public int run(IoExecutionContext context) {
197         int handled = 0;
198         try {
199             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
200             switch (strategy) {
201                 case SelectStrategy.CONTINUE:
202                     return 0;
203 
204                 case SelectStrategy.BUSY_WAIT:
205                     // fall-through to SELECT since the busy-wait is not supported with kqueue
206 
207                 case SelectStrategy.SELECT:
208                     strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
209 
210                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
211                     // before calling 'selector.wakeup()' to reduce the wake-up
212                     // overhead. (Selector.wakeup() is an expensive operation.)
213                     //
214                     // However, there is a race condition in this approach.
215                     // The race condition is triggered when 'wakenUp' is set to
216                     // true too early.
217                     //
218                     // 'wakenUp' is set to true too early if:
219                     // 1) Selector is waken up between 'wakenUp.set(false)' and
220                     //    'selector.select(...)'. (BAD)
221                     // 2) Selector is waken up between 'selector.select(...)' and
222                     //    'if (wakenUp.get()) { ... }'. (OK)
223                     //
224                     // In the first case, 'wakenUp' is set to true and the
225                     // following 'selector.select(...)' will wake up immediately.
226                     // Until 'wakenUp' is set to false again in the next round,
227                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
228                     // any attempt to wake up the Selector will fail, too, causing
229                     // the following 'selector.select(...)' call to block
230                     // unnecessarily.
231                     //
232                     // To fix this problem, we wake up the selector again if wakenUp
233                     // is true immediately after selector.select(...).
234                     // It is inefficient in that it wakes up the selector for both
235                     // the first case (BAD - wake-up required) and the second case
236                     // (OK - no wake-up required).
237 
238                     if (wakenUp == 1) {
239                         wakeup();
240                     }
241                     // fall-through
242                 default:
243             }
244 
245             if (strategy > 0) {
246                 handled = strategy;
247                 processReady(strategy);
248             }
249 
250             if (allowGrowing && strategy == eventList.capacity()) {
251                 //increase the size of the array as we needed the whole space for the events
252                 eventList.realloc(false);
253             }
254         } catch (Error e) {
255             throw e;
256         } catch (Throwable t) {
257             handleLoopException(t);
258         }
259         return handled;
260     }
261 
262     int numRegisteredChannels() {
263         return numChannels;
264     }
265 
266     List<Channel> registeredChannelsList() {
267         IntObjectMap<DefaultKqueueIoRegistration> ch = registrations;
268         if (ch.isEmpty()) {
269             return Collections.emptyList();
270         }
271 
272         List<Channel> channels = new ArrayList<>(ch.size());
273 
274         for (DefaultKqueueIoRegistration registration : ch.values()) {
275             if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
276                 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
277             }
278         }
279         return Collections.unmodifiableList(channels);
280     }
281 
282     private static void handleLoopException(Throwable t) {
283         logger.warn("Unexpected exception in the selector loop.", t);
284 
285         // Prevent possible consecutive immediate failures that lead to
286         // excessive CPU consumption.
287         try {
288             Thread.sleep(1000);
289         } catch (InterruptedException e) {
290             // Ignore.
291         }
292     }
293 
294     @Override
295     public void prepareToDestroy() {
296         try {
297             kqueueWaitNow();
298         } catch (IOException e) {
299             // ignore on close
300         }
301 
302         // Using the intermediate collection to prevent ConcurrentModificationException.
303         // In the `close()` method, the channel is deleted from `channels` map.
304         DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
305 
306         for (DefaultKqueueIoRegistration reg: copy) {
307             reg.close();
308         }
309     }
310 
311     @Override
312     public void destroy() {
313         try {
314             try {
315                 kqueueFd.close();
316             } catch (IOException e) {
317                 logger.warn("Failed to close the kqueue fd.", e);
318             }
319         } finally {
320             // Cleanup all native memory!
321             changeList.free();
322             eventList.free();
323         }
324     }
325 
326     @Override
327     public KQueueIoRegistration register(IoEventLoop eventLoop, IoHandle handle) {
328         final KQueueIoHandle kqueueHandle = cast(handle);
329         if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
330             throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
331         }
332 
333         DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
334                 eventLoop, kqueueHandle);
335         DefaultKqueueIoRegistration old = registrations.put(kqueueHandle.ident(), registration);
336         if (old != null) {
337             // restore old mapping and throw exception
338             registrations.put(kqueueHandle.ident(), old);
339             throw new IllegalStateException("registration for the KQueueIoHandle.ident() already exists");
340         }
341 
342         if (kqueueHandle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
343             numChannels++;
344         }
345         return registration;
346     }
347 
348     private static KQueueIoHandle cast(IoHandle handle) {
349         if (handle instanceof KQueueIoHandle) {
350             return (KQueueIoHandle) handle;
351         }
352         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
353     }
354 
355     private static KQueueIoOps cast(IoOps ops) {
356         if (ops instanceof KQueueIoOps) {
357             return (KQueueIoOps) ops;
358         }
359         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
360     }
361 
362     @Override
363     public boolean isCompatible(Class<? extends IoHandle> handleType) {
364         return KQueueIoHandle.class.isAssignableFrom(handleType);
365     }
366 
367     private final class DefaultKqueueIoRegistration extends AtomicBoolean implements KQueueIoRegistration {
368         private final KQueueIoEvent event = new KQueueIoEvent();
369 
370         final KQueueIoHandle handle;
371 
372         private final IoEventLoop eventLoop;
373 
374         DefaultKqueueIoRegistration(IoEventLoop eventLoop, KQueueIoHandle handle) {
375             this.eventLoop = eventLoop;
376             this.handle = handle;
377         }
378 
379         @Override
380         public long submit(IoOps ops) {
381             KQueueIoOps kQueueIoOps = cast(ops);
382             if (!isValid()) {
383                 return -1;
384             }
385             short filter = kQueueIoOps.filter();
386             short flags = kQueueIoOps.flags();
387             int fflags = kQueueIoOps.fflags();
388             if (eventLoop.inEventLoop()) {
389                 evSet(filter, flags, fflags);
390             } else {
391                 eventLoop.execute(() -> evSet(filter, flags, fflags));
392             }
393             return 0;
394         }
395 
396         @Override
397         public KQueueIoHandler ioHandler() {
398             return KQueueIoHandler.this;
399         }
400 
401         void handle(int ident, short filter, short flags, int fflags, long data) {
402             event.update(ident, filter, flags, fflags, data);
403             handle.handle(this, event);
404         }
405 
406         private void evSet(short filter, short flags, int fflags) {
407             changeList.evSet(handle.ident(), filter, flags, fflags);
408         }
409 
410         @Override
411         public boolean isValid() {
412             return !get();
413         }
414 
415         @Override
416         public void cancel() {
417             if (getAndSet(true)) {
418                 return;
419             }
420             if (eventLoop.inEventLoop()) {
421                 cancel0();
422             } else {
423                 eventLoop.execute(this::cancel0);
424             }
425         }
426 
427         private void cancel0() {
428             int ident = handle.ident();
429             DefaultKqueueIoRegistration old = registrations.remove(ident);
430             if (old != null) {
431                 if (old != this) {
432                     // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
433                     registrations.put(ident, old);
434                 } else if (old.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
435                     numChannels--;
436                 }
437             }
438         }
439 
440         void close() {
441             cancel();
442             try {
443                 handle.close();
444             } catch (Exception e) {
445                 logger.debug("Exception during closing " + handle, e);
446             }
447         }
448     }
449 }