View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.EventLoop;
19  import io.netty.channel.EventLoopGroup;
20  import io.netty.channel.SelectStrategy;
21  import io.netty.channel.SingleThreadEventLoop;
22  import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
23  import io.netty.channel.unix.FileDescriptor;
24  import io.netty.channel.unix.IovArray;
25  import io.netty.util.IntSupplier;
26  import io.netty.util.collection.IntObjectHashMap;
27  import io.netty.util.collection.IntObjectMap;
28  import io.netty.util.concurrent.RejectedExecutionHandler;
29  import io.netty.util.internal.ObjectUtil;
30  import io.netty.util.internal.PlatformDependent;
31  import io.netty.util.internal.logging.InternalLogger;
32  import io.netty.util.internal.logging.InternalLoggerFactory;
33  
34  import java.io.IOException;
35  import java.util.ArrayList;
36  import java.util.Collection;
37  import java.util.Queue;
38  import java.util.concurrent.Callable;
39  import java.util.concurrent.Executor;
40  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
41  
42  import static java.lang.Math.min;
43  
44  /**
45   * {@link EventLoop} which uses epoll under the covers. Only works on Linux!
46   */
47  final class EpollEventLoop extends SingleThreadEventLoop {
48      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
49      private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER =
50              AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");
51  
52      static {
53          // Ensure JNI is initialized by the time this class is loaded by this time!
54          // We use unix-common methods in this class which are backed by JNI methods.
55          Epoll.ensureAvailability();
56      }
57  
58      private final FileDescriptor epollFd;
59      private final FileDescriptor eventFd;
60      private final FileDescriptor timerFd;
61      private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
62      private final boolean allowGrowing;
63      private final EpollEventArray events;
64  
65      // These are initialized on first use
66      private IovArray iovArray;
67      private NativeDatagramPacketArray datagramPacketArray;
68  
69      private final SelectStrategy selectStrategy;
70      private final IntSupplier selectNowSupplier = new IntSupplier() {
71          @Override
72          public int get() throws Exception {
73              return epollWaitNow();
74          }
75      };
76      private volatile int wakenUp;
77      private volatile int ioRatio = 50;
78  
79      // See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
80      private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
81  
82      EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
83                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
84          super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
85          selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
86          if (maxEvents == 0) {
87              allowGrowing = true;
88              events = new EpollEventArray(4096);
89          } else {
90              allowGrowing = false;
91              events = new EpollEventArray(maxEvents);
92          }
93          boolean success = false;
94          FileDescriptor epollFd = null;
95          FileDescriptor eventFd = null;
96          FileDescriptor timerFd = null;
97          try {
98              this.epollFd = epollFd = Native.newEpollCreate();
99              this.eventFd = eventFd = Native.newEventFd();
100             try {
101                 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN);
102             } catch (IOException e) {
103                 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
104             }
105             this.timerFd = timerFd = Native.newTimerFd();
106             try {
107                 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
108             } catch (IOException e) {
109                 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
110             }
111             success = true;
112         } finally {
113             if (!success) {
114                 if (epollFd != null) {
115                     try {
116                         epollFd.close();
117                     } catch (Exception e) {
118                         // ignore
119                     }
120                 }
121                 if (eventFd != null) {
122                     try {
123                         eventFd.close();
124                     } catch (Exception e) {
125                         // ignore
126                     }
127                 }
128                 if (timerFd != null) {
129                     try {
130                         timerFd.close();
131                     } catch (Exception e) {
132                         // ignore
133                     }
134                 }
135             }
136         }
137     }
138 
139     /**
140      * Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
141      */
142     IovArray cleanIovArray() {
143         if (iovArray == null) {
144             iovArray = new IovArray();
145         } else {
146             iovArray.clear();
147         }
148         return iovArray;
149     }
150 
151     /**
152      * Return a cleared {@link NativeDatagramPacketArray} that can be used for writes in this {@link EventLoop}.
153      */
154     NativeDatagramPacketArray cleanDatagramPacketArray() {
155         if (datagramPacketArray == null) {
156             datagramPacketArray = new NativeDatagramPacketArray();
157         } else {
158             datagramPacketArray.clear();
159         }
160         return datagramPacketArray;
161     }
162 
163     @Override
164     protected void wakeup(boolean inEventLoop) {
165         if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
166             // write to the evfd which will then wake-up epoll_wait(...)
167             Native.eventFdWrite(eventFd.intValue(), 1L);
168         }
169     }
170 
171     /**
172      * Register the given epoll with this {@link EventLoop}.
173      */
174     void add(AbstractEpollChannel ch) throws IOException {
175         assert inEventLoop();
176         int fd = ch.socket.intValue();
177         Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
178         channels.put(fd, ch);
179     }
180 
181     /**
182      * The flags of the given epoll was modified so update the registration
183      */
184     void modify(AbstractEpollChannel ch) throws IOException {
185         assert inEventLoop();
186         Native.epollCtlMod(epollFd.intValue(), ch.socket.intValue(), ch.flags);
187     }
188 
189     /**
190      * Deregister the given epoll from this {@link EventLoop}.
191      */
192     void remove(AbstractEpollChannel ch) throws IOException {
193         assert inEventLoop();
194 
195         if (ch.isOpen()) {
196             int fd = ch.socket.intValue();
197             if (channels.remove(fd) != null) {
198                 // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
199                 // removed once the file-descriptor is closed.
200                 Native.epollCtlDel(epollFd.intValue(), ch.fd().intValue());
201             }
202         }
203     }
204 
205     @Override
206     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
207         // This event loop never calls takeTask()
208         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
209                                                     : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
210     }
211 
212     /**
213      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
214      */
215     public int getIoRatio() {
216         return ioRatio;
217     }
218 
219     /**
220      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
221      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
222      */
223     public void setIoRatio(int ioRatio) {
224         if (ioRatio <= 0 || ioRatio > 100) {
225             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
226         }
227         this.ioRatio = ioRatio;
228     }
229 
230     private int epollWait(boolean oldWakeup) throws IOException {
231         // If a task was submitted when wakenUp value was 1, the task didn't get a chance to produce wakeup event.
232         // So we need to check task queue again before calling epoll_wait. If we don't, the task might be pended
233         // until epoll_wait was timed out. It might be pended until idle timeout if IdleStateHandler existed
234         // in pipeline.
235         if (oldWakeup && hasTasks()) {
236             return epollWaitNow();
237         }
238 
239         long totalDelay = delayNanos(System.nanoTime());
240         int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
241         return Native.epollWait(epollFd, events, timerFd, delaySeconds,
242                 (int) min(MAX_SCHEDULED_TIMERFD_NS, totalDelay - delaySeconds * 1000000000L));
243     }
244 
245     private int epollWaitNow() throws IOException {
246         return Native.epollWait(epollFd, events, timerFd, 0, 0);
247     }
248 
249     @Override
250     protected void run() {
251         for (;;) {
252             try {
253                 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
254                 switch (strategy) {
255                     case SelectStrategy.CONTINUE:
256                         continue;
257                     case SelectStrategy.SELECT:
258                         strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
259 
260                         // 'wakenUp.compareAndSet(false, true)' is always evaluated
261                         // before calling 'selector.wakeup()' to reduce the wake-up
262                         // overhead. (Selector.wakeup() is an expensive operation.)
263                         //
264                         // However, there is a race condition in this approach.
265                         // The race condition is triggered when 'wakenUp' is set to
266                         // true too early.
267                         //
268                         // 'wakenUp' is set to true too early if:
269                         // 1) Selector is waken up between 'wakenUp.set(false)' and
270                         //    'selector.select(...)'. (BAD)
271                         // 2) Selector is waken up between 'selector.select(...)' and
272                         //    'if (wakenUp.get()) { ... }'. (OK)
273                         //
274                         // In the first case, 'wakenUp' is set to true and the
275                         // following 'selector.select(...)' will wake up immediately.
276                         // Until 'wakenUp' is set to false again in the next round,
277                         // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
278                         // any attempt to wake up the Selector will fail, too, causing
279                         // the following 'selector.select(...)' call to block
280                         // unnecessarily.
281                         //
282                         // To fix this problem, we wake up the selector again if wakenUp
283                         // is true immediately after selector.select(...).
284                         // It is inefficient in that it wakes up the selector for both
285                         // the first case (BAD - wake-up required) and the second case
286                         // (OK - no wake-up required).
287 
288                         if (wakenUp == 1) {
289                             Native.eventFdWrite(eventFd.intValue(), 1L);
290                         }
291                         // fallthrough
292                     default:
293                 }
294 
295                 final int ioRatio = this.ioRatio;
296                 if (ioRatio == 100) {
297                     try {
298                         if (strategy > 0) {
299                             processReady(events, strategy);
300                         }
301                     } finally {
302                         // Ensure we always run tasks.
303                         runAllTasks();
304                     }
305                 } else {
306                     final long ioStartTime = System.nanoTime();
307 
308                     try {
309                         if (strategy > 0) {
310                             processReady(events, strategy);
311                         }
312                     } finally {
313                         // Ensure we always run tasks.
314                         final long ioTime = System.nanoTime() - ioStartTime;
315                         runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
316                     }
317                 }
318                 if (allowGrowing && strategy == events.length()) {
319                     //increase the size of the array as we needed the whole space for the events
320                     events.increase();
321                 }
322             } catch (Throwable t) {
323                 handleLoopException(t);
324             }
325             // Always handle shutdown even if the loop processing threw an exception.
326             try {
327                 if (isShuttingDown()) {
328                     closeAll();
329                     if (confirmShutdown()) {
330                         break;
331                     }
332                 }
333             } catch (Throwable t) {
334                 handleLoopException(t);
335             }
336         }
337     }
338 
339     private static void handleLoopException(Throwable t) {
340         logger.warn("Unexpected exception in the selector loop.", t);
341 
342         // Prevent possible consecutive immediate failures that lead to
343         // excessive CPU consumption.
344         try {
345             Thread.sleep(1000);
346         } catch (InterruptedException e) {
347             // Ignore.
348         }
349     }
350 
351     private void closeAll() {
352         try {
353             epollWaitNow();
354         } catch (IOException ignore) {
355             // ignore on close
356         }
357         // Using the intermediate collection to prevent ConcurrentModificationException.
358         // In the `close()` method, the channel is deleted from `channels` map.
359         Collection<AbstractEpollChannel> array = new ArrayList<AbstractEpollChannel>(channels.size());
360 
361         for (AbstractEpollChannel channel: channels.values()) {
362             array.add(channel);
363         }
364 
365         for (AbstractEpollChannel ch: array) {
366             ch.unsafe().close(ch.unsafe().voidPromise());
367         }
368     }
369 
370     private void processReady(EpollEventArray events, int ready) {
371         for (int i = 0; i < ready; i ++) {
372             final int fd = events.fd(i);
373             if (fd == eventFd.intValue()) {
374                 // consume wakeup event.
375                 Native.eventFdRead(fd);
376             } else if (fd == timerFd.intValue()) {
377                 // consume wakeup event, necessary because the timer is added with ET mode.
378                 Native.timerFdRead(fd);
379             } else {
380                 final long ev = events.events(i);
381 
382                 AbstractEpollChannel ch = channels.get(fd);
383                 if (ch != null) {
384                     // Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
385                     // sure about it!
386                     // Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
387                     // past.
388                     AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
389 
390                     // First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
391                     // to read from the file descriptor.
392                     // See https://github.com/netty/netty/issues/3785
393                     //
394                     // It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
395                     // In either case epollOutReady() will do the correct thing (finish connecting, or fail
396                     // the connection).
397                     // See https://github.com/netty/netty/issues/3848
398                     if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
399                         // Force flush of data as the epoll is writable again
400                         unsafe.epollOutReady();
401                     }
402 
403                     // Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
404                     // See https://github.com/netty/netty/issues/4317.
405                     //
406                     // If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
407                     // try to read from the underlying file descriptor and so notify the user about the error.
408                     if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
409                         // The Channel is still open and there is something to read. Do it now.
410                         unsafe.epollInReady();
411                     }
412 
413                     // Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
414                     // we may close the channel directly or try to read more data depending on the state of the
415                     // Channel and als depending on the AbstractEpollChannel subtype.
416                     if ((ev & Native.EPOLLRDHUP) != 0) {
417                         unsafe.epollRdHupReady();
418                     }
419                 } else {
420                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
421                     try {
422                         Native.epollCtlDel(epollFd.intValue(), fd);
423                     } catch (IOException ignore) {
424                         // This can happen but is nothing we need to worry about as we only try to delete
425                         // the fd from the epoll set as we not found it in our mappings. So this call to
426                         // epollCtlDel(...) is just to ensure we cleanup stuff and so may fail if it was
427                         // deleted before or the file descriptor was closed before.
428                     }
429                 }
430             }
431         }
432     }
433 
434     @Override
435     protected void cleanup() {
436         try {
437             try {
438                 epollFd.close();
439             } catch (IOException e) {
440                 logger.warn("Failed to close the epoll fd.", e);
441             }
442             try {
443                 eventFd.close();
444             } catch (IOException e) {
445                 logger.warn("Failed to close the event fd.", e);
446             }
447             try {
448                 timerFd.close();
449             } catch (IOException e) {
450                 logger.warn("Failed to close the timer fd.", e);
451             }
452         } finally {
453             // release native memory
454             if (iovArray != null) {
455                 iovArray.release();
456                 iovArray = null;
457             }
458             if (datagramPacketArray != null) {
459                 datagramPacketArray.release();
460                 datagramPacketArray = null;
461             }
462             events.free();
463         }
464     }
465 }