1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.AbstractScheduledEventExecutor;
19 import io.netty.util.concurrent.DefaultPromise;
20 import io.netty.util.concurrent.EventExecutor;
21 import io.netty.util.concurrent.Future;
22 import io.netty.util.concurrent.GlobalEventExecutor;
23 import io.netty.util.concurrent.Promise;
24 import io.netty.util.concurrent.Ticker;
25 import io.netty.util.internal.ObjectUtil;
26 import io.netty.util.internal.PlatformDependent;
27 import io.netty.util.internal.ThreadExecutorMap;
28
29 import java.util.Collection;
30 import java.util.List;
31 import java.util.Objects;
32 import java.util.Queue;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.ExecutionException;
35 import java.util.concurrent.RejectedExecutionException;
36 import java.util.concurrent.TimeUnit;
37 import java.util.concurrent.TimeoutException;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.concurrent.atomic.AtomicReference;
40
41
42
43
44
45
46
47
48
49
50 public final class ManualIoEventLoop extends AbstractScheduledEventExecutor implements IoEventLoop {
51 private static final int ST_STARTED = 0;
52 private static final int ST_SHUTTING_DOWN = 1;
53 private static final int ST_SHUTDOWN = 2;
54 private static final int ST_TERMINATED = 3;
55
56 private final AtomicInteger state;
57 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
58 private final Queue<Runnable> taskQueue = PlatformDependent.newMpscQueue();
59 private final IoHandlerContext nonBlockingContext = new IoHandlerContext() {
60 @Override
61 public boolean canBlock() {
62 assert inEventLoop();
63 return false;
64 }
65
66 @Override
67 public long delayNanos(long currentTimeNanos) {
68 assert inEventLoop();
69 return 0;
70 }
71
72 @Override
73 public long deadlineNanos() {
74 assert inEventLoop();
75 return -1;
76 }
77 };
78 private final BlockingIoHandlerContext blockingContext = new BlockingIoHandlerContext();
79 private final IoEventLoopGroup parent;
80 private final AtomicReference<Thread> owningThread;
81 private final IoHandler handler;
82 private final Ticker ticker;
83
84 private volatile long gracefulShutdownQuietPeriod;
85 private volatile long gracefulShutdownTimeout;
86 private long gracefulShutdownStartTime;
87 private long lastExecutionTime;
88 private boolean initialized;
89
90
91
92
93
94
95
96
97
98
99
100
101 public ManualIoEventLoop(Thread owningThread, IoHandlerFactory factory) {
102 this(null, owningThread, factory);
103 }
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118 public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory) {
119 this(parent, owningThread, factory, Ticker.systemTicker());
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory, Ticker ticker) {
139 this.parent = parent;
140 this.owningThread = new AtomicReference<>(owningThread);
141 this.handler = factory.newHandler(this);
142 this.ticker = Objects.requireNonNull(ticker, "ticker");
143 state = new AtomicInteger(ST_STARTED);
144 }
145
146 @Override
147 public Ticker ticker() {
148 return ticker;
149 }
150
151 private int runAllTasks() {
152 assert inEventLoop();
153 int numRun = 0;
154 boolean fetchedAll;
155 do {
156 fetchedAll = fetchFromScheduledTaskQueue(taskQueue);
157 for (;;) {
158 Runnable task = taskQueue.poll();
159 if (task == null) {
160 break;
161 }
162 safeExecute(task);
163 numRun++;
164 }
165 } while (!fetchedAll);
166 if (numRun > 0) {
167 lastExecutionTime = ticker.nanoTime();
168 }
169 return numRun;
170 }
171
172 private int run(IoHandlerContext context) {
173 if (!initialized) {
174 if (owningThread.get() == null) {
175 throw new IllegalStateException("Owning thread not set");
176 }
177 initialized = true;
178 handler.initialize();
179 }
180 EventExecutor old = ThreadExecutorMap.setCurrentExecutor(this);
181 try {
182 if (isShuttingDown()) {
183 if (terminationFuture.isDone()) {
184
185 return 0;
186 }
187
188 int run = runAllTasks();
189 handler.prepareToDestroy();
190 if (confirmShutdown()) {
191
192 try {
193 handler.destroy();
194 for (;;) {
195 int r = runAllTasks();
196 run += r;
197 if (r == 0) {
198 break;
199 }
200 }
201 } finally {
202 state.set(ST_TERMINATED);
203 terminationFuture.setSuccess(null);
204 }
205 }
206 return run;
207 }
208 int run = handler.run(context);
209
210 return run + runAllTasks();
211 } finally {
212 ThreadExecutorMap.setCurrentExecutor(old);
213 }
214 }
215
216
217
218
219
220
221
222
223
224
225 public int runNow() {
226 checkCurrentThread();
227 return run(nonBlockingContext);
228 }
229
230
231
232
233
234
235
236
237
238
239
240
241
242 public int run(long waitNanos) {
243 checkCurrentThread();
244
245 final IoHandlerContext context;
246 if (waitNanos < 0) {
247 context = nonBlockingContext;
248 } else {
249 context = blockingContext;
250 blockingContext.maxBlockingNanos = waitNanos == 0 ? Long.MAX_VALUE : waitNanos;
251 }
252 return run(context);
253 }
254
255 private void checkCurrentThread() {
256 if (!inEventLoop(Thread.currentThread())) {
257 throw new IllegalStateException();
258 }
259 }
260
261
262
263
264 public void wakeup() {
265 if (isShuttingDown()) {
266 return;
267 }
268 handler.wakeup();
269 }
270
271 @Override
272 public ManualIoEventLoop next() {
273 return this;
274 }
275
276 @Override
277 public IoEventLoopGroup parent() {
278 return parent;
279 }
280
281 @Deprecated
282 @Override
283 public ChannelFuture register(Channel channel) {
284 return register(new DefaultChannelPromise(channel, this));
285 }
286
287 @Deprecated
288 @Override
289 public ChannelFuture register(final ChannelPromise promise) {
290 ObjectUtil.checkNotNull(promise, "promise");
291 promise.channel().unsafe().register(this, promise);
292 return promise;
293 }
294
295 @Override
296 public Future<IoRegistration> register(final IoHandle handle) {
297 Promise<IoRegistration> promise = newPromise();
298 if (inEventLoop()) {
299 registerForIo0(handle, promise);
300 } else {
301 execute(() -> registerForIo0(handle, promise));
302 }
303
304 return promise;
305 }
306
307 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
308 assert inEventLoop();
309 final IoRegistration registration;
310 try {
311 registration = handler.register(handle);
312 } catch (Exception e) {
313 promise.setFailure(e);
314 return;
315 }
316 promise.setSuccess(registration);
317 }
318
319 @Deprecated
320 @Override
321 public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
322 ObjectUtil.checkNotNull(promise, "promise");
323 ObjectUtil.checkNotNull(channel, "channel");
324 channel.unsafe().register(this, promise);
325 return promise;
326 }
327
328 @Override
329 public boolean isCompatible(Class<? extends IoHandle> handleType) {
330 return handler.isCompatible(handleType);
331 }
332
333 @Override
334 public boolean isIoType(Class<? extends IoHandler> handlerType) {
335 return handler.getClass().equals(handlerType);
336 }
337
338 @Override
339 public boolean inEventLoop(Thread thread) {
340 return this.owningThread.get() == thread;
341 }
342
343
344
345
346
347
348
349 public void setOwningThread(Thread owningThread) {
350 Objects.requireNonNull(owningThread, "owningThread");
351 if (!this.owningThread.compareAndSet(null, owningThread)) {
352 throw new IllegalStateException("Owning thread already set");
353 }
354 }
355
356 private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
357 boolean inEventLoop = inEventLoop();
358 boolean wakeup;
359 int oldState;
360 for (;;) {
361 if (isShuttingDown()) {
362 return;
363 }
364 int newState;
365 wakeup = true;
366 oldState = state.get();
367 if (inEventLoop) {
368 newState = shutdownState;
369 } else if (oldState == ST_STARTED) {
370 newState = shutdownState;
371 } else {
372 newState = oldState;
373 wakeup = false;
374 }
375
376 if (state.compareAndSet(oldState, newState)) {
377 break;
378 }
379 }
380 if (quietPeriod != -1) {
381 gracefulShutdownQuietPeriod = quietPeriod;
382 }
383 if (timeout != -1) {
384 gracefulShutdownTimeout = timeout;
385 }
386
387 if (wakeup) {
388 handler.wakeup();
389 }
390 }
391
392 @Override
393 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
394 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
395 if (timeout < quietPeriod) {
396 throw new IllegalArgumentException(
397 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
398 }
399 ObjectUtil.checkNotNull(unit, "unit");
400
401 shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
402 return terminationFuture();
403 }
404
405 @Override
406 @Deprecated
407 public void shutdown() {
408 shutdown0(-1, -1, ST_SHUTDOWN);
409 }
410
411 @Override
412 public Future<?> terminationFuture() {
413 return terminationFuture;
414 }
415
416 @Override
417 public boolean isShuttingDown() {
418 return state.get() >= ST_SHUTTING_DOWN;
419 }
420
421 @Override
422 public boolean isShutdown() {
423 return state.get() >= ST_SHUTDOWN;
424 }
425
426 @Override
427 public boolean isTerminated() {
428 return state.get() == ST_TERMINATED;
429 }
430
431 @Override
432 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
433 return terminationFuture.await(timeout, unit);
434 }
435
436 @Override
437 public void execute(Runnable command) {
438 Objects.requireNonNull(command, "command");
439 boolean inEventLoop = inEventLoop();
440 if (inEventLoop) {
441 if (isShutdown()) {
442 throw new RejectedExecutionException("event executor terminated");
443 }
444 }
445 taskQueue.add(command);
446 if (!inEventLoop) {
447 if (isShutdown()) {
448 boolean reject = false;
449 try {
450 if (taskQueue.remove(command)) {
451 reject = true;
452 }
453 } catch (UnsupportedOperationException e) {
454
455
456
457 }
458 if (reject) {
459 throw new RejectedExecutionException("event executor terminated");
460 }
461 }
462 handler.wakeup();
463 }
464 }
465
466 private boolean hasTasks() {
467 return !taskQueue.isEmpty();
468 }
469
470 private boolean confirmShutdown() {
471 if (!isShuttingDown()) {
472 return false;
473 }
474
475 if (!inEventLoop()) {
476 throw new IllegalStateException("must be invoked from an event loop");
477 }
478
479 cancelScheduledTasks();
480
481 if (gracefulShutdownStartTime == 0) {
482 gracefulShutdownStartTime = ticker.nanoTime();
483 }
484
485 if (runAllTasks() > 0) {
486 if (isShutdown()) {
487
488 return true;
489 }
490
491
492
493
494 if (gracefulShutdownQuietPeriod == 0) {
495 return true;
496 }
497 return false;
498 }
499
500 final long nanoTime = ticker.nanoTime();
501
502 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
503 return true;
504 }
505
506 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
507 try {
508 Thread.sleep(100);
509 } catch (InterruptedException e) {
510
511 }
512
513 return false;
514 }
515
516
517
518 return true;
519 }
520
521 @Override
522 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
523
524 throwIfInEventLoop("invokeAny");
525 return super.invokeAny(tasks);
526 }
527
528 @Override
529 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
530 throws InterruptedException, ExecutionException, TimeoutException {
531
532 throwIfInEventLoop("invokeAny");
533 return super.invokeAny(tasks, timeout, unit);
534 }
535
536 @Override
537 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
538 throws InterruptedException {
539
540 throwIfInEventLoop("invokeAll");
541 return super.invokeAll(tasks);
542 }
543
544 @Override
545 public <T> List<java.util.concurrent.Future<T>> invokeAll(
546 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
547
548 throwIfInEventLoop("invokeAll");
549 return super.invokeAll(tasks, timeout, unit);
550 }
551
552 private void throwIfInEventLoop(String method) {
553 if (inEventLoop()) {
554 throw new RejectedExecutionException(
555 "Calling " + method + " from within the EventLoop is not allowed as it would deadlock");
556 }
557 }
558
559 private final class BlockingIoHandlerContext implements IoHandlerContext {
560 long maxBlockingNanos = Long.MAX_VALUE;
561
562 @Override
563 public boolean canBlock() {
564 assert inEventLoop();
565 return !hasTasks() && !hasScheduledTasks();
566 }
567
568 @Override
569 public long delayNanos(long currentTimeNanos) {
570 assert inEventLoop();
571 return Math.min(maxBlockingNanos, ManualIoEventLoop.this.delayNanos(currentTimeNanos, maxBlockingNanos));
572 }
573
574 @Override
575 public long deadlineNanos() {
576 assert inEventLoop();
577 long next = nextScheduledTaskDeadlineNanos();
578 long maxDeadlineNanos = ticker.nanoTime() + maxBlockingNanos;
579 if (next == -1) {
580 return maxDeadlineNanos;
581 }
582 return Math.min(next, maxDeadlineNanos);
583 }
584 };
585 }