1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.AbstractScheduledEventExecutor;
19 import io.netty.util.concurrent.DefaultPromise;
20 import io.netty.util.concurrent.EventExecutor;
21 import io.netty.util.concurrent.Future;
22 import io.netty.util.concurrent.GlobalEventExecutor;
23 import io.netty.util.concurrent.Promise;
24 import io.netty.util.concurrent.Ticker;
25 import io.netty.util.internal.ObjectUtil;
26 import io.netty.util.internal.PlatformDependent;
27 import io.netty.util.internal.ThreadExecutorMap;
28
29 import java.util.Collection;
30 import java.util.List;
31 import java.util.Objects;
32 import java.util.Queue;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40
41
42
43
44
45
46
47
48
49
50 public final class ManualIoEventLoop extends AbstractScheduledEventExecutor implements IoEventLoop {
51 private static final int ST_STARTED = 0;
52 private static final int ST_SHUTTING_DOWN = 1;
53 private static final int ST_SHUTDOWN = 2;
54 private static final int ST_TERMINATED = 3;
55
56 private final AtomicInteger state;
57 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
58 private final Queue<Runnable> taskQueue = PlatformDependent.newMpscQueue();
59 private final IoHandlerContext nonBlockingContext = new IoHandlerContext() {
60 @Override
61 public boolean canBlock() {
62 assert inEventLoop();
63 return false;
64 }
65
66 @Override
67 public long delayNanos(long currentTimeNanos) {
68 assert inEventLoop();
69 return 0;
70 }
71
72 @Override
73 public long deadlineNanos() {
74 assert inEventLoop();
75 return -1;
76 }
77 };
78 private final BlockingIoHandlerContext blockingContext = new BlockingIoHandlerContext();
79 private final IoEventLoopGroup parent;
80 private final AtomicReference<Thread> owningThread;
81 private final IoHandler handler;
82 private final Ticker ticker;
83
84 private volatile long gracefulShutdownQuietPeriod;
85 private volatile long gracefulShutdownTimeout;
86 private long gracefulShutdownStartTime;
87 private long lastExecutionTime;
88 private boolean initialized;
89
90
91
92
93
94
95
96
97
98
99
100
101 public ManualIoEventLoop(Thread owningThread, IoHandlerFactory factory) {
102 this(null, owningThread, factory);
103 }
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory) {
119 this(parent, owningThread, factory, Ticker.systemTicker());
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory, Ticker ticker) {
139 this.parent = parent;
140 this.owningThread = new AtomicReference<>(owningThread);
141 this.handler = factory.newHandler(this);
142 this.ticker = Objects.requireNonNull(ticker, "ticker");
143 state = new AtomicInteger(ST_STARTED);
144 }
145
146 @Override
147 public Ticker ticker() {
148 return ticker;
149 }
150
151
152
153
154
155
156
157 public int runNonBlockingTasks(long timeoutNanos) {
158 return runAllTasks(timeoutNanos, true);
159 }
160
161 private int runAllTasks(long timeoutNanos, boolean setCurrentExecutor) {
162 assert inEventLoop();
163 final Queue<Runnable> taskQueue = this.taskQueue;
164
165 boolean alwaysTrue = fetchFromScheduledTaskQueue(taskQueue);
166 assert alwaysTrue;
167 Runnable task = taskQueue.poll();
168 if (task == null) {
169 return 0;
170 }
171 EventExecutor old = setCurrentExecutor? ThreadExecutorMap.setCurrentExecutor(this) : null;
172 try {
173 final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
174 int runTasks = 0;
175 long lastExecutionTime;
176 final Ticker ticker = this.ticker;
177 for (;;) {
178 safeExecute(task);
179
180 runTasks++;
181
182 if (timeoutNanos > 0) {
183 lastExecutionTime = ticker.nanoTime();
184 if ((lastExecutionTime - deadline) >= 0) {
185 break;
186 }
187 }
188
189 task = taskQueue.poll();
190 if (task == null) {
191 lastExecutionTime = ticker.nanoTime();
192 break;
193 }
194 }
195 this.lastExecutionTime = lastExecutionTime;
196 return runTasks;
197 } finally {
198 if (setCurrentExecutor) {
199 ThreadExecutorMap.setCurrentExecutor(old);
200 }
201 }
202 }
203
204 private int run(IoHandlerContext context, long runAllTasksTimeoutNanos) {
205 if (!initialized) {
206 if (owningThread.get() == null) {
207 throw new IllegalStateException("Owning thread not set");
208 }
209 initialized = true;
210 handler.initialize();
211 }
212 EventExecutor old = ThreadExecutorMap.setCurrentExecutor(this);
213 try {
214 if (isShuttingDown()) {
215 if (terminationFuture.isDone()) {
216
217 return 0;
218 }
219 return runAllTasksBeforeDestroy();
220 }
221 final int ioTasks = handler.run(context);
222
223 if (runAllTasksTimeoutNanos < 0) {
224 return ioTasks;
225 }
226 assert runAllTasksTimeoutNanos >= 0;
227 return ioTasks + runAllTasks(runAllTasksTimeoutNanos, false);
228 } finally {
229 ThreadExecutorMap.setCurrentExecutor(old);
230 }
231 }
232
233 private int runAllTasksBeforeDestroy() {
234
235 int run = runAllTasks(-1, false);
236 handler.prepareToDestroy();
237 if (confirmShutdown()) {
238
239 try {
240 handler.destroy();
241 for (;;) {
242 int r = runAllTasks(-1, false);
243 run += r;
244 if (r == 0) {
245 break;
246 }
247 }
248 } finally {
249 state.set(ST_TERMINATED);
250 terminationFuture.setSuccess(null);
251 }
252 }
253 return run;
254 }
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269 public int runNow(long runAllTasksTimeoutNanos) {
270 checkCurrentThread();
271 return run(nonBlockingContext, runAllTasksTimeoutNanos);
272 }
273
274
275
276
277
278
279
280
281
282
283 public int runNow() {
284 checkCurrentThread();
285 return run(nonBlockingContext, 0);
286 }
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302 public int run(long waitNanos, long runAllTasksTimeoutNanos) {
303 checkCurrentThread();
304
305 final IoHandlerContext context;
306 if (waitNanos < 0) {
307 context = nonBlockingContext;
308 } else {
309 context = blockingContext;
310 blockingContext.maxBlockingNanos = waitNanos == 0 ? Long.MAX_VALUE : waitNanos;
311 }
312 return run(context, runAllTasksTimeoutNanos);
313 }
314
315
316
317
318
319
320
321
322
323
324
325
326
327 public int run(long waitNanos) {
328 return run(waitNanos, 0);
329 }
330
331 private void checkCurrentThread() {
332 if (!inEventLoop(Thread.currentThread())) {
333 throw new IllegalStateException();
334 }
335 }
336
337
338
339
340 public void wakeup() {
341 if (isShuttingDown()) {
342 return;
343 }
344 handler.wakeup();
345 }
346
347 @Override
348 public ManualIoEventLoop next() {
349 return this;
350 }
351
352 @Override
353 public IoEventLoopGroup parent() {
354 return parent;
355 }
356
357 @Deprecated
358 @Override
359 public ChannelFuture register(Channel channel) {
360 return register(new DefaultChannelPromise(channel, this));
361 }
362
363 @Deprecated
364 @Override
365 public ChannelFuture register(final ChannelPromise promise) {
366 ObjectUtil.checkNotNull(promise, "promise");
367 promise.channel().unsafe().register(this, promise);
368 return promise;
369 }
370
371 @Override
372 public Future<IoRegistration> register(final IoHandle handle) {
373 Promise<IoRegistration> promise = newPromise();
374 if (inEventLoop()) {
375 registerForIo0(handle, promise);
376 } else {
377 execute(() -> registerForIo0(handle, promise));
378 }
379
380 return promise;
381 }
382
383 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
384 assert inEventLoop();
385 final IoRegistration registration;
386 try {
387 registration = handler.register(handle);
388 } catch (Exception e) {
389 promise.setFailure(e);
390 return;
391 }
392 promise.setSuccess(registration);
393 }
394
395 @Deprecated
396 @Override
397 public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
398 ObjectUtil.checkNotNull(promise, "promise");
399 ObjectUtil.checkNotNull(channel, "channel");
400 channel.unsafe().register(this, promise);
401 return promise;
402 }
403
404 @Override
405 public boolean isCompatible(Class<? extends IoHandle> handleType) {
406 return handler.isCompatible(handleType);
407 }
408
409 @Override
410 public boolean isIoType(Class<? extends IoHandler> handlerType) {
411 return handler.getClass().equals(handlerType);
412 }
413
414 @Override
415 public boolean inEventLoop(Thread thread) {
416 return this.owningThread.get() == thread;
417 }
418
419
420
421
422
423
424
425 public void setOwningThread(Thread owningThread) {
426 Objects.requireNonNull(owningThread, "owningThread");
427 if (!this.owningThread.compareAndSet(null, owningThread)) {
428 throw new IllegalStateException("Owning thread already set");
429 }
430 }
431
432 private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
433 boolean inEventLoop = inEventLoop();
434 boolean wakeup;
435 int oldState;
436 for (;;) {
437 if (isShuttingDown()) {
438 return;
439 }
440 int newState;
441 wakeup = true;
442 oldState = state.get();
443 if (inEventLoop) {
444 newState = shutdownState;
445 } else if (oldState == ST_STARTED) {
446 newState = shutdownState;
447 } else {
448 newState = oldState;
449 wakeup = false;
450 }
451
452 if (state.compareAndSet(oldState, newState)) {
453 break;
454 }
455 }
456 if (quietPeriod != -1) {
457 gracefulShutdownQuietPeriod = quietPeriod;
458 }
459 if (timeout != -1) {
460 gracefulShutdownTimeout = timeout;
461 }
462
463 if (wakeup) {
464 handler.wakeup();
465 }
466 }
467
468 @Override
469 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
470 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
471 if (timeout < quietPeriod) {
472 throw new IllegalArgumentException(
473 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
474 }
475 ObjectUtil.checkNotNull(unit, "unit");
476
477 shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
478 return terminationFuture();
479 }
480
481 @Override
482 @Deprecated
483 public void shutdown() {
484 shutdown0(-1, -1, ST_SHUTDOWN);
485 }
486
487 @Override
488 public Future<?> terminationFuture() {
489 return terminationFuture;
490 }
491
492 @Override
493 public boolean isShuttingDown() {
494 return state.get() >= ST_SHUTTING_DOWN;
495 }
496
497 @Override
498 public boolean isShutdown() {
499 return state.get() >= ST_SHUTDOWN;
500 }
501
502 @Override
503 public boolean isTerminated() {
504 return state.get() == ST_TERMINATED;
505 }
506
507 @Override
508 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
509 return terminationFuture.await(timeout, unit);
510 }
511
512 @Override
513 public void execute(Runnable command) {
514 Objects.requireNonNull(command, "command");
515 boolean inEventLoop = inEventLoop();
516 if (inEventLoop) {
517 if (isShutdown()) {
518 throw new RejectedExecutionException("event executor terminated");
519 }
520 }
521 taskQueue.add(command);
522 if (!inEventLoop) {
523 if (isShutdown()) {
524 boolean reject = false;
525 try {
526 if (taskQueue.remove(command)) {
527 reject = true;
528 }
529 } catch (UnsupportedOperationException e) {
530
531
532
533 }
534 if (reject) {
535 throw new RejectedExecutionException("event executor terminated");
536 }
537 }
538 handler.wakeup();
539 }
540 }
541
542 private boolean hasTasks() {
543 return !taskQueue.isEmpty();
544 }
545
546 private boolean confirmShutdown() {
547 if (!isShuttingDown()) {
548 return false;
549 }
550
551 if (!inEventLoop()) {
552 throw new IllegalStateException("must be invoked from an event loop");
553 }
554
555 cancelScheduledTasks();
556
557 if (gracefulShutdownStartTime == 0) {
558 gracefulShutdownStartTime = ticker.nanoTime();
559 }
560
561 if (runAllTasks(-1, false) > 0) {
562 if (isShutdown()) {
563
564 return true;
565 }
566
567
568
569
570 if (gracefulShutdownQuietPeriod == 0) {
571 return true;
572 }
573 return false;
574 }
575
576 final long nanoTime = ticker.nanoTime();
577
578 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
579 return true;
580 }
581
582 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
583 try {
584 Thread.sleep(100);
585 } catch (InterruptedException e) {
586
587 }
588
589 return false;
590 }
591
592
593
594 return true;
595 }
596
597 @Override
598 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
599
600 throwIfInEventLoop("invokeAny");
601 return super.invokeAny(tasks);
602 }
603
604 @Override
605 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
606 throws InterruptedException, ExecutionException, TimeoutException {
607
608 throwIfInEventLoop("invokeAny");
609 return super.invokeAny(tasks, timeout, unit);
610 }
611
612 @Override
613 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
614 throws InterruptedException {
615
616 throwIfInEventLoop("invokeAll");
617 return super.invokeAll(tasks);
618 }
619
620 @Override
621 public <T> List<java.util.concurrent.Future<T>> invokeAll(
622 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
623
624 throwIfInEventLoop("invokeAll");
625 return super.invokeAll(tasks, timeout, unit);
626 }
627
628 private void throwIfInEventLoop(String method) {
629 if (inEventLoop()) {
630 throw new RejectedExecutionException(
631 "Calling " + method + " from within the EventLoop is not allowed as it would deadlock");
632 }
633 }
634
635 private final class BlockingIoHandlerContext implements IoHandlerContext {
636 long maxBlockingNanos = Long.MAX_VALUE;
637
638 @Override
639 public boolean canBlock() {
640 assert inEventLoop();
641 return !hasTasks() && !hasScheduledTasks();
642 }
643
644 @Override
645 public long delayNanos(long currentTimeNanos) {
646 assert inEventLoop();
647 return Math.min(maxBlockingNanos, ManualIoEventLoop.this.delayNanos(currentTimeNanos, maxBlockingNanos));
648 }
649
650 @Override
651 public long deadlineNanos() {
652 assert inEventLoop();
653 long next = nextScheduledTaskDeadlineNanos();
654 long maxDeadlineNanos = ticker.nanoTime() + maxBlockingNanos;
655 if (next == -1) {
656 return maxDeadlineNanos;
657 }
658 return Math.min(next, maxDeadlineNanos);
659 }
660 };
661 }