View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.DefaultSelectStrategyFactory;
20  import io.netty.channel.EventLoop;
21  import io.netty.channel.IoEventLoop;
22  import io.netty.channel.IoExecutionContext;
23  import io.netty.channel.IoHandle;
24  import io.netty.channel.IoHandler;
25  import io.netty.channel.IoHandlerFactory;
26  import io.netty.channel.IoOps;
27  import io.netty.channel.SelectStrategy;
28  import io.netty.channel.SelectStrategyFactory;
29  import io.netty.channel.unix.FileDescriptor;
30  import io.netty.channel.unix.IovArray;
31  import io.netty.util.IntSupplier;
32  import io.netty.util.collection.IntObjectHashMap;
33  import io.netty.util.collection.IntObjectMap;
34  import io.netty.util.concurrent.Future;
35  import io.netty.util.concurrent.Promise;
36  import io.netty.util.internal.ObjectUtil;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.logging.InternalLogger;
39  import io.netty.util.internal.logging.InternalLoggerFactory;
40  
41  import java.io.IOException;
42  import java.util.ArrayList;
43  import java.util.Collections;
44  import java.util.List;
45  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
46  
47  import static java.lang.Math.min;
48  
49  /**
50   * {@link IoHandler} which uses kqueue under the covers. Only works on BSD!
51   */
52  public final class KQueueIoHandler implements IoHandler {
53      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueIoHandler.class);
54      private static final AtomicIntegerFieldUpdater<KQueueIoHandler> WAKEN_UP_UPDATER =
55              AtomicIntegerFieldUpdater.newUpdater(KQueueIoHandler.class, "wakenUp");
56      private static final int KQUEUE_WAKE_UP_IDENT = 0;
57      // `kqueue()` may return EINVAL when a large number such as Integer.MAX_VALUE is specified as timeout.
58      // 24 hours would be a large enough value.
59      // https://man.freebsd.org/cgi/man.cgi?query=kevent&apropos=0&sektion=0&manpath=FreeBSD+6.1-RELEASE&format=html#end
60      private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399; // 24 hours - 1 second
61  
62      static {
63          // Ensure JNI is initialized by the time this class is loaded by this time!
64          // We use unix-common methods in this class which are backed by JNI methods.
65          KQueue.ensureAvailability();
66      }
67  
68      private final boolean allowGrowing;
69      private final FileDescriptor kqueueFd;
70      private final KQueueEventArray changeList;
71      private final KQueueEventArray eventList;
72      private final SelectStrategy selectStrategy;
73      private final IovArray iovArray = new IovArray();
74      private final IntSupplier selectNowSupplier = new IntSupplier() {
75          @Override
76          public int get() throws Exception {
77              return kqueueWaitNow();
78          }
79      };
80      private final IntObjectMap<DefaultKqueueIoRegistration> registrations = new IntObjectHashMap<>(4096);
81      private int numChannels;
82  
83      private volatile int wakenUp;
84  
85      /**
86       * Returns a new {@link IoHandlerFactory} that creates {@link KQueueIoHandler} instances.
87       */
88      public static IoHandlerFactory newFactory() {
89          return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
90      }
91  
92      /**
93       * Returns a new {@link IoHandlerFactory} that creates {@link KQueueIoHandler} instances.
94       */
95      public static IoHandlerFactory newFactory(final int maxEvents,
96                                                final SelectStrategyFactory selectStrategyFactory) {
97          ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
98          ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
99          return new IoHandlerFactory() {
100             @Override
101             public IoHandler newHandler() {
102                 return new KQueueIoHandler(maxEvents, selectStrategyFactory.newSelectStrategy());
103             }
104         };
105     }
106 
107     private KQueueIoHandler(int maxEvents, SelectStrategy strategy) {
108         this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
109         this.kqueueFd = Native.newKQueue();
110         if (maxEvents == 0) {
111             allowGrowing = true;
112             maxEvents = 4096;
113         } else {
114             allowGrowing = false;
115         }
116         this.changeList = new KQueueEventArray(maxEvents);
117         this.eventList = new KQueueEventArray(maxEvents);
118         int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
119         if (result < 0) {
120             destroy();
121             throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
122         }
123     }
124 
125     /**
126      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
127      */
128     IovArray cleanArray() {
129         iovArray.clear();
130         return iovArray;
131     }
132 
133     @Override
134     public void wakeup(IoEventLoop eventLoop) {
135         if (!eventLoop.inEventLoop() && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
136             wakeup();
137         }
138     }
139 
140     private void wakeup() {
141         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
142         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
143         // So it is not very practical to assert the return value is always >= 0.
144     }
145 
146     private int kqueueWait(IoExecutionContext context, boolean oldWakeup) throws IOException {
147         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
148         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
149         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
150         // in pipeline.
151         if (oldWakeup && !context.canBlock()) {
152             return kqueueWaitNow();
153         }
154 
155         long totalDelay = context.delayNanos(System.nanoTime());
156         int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
157         int delayNanos = (int) (totalDelay % 1000000000L);
158         return kqueueWait(delaySeconds, delayNanos);
159     }
160 
161     private int kqueueWaitNow() throws IOException {
162         return kqueueWait(0, 0);
163     }
164 
165     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
166         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
167         changeList.clear();
168         return numEvents;
169     }
170 
171     private void processReady(int ready) {
172         for (int i = 0; i < ready; ++i) {
173             final short filter = eventList.filter(i);
174             final short flags = eventList.flags(i);
175             final int ident = eventList.ident(i);
176             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
177                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
178                 // we later attempt to delete the filters from kqueue.
179                 assert filter != Native.EVFILT_USER ||
180                         (filter == Native.EVFILT_USER && ident == KQUEUE_WAKE_UP_IDENT);
181                 continue;
182             }
183 
184             DefaultKqueueIoRegistration registration = registrations.get(ident);
185             if (registration == null) {
186                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
187                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
188                 // thus removed from kqueue FD.
189                 logger.warn("events[{}]=[{}, {}] had no registration!", i, ident, filter);
190                 continue;
191             }
192             registration.handle(ident, filter, flags, eventList.fflags(i), eventList.data(i));
193         }
194     }
195 
196     @Override
197     public int run(IoExecutionContext context) {
198         int handled = 0;
199         try {
200             int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
201             switch (strategy) {
202                 case SelectStrategy.CONTINUE:
203                     return 0;
204 
205                 case SelectStrategy.BUSY_WAIT:
206                     // fall-through to SELECT since the busy-wait is not supported with kqueue
207 
208                 case SelectStrategy.SELECT:
209                     strategy = kqueueWait(context, WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
210 
211                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
212                     // before calling 'selector.wakeup()' to reduce the wake-up
213                     // overhead. (Selector.wakeup() is an expensive operation.)
214                     //
215                     // However, there is a race condition in this approach.
216                     // The race condition is triggered when 'wakenUp' is set to
217                     // true too early.
218                     //
219                     // 'wakenUp' is set to true too early if:
220                     // 1) Selector is waken up between 'wakenUp.set(false)' and
221                     //    'selector.select(...)'. (BAD)
222                     // 2) Selector is waken up between 'selector.select(...)' and
223                     //    'if (wakenUp.get()) { ... }'. (OK)
224                     //
225                     // In the first case, 'wakenUp' is set to true and the
226                     // following 'selector.select(...)' will wake up immediately.
227                     // Until 'wakenUp' is set to false again in the next round,
228                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
229                     // any attempt to wake up the Selector will fail, too, causing
230                     // the following 'selector.select(...)' call to block
231                     // unnecessarily.
232                     //
233                     // To fix this problem, we wake up the selector again if wakenUp
234                     // is true immediately after selector.select(...).
235                     // It is inefficient in that it wakes up the selector for both
236                     // the first case (BAD - wake-up required) and the second case
237                     // (OK - no wake-up required).
238 
239                     if (wakenUp == 1) {
240                         wakeup();
241                     }
242                     // fall-through
243                 default:
244             }
245 
246             if (strategy > 0) {
247                 handled = strategy;
248                 processReady(strategy);
249             }
250 
251             if (allowGrowing && strategy == eventList.capacity()) {
252                 //increase the size of the array as we needed the whole space for the events
253                 eventList.realloc(false);
254             }
255         } catch (Error e) {
256             throw e;
257         } catch (Throwable t) {
258             handleLoopException(t);
259         }
260         return handled;
261     }
262 
263     int numRegisteredChannels() {
264         return numChannels;
265     }
266 
267     List<Channel> registeredChannelsList() {
268         IntObjectMap<DefaultKqueueIoRegistration> ch = registrations;
269         if (ch.isEmpty()) {
270             return Collections.emptyList();
271         }
272 
273         List<Channel> channels = new ArrayList<>(ch.size());
274 
275         for (DefaultKqueueIoRegistration registration : ch.values()) {
276             if (registration.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
277                 channels.add(((AbstractKQueueChannel.AbstractKQueueUnsafe) registration.handle).channel());
278             }
279         }
280         return Collections.unmodifiableList(channels);
281     }
282 
283     private static void handleLoopException(Throwable t) {
284         logger.warn("Unexpected exception in the selector loop.", t);
285 
286         // Prevent possible consecutive immediate failures that lead to
287         // excessive CPU consumption.
288         try {
289             Thread.sleep(1000);
290         } catch (InterruptedException e) {
291             // Ignore.
292         }
293     }
294 
295     @Override
296     public void prepareToDestroy() {
297         try {
298             kqueueWaitNow();
299         } catch (IOException e) {
300             // ignore on close
301         }
302 
303         // Using the intermediate collection to prevent ConcurrentModificationException.
304         // In the `close()` method, the channel is deleted from `channels` map.
305         DefaultKqueueIoRegistration[] copy = registrations.values().toArray(new DefaultKqueueIoRegistration[0]);
306 
307         for (DefaultKqueueIoRegistration reg: copy) {
308             reg.close();
309         }
310     }
311 
312     @Override
313     public void destroy() {
314         try {
315             try {
316                 kqueueFd.close();
317             } catch (IOException e) {
318                 logger.warn("Failed to close the kqueue fd.", e);
319             }
320         } finally {
321             // Cleanup all native memory!
322             changeList.free();
323             eventList.free();
324         }
325     }
326 
327     @Override
328     public KQueueIoRegistration register(IoEventLoop eventLoop, IoHandle handle) {
329         final KQueueIoHandle kqueueHandle = cast(handle);
330         if (kqueueHandle.ident() == KQUEUE_WAKE_UP_IDENT) {
331             throw new IllegalArgumentException("ident " + KQUEUE_WAKE_UP_IDENT + " is reserved for internal usage");
332         }
333 
334         DefaultKqueueIoRegistration registration = new DefaultKqueueIoRegistration(
335                 eventLoop, kqueueHandle);
336         DefaultKqueueIoRegistration old = registrations.put(kqueueHandle.ident(), registration);
337         if (old != null) {
338             // restore old mapping and throw exception
339             registrations.put(kqueueHandle.ident(), old);
340             throw new IllegalStateException("registration for the KQueueIoHandle.ident() already exists");
341         }
342 
343         if (kqueueHandle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
344             numChannels++;
345         }
346         return registration;
347     }
348 
349     private static KQueueIoHandle cast(IoHandle handle) {
350         if (handle instanceof KQueueIoHandle) {
351             return (KQueueIoHandle) handle;
352         }
353         throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
354     }
355 
356     private static KQueueIoOps cast(IoOps ops) {
357         if (ops instanceof KQueueIoOps) {
358             return (KQueueIoOps) ops;
359         }
360         throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
361     }
362 
363     @Override
364     public boolean isCompatible(Class<? extends IoHandle> handleType) {
365         return KQueueIoHandle.class.isAssignableFrom(handleType);
366     }
367 
368     private final class DefaultKqueueIoRegistration implements KQueueIoRegistration {
369         private final Promise<?> cancellationPromise;
370         private final KQueueIoEvent event = new KQueueIoEvent();
371 
372         final KQueueIoHandle handle;
373 
374         private final IoEventLoop eventLoop;
375 
376         DefaultKqueueIoRegistration(IoEventLoop eventLoop, KQueueIoHandle handle) {
377             this.eventLoop = eventLoop;
378             this.handle = handle;
379             this.cancellationPromise = eventLoop.newPromise();
380         }
381 
382         @Override
383         public long submit(IoOps ops) {
384             KQueueIoOps kQueueIoOps = cast(ops);
385             if (!isValid()) {
386                 return -1;
387             }
388             short filter = kQueueIoOps.filter();
389             short flags = kQueueIoOps.flags();
390             int fflags = kQueueIoOps.fflags();
391             if (eventLoop.inEventLoop()) {
392                 evSet(filter, flags, fflags);
393             } else {
394                 eventLoop.execute(() -> evSet(filter, flags, fflags));
395             }
396             return 0;
397         }
398 
399         @Override
400         public KQueueIoHandler ioHandler() {
401             return KQueueIoHandler.this;
402         }
403 
404         void handle(int ident, short filter, short flags, int fflags, long data) {
405             event.update(ident, filter, flags, fflags, data);
406             handle.handle(this, event);
407         }
408 
409         private void evSet(short filter, short flags, int fflags) {
410             changeList.evSet(handle.ident(), filter, flags, fflags);
411         }
412 
413         @Override
414         public void cancel() {
415             if (!cancellationPromise.trySuccess(null)) {
416                 return;
417             }
418             if (eventLoop.inEventLoop()) {
419                 cancel0();
420             } else {
421                 eventLoop.execute(this::cancel0);
422             }
423         }
424 
425         @Override
426         public Future<?> cancelFuture() {
427             return cancellationPromise;
428         }
429 
430         private void cancel0() {
431             int ident = handle.ident();
432             DefaultKqueueIoRegistration old = registrations.remove(ident);
433             if (old != null) {
434                 if (old != this) {
435                     // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
436                     registrations.put(ident, old);
437                 } else if (old.handle instanceof AbstractKQueueChannel.AbstractKQueueUnsafe) {
438                     numChannels--;
439                 }
440             }
441         }
442 
443         void close() {
444             cancel();
445             try {
446                 handle.close();
447             } catch (Exception e) {
448                 logger.debug("Exception during closing " + handle, e);
449             }
450         }
451     }
452 }