View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.EventLoop;
20  import io.netty.channel.EventLoopGroup;
21  import io.netty.channel.EventLoopTaskQueueFactory;
22  import io.netty.channel.SelectStrategy;
23  import io.netty.channel.SingleThreadEventLoop;
24  import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe;
25  import io.netty.channel.unix.FileDescriptor;
26  import io.netty.channel.unix.IovArray;
27  import io.netty.util.IntSupplier;
28  import io.netty.util.collection.LongObjectHashMap;
29  import io.netty.util.collection.LongObjectMap;
30  import io.netty.util.concurrent.RejectedExecutionHandler;
31  import io.netty.util.internal.ObjectUtil;
32  import io.netty.util.internal.PlatformDependent;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.util.ArrayDeque;
38  import java.util.Collection;
39  import java.util.Iterator;
40  import java.util.Queue;
41  import java.util.concurrent.Executor;
42  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
43  
44  import static java.lang.Math.min;
45  
46  /**
47   * {@link EventLoop} which uses kqueue under the covers. Only works on BSD!
48   */
49  final class KQueueEventLoop extends SingleThreadEventLoop {
50      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueEventLoop.class);
51      private static final AtomicIntegerFieldUpdater<KQueueEventLoop> WAKEN_UP_UPDATER =
52              AtomicIntegerFieldUpdater.newUpdater(KQueueEventLoop.class, "wakenUp");
53      private static final int KQUEUE_WAKE_UP_IDENT = 0;
54      // `kqueue()` may return EINVAL when a large number such as Integer.MAX_VALUE is specified as timeout.
55      // 24 hours would be a large enough value.
56      // https://man.freebsd.org/cgi/man.cgi?query=kevent&apropos=0&sektion=0&manpath=FreeBSD+6.1-RELEASE&format=html#end
57      private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399; // 24 hours - 1 second
58  
59      static {
60          // Ensure JNI is initialized by the time this class is loaded by this time!
61          // We use unix-common methods in this class which are backed by JNI methods.
62          KQueue.ensureAvailability();
63      }
64  
65      private final boolean allowGrowing;
66      private final FileDescriptor kqueueFd;
67      private final KQueueEventArray changeList;
68      private final KQueueEventArray eventList;
69      private final SelectStrategy selectStrategy;
70      private final IovArray iovArray = new IovArray();
71      private final IntSupplier selectNowSupplier = new IntSupplier() {
72          @Override
73          public int get() throws Exception {
74              return kqueueWaitNow();
75          }
76      };
77      private final LongObjectMap<KQueueRegistration> registrations = new LongObjectHashMap<KQueueRegistration>(4096);
78      private final Queue<KQueueRegistration> cancelledRegistrations = new ArrayDeque<KQueueRegistration>();
79      private long nextId;
80      private volatile int wakenUp;
81      private volatile int ioRatio = 50;
82  
83      KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
84                      SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
85                      EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
86          super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
87                  rejectedExecutionHandler);
88          this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
89          this.kqueueFd = Native.newKQueue();
90          if (maxEvents == 0) {
91              allowGrowing = true;
92              maxEvents = 4096;
93          } else {
94              allowGrowing = false;
95          }
96          this.changeList = new KQueueEventArray(maxEvents);
97          this.eventList = new KQueueEventArray(maxEvents);
98          int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
99          if (result < 0) {
100             cleanup();
101             throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
102         }
103     }
104 
105     final class KQueueRegistration {
106         private final AbstractKQueueChannel channel;
107         private final long id;
108         private boolean removed;
109 
110         KQueueRegistration(AbstractKQueueChannel channel, long id) {
111             this.channel = channel;
112             this.id = id;
113         }
114 
115         void evSet(short filter, short flags, int fflags, long data) {
116             assert inEventLoop();
117             if (removed) {
118                 return;
119             }
120             changeList.evSet(channel.fd().intValue(), filter, flags, fflags, data, id);
121         }
122 
123         void remove() throws Exception {
124             assert inEventLoop();
125 
126             KQueueRegistration old = registrations.get(id);
127             if (old == null) {
128                 return;
129             }
130             assert old == this;
131             if (!old.removed) {
132                 cancelledRegistrations.offer(old);
133             }
134             if (channel.isOpen()) {
135                 // Remove the filters. This is only needed if it's still open as otherwise it will be automatically
136                 // removed once the file-descriptor is closed.
137                 //
138                 // See also https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
139                 channel.unregisterFilters();
140             }
141             old.removed = true;
142         }
143     }
144 
145     private static Queue<Runnable> newTaskQueue(
146             EventLoopTaskQueueFactory queueFactory) {
147         if (queueFactory == null) {
148             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
149         }
150         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
151     }
152 
153     private long generateNextId() {
154         boolean reset = false;
155         for (;;) {
156             if (nextId == Long.MAX_VALUE) {
157                 if (reset) {
158                     throw new IllegalStateException("All possible ids in use");
159                 }
160                 reset = true;
161             }
162             nextId++;
163             if (nextId == KQUEUE_WAKE_UP_IDENT) {
164                 continue;
165             }
166             if (!registrations.containsKey(nextId)) {
167                 return nextId;
168             }
169         }
170     }
171 
172     // Process all previous cannceld registrations and remove them from the registration map.Add commentMore actions
173     private void processCancelledRegistrations() {
174         for (;;) {
175             KQueueRegistration cancelledRegistration = cancelledRegistrations.poll();
176             if (cancelledRegistration == null) {
177                 return;
178             }
179             KQueueRegistration removed = registrations.remove(cancelledRegistration.id);
180             assert removed == cancelledRegistration;
181         }
182     }
183 
184     KQueueRegistration add(AbstractKQueueChannel ch) {
185         assert inEventLoop();
186 
187         long id = generateNextId();
188         KQueueRegistration registration = new KQueueRegistration(ch, id);
189         KQueueRegistration old = registrations.put(id, registration);
190 
191         assert old == null;
192         return registration;
193     }
194 
195     /**
196      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
197      */
198     IovArray cleanArray() {
199         iovArray.clear();
200         return iovArray;
201     }
202 
203     @Override
204     protected void wakeup(boolean inEventLoop) {
205         if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
206             wakeup();
207         }
208     }
209 
210     private void wakeup() {
211         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
212         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
213         // So it is not very practical to assert the return value is always >= 0.
214     }
215 
216     private int kqueueWait(boolean oldWakeup) throws IOException {
217         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
218         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
219         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
220         // in pipeline.
221         if (oldWakeup && hasTasks()) {
222             return kqueueWaitNow();
223         }
224 
225         long totalDelay = delayNanos(System.nanoTime());
226         int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
227         int delayNanos = (int) (totalDelay % 1000000000L);
228         return kqueueWait(delaySeconds, delayNanos);
229     }
230 
231     private int kqueueWaitNow() throws IOException {
232         return kqueueWait(0, 0);
233     }
234 
235     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
236         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
237         changeList.clear();
238         return numEvents;
239     }
240 
241     private void processReady(int ready) {
242         for (int i = 0; i < ready; ++i) {
243             final short filter = eventList.filter(i);
244             final short flags = eventList.flags(i);
245             final int fd = eventList.fd(i);
246             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
247                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
248                 // we later attempt to delete the filters from kqueue.
249                 assert filter != Native.EVFILT_USER ||
250                         (filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT);
251                 continue;
252             }
253 
254             long id = eventList.udata(i);
255             KQueueRegistration registration = registrations.get(id);
256             if (registration == null) {
257                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
258                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
259                 // thus removed from kqueue FD.
260                 logger.warn("events[{}]=[{}, {}, {}] had no registration!", i, fd, id, filter);
261                 continue;
262             }
263             if (registration.removed) {
264                 // just ignore
265                 continue;
266             }
267             AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) registration.channel.unsafe();
268             // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
269             // to read from the file descriptor.
270             if (filter == Native.EVFILT_WRITE) {
271                 unsafe.writeReady();
272             } else if (filter == Native.EVFILT_READ) {
273                 // Check READ before EOF to ensure all data is read before shutting down the input.
274                 unsafe.readReady(eventList.data(i));
275             } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
276                 unsafe.readEOF();
277             }
278 
279             // Check if EV_EOF was set, this will notify us for connection-reset in which case
280             // we may close the channel directly or try to read more data depending on the state of the
281             // Channel and also depending on the AbstractKQueueChannel subtype.
282             if ((flags & Native.EV_EOF) != 0) {
283                 unsafe.readEOF();
284             }
285         }
286     }
287 
288     @Override
289     protected void run() {
290         for (;;) {
291             try {
292                 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
293                 switch (strategy) {
294                     case SelectStrategy.CONTINUE:
295                         continue;
296 
297                     case SelectStrategy.BUSY_WAIT:
298                         // fall-through to SELECT since the busy-wait is not supported with kqueue
299 
300                     case SelectStrategy.SELECT:
301                         strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
302 
303                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
304                         // before calling 'selector.wakeup()' to reduce the wake-up
305                         // overhead. (Selector.wakeup() is an expensive operation.)
306                         //
307                         // However, there is a race condition in this approach.
308                         // The race condition is triggered when 'wakenUp' is set to
309                         // true too early.
310                         //
311                         // 'wakenUp' is set to true too early if:
312                         // 1) Selector is waken up between 'wakenUp.set(false)' and
313                         //    'selector.select(...)'. (BAD)
314                         // 2) Selector is waken up between 'selector.select(...)' and
315                         //    'if (wakenUp.get()) { ... }'. (OK)
316                         //
317                         // In the first case, 'wakenUp' is set to true and the
318                         // following 'selector.select(...)' will wake up immediately.
319                         // Until 'wakenUp' is set to false again in the next round,
320                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
321                         // any attempt to wake up the Selector will fail, too, causing
322                         // the following 'selector.select(...)' call to block
323                         // unnecessarily.
324                         //
325                         // To fix this problem, we wake up the selector again if wakenUp
326                         // is true immediately after selector.select(...).
327                         // It is inefficient in that it wakes up the selector for both
328                         // the first case (BAD - wake-up required) and the second case
329                         // (OK - no wake-up required).
330 
331                         if (wakenUp == 1) {
332                             wakeup();
333                         }
334                         // fallthrough
335                     default:
336                 }
337 
338                 final int ioRatio = this.ioRatio;
339                 if (ioRatio == 100) {
340                     try {
341                         if (strategy > 0) {
342                             processReady(strategy);
343                         }
344                     } finally {
345                         runAllTasks();
346                     }
347                 } else {
348                     final long ioStartTime = System.nanoTime();
349 
350                     try {
351                         if (strategy > 0) {
352                             processReady(strategy);
353                         }
354                     } finally {
355                         final long ioTime = System.nanoTime() - ioStartTime;
356                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
357                     }
358                 }
359                 if (allowGrowing && strategy == eventList.capacity()) {
360                     //increase the size of the array as we needed the whole space for the events
361                     eventList.realloc(false);
362                 }
363             } catch (Error e) {
364                 throw e;
365             } catch (Throwable t) {
366                 handleLoopException(t);
367             } finally {
368                 processCancelledRegistrations();
369                 // Always handle shutdown even if the loop processing threw an exception.
370                 try {
371                     if (isShuttingDown()) {
372                         closeAll();
373                         if (confirmShutdown()) {
374                             break;
375                         }
376                     }
377                 } catch (Error e) {
378                     throw e;
379                 } catch (Throwable t) {
380                     handleLoopException(t);
381                 }
382             }
383         }
384     }
385 
386     @Override
387     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
388         return newTaskQueue0(maxPendingTasks);
389     }
390 
391     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
392         // This event loop never calls takeTask()
393         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
394                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
395     }
396 
397     /**
398      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
399      */
400     public int getIoRatio() {
401         return ioRatio;
402     }
403 
404     /**
405      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
406      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
407      */
408     public void setIoRatio(int ioRatio) {
409         if (ioRatio <= 0 || ioRatio > 100) {
410             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
411         }
412         this.ioRatio = ioRatio;
413     }
414 
415     @Override
416     public int registeredChannels() {
417         return registrations.size();
418     }
419 
420     @Override
421     public Iterator<Channel> registeredChannelsIterator() {
422         assert inEventLoop();
423         final LongObjectMap<KQueueRegistration> ch = registrations;
424         if (ch.isEmpty()) {
425             return ChannelsReadOnlyIterator.empty();
426         }
427         return new ChannelsReadOnlyIterator<AbstractKQueueChannel>(new Iterable<AbstractKQueueChannel>() {
428             private final Collection<KQueueRegistration> registrations = ch.values();
429             @Override
430             public Iterator<AbstractKQueueChannel> iterator() {
431                 final Iterator<KQueueRegistration> registrationIterator = registrations.iterator();
432                 return new Iterator<AbstractKQueueChannel>() {
433                     @Override
434                     public boolean hasNext() {
435                         return registrationIterator.hasNext();
436                     }
437 
438                     @Override
439                     public AbstractKQueueChannel next() {
440                         return registrationIterator.next().channel;
441                     }
442                 };
443             }
444         });
445     }
446 
447     @Override
448     protected void cleanup() {
449         try {
450             try {
451                 kqueueFd.close();
452             } catch (IOException e) {
453                 logger.warn("Failed to close the kqueue fd.", e);
454             }
455         } finally {
456             // Cleanup all native memory!
457             iovArray.release();
458             changeList.free();
459             eventList.free();
460         }
461     }
462 
463     private void closeAll() {
464         try {
465             kqueueWaitNow();
466         } catch (IOException e) {
467             // ignore on close
468         }
469 
470         // Using the intermediate collection to prevent ConcurrentModificationException.
471         // In the `close()` method, the channel is deleted from `channels` map.
472         KQueueRegistration[] localRegistrations = registrations.values().toArray(new KQueueRegistration[0]);
473 
474         for (KQueueRegistration reg: localRegistrations) {
475             reg.channel.unsafe().close(reg.channel.unsafe().voidPromise());
476         }
477         processCancelledRegistrations();
478     }
479 
480     private static void handleLoopException(Throwable t) {
481         logger.warn("Unexpected exception in the selector loop.", t);
482 
483         // Prevent possible consecutive immediate failures that lead to
484         // excessive CPU consumption.
485         try {
486             Thread.sleep(1000);
487         } catch (InterruptedException e) {
488             // Ignore.
489         }
490     }
491 }