View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.channel.EventLoop;
19  import io.netty.channel.EventLoopGroup;
20  import io.netty.channel.SelectStrategy;
21  import io.netty.channel.SingleThreadEventLoop;
22  import io.netty.channel.kqueue.AbstractKQueueChannel.AbstractKQueueUnsafe;
23  import io.netty.channel.unix.FileDescriptor;
24  import io.netty.channel.unix.IovArray;
25  import io.netty.util.IntSupplier;
26  import io.netty.util.concurrent.RejectedExecutionHandler;
27  import io.netty.util.internal.ObjectUtil;
28  import io.netty.util.internal.PlatformDependent;
29  import io.netty.util.internal.logging.InternalLogger;
30  import io.netty.util.internal.logging.InternalLoggerFactory;
31  
32  import java.io.IOException;
33  import java.util.Queue;
34  import java.util.concurrent.Callable;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
37  
38  import static io.netty.channel.kqueue.KQueueEventArray.deleteGlobalRefs;
39  import static java.lang.Math.min;
40  
41  /**
42   * {@link EventLoop} which uses kqueue under the covers. Only works on BSD!
43   */
44  final class KQueueEventLoop extends SingleThreadEventLoop {
45      private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueEventLoop.class);
46      private static final AtomicIntegerFieldUpdater<KQueueEventLoop> WAKEN_UP_UPDATER =
47              AtomicIntegerFieldUpdater.newUpdater(KQueueEventLoop.class, "wakenUp");
48      private static final int KQUEUE_WAKE_UP_IDENT = 0;
49  
50      static {
51          // Ensure JNI is initialized by the time this class is loaded by this time!
52          // We use unix-common methods in this class which are backed by JNI methods.
53          KQueue.ensureAvailability();
54      }
55  
56      private final NativeLongArray jniChannelPointers;
57      private final boolean allowGrowing;
58      private final FileDescriptor kqueueFd;
59      private final KQueueEventArray changeList;
60      private final KQueueEventArray eventList;
61      private final SelectStrategy selectStrategy;
62      private final IovArray iovArray = new IovArray();
63      private final IntSupplier selectNowSupplier = new IntSupplier() {
64          @Override
65          public int get() throws Exception {
66              return kqueueWaitNow();
67          }
68      };
69  
70      private volatile int wakenUp;
71      private volatile int ioRatio = 50;
72  
73      KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
74                      SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
75          super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
76          selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
77          this.kqueueFd = Native.newKQueue();
78          if (maxEvents == 0) {
79              allowGrowing = true;
80              maxEvents = 4096;
81          } else {
82              allowGrowing = false;
83          }
84          changeList = new KQueueEventArray(maxEvents);
85          eventList = new KQueueEventArray(maxEvents);
86          jniChannelPointers = new NativeLongArray(4096);
87          int result = Native.keventAddUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
88          if (result < 0) {
89              cleanup();
90              throw new IllegalStateException("kevent failed to add user event with errno: " + (-result));
91          }
92      }
93  
94      void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
95          changeList.evSet(ch, filter, flags, fflags);
96      }
97  
98      void remove(AbstractKQueueChannel ch) throws IOException {
99          assert inEventLoop();
100         if (ch.jniSelfPtr == 0) {
101             return;
102         }
103 
104         jniChannelPointers.add(ch.jniSelfPtr);
105         ch.jniSelfPtr = 0;
106     }
107 
108     /**
109      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
110      */
111     IovArray cleanArray() {
112         iovArray.clear();
113         return iovArray;
114     }
115 
116     @Override
117     protected void wakeup(boolean inEventLoop) {
118         if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
119             wakeup();
120         }
121     }
122 
123     private void wakeup() {
124         Native.keventTriggerUserEvent(kqueueFd.intValue(), KQUEUE_WAKE_UP_IDENT);
125         // Note that the result may return an error (e.g. errno = EBADF after the event loop has been shutdown).
126         // So it is not very practical to assert the return value is always >= 0.
127     }
128 
129     private int kqueueWait(boolean oldWakeup) throws IOException {
130         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
131         // So we need to check task queue again before calling kqueueWait. If we don't, the task might be pended
132         // until kqueueWait was timed out. It might be pended until idle timeout if IdleStateHandler existed
133         // in pipeline.
134         if (oldWakeup && hasTasks()) {
135             return kqueueWaitNow();
136         }
137 
138         long totalDelay = delayNanos(System.nanoTime());
139         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
140         return kqueueWait(delaySeconds, (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
141     }
142 
143     private int kqueueWaitNow() throws IOException {
144         return kqueueWait(0, 0);
145     }
146 
147     private int kqueueWait(int timeoutSec, int timeoutNs) throws IOException {
148         deleteJniChannelPointers();
149         int numEvents = Native.keventWait(kqueueFd.intValue(), changeList, eventList, timeoutSec, timeoutNs);
150         changeList.clear();
151         return numEvents;
152     }
153 
154     private void deleteJniChannelPointers() {
155         if (!jniChannelPointers.isEmpty()) {
156             deleteGlobalRefs(jniChannelPointers.memoryAddress(), jniChannelPointers.memoryAddressEnd());
157             jniChannelPointers.clear();
158         }
159     }
160 
161     private void processReady(int ready) {
162         for (int i = 0; i < ready; ++i) {
163             final short filter = eventList.filter(i);
164             final short flags = eventList.flags(i);
165             if (filter == Native.EVFILT_USER || (flags & Native.EV_ERROR) != 0) {
166                 // EV_ERROR is returned if the FD is closed synchronously (which removes from kqueue) and then
167                 // we later attempt to delete the filters from kqueue.
168                 assert filter != Native.EVFILT_USER ||
169                         (filter == Native.EVFILT_USER && eventList.fd(i) == KQUEUE_WAKE_UP_IDENT);
170                 continue;
171             }
172 
173             AbstractKQueueChannel channel = eventList.channel(i);
174             if (channel == null) {
175                 // This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
176                 // We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
177                 // thus removed from kqueue FD.
178                 logger.warn("events[{}]=[{}, {}] had no channel!", i, eventList.fd(i), filter);
179                 continue;
180             }
181 
182             AbstractKQueueUnsafe unsafe = (AbstractKQueueUnsafe) channel.unsafe();
183             // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
184             // to read from the file descriptor.
185             if (filter == Native.EVFILT_WRITE) {
186                 unsafe.writeReady();
187             } else if (filter == Native.EVFILT_READ) {
188                 // Check READ before EOF to ensure all data is read before shutting down the input.
189                 unsafe.readReady(eventList.data(i));
190             } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
191                 unsafe.readEOF();
192             }
193 
194             // Check if EV_EOF was set, this will notify us for connection-reset in which case
195             // we may close the channel directly or try to read more data depending on the state of the
196             // Channel and also depending on the AbstractKQueueChannel subtype.
197             if ((flags & Native.EV_EOF) != 0) {
198                 unsafe.readEOF();
199             }
200         }
201     }
202 
203     @Override
204     protected void run() {
205         for (;;) {
206             try {
207                 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
208                 switch (strategy) {
209                     case SelectStrategy.CONTINUE:
210                         continue;
211                     case SelectStrategy.SELECT:
212                         strategy = kqueueWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
213 
214                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
215                         // before calling 'selector.wakeup()' to reduce the wake-up
216                         // overhead. (Selector.wakeup() is an expensive operation.)
217                         //
218                         // However, there is a race condition in this approach.
219                         // The race condition is triggered when 'wakenUp' is set to
220                         // true too early.
221                         //
222                         // 'wakenUp' is set to true too early if:
223                         // 1) Selector is waken up between 'wakenUp.set(false)' and
224                         //    'selector.select(...)'. (BAD)
225                         // 2) Selector is waken up between 'selector.select(...)' and
226                         //    'if (wakenUp.get()) { ... }'. (OK)
227                         //
228                         // In the first case, 'wakenUp' is set to true and the
229                         // following 'selector.select(...)' will wake up immediately.
230                         // Until 'wakenUp' is set to false again in the next round,
231                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
232                         // any attempt to wake up the Selector will fail, too, causing
233                         // the following 'selector.select(...)' call to block
234                         // unnecessarily.
235                         //
236                         // To fix this problem, we wake up the selector again if wakenUp
237                         // is true immediately after selector.select(...).
238                         // It is inefficient in that it wakes up the selector for both
239                         // the first case (BAD - wake-up required) and the second case
240                         // (OK - no wake-up required).
241 
242                         if (wakenUp == 1) {
243                             wakeup();
244                         }
245                         // fallthrough
246                     default:
247                 }
248 
249                 final int ioRatio = this.ioRatio;
250                 if (ioRatio == 100) {
251                     try {
252                         if (strategy > 0) {
253                             processReady(strategy);
254                         }
255                     } finally {
256                         runAllTasks();
257                     }
258                 } else {
259                     final long ioStartTime = System.nanoTime();
260 
261                     try {
262                         if (strategy > 0) {
263                             processReady(strategy);
264                         }
265                     } finally {
266                         final long ioTime = System.nanoTime() - ioStartTime;
267                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
268                     }
269                 }
270                 if (allowGrowing && strategy == eventList.capacity()) {
271                     //increase the size of the array as we needed the whole space for the events
272                     eventList.realloc(false);
273                 }
274             } catch (Throwable t) {
275                 handleLoopException(t);
276             }
277             // Always handle shutdown even if the loop processing threw an exception.
278             try {
279                 if (isShuttingDown()) {
280                     closeAll();
281                     if (confirmShutdown()) {
282                         break;
283                     }
284                 }
285             } catch (Throwable t) {
286                 handleLoopException(t);
287             }
288         }
289     }
290 
291     @Override
292     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
293         // This event loop never calls takeTask()
294         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
295                                                     : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
296     }
297 
298     /**
299      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
300      */
301     public int getIoRatio() {
302         return ioRatio;
303     }
304 
305     /**
306      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
307      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
308      */
309     public void setIoRatio(int ioRatio) {
310         if (ioRatio <= 0 || ioRatio > 100) {
311             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
312         }
313         this.ioRatio = ioRatio;
314     }
315 
316     @Override
317     protected void cleanup() {
318         try {
319             try {
320                 kqueueFd.close();
321             } catch (IOException e) {
322                 logger.warn("Failed to close the kqueue fd.", e);
323             }
324         } finally {
325             // Cleanup all native memory!
326 
327             // The JNI channel pointers should already be deleted because we should wait on kevent before this method,
328             // but lets just be sure we cleanup native memory.
329             deleteJniChannelPointers();
330             jniChannelPointers.free();
331 
332             changeList.free();
333             eventList.free();
334         }
335     }
336 
337     private void closeAll() {
338         try {
339             kqueueWaitNow();
340         } catch (IOException e) {
341             // ignore on close
342         }
343     }
344 
345     private static void handleLoopException(Throwable t) {
346         logger.warn("Unexpected exception in the selector loop.", t);
347 
348         // Prevent possible consecutive immediate failures that lead to
349         // excessive CPU consumption.
350         try {
351             Thread.sleep(1000);
352         } catch (InterruptedException e) {
353             // Ignore.
354         }
355     }
356 }