View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.EventLoop;
19  import io.netty.channel.EventLoopGroup;
20  import io.netty.channel.SelectStrategy;
21  import io.netty.channel.SingleThreadEventLoop;
22  import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe;
23  import io.netty.channel.unix.FileDescriptor;
24  import io.netty.channel.unix.IovArray;
25  import io.netty.util.IntSupplier;
26  import io.netty.util.concurrent.RejectedExecutionHandler;
27  import io.netty.util.internal.ObjectUtil;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.io.IOException;
33  import java.util.Queue;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
37  
38  import static io.netty.channel.kqueue.KQueueEventArray.deleteGlobalRefs;
39  import static java.lang.Math.min;
40  
41  /**
42   * {@link EventLoop} which uses kqueue under the covers. Only works on BSD!
43   */
44  final class KQueueEventLoop extends SingleThreadEventLoop {
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueEventLoop.class);
46      private static final AtomicIntegerFieldUpdater<KQueueEventLoop> WAKEN_UP_UPDATER =
47              AtomicIntegerFieldUpdater.newUpdater(KQueueEventLoop.class, "wakenUp");
48      private static final int KQUEUE_WAKE_UP_IDENT = 0;
49  
50      static {
51          // Ensure JNI is initialized by the time this class is loaded by this time!
52          // We use unix-common methods in this class which are backed by JNI methods.
53          KQueue.ensureAvailability();
54      }
55  
56      private final NativeLongArray jniChannelPointers;
57      private final boolean allowGrowing;
58      private final FileDescriptor kqueueFd;
59      private final KQueueEventArray changeList;
60      private final KQueueEventArray eventList;
61      private final SelectStrategy selectStrategy;
62      private final IovArray iovArray = new IovArray();
63      private final IntSupplier selectNowSupplier = new IntSupplier() {
64          @Override
65          public int get() throws Exception {
66              return kqueueWaitNow();
67          }
68      };
69      private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
70          @Override
71          public Integer call() throws Exception {
72              return KQueueEventLoop.super.pendingTasks();
73          }
74      };
75  
76      private volatile int wakenUp;
77      private volatile int ioRatio = 50;
78  
79      KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
80                      SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
81          super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
82          selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
83          this.kqueueFd = Native.newKQueue();
84          if (maxEvents == 0) {
85              allowGrowing = true;
86              maxEvents = 4096;
87          } else {
88              allowGrowing = false;
89          }
90          changeList = new KQueueEventArray(maxEvents);
91          eventList = new KQueueEventArray(maxEvents);
92          jniChannelPointers = new NativeLongArray(4096);
93          int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
94          if (result < 0) {
95              cleanup();
96              throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
97          }
98      }
99  
100     void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
101         changeList.evSet(ch, filter, flags, fflags);
102     }
103 
104     void remove(AbstractKQueueChannel ch) throws IOException {
105         assert inEventLoop();
106         if (ch.jniSelfPtr == 0) {
107             return;
108         }
109 
110         jniChannelPointers.add(ch.jniSelfPtr);
111         ch.jniSelfPtr = 0;
112     }
113 
114     /**
115      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
116      */
117     IovArray cleanArray() {
118         iovArray.clear();
119         return iovArray;
120     }
121 
122     @Override
123     protected void wakeup(boolean inEventLoop) {
124         if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
125             wakeup();
126         }
127     }
128 
129     private void wakeup() {
130         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
131         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
132         // So it is not very practical to assert the return value is always >= 0.
133     }
134 
135     private int kqueueWait(boolean oldWakeup) throws IOException {
136         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
137         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
138         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
139         // in pipeline.
140         if (oldWakeup && hasTasks()) {
141             return kqueueWaitNow();
142         }
143 
144         long totalDelay = delayNanos(System.nanoTime());
145         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
146         return kqueueWait(delaySeconds, (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
147     }
148 
149     private int kqueueWaitNow() throws IOException {
150         return kqueueWait(0, 0);
151     }
152 
153     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
154         deleteJniChannelPointers();
155         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
156         changeList.clear();
157         return numEvents;
158     }
159 
160     private void deleteJniChannelPointers() {
161         if (!jniChannelPointers.isEmpty()) {
162             deleteGlobalRefs(jniChannelPointers.memoryAddress(), jniChannelPointers.memoryAddressEnd());
163             jniChannelPointers.clear();
164         }
165     }
166 
167     private void processReady(int ready) {
168         for (int i = 0; i < ready; ++i) {
169             final short filter = eventList.filter(i);
170             final short flags = eventList.flags(i);
171             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
172                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
173                 // we later attempt to delete the filters from kqueue.
174                 assert filter != Native.EVFILT_USER ||
175                         (filter == Native.EVFILT_USER && eventList.fd(i) == KQUEUE_WAKE_UP_IDENT);
176                 continue;
177             }
178 
179             AbstractKQueueChannel channel = eventList.channel(i);
180             if (channel == null) {
181                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
182                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
183                 // thus removed from kqueue FD.
184                 logger.warn("events[{}]=[{}, {}] had no channel!", i, eventList.fd(i), filter);
185                 continue;
186             }
187 
188             AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();
189             // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
190             // to read from the file descriptor.
191             if (filter == Native.EVFILT_WRITE) {
192                 unsafe.writeReady();
193             } else if (filter == Native.EVFILT_READ) {
194                 // Check READ before EOF to ensure all data is read before shutting down the input.
195                 unsafe.readReady(eventList.data(i));
196             } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
197                 unsafe.readEOF();
198             }
199 
200             // Check if EV_EOF was set, this will notify us for connection-reset in which case
201             // we may close the channel directly or try to read more data depending on the state of the
202             // Channel and also depending on the AbstractKQueueChannel subtype.
203             if ((flags & Native.EV_EOF) != 0) {
204                 unsafe.readEOF();
205             }
206         }
207     }
208 
209     @Override
210     protected void run() {
211         for (;;) {
212             try {
213                 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
214                 switch (strategy) {
215                     case SelectStrategy.CONTINUE:
216                         continue;
217                     case SelectStrategy.SELECT:
218                         strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
219 
220                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
221                         // before calling 'selector.wakeup()' to reduce the wake-up
222                         // overhead. (Selector.wakeup() is an expensive operation.)
223                         //
224                         // However, there is a race condition in this approach.
225                         // The race condition is triggered when 'wakenUp' is set to
226                         // true too early.
227                         //
228                         // 'wakenUp' is set to true too early if:
229                         // 1) Selector is waken up between 'wakenUp.set(false)' and
230                         //    'selector.select(...)'. (BAD)
231                         // 2) Selector is waken up between 'selector.select(...)' and
232                         //    'if (wakenUp.get()) { ... }'. (OK)
233                         //
234                         // In the first case, 'wakenUp' is set to true and the
235                         // following 'selector.select(...)' will wake up immediately.
236                         // Until 'wakenUp' is set to false again in the next round,
237                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
238                         // any attempt to wake up the Selector will fail, too, causing
239                         // the following 'selector.select(...)' call to block
240                         // unnecessarily.
241                         //
242                         // To fix this problem, we wake up the selector again if wakenUp
243                         // is true immediately after selector.select(...).
244                         // It is inefficient in that it wakes up the selector for both
245                         // the first case (BAD - wake-up required) and the second case
246                         // (OK - no wake-up required).
247 
248                         if (wakenUp == 1) {
249                             wakeup();
250                         }
251                         // fallthrough
252                     default:
253                 }
254 
255                 final int ioRatio = this.ioRatio;
256                 if (ioRatio == 100) {
257                     try {
258                         if (strategy > 0) {
259                             processReady(strategy);
260                         }
261                     } finally {
262                         runAllTasks();
263                     }
264                 } else {
265                     final long ioStartTime = System.nanoTime();
266 
267                     try {
268                         if (strategy > 0) {
269                             processReady(strategy);
270                         }
271                     } finally {
272                         final long ioTime = System.nanoTime() - ioStartTime;
273                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
274                     }
275                 }
276                 if (allowGrowing && strategy == eventList.capacity()) {
277                     //increase the size of the array as we needed the whole space for the events
278                     eventList.realloc(false);
279                 }
280             } catch (Throwable t) {
281                 handleLoopException(t);
282             }
283             // Always handle shutdown even if the loop processing threw an exception.
284             try {
285                 if (isShuttingDown()) {
286                     closeAll();
287                     if (confirmShutdown()) {
288                         break;
289                     }
290                 }
291             } catch (Throwable t) {
292                 handleLoopException(t);
293             }
294         }
295     }
296 
297     @Override
298     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
299         // This event loop never calls takeTask()
300         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
301                                                     : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
302     }
303 
304     @Override
305     public int pendingTasks() {
306         // As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
307         // otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
308         // See https://github.com/netty/netty/issues/5297
309         return inEventLoop() ? super.pendingTasks() : submit(pendingTasksCallable).syncUninterruptibly().getNow();
310     }
311 
312     /**
313      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
314      */
315     public int getIoRatio() {
316         return ioRatio;
317     }
318 
319     /**
320      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
321      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
322      */
323     public void setIoRatio(int ioRatio) {
324         if (ioRatio <= 0 || ioRatio > 100) {
325             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
326         }
327         this.ioRatio = ioRatio;
328     }
329 
330     @Override
331     protected void cleanup() {
332         try {
333             try {
334                 kqueueFd.close();
335             } catch (IOException e) {
336                 logger.warn("Failed to close the kqueue fd.", e);
337             }
338         } finally {
339             // Cleanup all native memory!
340 
341             // The JNI channel pointers should already be deleted because we should wait on kevent before this method,
342             // but lets just be sure we cleanup native memory.
343             deleteJniChannelPointers();
344             jniChannelPointers.free();
345 
346             changeList.free();
347             eventList.free();
348         }
349     }
350 
351     private void closeAll() {
352         try {
353             kqueueWaitNow();
354         } catch (IOException e) {
355             // ignore on close
356         }
357     }
358 
359     private static void handleLoopException(Throwable t) {
360         logger.warn("Unexpected exception in the selector loop.", t);
361 
362         // Prevent possible consecutive immediate failures that lead to
363         // excessive CPU consumption.
364         try {
365             Thread.sleep(1000);
366         } catch (InterruptedException e) {
367             // Ignore.
368         }
369     }
370 }