1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.AbstractScheduledEventExecutor;
19 import io.netty.util.concurrent.DefaultPromise;
20 import io.netty.util.concurrent.EventExecutor;
21 import io.netty.util.concurrent.Future;
22 import io.netty.util.concurrent.GlobalEventExecutor;
23 import io.netty.util.concurrent.Promise;
24 import io.netty.util.concurrent.Ticker;
25 import io.netty.util.internal.ObjectUtil;
26 import io.netty.util.internal.PlatformDependent;
27 import io.netty.util.internal.ThreadExecutorMap;
28
29 import java.util.Collection;
30 import java.util.List;
31 import java.util.Objects;
32 import java.util.Queue;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40
41
42
43
44
45
46
47
48
49
50 public class ManualIoEventLoop extends AbstractScheduledEventExecutor implements IoEventLoop {
51 private static final Runnable WAKEUP_TASK = () -> {
52
53 };
54 private static final int ST_STARTED = 0;
55 private static final int ST_SHUTTING_DOWN = 1;
56 private static final int ST_SHUTDOWN = 2;
57 private static final int ST_TERMINATED = 3;
58
59 private final AtomicInteger state;
60 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
61 private final Queue<Runnable> taskQueue = PlatformDependent.newMpscQueue();
62 private final IoHandlerContext nonBlockingContext = new IoHandlerContext() {
63 @Override
64 public boolean canBlock() {
65 assert inEventLoop();
66 return false;
67 }
68
69 @Override
70 public long delayNanos(long currentTimeNanos) {
71 assert inEventLoop();
72 return 0;
73 }
74
75 @Override
76 public long deadlineNanos() {
77 assert inEventLoop();
78 return -1;
79 }
80 };
81 private final BlockingIoHandlerContext blockingContext = new BlockingIoHandlerContext();
82 private final IoEventLoopGroup parent;
83 private final AtomicReference<Thread> owningThread;
84 private final IoHandler handler;
85 private final Ticker ticker;
86
87 private volatile long gracefulShutdownQuietPeriod;
88 private volatile long gracefulShutdownTimeout;
89 private long gracefulShutdownStartTime;
90 private long lastExecutionTime;
91 private boolean initialized;
92
93
94
95
96
97 protected boolean canBlock() {
98 return true;
99 }
100
101
102
103
104
105
106
107
108
109
110
111
112 public ManualIoEventLoop(Thread owningThread, IoHandlerFactory factory) {
113 this(null, owningThread, factory);
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129 public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory) {
130 this(parent, owningThread, factory, Ticker.systemTicker());
131 }
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149 public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory, Ticker ticker) {
150 this.parent = parent;
151 this.owningThread = new AtomicReference<>(owningThread);
152 this.handler = factory.newHandler(this);
153 this.ticker = Objects.requireNonNull(ticker, "ticker");
154 state = new AtomicInteger(ST_STARTED);
155 }
156
157 @Override
158 public final Ticker ticker() {
159 return ticker;
160 }
161
162
163
164
165
166
167
168 public final int runNonBlockingTasks(long timeoutNanos) {
169 return runAllTasks(timeoutNanos, true);
170 }
171
172 private int runAllTasks(long timeoutNanos, boolean setCurrentExecutor) {
173 assert inEventLoop();
174 final Queue<Runnable> taskQueue = this.taskQueue;
175
176 boolean alwaysTrue = fetchFromScheduledTaskQueue(taskQueue);
177 assert alwaysTrue;
178 Runnable task = taskQueue.poll();
179 if (task == null) {
180 return 0;
181 }
182 EventExecutor old = setCurrentExecutor? ThreadExecutorMap.setCurrentExecutor(this) : null;
183 try {
184 final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
185 int runTasks = 0;
186 long lastExecutionTime;
187 final Ticker ticker = this.ticker;
188 for (;;) {
189 safeExecute(task);
190
191 runTasks++;
192
193 if (timeoutNanos > 0) {
194 lastExecutionTime = ticker.nanoTime();
195 if ((lastExecutionTime - deadline) >= 0) {
196 break;
197 }
198 }
199
200 task = taskQueue.poll();
201 if (task == null) {
202 lastExecutionTime = ticker.nanoTime();
203 break;
204 }
205 }
206 this.lastExecutionTime = lastExecutionTime;
207 return runTasks;
208 } finally {
209 if (setCurrentExecutor) {
210 ThreadExecutorMap.setCurrentExecutor(old);
211 }
212 }
213 }
214
215 private int run(IoHandlerContext context, long runAllTasksTimeoutNanos) {
216 if (!initialized) {
217 if (owningThread.get() == null) {
218 throw new IllegalStateException("Owning thread not set");
219 }
220 initialized = true;
221 handler.initialize();
222 }
223 EventExecutor old = ThreadExecutorMap.setCurrentExecutor(this);
224 try {
225 if (isShuttingDown()) {
226 if (terminationFuture.isDone()) {
227
228 return 0;
229 }
230 return runAllTasksBeforeDestroy();
231 }
232 final int ioTasks = handler.run(context);
233
234 if (runAllTasksTimeoutNanos < 0) {
235 return ioTasks;
236 }
237 assert runAllTasksTimeoutNanos >= 0;
238 return ioTasks + runAllTasks(runAllTasksTimeoutNanos, false);
239 } finally {
240 ThreadExecutorMap.setCurrentExecutor(old);
241 }
242 }
243
244 private int runAllTasksBeforeDestroy() {
245
246 int run = runAllTasks(-1, false);
247 handler.prepareToDestroy();
248 if (confirmShutdown()) {
249
250 try {
251 handler.destroy();
252 for (;;) {
253 int r = runAllTasks(-1, false);
254 run += r;
255 if (r == 0) {
256 break;
257 }
258 }
259 } finally {
260 state.set(ST_TERMINATED);
261 terminationFuture.setSuccess(null);
262 }
263 }
264 return run;
265 }
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280 public final int runNow(long runAllTasksTimeoutNanos) {
281 checkCurrentThread();
282 return run(nonBlockingContext, runAllTasksTimeoutNanos);
283 }
284
285
286
287
288
289
290
291
292
293
294 public final int runNow() {
295 checkCurrentThread();
296 return run(nonBlockingContext, 0);
297 }
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313 public final int run(long waitNanos, long runAllTasksTimeoutNanos) {
314 checkCurrentThread();
315
316 final IoHandlerContext context;
317 if (waitNanos < 0) {
318 context = nonBlockingContext;
319 } else {
320 context = blockingContext;
321 blockingContext.maxBlockingNanos = waitNanos == 0 ? Long.MAX_VALUE : waitNanos;
322 }
323 return run(context, runAllTasksTimeoutNanos);
324 }
325
326
327
328
329
330
331
332
333
334
335
336
337
338 public final int run(long waitNanos) {
339 return run(waitNanos, 0);
340 }
341
342 private void checkCurrentThread() {
343 if (!inEventLoop(Thread.currentThread())) {
344 throw new IllegalStateException();
345 }
346 }
347
348
349
350
351 public final void wakeup() {
352 if (isShuttingDown()) {
353 return;
354 }
355 handler.wakeup();
356 }
357
358 @Override
359 public final ManualIoEventLoop next() {
360 return this;
361 }
362
363 @Override
364 public final IoEventLoopGroup parent() {
365 return parent;
366 }
367
368 @Deprecated
369 @Override
370 public final ChannelFuture register(Channel channel) {
371 return register(new DefaultChannelPromise(channel, this));
372 }
373
374 @Deprecated
375 @Override
376 public final ChannelFuture register(final ChannelPromise promise) {
377 ObjectUtil.checkNotNull(promise, "promise");
378 promise.channel().unsafe().register(this, promise);
379 return promise;
380 }
381
382 @Override
383 public final Future<IoRegistration> register(final IoHandle handle) {
384 Promise<IoRegistration> promise = newPromise();
385 if (inEventLoop()) {
386 registerForIo0(handle, promise);
387 } else {
388 execute(() -> registerForIo0(handle, promise));
389 }
390
391 return promise;
392 }
393
394 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
395 assert inEventLoop();
396 final IoRegistration registration;
397 try {
398 registration = handler.register(handle);
399 } catch (Exception e) {
400 promise.setFailure(e);
401 return;
402 }
403 promise.setSuccess(registration);
404 }
405
406 @Deprecated
407 @Override
408 public final ChannelFuture register(final Channel channel, final ChannelPromise promise) {
409 ObjectUtil.checkNotNull(promise, "promise");
410 ObjectUtil.checkNotNull(channel, "channel");
411 channel.unsafe().register(this, promise);
412 return promise;
413 }
414
415 @Override
416 public final boolean isCompatible(Class<? extends IoHandle> handleType) {
417 return handler.isCompatible(handleType);
418 }
419
420 @Override
421 public final boolean isIoType(Class<? extends IoHandler> handlerType) {
422 return handler.getClass().equals(handlerType);
423 }
424
425 @Override
426 public final boolean inEventLoop(Thread thread) {
427 return this.owningThread.get() == thread;
428 }
429
430
431
432
433
434
435
436 public final void setOwningThread(Thread owningThread) {
437 Objects.requireNonNull(owningThread, "owningThread");
438 if (!this.owningThread.compareAndSet(null, owningThread)) {
439 throw new IllegalStateException("Owning thread already set");
440 }
441 }
442
443 private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
444 boolean inEventLoop = inEventLoop();
445 boolean wakeup;
446 int oldState;
447 for (;;) {
448 if (isShuttingDown()) {
449 return;
450 }
451 int newState;
452 wakeup = true;
453 oldState = state.get();
454 if (inEventLoop) {
455 newState = shutdownState;
456 } else if (oldState == ST_STARTED) {
457 newState = shutdownState;
458 } else {
459 newState = oldState;
460 wakeup = false;
461 }
462
463 if (state.compareAndSet(oldState, newState)) {
464 break;
465 }
466 }
467 if (quietPeriod != -1) {
468 gracefulShutdownQuietPeriod = quietPeriod;
469 }
470 if (timeout != -1) {
471 gracefulShutdownTimeout = timeout;
472 }
473
474 if (wakeup) {
475
476 taskQueue.offer(WAKEUP_TASK);
477 handler.wakeup();
478 }
479 }
480
481 @Override
482 public final Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
483 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
484 if (timeout < quietPeriod) {
485 throw new IllegalArgumentException(
486 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
487 }
488 ObjectUtil.checkNotNull(unit, "unit");
489
490 shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
491 return terminationFuture();
492 }
493
494 @Override
495 @Deprecated
496 public final void shutdown() {
497 shutdown0(-1, -1, ST_SHUTDOWN);
498 }
499
500 @Override
501 public final Future<?> terminationFuture() {
502 return terminationFuture;
503 }
504
505 @Override
506 public final boolean isShuttingDown() {
507 return state.get() >= ST_SHUTTING_DOWN;
508 }
509
510 @Override
511 public final boolean isShutdown() {
512 return state.get() >= ST_SHUTDOWN;
513 }
514
515 @Override
516 public final boolean isTerminated() {
517 return state.get() == ST_TERMINATED;
518 }
519
520 @Override
521 public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
522 return terminationFuture.await(timeout, unit);
523 }
524
525 @Override
526 public final void execute(Runnable command) {
527 Objects.requireNonNull(command, "command");
528 boolean inEventLoop = inEventLoop();
529 if (inEventLoop) {
530 if (isShutdown()) {
531 throw new RejectedExecutionException("event executor terminated");
532 }
533 }
534 taskQueue.add(command);
535 if (!inEventLoop) {
536 if (isShutdown()) {
537 boolean reject = false;
538 try {
539 if (taskQueue.remove(command)) {
540 reject = true;
541 }
542 } catch (UnsupportedOperationException e) {
543
544
545
546 }
547 if (reject) {
548 throw new RejectedExecutionException("event executor terminated");
549 }
550 }
551 handler.wakeup();
552 }
553 }
554
555 private boolean hasTasks() {
556 return !taskQueue.isEmpty();
557 }
558
559 private boolean confirmShutdown() {
560 if (!isShuttingDown()) {
561 return false;
562 }
563
564 if (!inEventLoop()) {
565 throw new IllegalStateException("must be invoked from an event loop");
566 }
567
568 cancelScheduledTasks();
569
570 if (gracefulShutdownStartTime == 0) {
571 gracefulShutdownStartTime = ticker.nanoTime();
572 }
573
574 if (runAllTasks(-1, false) > 0) {
575 if (isShutdown()) {
576
577 return true;
578 }
579
580
581
582
583 if (gracefulShutdownQuietPeriod == 0) {
584 return true;
585 }
586 return false;
587 }
588
589 final long nanoTime = ticker.nanoTime();
590
591 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
592 return true;
593 }
594
595 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
596 try {
597 Thread.sleep(100);
598 } catch (InterruptedException e) {
599
600 }
601
602 return false;
603 }
604
605
606
607 return true;
608 }
609
610 @Override
611 public final <T> T invokeAny(Collection<? extends Callable<T>> tasks)
612 throws InterruptedException, ExecutionException {
613
614 throwIfInEventLoop("invokeAny");
615 return super.invokeAny(tasks);
616 }
617
618 @Override
619 public final <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
620 throws InterruptedException, ExecutionException, TimeoutException {
621
622 throwIfInEventLoop("invokeAny");
623 return super.invokeAny(tasks, timeout, unit);
624 }
625
626 @Override
627 public final <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
628 throws InterruptedException {
629
630 throwIfInEventLoop("invokeAll");
631 return super.invokeAll(tasks);
632 }
633
634 @Override
635 public final <T> List<java.util.concurrent.Future<T>> invokeAll(
636 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
637
638 throwIfInEventLoop("invokeAll");
639 return super.invokeAll(tasks, timeout, unit);
640 }
641
642 private void throwIfInEventLoop(String method) {
643 if (inEventLoop()) {
644 throw new RejectedExecutionException(
645 "Calling " + method + " from within the EventLoop is not allowed as it would deadlock");
646 }
647 }
648
649 private class BlockingIoHandlerContext implements IoHandlerContext {
650
651 long maxBlockingNanos = Long.MAX_VALUE;
652
653 @Override
654 public boolean canBlock() {
655 assert inEventLoop();
656 return !hasTasks() && !hasScheduledTasks() && ManualIoEventLoop.this.canBlock();
657 }
658
659 @Override
660 public long delayNanos(long currentTimeNanos) {
661 assert inEventLoop();
662 return Math.min(maxBlockingNanos, ManualIoEventLoop.this.delayNanos(currentTimeNanos, maxBlockingNanos));
663 }
664
665 @Override
666 public long deadlineNanos() {
667 assert inEventLoop();
668 long next = nextScheduledTaskDeadlineNanos();
669 if (maxBlockingNanos == Long.MAX_VALUE) {
670
671 return next;
672 }
673 long now = ticker.nanoTime();
674
675 if (next == -1 || next - now > maxBlockingNanos) {
676 return now + maxBlockingNanos;
677 }
678 return next;
679 }
680 }
681 }