1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.EventLoop;
19 import io.netty.channel.EventLoopGroup;
20 import io.netty.channel.SelectStrategy;
21 import io.netty.channel.SingleThreadEventLoop;
22 import io.netty.channel.epoll.AbstractEpollChannel.AbstractEpollUnsafe;
23 import io.netty.channel.unix.FileDescriptor;
24 import io.netty.util.IntSupplier;
25 import io.netty.util.collection.IntObjectHashMap;
26 import io.netty.util.collection.IntObjectMap;
27 import io.netty.util.concurrent.RejectedExecutionHandler;
28 import io.netty.util.internal.ObjectUtil;
29 import io.netty.util.internal.PlatformDependent;
30 import io.netty.util.internal.logging.InternalLogger;
31 import io.netty.util.internal.logging.InternalLoggerFactory;
32
33 import java.io.IOException;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Queue;
37 import java.util.concurrent.ThreadFactory;
38 import java.util.concurrent.Callable;
39 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
40
41 import static java.lang.Math.min;
42
43
44
45
46 final class EpollEventLoop extends SingleThreadEventLoop {
47 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
48 private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER =
49 AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");
50
51 private final FileDescriptor epollFd;
52 private final FileDescriptor eventFd;
53 private final FileDescriptor timerFd;
54 private final IntObjectMap<AbstractEpollChannel> channels = new IntObjectHashMap<AbstractEpollChannel>(4096);
55 private final boolean allowGrowing;
56 private final EpollEventArray events;
57 private final IovArray iovArray = new IovArray();
58 private final SelectStrategy selectStrategy;
59 private final IntSupplier selectNowSupplier = new IntSupplier() {
60 @Override
61 public int get() throws Exception {
62 return epollWaitNow();
63 }
64 };
65
66 private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
67 @Override
68 public Integer call() throws Exception {
69 return EpollEventLoop.super.pendingTasks();
70 }
71 };
72
73 @SuppressWarnings("unused")
74 private volatile int wakenUp;
75 private volatile int ioRatio = 50;
76
77 EpollEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, int maxEvents,
78 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
79 super(parent, threadFactory, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
80 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
81 if (maxEvents == 0) {
82 allowGrowing = true;
83 events = new EpollEventArray(4096);
84 } else {
85 allowGrowing = false;
86 events = new EpollEventArray(maxEvents);
87 }
88 boolean success = false;
89 FileDescriptor epollFd = null;
90 FileDescriptor eventFd = null;
91 FileDescriptor timerFd = null;
92 try {
93 this.epollFd = epollFd = Native.newEpollCreate();
94 this.eventFd = eventFd = Native.newEventFd();
95 try {
96 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN);
97 } catch (IOException e) {
98 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
99 }
100 this.timerFd = timerFd = Native.newTimerFd();
101 try {
102 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
103 } catch (IOException e) {
104 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
105 }
106 success = true;
107 } finally {
108 if (!success) {
109 if (epollFd != null) {
110 try {
111 epollFd.close();
112 } catch (Exception e) {
113
114 }
115 }
116 if (eventFd != null) {
117 try {
118 eventFd.close();
119 } catch (Exception e) {
120
121 }
122 }
123 if (timerFd != null) {
124 try {
125 timerFd.close();
126 } catch (Exception e) {
127
128 }
129 }
130 }
131 }
132 }
133
134
135
136
137 IovArray cleanArray() {
138 iovArray.clear();
139 return iovArray;
140 }
141
142 @Override
143 protected void wakeup(boolean inEventLoop) {
144 if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {
145
146 Native.eventFdWrite(eventFd.intValue(), 1L);
147 }
148 }
149
150
151
152
153 void add(AbstractEpollChannel ch) throws IOException {
154 assert inEventLoop();
155 int fd = ch.fd().intValue();
156 Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
157 channels.put(fd, ch);
158 }
159
160
161
162
163 void modify(AbstractEpollChannel ch) throws IOException {
164 assert inEventLoop();
165 Native.epollCtlMod(epollFd.intValue(), ch.fd().intValue(), ch.flags);
166 }
167
168
169
170
171 void remove(AbstractEpollChannel ch) throws IOException {
172 assert inEventLoop();
173
174 if (ch.isOpen()) {
175 int fd = ch.fd().intValue();
176 if (channels.remove(fd) != null) {
177
178
179 Native.epollCtlDel(epollFd.intValue(), ch.fd().intValue());
180 }
181 }
182 }
183
184 @Override
185 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
186
187 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
188 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
189 }
190
191 @Override
192 public int pendingTasks() {
193
194
195
196 if (inEventLoop()) {
197 return super.pendingTasks();
198 } else {
199 return submit(pendingTasksCallable).syncUninterruptibly().getNow();
200 }
201 }
202
203
204
205 public int getIoRatio() {
206 return ioRatio;
207 }
208
209
210
211
212
213 public void setIoRatio(int ioRatio) {
214 if (ioRatio <= 0 || ioRatio > 100) {
215 throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
216 }
217 this.ioRatio = ioRatio;
218 }
219
220 private int epollWait(boolean oldWakeup) throws IOException {
221
222
223
224
225 if (oldWakeup && hasTasks()) {
226 return epollWaitNow();
227 }
228
229 long totalDelay = delayNanos(System.nanoTime());
230 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
231 return Native.epollWait(epollFd, events, timerFd, delaySeconds,
232 (int) min(totalDelay - delaySeconds * 1000000000L, Integer.MAX_VALUE));
233 }
234
235 private int epollWaitNow() throws IOException {
236 return Native.epollWait(epollFd, events, timerFd, 0, 0);
237 }
238
239 @Override
240 protected void run() {
241 for (;;) {
242 try {
243 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
244 switch (strategy) {
245 case SelectStrategy.CONTINUE:
246 continue;
247 case SelectStrategy.SELECT:
248 strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278 if (wakenUp == 1) {
279 Native.eventFdWrite(eventFd.intValue(), 1L);
280 }
281
282 default:
283 }
284
285 final int ioRatio = this.ioRatio;
286 if (ioRatio == 100) {
287 try {
288 if (strategy > 0) {
289 processReady(events, strategy);
290 }
291 } finally {
292
293 runAllTasks();
294 }
295 } else {
296 final long ioStartTime = System.nanoTime();
297
298 try {
299 if (strategy > 0) {
300 processReady(events, strategy);
301 }
302 } finally {
303
304 final long ioTime = System.nanoTime() - ioStartTime;
305 runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
306 }
307 }
308 if (allowGrowing && strategy == events.length()) {
309
310 events.increase();
311 }
312 } catch (Throwable t) {
313 handleLoopException(t);
314 }
315
316 try {
317 if (isShuttingDown()) {
318 closeAll();
319 if (confirmShutdown()) {
320 break;
321 }
322 }
323 } catch (Throwable t) {
324 handleLoopException(t);
325 }
326 }
327 }
328
329 private static void handleLoopException(Throwable t) {
330 logger.warn("Unexpected exception in the selector loop.", t);
331
332
333
334 try {
335 Thread.sleep(1000);
336 } catch (InterruptedException e) {
337
338 }
339 }
340
341 private void closeAll() {
342 try {
343 epollWaitNow();
344 } catch (IOException ignore) {
345
346 }
347
348
349 Collection<AbstractEpollChannel> array = new ArrayList<AbstractEpollChannel>(channels.size());
350
351 for (IntObjectMap.Entry<AbstractEpollChannel> entry: channels.entries()) {
352 array.add(entry.value());
353 }
354
355 for (AbstractEpollChannel ch: array) {
356 ch.unsafe().close(ch.unsafe().voidPromise());
357 }
358 }
359
360 private void processReady(EpollEventArray events, int ready) {
361 for (int i = 0; i < ready; i ++) {
362 final int fd = events.fd(i);
363 if (fd == eventFd.intValue()) {
364
365 Native.eventFdRead(fd);
366 } else if (fd == timerFd.intValue()) {
367
368 Native.timerFdRead(fd);
369 } else {
370 final long ev = events.events(i);
371
372 AbstractEpollChannel ch = channels.get(fd);
373 if (ch != null) {
374
375
376
377
378 AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) ch.unsafe();
379
380
381
382
383
384
385
386
387
388 if ((ev & (Native.EPOLLERR | Native.EPOLLOUT)) != 0) {
389
390 unsafe.epollOutReady();
391 }
392
393
394
395
396
397
398 if ((ev & (Native.EPOLLERR | Native.EPOLLIN)) != 0) {
399
400 unsafe.epollInReady();
401 }
402
403
404
405
406 if ((ev & Native.EPOLLRDHUP) != 0) {
407 unsafe.epollRdHupReady();
408 }
409 } else {
410
411 try {
412 Native.epollCtlDel(epollFd.intValue(), fd);
413 } catch (IOException ignore) {
414
415
416
417
418 }
419 }
420 }
421 }
422 }
423
424 @Override
425 protected void cleanup() {
426 try {
427 try {
428 epollFd.close();
429 } catch (IOException e) {
430 logger.warn("Failed to close the epoll fd.", e);
431 }
432 try {
433 eventFd.close();
434 } catch (IOException e) {
435 logger.warn("Failed to close the event fd.", e);
436 }
437 try {
438 timerFd.close();
439 } catch (IOException e) {
440 logger.warn("Failed to close the timer fd.", e);
441 }
442 } finally {
443
444 iovArray.release();
445 events.free();
446 }
447 }
448 }