View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.Channel;
19  import io.netty.channel.EventLoop;
20  import io.netty.channel.EventLoopGroup;
21  import io.netty.channel.EventLoopTaskQueueFactory;
22  import io.netty.channel.SelectStrategy;
23  import io.netty.channel.SingleThreadEventLoop;
24  import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe;
25  import io.netty.channel.unix.FileDescriptor;
26  import io.netty.channel.unix.IovArray;
27  import io.netty.util.IntSupplier;
28  import io.netty.util.collection.IntObjectHashMap;
29  import io.netty.util.collection.IntObjectMap;
30  import io.netty.util.concurrent.RejectedExecutionHandler;
31  import io.netty.util.internal.ObjectUtil;
32  import io.netty.util.internal.PlatformDependent;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.util.Iterator;
38  import java.util.Queue;
39  import java.util.concurrent.Executor;
40  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
41  
42  import static java.lang.Math.min;
43  
44  /**
45   * {@link EventLoop} which uses kqueue under the covers. Only works on BSD!
46   */
47  final class KQueueEventLoop extends SingleThreadEventLoop {
48      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueEventLoop.class);
49      private static final AtomicIntegerFieldUpdater<KQueueEventLoop> WAKEN_UP_UPDATER =
50              AtomicIntegerFieldUpdater.newUpdater(KQueueEventLoop.class, "wakenUp");
51      private static final int KQUEUE_WAKE_UP_IDENT = 0;
52      // `kqueue()` may return EINVAL when a large number such as Integer.MAX_VALUE is specified as timeout.
53      // 24 hours would be a large enough value.
54      // https://man.freebsd.org/cgi/man.cgi?query=kevent&apropos=0&sektion=0&manpath=FreeBSD+6.1-RELEASE&format=html#end
55      private static final int KQUEUE_MAX_TIMEOUT_SECONDS = 86399; // 24 hours - 1 second
56  
57      static {
58          // Ensure JNI is initialized by the time this class is loaded by this time!
59          // We use unix-common methods in this class which are backed by JNI methods.
60          KQueue.ensureAvailability();
61      }
62  
63      private final boolean allowGrowing;
64      private final FileDescriptor kqueueFd;
65      private final KQueueEventArray changeList;
66      private final KQueueEventArray eventList;
67      private final SelectStrategy selectStrategy;
68      private final IovArray iovArray = new IovArray();
69      private final IntSupplier selectNowSupplier = new IntSupplier() {
70          @Override
71          public int get() throws Exception {
72              return kqueueWaitNow();
73          }
74      };
75      private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);
76  
77      private volatile int wakenUp;
78      private volatile int ioRatio = 50;
79  
80      KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
81                      SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
82                      EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
83          super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
84                  rejectedExecutionHandler);
85          this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
86          this.kqueueFd = Native.newKQueue();
87          if (maxEvents == 0) {
88              allowGrowing = true;
89              maxEvents = 4096;
90          } else {
91              allowGrowing = false;
92          }
93          this.changeList = new KQueueEventArray(maxEvents);
94          this.eventList = new KQueueEventArray(maxEvents);
95          int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
96          if (result < 0) {
97              cleanup();
98              throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
99          }
100     }
101 
102     private static Queue<Runnable> newTaskQueue(
103             EventLoopTaskQueueFactory queueFactory) {
104         if (queueFactory == null) {
105             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
106         }
107         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
108     }
109 
110     void add(AbstractKQueueChannel ch) {
111         assert inEventLoop();
112         AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch);
113         // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
114         // closed.
115         assert old == null || !old.isOpen();
116     }
117 
118     void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
119         assert inEventLoop();
120         changeList.evSet(ch, filter, flags, fflags);
121     }
122 
123     void remove(AbstractKQueueChannel ch) throws Exception {
124         assert inEventLoop();
125         int fd = ch.fd().intValue();
126 
127         AbstractKQueueChannel old = channels.remove(fd);
128         if (old != null && old != ch) {
129             // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
130             channels.put(fd, old);
131 
132             // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
133             assert !ch.isOpen();
134         } else if (ch.isOpen()) {
135             // Remove the filters. This is only needed if it's still open as otherwise it will be automatically
136             // removed once the file-descriptor is closed.
137             //
138             // See also https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
139             ch.unregisterFilters();
140         }
141     }
142 
143     /**
144      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
145      */
146     IovArray cleanArray() {
147         iovArray.clear();
148         return iovArray;
149     }
150 
151     @Override
152     protected void wakeup(boolean inEventLoop) {
153         if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
154             wakeup();
155         }
156     }
157 
158     private void wakeup() {
159         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
160         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
161         // So it is not very practical to assert the return value is always >= 0.
162     }
163 
164     private int kqueueWait(boolean oldWakeup) throws IOException {
165         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
166         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
167         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
168         // in pipeline.
169         if (oldWakeup && hasTasks()) {
170             return kqueueWaitNow();
171         }
172 
173         long totalDelay = delayNanos(System.nanoTime());
174         int delaySeconds = (int) min(totalDelay / 1000000000L, KQUEUE_MAX_TIMEOUT_SECONDS);
175         int delayNanos = (int) (totalDelay % 1000000000L);
176         return kqueueWait(delaySeconds, delayNanos);
177     }
178 
179     private int kqueueWaitNow() throws IOException {
180         return kqueueWait(0, 0);
181     }
182 
183     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
184         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
185         changeList.clear();
186         return numEvents;
187     }
188 
189     private void processReady(int ready) {
190         for (int i = 0; i < ready; ++i) {
191             final short filter = eventList.filter(i);
192             final short flags = eventList.flags(i);
193             final int fd = eventList.fd(i);
194             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
195                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
196                 // we later attempt to delete the filters from kqueue.
197                 assert filter != Native.EVFILT_USER ||
198                         (filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT);
199                 continue;
200             }
201 
202             AbstractKQueueChannel channel = channels.get(fd);
203             if (channel == null) {
204                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
205                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
206                 // thus removed from kqueue FD.
207                 logger.warn("events[{}]=[{}, {}] had no channel!", i, eventList.fd(i), filter);
208                 continue;
209             }
210 
211             AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();
212             // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
213             // to read from the file descriptor.
214             if (filter == Native.EVFILT_WRITE) {
215                 unsafe.writeReady();
216             } else if (filter == Native.EVFILT_READ) {
217                 // Check READ before EOF to ensure all data is read before shutting down the input.
218                 unsafe.readReady(eventList.data(i));
219             } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
220                 unsafe.readEOF();
221             }
222 
223             // Check if EV_EOF was set, this will notify us for connection-reset in which case
224             // we may close the channel directly or try to read more data depending on the state of the
225             // Channel and also depending on the AbstractKQueueChannel subtype.
226             if ((flags & Native.EV_EOF) != 0) {
227                 unsafe.readEOF();
228             }
229         }
230     }
231 
232     @Override
233     protected void run() {
234         for (;;) {
235             try {
236                 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
237                 switch (strategy) {
238                     case SelectStrategy.CONTINUE:
239                         continue;
240 
241                     case SelectStrategy.BUSY_WAIT:
242                         // fall-through to SELECT since the busy-wait is not supported with kqueue
243 
244                     case SelectStrategy.SELECT:
245                         strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
246 
247                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
248                         // before calling 'selector.wakeup()' to reduce the wake-up
249                         // overhead. (Selector.wakeup() is an expensive operation.)
250                         //
251                         // However, there is a race condition in this approach.
252                         // The race condition is triggered when 'wakenUp' is set to
253                         // true too early.
254                         //
255                         // 'wakenUp' is set to true too early if:
256                         // 1) Selector is waken up between 'wakenUp.set(false)' and
257                         //    'selector.select(...)'. (BAD)
258                         // 2) Selector is waken up between 'selector.select(...)' and
259                         //    'if (wakenUp.get()) { ... }'. (OK)
260                         //
261                         // In the first case, 'wakenUp' is set to true and the
262                         // following 'selector.select(...)' will wake up immediately.
263                         // Until 'wakenUp' is set to false again in the next round,
264                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
265                         // any attempt to wake up the Selector will fail, too, causing
266                         // the following 'selector.select(...)' call to block
267                         // unnecessarily.
268                         //
269                         // To fix this problem, we wake up the selector again if wakenUp
270                         // is true immediately after selector.select(...).
271                         // It is inefficient in that it wakes up the selector for both
272                         // the first case (BAD - wake-up required) and the second case
273                         // (OK - no wake-up required).
274 
275                         if (wakenUp == 1) {
276                             wakeup();
277                         }
278                         // fallthrough
279                     default:
280                 }
281 
282                 final int ioRatio = this.ioRatio;
283                 if (ioRatio == 100) {
284                     try {
285                         if (strategy > 0) {
286                             processReady(strategy);
287                         }
288                     } finally {
289                         runAllTasks();
290                     }
291                 } else {
292                     final long ioStartTime = System.nanoTime();
293 
294                     try {
295                         if (strategy > 0) {
296                             processReady(strategy);
297                         }
298                     } finally {
299                         final long ioTime = System.nanoTime() - ioStartTime;
300                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
301                     }
302                 }
303                 if (allowGrowing && strategy == eventList.capacity()) {
304                     //increase the size of the array as we needed the whole space for the events
305                     eventList.realloc(false);
306                 }
307             } catch (Error e) {
308                 throw e;
309             } catch (Throwable t) {
310                 handleLoopException(t);
311             } finally {
312                 // Always handle shutdown even if the loop processing threw an exception.
313                 try {
314                     if (isShuttingDown()) {
315                         closeAll();
316                         if (confirmShutdown()) {
317                             break;
318                         }
319                     }
320                 } catch (Error e) {
321                     throw e;
322                 } catch (Throwable t) {
323                     handleLoopException(t);
324                 }
325             }
326         }
327     }
328 
329     @Override
330     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
331         return newTaskQueue0(maxPendingTasks);
332     }
333 
334     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
335         // This event loop never calls takeTask()
336         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
337                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
338     }
339 
340     /**
341      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
342      */
343     public int getIoRatio() {
344         return ioRatio;
345     }
346 
347     /**
348      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
349      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
350      */
351     public void setIoRatio(int ioRatio) {
352         if (ioRatio <= 0 || ioRatio > 100) {
353             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
354         }
355         this.ioRatio = ioRatio;
356     }
357 
358     @Override
359     public int registeredChannels() {
360         return channels.size();
361     }
362 
363     @Override
364     public Iterator<Channel> registeredChannelsIterator() {
365         assert inEventLoop();
366         IntObjectMap<AbstractKQueueChannel> ch = channels;
367         if (ch.isEmpty()) {
368             return ChannelsReadOnlyIterator.empty();
369         }
370         return new ChannelsReadOnlyIterator<AbstractKQueueChannel>(ch.values());
371     }
372 
373     @Override
374     protected void cleanup() {
375         try {
376             try {
377                 kqueueFd.close();
378             } catch (IOException e) {
379                 logger.warn("Failed to close the kqueue fd.", e);
380             }
381         } finally {
382             // Cleanup all native memory!
383             changeList.free();
384             eventList.free();
385         }
386     }
387 
388     private void closeAll() {
389         try {
390             kqueueWaitNow();
391         } catch (IOException e) {
392             // ignore on close
393         }
394 
395         // Using the intermediate collection to prevent ConcurrentModificationException.
396         // In the `close()` method, the channel is deleted from `channels` map.
397         AbstractKQueueChannel[] localChannels = channels.values().toArray(new AbstractKQueueChannel[0]);
398 
399         for (AbstractKQueueChannel ch: localChannels) {
400             ch.unsafe().close(ch.unsafe().voidPromise());
401         }
402     }
403 
404     private static void handleLoopException(Throwable t) {
405         logger.warn("Unexpected exception in the selector loop.", t);
406 
407         // Prevent possible consecutive immediate failures that lead to
408         // excessive CPU consumption.
409         try {
410             Thread.sleep(1000);
411         } catch (InterruptedException e) {
412             // Ignore.
413         }
414     }
415 }