View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.EventLoop;
19  import io.netty.channel.EventLoopGroup;
20  import io.netty.channel.EventLoopTaskQueueFactory;
21  import io.netty.channel.SelectStrategy;
22  import io.netty.channel.SingleThreadEventLoop;
23  import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe;
24  import io.netty.channel.unix.FileDescriptor;
25  import io.netty.channel.unix.IovArray;
26  import io.netty.util.IntSupplier;
27  import io.netty.util.collection.IntObjectHashMap;
28  import io.netty.util.collection.IntObjectMap;
29  import io.netty.util.concurrent.RejectedExecutionHandler;
30  import io.netty.util.internal.ObjectUtil;
31  import io.netty.util.internal.PlatformDependent;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.io.IOException;
36  import java.util.Queue;
37  import java.util.concurrent.Executor;
38  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
39  
40  import static java.lang.Math.min;
41  
42  /**
43   * {@link EventLoop} which uses kqueue under the covers. Only works on BSD!
44   */
45  final class KQueueEventLoop extends SingleThreadEventLoop {
46      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueEventLoop.class);
47      private static final AtomicIntegerFieldUpdater<KQueueEventLoop> WAKEN_UP_UPDATER =
48              AtomicIntegerFieldUpdater.newUpdater(KQueueEventLoop.class, "wakenUp");
49      private static final int KQUEUE_WAKE_UP_IDENT = 0;
50  
51      static {
52          // Ensure JNI is initialized by the time this class is loaded by this time!
53          // We use unix-common methods in this class which are backed by JNI methods.
54          KQueue.ensureAvailability();
55      }
56  
57      private final boolean allowGrowing;
58      private final FileDescriptor kqueueFd;
59      private final KQueueEventArray changeList;
60      private final KQueueEventArray eventList;
61      private final SelectStrategy selectStrategy;
62      private final IovArray iovArray = new IovArray();
63      private final IntSupplier selectNowSupplier = new IntSupplier() {
64          @Override
65          public int get() throws Exception {
66              return kqueueWaitNow();
67          }
68      };
69      private final IntObjectMap<AbstractKQueueChannel> channels = new IntObjectHashMap<AbstractKQueueChannel>(4096);
70  
71      private volatile int wakenUp;
72      private volatile int ioRatio = 50;
73  
74      KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
75                      SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
76                      EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
77          super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
78                  rejectedExecutionHandler);
79          this.selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
80          this.kqueueFd = Native.newKQueue();
81          if (maxEvents == 0) {
82              allowGrowing = true;
83              maxEvents = 4096;
84          } else {
85              allowGrowing = false;
86          }
87          this.changeList = new KQueueEventArray(maxEvents);
88          this.eventList = new KQueueEventArray(maxEvents);
89          int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
90          if (result < 0) {
91              cleanup();
92              throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
93          }
94      }
95  
96      private static Queue<Runnable> newTaskQueue(
97              EventLoopTaskQueueFactory queueFactory) {
98          if (queueFactory == null) {
99              return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
100         }
101         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
102     }
103 
104     void add(AbstractKQueueChannel ch) {
105         assert inEventLoop();
106         AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch);
107         // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
108         // closed.
109         assert old == null || !old.isOpen();
110     }
111 
112     void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
113         assert inEventLoop();
114         changeList.evSet(ch, filter, flags, fflags);
115     }
116 
117     void remove(AbstractKQueueChannel ch) throws Exception {
118         assert inEventLoop();
119         int fd = ch.fd().intValue();
120 
121         AbstractKQueueChannel old = channels.remove(fd);
122         if (old != null && old != ch) {
123             // The Channel mapping was already replaced due FD reuse, put back the stored Channel.
124             channels.put(fd, old);
125 
126             // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
127             assert !ch.isOpen();
128         } else if (ch.isOpen()) {
129             // Remove the filters. This is only needed if it's still open as otherwise it will be automatically
130             // removed once the file-descriptor is closed.
131             //
132             // See also https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
133             ch.unregisterFilters();
134         }
135     }
136 
137     /**
138      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
139      */
140     IovArray cleanArray() {
141         iovArray.clear();
142         return iovArray;
143     }
144 
145     @Override
146     protected void wakeup(boolean inEventLoop) {
147         if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
148             wakeup();
149         }
150     }
151 
152     private void wakeup() {
153         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
154         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
155         // So it is not very practical to assert the return value is always >= 0.
156     }
157 
158     private int kqueueWait(boolean oldWakeup) throws IOException {
159         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
160         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
161         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
162         // in pipeline.
163         if (oldWakeup && hasTasks()) {
164             return kqueueWaitNow();
165         }
166 
167         long totalDelay = delayNanos(System.nanoTime());
168         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
169         return kqueueWait(delaySeconds, (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
170     }
171 
172     private int kqueueWaitNow() throws IOException {
173         return kqueueWait(0, 0);
174     }
175 
176     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
177         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
178         changeList.clear();
179         return numEvents;
180     }
181 
182     private void processReady(int ready) {
183         for (int i = 0; i < ready; ++i) {
184             final short filter = eventList.filter(i);
185             final short flags = eventList.flags(i);
186             final int fd = eventList.fd(i);
187             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
188                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
189                 // we later attempt to delete the filters from kqueue.
190                 assert filter != Native.EVFILT_USER ||
191                         (filter == Native.EVFILT_USER && fd == KQUEUE_WAKE_UP_IDENT);
192                 continue;
193             }
194 
195             AbstractKQueueChannel channel = channels.get(fd);
196             if (channel == null) {
197                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
198                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
199                 // thus removed from kqueue FD.
200                 logger.warn("events[{}]=[{}, {}] had no channel!", i, eventList.fd(i), filter);
201                 continue;
202             }
203 
204             AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();
205             // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
206             // to read from the file descriptor.
207             if (filter == Native.EVFILT_WRITE) {
208                 unsafe.writeReady();
209             } else if (filter == Native.EVFILT_READ) {
210                 // Check READ before EOF to ensure all data is read before shutting down the input.
211                 unsafe.readReady(eventList.data(i));
212             } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
213                 unsafe.readEOF();
214             }
215 
216             // Check if EV_EOF was set, this will notify us for connection-reset in which case
217             // we may close the channel directly or try to read more data depending on the state of the
218             // Channel and also depending on the AbstractKQueueChannel subtype.
219             if ((flags & Native.EV_EOF) != 0) {
220                 unsafe.readEOF();
221             }
222         }
223     }
224 
225     @Override
226     protected void run() {
227         for (;;) {
228             try {
229                 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
230                 switch (strategy) {
231                     case SelectStrategy.CONTINUE:
232                         continue;
233 
234                     case SelectStrategy.BUSY_WAIT:
235                         // fall-through to SELECT since the busy-wait is not supported with kqueue
236 
237                     case SelectStrategy.SELECT:
238                         strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
239 
240                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
241                         // before calling 'selector.wakeup()' to reduce the wake-up
242                         // overhead. (Selector.wakeup() is an expensive operation.)
243                         //
244                         // However, there is a race condition in this approach.
245                         // The race condition is triggered when 'wakenUp' is set to
246                         // true too early.
247                         //
248                         // 'wakenUp' is set to true too early if:
249                         // 1) Selector is waken up between 'wakenUp.set(false)' and
250                         //    'selector.select(...)'. (BAD)
251                         // 2) Selector is waken up between 'selector.select(...)' and
252                         //    'if (wakenUp.get()) { ... }'. (OK)
253                         //
254                         // In the first case, 'wakenUp' is set to true and the
255                         // following 'selector.select(...)' will wake up immediately.
256                         // Until 'wakenUp' is set to false again in the next round,
257                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
258                         // any attempt to wake up the Selector will fail, too, causing
259                         // the following 'selector.select(...)' call to block
260                         // unnecessarily.
261                         //
262                         // To fix this problem, we wake up the selector again if wakenUp
263                         // is true immediately after selector.select(...).
264                         // It is inefficient in that it wakes up the selector for both
265                         // the first case (BAD - wake-up required) and the second case
266                         // (OK - no wake-up required).
267 
268                         if (wakenUp == 1) {
269                             wakeup();
270                         }
271                         // fallthrough
272                     default:
273                 }
274 
275                 final int ioRatio = this.ioRatio;
276                 if (ioRatio == 100) {
277                     try {
278                         if (strategy > 0) {
279                             processReady(strategy);
280                         }
281                     } finally {
282                         runAllTasks();
283                     }
284                 } else {
285                     final long ioStartTime = System.nanoTime();
286 
287                     try {
288                         if (strategy > 0) {
289                             processReady(strategy);
290                         }
291                     } finally {
292                         final long ioTime = System.nanoTime() - ioStartTime;
293                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
294                     }
295                 }
296                 if (allowGrowing && strategy == eventList.capacity()) {
297                     //increase the size of the array as we needed the whole space for the events
298                     eventList.realloc(false);
299                 }
300             } catch (Error e) {
301                 throw e;
302             } catch (Throwable t) {
303                 handleLoopException(t);
304             } finally {
305                 // Always handle shutdown even if the loop processing threw an exception.
306                 try {
307                     if (isShuttingDown()) {
308                         closeAll();
309                         if (confirmShutdown()) {
310                             break;
311                         }
312                     }
313                 } catch (Error e) {
314                     throw e;
315                 } catch (Throwable t) {
316                     handleLoopException(t);
317                 }
318             }
319         }
320     }
321 
322     @Override
323     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
324         return newTaskQueue0(maxPendingTasks);
325     }
326 
327     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
328         // This event loop never calls takeTask()
329         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
330                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
331     }
332 
333     /**
334      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
335      */
336     public int getIoRatio() {
337         return ioRatio;
338     }
339 
340     /**
341      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
342      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
343      */
344     public void setIoRatio(int ioRatio) {
345         if (ioRatio <= 0 || ioRatio > 100) {
346             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
347         }
348         this.ioRatio = ioRatio;
349     }
350 
351     @Override
352     public int registeredChannels() {
353         return channels.size();
354     }
355 
356     @Override
357     protected void cleanup() {
358         try {
359             try {
360                 kqueueFd.close();
361             } catch (IOException e) {
362                 logger.warn("Failed to close the kqueue fd.", e);
363             }
364         } finally {
365             // Cleanup all native memory!
366             changeList.free();
367             eventList.free();
368         }
369     }
370 
371     private void closeAll() {
372         try {
373             kqueueWaitNow();
374         } catch (IOException e) {
375             // ignore on close
376         }
377 
378         // Using the intermediate collection to prevent ConcurrentModificationException.
379         // In the `close()` method, the channel is deleted from `channels` map.
380         AbstractKQueueChannel[] localChannels = channels.values().toArray(new AbstractKQueueChannel[0]);
381 
382         for (AbstractKQueueChannel ch: localChannels) {
383             ch.unsafe().close(ch.unsafe().voidPromise());
384         }
385     }
386 
387     private static void handleLoopException(Throwable t) {
388         logger.warn("Unexpected exception in the selector loop.", t);
389 
390         // Prevent possible consecutive immediate failures that lead to
391         // excessive CPU consumption.
392         try {
393             Thread.sleep(1000);
394         } catch (InterruptedException e) {
395             // Ignore.
396         }
397     }
398 }