View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.AbstractScheduledEventExecutor;
19  import io.netty.util.concurrent.DefaultPromise;
20  import io.netty.util.concurrent.EventExecutor;
21  import io.netty.util.concurrent.Future;
22  import io.netty.util.concurrent.GlobalEventExecutor;
23  import io.netty.util.concurrent.Promise;
24  import io.netty.util.concurrent.Ticker;
25  import io.netty.util.internal.ObjectUtil;
26  import io.netty.util.internal.PlatformDependent;
27  import io.netty.util.internal.ThreadExecutorMap;
28  
29  import java.util.Collection;
30  import java.util.List;
31  import java.util.Objects;
32  import java.util.Queue;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.RejectedExecutionException;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.TimeoutException;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  /**
42   * {@link IoEventLoop} implementation that is owned by the user and so needs to be driven by the user manually with the
43   * given {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or {@link #run(long)}
44   * to execute IO and tasks that were submitted to this {@link IoEventLoop}.
45   * <p>
46   * This is for <strong>advanced use-cases only</strong>, where the user wants to own the {@link Thread} that drives the
47   * {@link IoEventLoop} to also do other work. Care must be taken that the {@link #runNow() or
48   * {@link #waitAndRun()}} methods are called in a timely fashion.
49   */
50  public final class ManualIoEventLoop extends AbstractScheduledEventExecutor implements IoEventLoop {
51      private static final int ST_STARTED = 0;
52      private static final int ST_SHUTTING_DOWN = 1;
53      private static final int ST_SHUTDOWN = 2;
54      private static final int ST_TERMINATED = 3;
55  
56      private final AtomicInteger state;
57      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
58      private final Queue<Runnable> taskQueue = PlatformDependent.newMpscQueue();
59      private final IoHandlerContext nonBlockingContext = new IoHandlerContext() {
60          @Override
61          public boolean canBlock() {
62              assert inEventLoop();
63              return false;
64          }
65  
66          @Override
67          public long delayNanos(long currentTimeNanos) {
68              assert inEventLoop();
69              return 0;
70          }
71  
72          @Override
73          public long deadlineNanos() {
74              assert inEventLoop();
75              return -1;
76          }
77      };
78      private final BlockingIoHandlerContext blockingContext = new BlockingIoHandlerContext();
79      private final IoEventLoopGroup parent;
80      private final AtomicReference<Thread> owningThread;
81      private final IoHandler handler;
82      private final Ticker ticker;
83  
84      private volatile long gracefulShutdownQuietPeriod;
85      private volatile long gracefulShutdownTimeout;
86      private long gracefulShutdownStartTime;
87      private long lastExecutionTime;
88      private boolean initialized;
89  
90      /**
91       * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
92       * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
93       * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
94       *
95       * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
96       *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
97       *                          make progress.
98       * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
99       *                          used by this {@link IoEventLoop}.
100      */
101     public ManualIoEventLoop(Thread owningThread, IoHandlerFactory factory) {
102         this(null, owningThread, factory);
103     }
104 
105     /**
106      * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
107      * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
108      * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
109      *
110      * @param parent            the parent {@link IoEventLoopGroup} or {@code null} if no parent.
111      * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
112      *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
113      *                          make progress. If {@code null}, must be set later using
114      *                          {@link #setOwningThread(Thread)}.
115      * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
116      *                          used by this {@link IoEventLoop}.
117      */
118     public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory) {
119         this(parent, owningThread, factory, Ticker.systemTicker());
120     }
121 
122     /**
123      * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
124      * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
125      * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
126      *
127      * @param parent            the parent {@link IoEventLoopGroup} or {@code null} if no parent.
128      * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
129      *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
130      *                          make progress. If {@code null}, must be set later using
131      *                          {@link #setOwningThread(Thread)}.
132      * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
133      *                          used by this {@link IoEventLoop}.
134      * @param ticker            The {@link #ticker()} to use for this event loop. Note that the {@link IoHandler} does
135      *                          not use the ticker, so if the ticker advances faster than system time, you may have to
136      *                          {@link #wakeup()} this event loop manually.
137      */
138     public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory, Ticker ticker) {
139         this.parent = parent;
140         this.owningThread = new AtomicReference<>(owningThread);
141         this.handler = factory.newHandler(this);
142         this.ticker = Objects.requireNonNull(ticker, "ticker");
143         state = new AtomicInteger(ST_STARTED);
144     }
145 
146     @Override
147     public Ticker ticker() {
148         return ticker;
149     }
150 
151     /**
152      * Poll and run tasks from the task queue, until the task queue is empty or the given deadline is exceeded.<br>
153      * If {@code timeoutNanos} is less or equals 0, no deadline is applied.
154      *
155      * @param timeoutNanos the maximum time in nanoseconds to run tasks.
156      */
157     public int runNonBlockingTasks(long timeoutNanos) {
158         return runAllTasks(timeoutNanos, true);
159     }
160 
161     private int runAllTasks(long timeoutNanos, boolean setCurrentExecutor) {
162         assert inEventLoop();
163         final Queue<Runnable> taskQueue = this.taskQueue;
164         // since the taskQueue is unbounded we don't need to keep on calling this while draining it.
165         boolean alwaysTrue = fetchFromScheduledTaskQueue(taskQueue);
166         assert alwaysTrue;
167         Runnable task = taskQueue.poll();
168         if (task == null) {
169             return 0;
170         }
171         EventExecutor old = setCurrentExecutor? ThreadExecutorMap.setCurrentExecutor(this) : null;
172         try {
173             final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
174             int runTasks = 0;
175             long lastExecutionTime;
176             final Ticker ticker = this.ticker;
177             for (;;) {
178                 safeExecute(task);
179 
180                 runTasks++;
181 
182                if (timeoutNanos > 0) {
183                     lastExecutionTime = ticker.nanoTime();
184                     if ((lastExecutionTime - deadline) >= 0) {
185                         break;
186                     }
187                 }
188 
189                 task = taskQueue.poll();
190                 if (task == null) {
191                     lastExecutionTime = ticker.nanoTime();
192                     break;
193                 }
194             }
195             this.lastExecutionTime = lastExecutionTime;
196             return runTasks;
197         } finally {
198             if (setCurrentExecutor) {
199                 ThreadExecutorMap.setCurrentExecutor(old);
200             }
201         }
202     }
203 
204     private int run(IoHandlerContext context, long runAllTasksTimeoutNanos) {
205         if (!initialized) {
206             if (owningThread.get() == null) {
207                 throw new IllegalStateException("Owning thread not set");
208             }
209             initialized = true;
210             handler.initialize();
211         }
212         EventExecutor old = ThreadExecutorMap.setCurrentExecutor(this);
213         try {
214             if (isShuttingDown()) {
215                 if (terminationFuture.isDone()) {
216                     // Already completely terminated
217                     return 0;
218                 }
219                 return runAllTasksBeforeDestroy();
220             }
221             final int ioTasks = handler.run(context);
222             // Now run all tasks.
223             if (runAllTasksTimeoutNanos < 0) {
224                 return ioTasks;
225             }
226             assert runAllTasksTimeoutNanos >= 0;
227             return ioTasks + runAllTasks(runAllTasksTimeoutNanos, false);
228         } finally {
229             ThreadExecutorMap.setCurrentExecutor(old);
230         }
231     }
232 
233     private int runAllTasksBeforeDestroy() {
234         // Run all tasks before prepare to destroy.
235         int run = runAllTasks(-1, false);
236         handler.prepareToDestroy();
237         if (confirmShutdown()) {
238             // Destroy the handler now and run all remaining tasks.
239             try {
240                 handler.destroy();
241                 for (;;) {
242                     int r = runAllTasks(-1, false);
243                     run += r;
244                     if (r == 0) {
245                         break;
246                     }
247                 }
248             } finally {
249                 state.set(ST_TERMINATED);
250                 terminationFuture.setSuccess(null);
251             }
252         }
253         return run;
254     }
255 
256     /**
257      * Executes all ready IO and tasks for this {@link IoEventLoop}.
258      * This methods will <strong>NOT</strong> block and wait for IO / tasks to be ready, it will just
259      * return directly if there is nothing to do.
260      * <p>
261      * <strong>Must be called from the owning {@link Thread} that was passed as a parameter on construction.</strong>
262      * <p>
263      *
264      * @param runAllTasksTimeoutNanos the maximum time in nanoseconds to run tasks.
265      *                                If {@code = 0}, no timeout is applied; if {@code < 0} it just perform I/O tasks.
266      * @return the number of IO and tasks executed.
267      * @throws IllegalStateException if the method is not called from the owning {@link Thread}.
268      */
269     public int runNow(long runAllTasksTimeoutNanos) {
270         checkCurrentThread();
271         return run(nonBlockingContext, runAllTasksTimeoutNanos);
272     }
273 
274     /**
275      * Run all ready IO and tasks for this {@link IoEventLoop}.
276      * This methods will <strong>NOT</strong> block and wait for IO / tasks to be ready, it will just
277      * return directly if there is nothing to do.
278      * <p>
279      * <strong>Must be called from the owning {@link Thread} that was passed as a parameter on construction.</strong>
280      *
281      * @return the number of IO and tasks executed.
282      */
283     public int runNow() {
284         checkCurrentThread();
285         return run(nonBlockingContext, 0);
286     }
287 
288     /**
289      * Run all ready IO and tasks for this {@link IoEventLoop}.
290      * This methods will block and wait for IO / tasks to be ready if there is nothing to process atm for the given
291      * {@code waitNanos}.
292      * <p>
293      * <strong>Must be called from the owning {@link Thread} that was passed as an parameter on construction.</strong>
294      *
295      * @param runAllTasksTimeoutNanos the maximum time in nanoseconds to run tasks.
296      *                                If {@code = 0}, no timeout is applied; if {@code < 0} it just perform I/O tasks.
297      * @param waitNanos the maximum amount of nanoseconds to wait before returning. IF {@code 0} it will block until
298      *                  there is some IO / tasks ready, if {@code -1} will not block at all and just return directly
299      *                  if there is nothing to run (like {@link #runNow()}).
300      * @return          the number of IO and tasks executed.
301      */
302     public int run(long waitNanos, long runAllTasksTimeoutNanos) {
303         checkCurrentThread();
304 
305         final IoHandlerContext context;
306         if (waitNanos < 0) {
307             context = nonBlockingContext;
308         } else {
309             context = blockingContext;
310             blockingContext.maxBlockingNanos = waitNanos == 0 ? Long.MAX_VALUE : waitNanos;
311         }
312         return run(context, runAllTasksTimeoutNanos);
313     }
314 
315     /**
316      * Run all ready IO and tasks for this {@link IoEventLoop}.
317      * This methods will block and wait for IO / tasks to be ready if there is nothing to process atm for the given
318      * {@code waitNanos}.
319      * <p>
320      * <strong>Must be called from the owning {@link Thread} that was passed as an parameter on construction.</strong>
321      *
322      * @param waitNanos the maximum amount of nanoseconds to wait before returning. IF {@code 0} it will block until
323      *                  there is some IO / tasks ready, if {@code -1} will not block at all and just return directly
324      *                  if there is nothing to run (like {@link #runNow()}).
325      * @return          the number of IO and tasks executed.
326      */
327     public int run(long waitNanos) {
328         return run(waitNanos, 0);
329     }
330 
331     private void checkCurrentThread() {
332         if (!inEventLoop(Thread.currentThread())) {
333             throw new IllegalStateException();
334         }
335     }
336 
337     /**
338      * Force a wakeup and so the {@link #run(long)} method will unblock and return even if there was nothing to do.
339      */
340     public void wakeup() {
341         if (isShuttingDown()) {
342             return;
343         }
344         handler.wakeup();
345     }
346 
347     @Override
348     public ManualIoEventLoop next() {
349         return this;
350     }
351 
352     @Override
353     public IoEventLoopGroup parent() {
354         return parent;
355     }
356 
357     @Deprecated
358     @Override
359     public ChannelFuture register(Channel channel) {
360         return register(new DefaultChannelPromise(channel, this));
361     }
362 
363     @Deprecated
364     @Override
365     public ChannelFuture register(final ChannelPromise promise) {
366         ObjectUtil.checkNotNull(promise, "promise");
367         promise.channel().unsafe().register(this, promise);
368         return promise;
369     }
370 
371     @Override
372     public Future<IoRegistration> register(final IoHandle handle) {
373         Promise<IoRegistration> promise = newPromise();
374         if (inEventLoop()) {
375             registerForIo0(handle, promise);
376         } else {
377             execute(() -> registerForIo0(handle, promise));
378         }
379 
380         return promise;
381     }
382 
383     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
384         assert inEventLoop();
385         final IoRegistration registration;
386         try {
387             registration = handler.register(handle);
388         } catch (Exception e) {
389             promise.setFailure(e);
390             return;
391         }
392         promise.setSuccess(registration);
393     }
394 
395     @Deprecated
396     @Override
397     public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
398         ObjectUtil.checkNotNull(promise, "promise");
399         ObjectUtil.checkNotNull(channel, "channel");
400         channel.unsafe().register(this, promise);
401         return promise;
402     }
403 
404     @Override
405     public boolean isCompatible(Class<? extends IoHandle> handleType) {
406         return handler.isCompatible(handleType);
407     }
408 
409     @Override
410     public boolean isIoType(Class<? extends IoHandler> handlerType) {
411         return handler.getClass().equals(handlerType);
412     }
413 
414     @Override
415     public boolean inEventLoop(Thread thread) {
416         return this.owningThread.get() == thread;
417     }
418 
419     /**
420      * Set the owning thread that will call {@link #run}. May only be called once, and only if the owning thread was
421      * not set in the constructor already.
422      *
423      * @param owningThread The owning thread
424      */
425     public void setOwningThread(Thread owningThread) {
426         Objects.requireNonNull(owningThread, "owningThread");
427         if (!this.owningThread.compareAndSet(null, owningThread)) {
428             throw new IllegalStateException("Owning thread already set");
429         }
430     }
431 
432     private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
433         boolean inEventLoop = inEventLoop();
434         boolean wakeup;
435         int oldState;
436         for (;;) {
437             if (isShuttingDown()) {
438                 return;
439             }
440             int newState;
441             wakeup = true;
442             oldState = state.get();
443             if (inEventLoop) {
444                 newState = shutdownState;
445             } else if (oldState == ST_STARTED) {
446                 newState = shutdownState;
447             } else {
448                 newState = oldState;
449                 wakeup = false;
450             }
451 
452             if (state.compareAndSet(oldState, newState)) {
453                 break;
454             }
455         }
456         if (quietPeriod != -1) {
457             gracefulShutdownQuietPeriod = quietPeriod;
458         }
459         if (timeout != -1) {
460             gracefulShutdownTimeout = timeout;
461         }
462 
463         if (wakeup) {
464             handler.wakeup();
465         }
466     }
467 
468     @Override
469     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
470         ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
471         if (timeout < quietPeriod) {
472             throw new IllegalArgumentException(
473                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
474         }
475         ObjectUtil.checkNotNull(unit, "unit");
476 
477         shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
478         return terminationFuture();
479     }
480 
481     @Override
482     @Deprecated
483     public void shutdown() {
484         shutdown0(-1, -1, ST_SHUTDOWN);
485     }
486 
487     @Override
488     public Future<?> terminationFuture() {
489         return terminationFuture;
490     }
491 
492     @Override
493     public boolean isShuttingDown() {
494         return state.get() >= ST_SHUTTING_DOWN;
495     }
496 
497     @Override
498     public boolean isShutdown() {
499         return state.get() >= ST_SHUTDOWN;
500     }
501 
502     @Override
503     public boolean isTerminated() {
504         return state.get() == ST_TERMINATED;
505     }
506 
507     @Override
508     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
509         return terminationFuture.await(timeout, unit);
510     }
511 
512     @Override
513     public void execute(Runnable command) {
514         Objects.requireNonNull(command, "command");
515         boolean inEventLoop = inEventLoop();
516         if (inEventLoop) {
517             if (isShutdown()) {
518                 throw new RejectedExecutionException("event executor terminated");
519             }
520         }
521         taskQueue.add(command);
522         if (!inEventLoop) {
523             if (isShutdown()) {
524                 boolean reject = false;
525                 try {
526                     if (taskQueue.remove(command)) {
527                         reject = true;
528                     }
529                 } catch (UnsupportedOperationException e) {
530                     // The task queue does not support removal so the best thing we can do is to just move on and
531                     // hope we will be able to pick-up the task before its completely terminated.
532                     // In worst case we will log on termination.
533                 }
534                 if (reject) {
535                     throw new RejectedExecutionException("event executor terminated");
536                 }
537             }
538             handler.wakeup();
539         }
540     }
541 
542     private boolean hasTasks() {
543         return !taskQueue.isEmpty();
544     }
545 
546     private boolean confirmShutdown() {
547         if (!isShuttingDown()) {
548             return false;
549         }
550 
551         if (!inEventLoop()) {
552             throw new IllegalStateException("must be invoked from an event loop");
553         }
554 
555         cancelScheduledTasks();
556 
557         if (gracefulShutdownStartTime == 0) {
558             gracefulShutdownStartTime = ticker.nanoTime();
559         }
560 
561         if (runAllTasks(-1, false) > 0) {
562             if (isShutdown()) {
563                 // Executor shut down - no new tasks anymore.
564                 return true;
565             }
566 
567             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
568             // terminate if the quiet period is 0.
569             // See https://github.com/netty/netty/issues/4241
570             if (gracefulShutdownQuietPeriod == 0) {
571                 return true;
572             }
573             return false;
574         }
575 
576         final long nanoTime = ticker.nanoTime();
577 
578         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
579             return true;
580         }
581 
582         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
583             try {
584                 Thread.sleep(100);
585             } catch (InterruptedException e) {
586                 // Ignore
587             }
588 
589             return false;
590         }
591 
592         // No tasks were added for last quiet period - hopefully safe to shut down.
593         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
594         return true;
595     }
596 
597     @Override
598     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
599         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
600         throwIfInEventLoop("invokeAny");
601         return super.invokeAny(tasks);
602     }
603 
604     @Override
605     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
606             throws InterruptedException, ExecutionException, TimeoutException {
607         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
608         throwIfInEventLoop("invokeAny");
609         return super.invokeAny(tasks, timeout, unit);
610     }
611 
612     @Override
613     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
614             throws InterruptedException {
615         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
616         throwIfInEventLoop("invokeAll");
617         return super.invokeAll(tasks);
618     }
619 
620     @Override
621     public <T> List<java.util.concurrent.Future<T>> invokeAll(
622             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
623         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
624         throwIfInEventLoop("invokeAll");
625         return super.invokeAll(tasks, timeout, unit);
626     }
627 
628     private void throwIfInEventLoop(String method) {
629         if (inEventLoop()) {
630             throw new RejectedExecutionException(
631                     "Calling " + method + " from within the EventLoop is not allowed as it would deadlock");
632         }
633     }
634 
635     private final class BlockingIoHandlerContext implements IoHandlerContext {
636         long maxBlockingNanos = Long.MAX_VALUE;
637 
638         @Override
639         public boolean canBlock() {
640             assert inEventLoop();
641             return !hasTasks() && !hasScheduledTasks();
642         }
643 
644         @Override
645         public long delayNanos(long currentTimeNanos) {
646             assert inEventLoop();
647             return Math.min(maxBlockingNanos, ManualIoEventLoop.this.delayNanos(currentTimeNanos, maxBlockingNanos));
648         }
649 
650         @Override
651         public long deadlineNanos() {
652             assert inEventLoop();
653             long next = nextScheduledTaskDeadlineNanos();
654             long maxDeadlineNanos = ticker.nanoTime() + maxBlockingNanos;
655             if (next == -1) {
656                 return maxDeadlineNanos;
657             }
658             return Math.min(next, maxDeadlineNanos);
659         }
660     };
661 }