1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.AbstractScheduledEventExecutor;
19 import io.netty.util.concurrent.DefaultPromise;
20 import io.netty.util.concurrent.EventExecutor;
21 import io.netty.util.concurrent.Future;
22 import io.netty.util.concurrent.GlobalEventExecutor;
23 import io.netty.util.concurrent.Promise;
24 import io.netty.util.internal.ObjectUtil;
25 import io.netty.util.internal.PlatformDependent;
26 import io.netty.util.internal.ThreadExecutorMap;
27
28 import java.util.Collection;
29 import java.util.List;
30 import java.util.Objects;
31 import java.util.Queue;
32 import java.util.concurrent.Callable;
33 import java.util.concurrent.ExecutionException;
34 import java.util.concurrent.RejectedExecutionException;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.TimeoutException;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39
40
41
42
43
44
45
46
47
48 public final class ManualIoEventLoop extends AbstractScheduledEventExecutor implements IoEventLoop {
49 private static final int ST_STARTED = 0;
50 private static final int ST_SHUTTING_DOWN = 1;
51 private static final int ST_SHUTDOWN = 2;
52 private static final int ST_TERMINATED = 3;
53
54 private final AtomicInteger state;
55 private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
56 private final Queue<Runnable> taskQueue = PlatformDependent.newMpscQueue();
57 private final IoHandlerContext nonBlockingContext = new IoHandlerContext() {
58 @Override
59 public boolean canBlock() {
60 assert inEventLoop();
61 return false;
62 }
63
64 @Override
65 public long delayNanos(long currentTimeNanos) {
66 assert inEventLoop();
67 return 0;
68 }
69
70 @Override
71 public long deadlineNanos() {
72 assert inEventLoop();
73 return -1;
74 }
75 };
76 private final BlockingIoHandlerContext blockingContext = new BlockingIoHandlerContext();
77 private final IoEventLoopGroup parent;
78 private final Thread owningThread;
79 private final IoHandler handler;
80
81 private volatile long gracefulShutdownQuietPeriod;
82 private volatile long gracefulShutdownTimeout;
83 private long gracefulShutdownStartTime;
84 private long lastExecutionTime;
85 private boolean initialized;
86
87
88
89
90
91
92
93
94
95
96
97
98 public ManualIoEventLoop(Thread owningThread, IoHandlerFactory factory) {
99 this(null, owningThread, factory);
100 }
101
102
103
104
105
106
107
108
109
110
111
112
113
114 public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory) {
115 this.parent = parent;
116 this.owningThread = Objects.requireNonNull(owningThread, "owningThread");
117 this.handler = factory.newHandler(this);
118 state = new AtomicInteger(ST_STARTED);
119 }
120
121 private int runAllTasks() {
122 assert inEventLoop();
123 int numRun = 0;
124 boolean fetchedAll;
125 do {
126 fetchedAll = fetchFromScheduledTaskQueue(taskQueue);
127 for (;;) {
128 Runnable task = taskQueue.poll();
129 if (task == null) {
130 break;
131 }
132 safeExecute(task);
133 numRun++;
134 }
135 } while (!fetchedAll);
136 if (numRun > 0) {
137 lastExecutionTime = getCurrentTimeNanos();
138 }
139 return numRun;
140 }
141
142 private int run(IoHandlerContext context) {
143 if (!initialized) {
144 initialized = true;
145 handler.initialize();
146 }
147 EventExecutor old = ThreadExecutorMap.setCurrentExecutor(this);
148 try {
149 if (isShuttingDown()) {
150 if (terminationFuture.isDone()) {
151
152 return 0;
153 }
154
155 int run = runAllTasks();
156 handler.prepareToDestroy();
157 if (confirmShutdown()) {
158
159 try {
160 handler.destroy();
161 for (;;) {
162 int r = runAllTasks();
163 run += r;
164 if (r == 0) {
165 break;
166 }
167 }
168 } finally {
169 state.set(ST_TERMINATED);
170 terminationFuture.setSuccess(null);
171 }
172 }
173 return run;
174 }
175 int run = handler.run(context);
176
177 return run + runAllTasks();
178 } finally {
179 ThreadExecutorMap.setCurrentExecutor(old);
180 }
181 }
182
183
184
185
186
187
188
189
190
191
192 public int runNow() {
193 checkCurrentThread();
194 return run(nonBlockingContext);
195 }
196
197
198
199
200
201
202
203
204
205
206
207 public int run(long waitNanos) {
208 checkCurrentThread();
209 blockingContext.maxBlockingNanos = waitNanos;
210 return run(blockingContext);
211 }
212
213 private void checkCurrentThread() {
214 if (!inEventLoop(Thread.currentThread())) {
215 throw new IllegalStateException();
216 }
217 }
218
219
220
221
222 public void wakeup() {
223 if (isShuttingDown()) {
224 return;
225 }
226 handler.wakeup();
227 }
228
229 @Override
230 public ManualIoEventLoop next() {
231 return this;
232 }
233
234 @Override
235 public IoEventLoopGroup parent() {
236 return parent;
237 }
238
239 @Deprecated
240 @Override
241 public ChannelFuture register(Channel channel) {
242 return register(new DefaultChannelPromise(channel, this));
243 }
244
245 @Deprecated
246 @Override
247 public ChannelFuture register(final ChannelPromise promise) {
248 ObjectUtil.checkNotNull(promise, "promise");
249 promise.channel().unsafe().register(this, promise);
250 return promise;
251 }
252
253 @Override
254 public Future<IoRegistration> register(final IoHandle handle) {
255 Promise<IoRegistration> promise = newPromise();
256 if (inEventLoop()) {
257 registerForIo0(handle, promise);
258 } else {
259 execute(() -> registerForIo0(handle, promise));
260 }
261
262 return promise;
263 }
264
265 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
266 assert inEventLoop();
267 final IoRegistration registration;
268 try {
269 registration = handler.register(handle);
270 } catch (Exception e) {
271 promise.setFailure(e);
272 return;
273 }
274 promise.setSuccess(registration);
275 }
276
277 @Deprecated
278 @Override
279 public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
280 ObjectUtil.checkNotNull(promise, "promise");
281 ObjectUtil.checkNotNull(channel, "channel");
282 channel.unsafe().register(this, promise);
283 return promise;
284 }
285
286 @Override
287 public boolean isCompatible(Class<? extends IoHandle> handleType) {
288 return handler.isCompatible(handleType);
289 }
290
291 @Override
292 public boolean isIoType(Class<? extends IoHandler> handlerType) {
293 return handler.getClass().equals(handlerType);
294 }
295
296 @Override
297 public boolean inEventLoop(Thread thread) {
298 return this.owningThread == thread;
299 }
300
301 private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
302 boolean inEventLoop = inEventLoop();
303 boolean wakeup;
304 int oldState;
305 for (;;) {
306 if (isShuttingDown()) {
307 return;
308 }
309 int newState;
310 wakeup = true;
311 oldState = state.get();
312 if (inEventLoop) {
313 newState = shutdownState;
314 } else if (oldState == ST_STARTED) {
315 newState = shutdownState;
316 } else {
317 newState = oldState;
318 wakeup = false;
319 }
320
321 if (state.compareAndSet(oldState, newState)) {
322 break;
323 }
324 }
325 if (quietPeriod != -1) {
326 gracefulShutdownQuietPeriod = quietPeriod;
327 }
328 if (timeout != -1) {
329 gracefulShutdownTimeout = timeout;
330 }
331
332 if (wakeup) {
333 handler.wakeup();
334 }
335 }
336
337 @Override
338 public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
339 ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
340 if (timeout < quietPeriod) {
341 throw new IllegalArgumentException(
342 "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
343 }
344 ObjectUtil.checkNotNull(unit, "unit");
345
346 shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
347 return terminationFuture();
348 }
349
350 @Override
351 @Deprecated
352 public void shutdown() {
353 shutdown0(-1, -1, ST_SHUTDOWN);
354 }
355
356 @Override
357 public Future<?> terminationFuture() {
358 return terminationFuture;
359 }
360
361 @Override
362 public boolean isShuttingDown() {
363 return state.get() >= ST_SHUTTING_DOWN;
364 }
365
366 @Override
367 public boolean isShutdown() {
368 return state.get() >= ST_SHUTDOWN;
369 }
370
371 @Override
372 public boolean isTerminated() {
373 return state.get() == ST_TERMINATED;
374 }
375
376 @Override
377 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
378 return terminationFuture.await(timeout, unit);
379 }
380
381 @Override
382 public void execute(Runnable command) {
383 Objects.requireNonNull(command, "command");
384 boolean inEventLoop = inEventLoop();
385 if (inEventLoop) {
386 if (isShutdown()) {
387 throw new RejectedExecutionException("event executor terminated");
388 }
389 }
390 taskQueue.add(command);
391 if (!inEventLoop) {
392 if (isShutdown()) {
393 boolean reject = false;
394 try {
395 if (taskQueue.remove(command)) {
396 reject = true;
397 }
398 } catch (UnsupportedOperationException e) {
399
400
401
402 }
403 if (reject) {
404 throw new RejectedExecutionException("event executor terminated");
405 }
406 }
407 handler.wakeup();
408 }
409 }
410
411 private boolean hasTasks() {
412 return !taskQueue.isEmpty();
413 }
414
415 private boolean confirmShutdown() {
416 if (!isShuttingDown()) {
417 return false;
418 }
419
420 if (!inEventLoop()) {
421 throw new IllegalStateException("must be invoked from an event loop");
422 }
423
424 cancelScheduledTasks();
425
426 if (gracefulShutdownStartTime == 0) {
427 gracefulShutdownStartTime = getCurrentTimeNanos();
428 }
429
430 if (runAllTasks() > 0) {
431 if (isShutdown()) {
432
433 return true;
434 }
435
436
437
438
439 if (gracefulShutdownQuietPeriod == 0) {
440 return true;
441 }
442 return false;
443 }
444
445 final long nanoTime = getCurrentTimeNanos();
446
447 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
448 return true;
449 }
450
451 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
452 try {
453 Thread.sleep(100);
454 } catch (InterruptedException e) {
455
456 }
457
458 return false;
459 }
460
461
462
463 return true;
464 }
465
466 @Override
467 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
468
469 throwIfInEventLoop("invokeAny");
470 return super.invokeAny(tasks);
471 }
472
473 @Override
474 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
475 throws InterruptedException, ExecutionException, TimeoutException {
476
477 throwIfInEventLoop("invokeAny");
478 return super.invokeAny(tasks, timeout, unit);
479 }
480
481 @Override
482 public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
483 throws InterruptedException {
484
485 throwIfInEventLoop("invokeAll");
486 return super.invokeAll(tasks);
487 }
488
489 @Override
490 public <T> List<java.util.concurrent.Future<T>> invokeAll(
491 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
492
493 throwIfInEventLoop("invokeAll");
494 return super.invokeAll(tasks, timeout, unit);
495 }
496
497 private void throwIfInEventLoop(String method) {
498 if (inEventLoop()) {
499 throw new RejectedExecutionException(
500 "Calling " + method + " from within the EventLoop is not allowed as it would deadlock");
501 }
502 }
503
504 private final class BlockingIoHandlerContext implements IoHandlerContext {
505 long maxBlockingNanos = Long.MAX_VALUE;
506
507 @Override
508 public boolean canBlock() {
509 assert inEventLoop();
510 return !hasTasks() && !hasScheduledTasks();
511 }
512
513 @Override
514 public long delayNanos(long currentTimeNanos) {
515 assert inEventLoop();
516 return ManualIoEventLoop.this.delayNanos(currentTimeNanos, maxBlockingNanos);
517 }
518
519 @Override
520 public long deadlineNanos() {
521 assert inEventLoop();
522 long next = nextScheduledTaskDeadlineNanos();
523 if (next == -1) {
524 return maxBlockingNanos;
525 }
526 return Math.min(next, maxBlockingNanos);
527 }
528 };
529 }