View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.epoll;
17  
18  import io.netty.channel.EventLoop;
19  import io.netty.channel.EventLoopGroup;
20  import io.netty.channel.SingleThreadEventLoop;
21  import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
22  import io.netty.util.collection.IntObjectHashMap;
23  import io.netty.util.collection.IntObjectMap;
24  import io.netty.util.internal.PlatformDependent;
25  import io.netty.util.internal.logging.InternalLogger;
26  import io.netty.util.internal.logging.InternalLoggerFactory;
27  
28  import java.io.IOException;
29  import java.util.ArrayList;
30  import java.util.Collection;
31  import java.util.Queue;
32  import java.util.concurrent.Executor;
33  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
34  
35  /**
36   * A {@link SingleThreadEventLoop} implementation which uses <a href="http://en.wikipedia.org/wiki/Epoll">epoll</a>
37   * under the covers. This {@link EventLoop} works only on Linux systems!
38   */
39  final class EpollEventLoop extends SingleThreadEventLoop {
40      private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
41      private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER;
42  
43      static {
44          AtomicIntegerFieldUpdater<EpollEventLoop> updater =
45                  PlatformDependent.newAtomicIntegerFieldUpdater(EpollEventLoop.class, "wakenUp");
46          if (updater == null) {
47              updater = AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");
48          }
49          WAKEN_UP_UPDATER = updater;
50      }
51  
52      private final int epollFd;
53      private final int eventFd;
54      private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
55      private final boolean allowGrowing;
56      private final EpollEventArray events;
57  
58      @SuppressWarnings("unused")
59      private volatile int wakenUp;
60      private volatile int ioRatio = 50;
61  
62      EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents) {
63          super(parent, executor, false);
64          if (maxEvents == 0) {
65              allowGrowing = true;
66              events = new EpollEventArray(4096);
67          } else {
68              allowGrowing = false;
69              events = new EpollEventArray(maxEvents);
70          }
71          boolean success = false;
72          int epollFd = -1;
73          int eventFd = -1;
74          try {
75              this.epollFd = epollFd = Native.epollCreate();
76              this.eventFd = eventFd = Native.eventFd();
77              Native.epollCtlAdd(epollFd, eventFd, Native.EPOLLIN);
78              success = true;
79          } finally {
80              if (!success) {
81                  if (epollFd != -1) {
82                      try {
83                          Native.close(epollFd);
84                      } catch (Exception e) {
85                          // ignore
86                      }
87                  }
88                  if (eventFd != -1) {
89                      try {
90                          Native.close(eventFd);
91                      } catch (Exception e) {
92                          // ignore
93                      }
94                  }
95              }
96          }
97      }
98  
99      @Override
100     protected void wakeup(boolean inEventLoop) {
101         if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
102             // write to the evfd which will then wake-up epoll_wait(...)
103             Native.eventFdWrite(eventFd, 1L);
104         }
105     }
106 
107     /**
108      * Register the given epoll with this {@link io.netty.channel.EventLoop}.
109      */
110     void add(AbstractEpollChannel ch) {
111         assert inEventLoop();
112         int fd = ch.fd().intValue();
113         Native.epollCtlAdd(epollFd, fd, ch.flags);
114         channels.put(fd, ch);
115     }
116 
117     /**
118      * The flags of the given epoll was modified so update the registration
119      */
120     void modify(AbstractEpollChannel ch) {
121         assert inEventLoop();
122         Native.epollCtlMod(epollFd, ch.fd().intValue(), ch.flags);
123     }
124 
125     /**
126      * Deregister the given epoll from this {@link io.netty.channel.EventLoop}.
127      */
128     void remove(AbstractEpollChannel ch) {
129         assert inEventLoop();
130 
131         if (ch.isOpen()) {
132             int fd = ch.fd().intValue();
133             if (channels.remove(fd) != null) {
134                 // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
135                 // removed once the file-descriptor is closed.
136                 Native.epollCtlDel(epollFd, ch.fd().intValue());
137             }
138         }
139     }
140 
141     @Override
142     protected Queue<Runnable> newTaskQueue() {
143         // This event loop never calls takeTask()
144         return PlatformDependent.newMpscQueue();
145     }
146 
147     /**
148      * Returns the percentage of the desired amount of time spent for I/O in the event loop.
149      */
150     public int getIoRatio() {
151         return ioRatio;
152     }
153 
154     /**
155      * Sets the percentage of the desired amount of time spent for I/O in the event loop.  The default value is
156      * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
157      */
158     public void setIoRatio(int ioRatio) {
159         if (ioRatio <= 0 || ioRatio > 100) {
160             throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
161         }
162         this.ioRatio = ioRatio;
163     }
164 
165     private int epollWait(boolean oldWakenUp) throws IOException {
166         int selectCnt = 0;
167         long currentTimeNanos = System.nanoTime();
168         long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
169         for (;;) {
170             long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
171             if (timeoutMillis <= 0) {
172                 if (selectCnt == 0) {
173                     int ready = Native.epollWait(epollFd, events, 0);
174                     if (ready > 0) {
175                         return ready;
176                     }
177                 }
178                 break;
179             }
180 
181             int selectedKeys = Native.epollWait(epollFd, events, (int) timeoutMillis);
182             selectCnt ++;
183 
184             if (selectedKeys != 0 || oldWakenUp || wakenUp == 1 || hasTasks() || hasScheduledTasks()) {
185                 // - Selected something,
186                 // - waken up by user, or
187                 // - the task queue has a pending task.
188                 // - a scheduled task is ready for processing
189                 return selectedKeys;
190             }
191             currentTimeNanos = System.nanoTime();
192         }
193         return 0;
194     }
195 
196     @Override
197     protected void run() {
198         boolean oldWakenUp = WAKEN_UP_UPDATER.getAndSet(this, 0) == 1;
199         try {
200             int ready;
201             if (hasTasks()) {
202                 // Non blocking just return what is ready directly without block
203                 ready = Native.epollWait(epollFd, events, 0);
204             } else {
205                 ready = epollWait(oldWakenUp);
206 
207                 // 'wakenUp.compareAndSet(false, true)' is always evaluated
208                 // before calling 'selector.wakeup()' to reduce the wake-up
209                 // overhead. (Selector.wakeup() is an expensive operation.)
210                 //
211                 // However, there is a race condition in this approach.
212                 // The race condition is triggered when 'wakenUp' is set to
213                 // true too early.
214                 //
215                 // 'wakenUp' is set to true too early if:
216                 // 1) Selector is waken up between 'wakenUp.set(false)' and
217                 //    'selector.select(...)'. (BAD)
218                 // 2) Selector is waken up between 'selector.select(...)' and
219                 //    'if (wakenUp.get()) { ... }'. (OK)
220                 //
221                 // In the first case, 'wakenUp' is set to true and the
222                 // following 'selector.select(...)' will wake up immediately.
223                 // Until 'wakenUp' is set to false again in the next round,
224                 // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
225                 // any attempt to wake up the Selector will fail, too, causing
226                 // the following 'selector.select(...)' call to block
227                 // unnecessarily.
228                 //
229                 // To fix this problem, we wake up the selector again if wakenUp
230                 // is true immediately after selector.select(...).
231                 // It is inefficient in that it wakes up the selector for both
232                 // the first case (BAD - wake-up required) and the second case
233                 // (OK - no wake-up required).
234 
235                 if (wakenUp == 1) {
236                     Native.eventFdWrite(eventFd, 1L);
237                 }
238             }
239 
240             final int ioRatio = this.ioRatio;
241             if (ioRatio == 100) {
242                 if (ready > 0) {
243                     processReady(events, ready);
244                 }
245                 runAllTasks();
246             } else {
247                 final long ioStartTime = System.nanoTime();
248 
249                 if (ready > 0) {
250                     processReady(events, ready);
251                 }
252 
253                 final long ioTime = System.nanoTime() - ioStartTime;
254                 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
255             }
256             if (allowGrowing && ready == events.length()) {
257                 //increase the size of the array as we needed the whole space for the events
258                 events.increase();
259             }
260             if (isShuttingDown()) {
261                 closeAll();
262                 if (confirmShutdown()) {
263                     cleanupAndTerminate(true);
264                     return;
265                 }
266             }
267         } catch (Throwable t) {
268             logger.warn("Unexpected exception in the selector loop.", t);
269 
270             // Prevent possible consecutive immediate failures that lead to
271             // excessive CPU consumption.
272             try {
273                 Thread.sleep(1000);
274             } catch (InterruptedException e) {
275                 // Ignore.
276             }
277         }
278 
279         scheduleExecution();
280     }
281 
282     private void closeAll() {
283         try {
284             Native.epollWait(epollFd, events, 0);
285         } catch (IOException ignore) {
286             // ignore on close
287         }
288         Collection<AbstractEpollChannel> array = new ArrayList<AbstractEpollChannel>(channels.size());
289 
290         for (IntObjectMap.Entry<AbstractEpollChannel> entry: channels.entries()) {
291             array.add(entry.value());
292         }
293 
294         for (AbstractEpollChannel ch: array) {
295             ch.unsafe().close(ch.unsafe().voidPromise());
296         }
297     }
298 
299     private void processReady(EpollEventArray events, int ready) {
300         for (int i = 0; i < ready; i ++) {
301             final int fd = events.fd(i);
302             if (fd == eventFd) {
303                 // consume wakeup event
304                 Native.eventFdRead(eventFd);
305             } else {
306                 final long ev = events.events(i);
307 
308                 AbstractEpollChannel ch = channels.get(fd);
309                 if (ch != null && ch.isOpen()) {
310                     boolean close = (ev & Native.EPOLLRDHUP) != 0;
311                     boolean read = (ev & Native.EPOLLIN) != 0;
312                     boolean write = (ev & Native.EPOLLOUT) != 0;
313 
314                     AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
315 
316                     if (close) {
317                         unsafe.epollRdHupReady();
318                     }
319 
320                     // We need to check if the channel is still open before try to trigger the
321                     // callbacks as otherwise we may trigger an IllegalStateException when try
322                     // to access the file descriptor.
323                     //
324                     // See https://github.com/netty/netty/issues/3443
325                     if (write && ch.isOpen()) {
326                         // force flush of data as the epoll is writable again
327                         unsafe.epollOutReady();
328                     }
329                     if (read && ch.isOpen()) {
330                         // Something is ready to read, so consume it now
331                         unsafe.epollInReady();
332                     }
333                 } else {
334                     // We received an event for an fd which we not use anymore. Remove it from the epoll_event set.
335                     Native.epollCtlDel(epollFd, fd);
336                 }
337             }
338         }
339     }
340 
341     @Override
342     protected void cleanup() {
343         try {
344             try {
345                 Native.close(epollFd);
346             } catch (IOException e) {
347                 logger.warn("Failed to close the epoll fd.", e);
348             }
349             try {
350                 Native.close(eventFd);
351             } catch (IOException e) {
352                 logger.warn("Failed to close the event fd.", e);
353             }
354         } finally {
355             // release native memory
356             events.free();
357         }
358     }
359 }