1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.channel.Channel;
19 import io.netty.channel.DefaultSelectStrategyFactory;
20 import io.netty.channel.IoHandlerContext;
21 import io.netty.channel.IoHandle;
22 import io.netty.channel.IoHandler;
23 import io.netty.channel.IoHandlerFactory;
24 import io.netty.channel.IoOps;
25 import io.netty.channel.IoRegistration;
26 import io.netty.channel.SelectStrategy;
27 import io.netty.channel.SelectStrategyFactory;
28 import io.netty.channel.unix.FileDescriptor;
29 import io.netty.util.IntSupplier;
30 import io.netty.util.collection.IntObjectHashMap;
31 import io.netty.util.collection.IntObjectMap;
32 import io.netty.util.concurrent.ThreadAwareExecutor;
33 import io.netty.util.internal.ObjectUtil;
34 import io.netty.util.internal.StringUtil;
35 import io.netty.util.internal.SystemPropertyUtil;
36 import io.netty.util.internal.UnstableApi;
37 import io.netty.util.internal.logging.InternalLogger;
38 import io.netty.util.internal.logging.InternalLoggerFactory;
39
40 import java.io.IOException;
41 import java.io.UncheckedIOException;
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.concurrent.atomic.AtomicBoolean;
46 import java.util.concurrent.atomic.AtomicLong;
47
48 import static java.lang.Math.min;
49 import static java.lang.System.nanoTime;
50
51
52
53
54 public class EpollIoHandler implements IoHandler {
55 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollIoHandler.class);
56 private static final long EPOLL_WAIT_MILLIS_THRESHOLD =
57 SystemPropertyUtil.getLong("io.netty.channel.epoll.epollWaitThreshold", 10);
58
59 static {
60
61
62 Epoll.ensureAvailability();
63 }
64
65
66 private long prevDeadlineNanos = nanoTime() - 1;
67 private FileDescriptor epollFd;
68 private FileDescriptor eventFd;
69 private FileDescriptor timerFd;
70 private final IntObjectMap<DefaultEpollIoRegistration> registrations = new IntObjectHashMap<>(4096);
71 private final boolean allowGrowing;
72 private final EpollEventArray events;
73 private final NativeArrays nativeArrays;
74
75 private final SelectStrategy selectStrategy;
76 private final IntSupplier selectNowSupplier = new IntSupplier() {
77 @Override
78 public int get() throws Exception {
79 return epollWaitNow();
80 }
81 };
82 private final ThreadAwareExecutor executor;
83
84 private static final long AWAKE = -1L;
85 private static final long NONE = Long.MAX_VALUE;
86
87
88
89
90
91 private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
92 private boolean pendingWakeup;
93
94 private int numChannels;
95
96
97 private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
98
99
100
101
102 public static IoHandlerFactory newFactory() {
103 return newFactory(0, DefaultSelectStrategyFactory.INSTANCE);
104 }
105
106
107
108
109 public static IoHandlerFactory newFactory(final int maxEvents,
110 final SelectStrategyFactory selectStrategyFactory) {
111 ObjectUtil.checkPositiveOrZero(maxEvents, "maxEvents");
112 ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
113 return executor -> new EpollIoHandler(executor, maxEvents, selectStrategyFactory.newSelectStrategy());
114 }
115
116
117 EpollIoHandler(ThreadAwareExecutor executor, int maxEvents, SelectStrategy strategy) {
118 this.executor = ObjectUtil.checkNotNull(executor, "executor");
119 selectStrategy = ObjectUtil.checkNotNull(strategy, "strategy");
120 if (maxEvents == 0) {
121 allowGrowing = true;
122 events = new EpollEventArray(4096);
123 } else {
124 allowGrowing = false;
125 events = new EpollEventArray(maxEvents);
126 }
127 nativeArrays = new NativeArrays();
128 openFileDescriptors();
129 }
130
131 private static EpollIoHandle cast(IoHandle handle) {
132 if (handle instanceof EpollIoHandle) {
133 return (EpollIoHandle) handle;
134 }
135 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
136 }
137
138 private static EpollIoOps cast(IoOps ops) {
139 if (ops instanceof EpollIoOps) {
140 return (EpollIoOps) ops;
141 }
142 throw new IllegalArgumentException("IoOps of type " + StringUtil.simpleClassName(ops) + " not supported");
143 }
144
145
146
147
148
149 @UnstableApi
150 public void openFileDescriptors() {
151 boolean success = false;
152 FileDescriptor epollFd = null;
153 FileDescriptor eventFd = null;
154 FileDescriptor timerFd = null;
155 try {
156 this.epollFd = epollFd = Native.newEpollCreate();
157 this.eventFd = eventFd = Native.newEventFd();
158 try {
159
160
161 Native.epollCtlAdd(epollFd.intValue(), eventFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
162 } catch (IOException e) {
163 throw new IllegalStateException("Unable to add eventFd filedescriptor to epoll", e);
164 }
165 this.timerFd = timerFd = Native.newTimerFd();
166 try {
167
168
169 Native.epollCtlAdd(epollFd.intValue(), timerFd.intValue(), Native.EPOLLIN | Native.EPOLLET);
170 } catch (IOException e) {
171 throw new IllegalStateException("Unable to add timerFd filedescriptor to epoll", e);
172 }
173 success = true;
174 } finally {
175 if (!success) {
176 closeFileDescriptor(epollFd);
177 closeFileDescriptor(eventFd);
178 closeFileDescriptor(timerFd);
179 }
180 }
181 }
182
183 private static void closeFileDescriptor(FileDescriptor fd) {
184 if (fd != null) {
185 try {
186 fd.close();
187 } catch (Exception e) {
188
189 }
190 }
191 }
192
193 @Override
194 public void wakeup() {
195 if (!executor.isExecutorThread(Thread.currentThread()) && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
196
197 Native.eventFdWrite(eventFd.intValue(), 1L);
198 }
199 }
200
201 @Override
202 public void prepareToDestroy() {
203
204
205 DefaultEpollIoRegistration[] copy = registrations.values().toArray(new DefaultEpollIoRegistration[0]);
206
207 for (DefaultEpollIoRegistration reg: copy) {
208 reg.close();
209 }
210 }
211
212 @Override
213 public void destroy() {
214 try {
215 closeFileDescriptors();
216 } finally {
217 nativeArrays.free();
218 events.free();
219 }
220 }
221
222 private final class DefaultEpollIoRegistration implements IoRegistration {
223 private final ThreadAwareExecutor executor;
224 private final AtomicBoolean canceled = new AtomicBoolean();
225 final EpollIoHandle handle;
226
227 DefaultEpollIoRegistration(ThreadAwareExecutor executor, EpollIoHandle handle) {
228 this.executor = executor;
229 this.handle = handle;
230 }
231
232 @SuppressWarnings("unchecked")
233 @Override
234 public <T> T attachment() {
235 return (T) nativeArrays;
236 }
237
238 @Override
239 public long submit(IoOps ops) {
240 EpollIoOps epollIoOps = cast(ops);
241 try {
242 if (!isValid()) {
243 return -1;
244 }
245 Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
246 return epollIoOps.value;
247 } catch (IOException e) {
248 throw new UncheckedIOException(e);
249 }
250 }
251
252 @Override
253 public boolean isValid() {
254 return !canceled.get();
255 }
256
257 @Override
258 public boolean cancel() {
259 if (!canceled.compareAndSet(false, true)) {
260 return false;
261 }
262 if (executor.isExecutorThread(Thread.currentThread())) {
263 cancel0();
264 } else {
265 executor.execute(this::cancel0);
266 }
267 return true;
268 }
269
270 private void cancel0() {
271 int fd = handle.fd().intValue();
272 DefaultEpollIoRegistration old = registrations.remove(fd);
273 if (old != null) {
274 if (old != this) {
275
276 registrations.put(fd, old);
277 return;
278 } else if (old.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
279 numChannels--;
280 }
281 if (handle.fd().isOpen()) {
282 try {
283
284
285 Native.epollCtlDel(epollFd.intValue(), fd);
286 } catch (IOException e) {
287 logger.debug("Unable to remove fd {} from epoll {}", fd, epollFd.intValue());
288 }
289 }
290 }
291 }
292
293 void close() {
294 try {
295 cancel();
296 } catch (Exception e) {
297 logger.debug("Exception during canceling " + this, e);
298 }
299 try {
300 handle.close();
301 } catch (Exception e) {
302 logger.debug("Exception during closing " + handle, e);
303 }
304 }
305
306 void handle(long ev) {
307 handle.handle(this, EpollIoOps.eventOf((int) ev));
308 }
309 }
310
311 @Override
312 public IoRegistration register(IoHandle handle)
313 throws Exception {
314 final EpollIoHandle epollHandle = cast(handle);
315 DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(executor, epollHandle);
316 int fd = epollHandle.fd().intValue();
317 Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
318 DefaultEpollIoRegistration old = registrations.put(fd, registration);
319
320
321
322 assert old == null || !old.isValid();
323
324 if (epollHandle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
325 numChannels++;
326 }
327 return registration;
328 }
329
330 @Override
331 public boolean isCompatible(Class<? extends IoHandle> handleType) {
332 return EpollIoHandle.class.isAssignableFrom(handleType);
333 }
334
335 int numRegisteredChannels() {
336 return numChannels;
337 }
338
339 List<Channel> registeredChannelsList() {
340 IntObjectMap<DefaultEpollIoRegistration> ch = registrations;
341 if (ch.isEmpty()) {
342 return Collections.emptyList();
343 }
344
345 List<Channel> channels = new ArrayList<>(ch.size());
346 for (DefaultEpollIoRegistration registration : ch.values()) {
347 if (registration.handle instanceof AbstractEpollChannel.AbstractEpollUnsafe) {
348 channels.add(((AbstractEpollChannel.AbstractEpollUnsafe) registration.handle).channel());
349 }
350 }
351 return Collections.unmodifiableList(channels);
352 }
353
354 private long epollWait(IoHandlerContext context, long deadlineNanos) throws IOException {
355 if (deadlineNanos == NONE) {
356 return Native.epollWait(epollFd, events, timerFd,
357 Integer.MAX_VALUE, 0, EPOLL_WAIT_MILLIS_THRESHOLD);
358 }
359 long totalDelay = context.delayNanos(System.nanoTime());
360 int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
361 int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
362 return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos, EPOLL_WAIT_MILLIS_THRESHOLD);
363 }
364
365 private int epollWaitNoTimerChange() throws IOException {
366 return Native.epollWait(epollFd, events, false);
367 }
368
369 private int epollWaitNow() throws IOException {
370 return Native.epollWait(epollFd, events, true);
371 }
372
373 private int epollBusyWait() throws IOException {
374 return Native.epollBusyWait(epollFd, events);
375 }
376
377 private int epollWaitTimeboxed() throws IOException {
378
379 return Native.epollWait(epollFd, events, 1000);
380 }
381
382 @Override
383 public int run(IoHandlerContext context) {
384 int handled = 0;
385 try {
386 int strategy = selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock());
387 switch (strategy) {
388 case SelectStrategy.CONTINUE:
389 return 0;
390
391 case SelectStrategy.BUSY_WAIT:
392 strategy = epollBusyWait();
393 break;
394
395 case SelectStrategy.SELECT:
396 if (pendingWakeup) {
397
398
399 strategy = epollWaitTimeboxed();
400 if (strategy != 0) {
401 break;
402 }
403
404
405 logger.warn("Missed eventfd write (not seen after > 1 second)");
406 pendingWakeup = false;
407 if (!context.canBlock()) {
408 break;
409 }
410
411 }
412
413 long curDeadlineNanos = context.deadlineNanos();
414 if (curDeadlineNanos == -1L) {
415 curDeadlineNanos = NONE;
416 }
417 nextWakeupNanos.set(curDeadlineNanos);
418 try {
419 if (context.canBlock()) {
420 if (curDeadlineNanos == prevDeadlineNanos) {
421
422 strategy = epollWaitNoTimerChange();
423 } else {
424
425 long result = epollWait(context, curDeadlineNanos);
426
427
428 strategy = Native.epollReady(result);
429 prevDeadlineNanos = Native.epollTimerWasUsed(result) ? curDeadlineNanos : NONE;
430 }
431 }
432 } finally {
433
434
435 if (nextWakeupNanos.get() == AWAKE || nextWakeupNanos.getAndSet(AWAKE) == AWAKE) {
436 pendingWakeup = true;
437 }
438 }
439
440 default:
441 }
442 if (strategy > 0) {
443 handled = strategy;
444 if (processReady(events, strategy)) {
445 prevDeadlineNanos = NONE;
446 }
447 }
448 if (allowGrowing && strategy == events.length()) {
449
450 events.increase();
451 }
452 } catch (Error e) {
453 throw e;
454 } catch (Throwable t) {
455 handleLoopException(t);
456 }
457 return handled;
458 }
459
460
461
462
463 void handleLoopException(Throwable t) {
464 logger.warn("Unexpected exception in the selector loop.", t);
465
466
467
468 try {
469 Thread.sleep(1000);
470 } catch (InterruptedException e) {
471
472 }
473 }
474
475
476 private boolean processReady(EpollEventArray events, int ready) {
477 boolean timerFired = false;
478 for (int i = 0; i < ready; i ++) {
479 final int fd = events.fd(i);
480 if (fd == eventFd.intValue()) {
481 pendingWakeup = false;
482 } else if (fd == timerFd.intValue()) {
483 timerFired = true;
484 } else {
485 final long ev = events.events(i);
486
487 DefaultEpollIoRegistration registration = registrations.get(fd);
488 if (registration != null) {
489 registration.handle(ev);
490 } else {
491
492 try {
493 Native.epollCtlDel(epollFd.intValue(), fd);
494 } catch (IOException ignore) {
495
496
497
498
499 }
500 }
501 }
502 }
503 return timerFired;
504 }
505
506
507
508
509
510
511
512 @UnstableApi
513 public void closeFileDescriptors() {
514
515 while (pendingWakeup) {
516 try {
517 int count = epollWaitTimeboxed();
518 if (count == 0) {
519
520 break;
521 }
522 for (int i = 0; i < count; i++) {
523 if (events.fd(i) == eventFd.intValue()) {
524 pendingWakeup = false;
525 break;
526 }
527 }
528 } catch (IOException ignore) {
529
530 }
531 }
532 try {
533 eventFd.close();
534 } catch (IOException e) {
535 logger.warn("Failed to close the event fd.", e);
536 }
537 try {
538 timerFd.close();
539 } catch (IOException e) {
540 logger.warn("Failed to close the timer fd.", e);
541 }
542
543 try {
544 epollFd.close();
545 } catch (IOException e) {
546 logger.warn("Failed to close the epoll fd.", e);
547 }
548 }
549 }