View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.AbstractScheduledEventExecutor;
19  import io.netty.util.concurrent.DefaultPromise;
20  import io.netty.util.concurrent.EventExecutor;
21  import io.netty.util.concurrent.Future;
22  import io.netty.util.concurrent.GlobalEventExecutor;
23  import io.netty.util.concurrent.Promise;
24  import io.netty.util.concurrent.Ticker;
25  import io.netty.util.internal.ObjectUtil;
26  import io.netty.util.internal.PlatformDependent;
27  import io.netty.util.internal.ThreadExecutorMap;
28  
29  import java.util.Collection;
30  import java.util.List;
31  import java.util.Objects;
32  import java.util.Queue;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.ExecutionException;
35  import java.util.concurrent.RejectedExecutionException;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.TimeoutException;
38  import java.util.concurrent.atomic.AtomicInteger;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  /**
42   * {@link IoEventLoop} implementation that is owned by the user and so needs to be driven by the user manually with the
43   * given {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or {@link #run(long)}
44   * to execute IO and tasks that were submitted to this {@link IoEventLoop}.
45   * <p>
46   * This is for <strong>advanced use-cases only</strong>, where the user wants to own the {@link Thread} that drives the
47   * {@link IoEventLoop} to also do other work. Care must be taken that the {@link #runNow() or
48   * {@link #waitAndRun()}} methods are called in a timely fashion.
49   */
50  public final class ManualIoEventLoop extends AbstractScheduledEventExecutor implements IoEventLoop {
51      private static final int ST_STARTED = 0;
52      private static final int ST_SHUTTING_DOWN = 1;
53      private static final int ST_SHUTDOWN = 2;
54      private static final int ST_TERMINATED = 3;
55  
56      private final AtomicInteger state;
57      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
58      private final Queue<Runnable> taskQueue = PlatformDependent.newMpscQueue();
59      private final IoHandlerContext nonBlockingContext = new IoHandlerContext() {
60          @Override
61          public boolean canBlock() {
62              assert inEventLoop();
63              return false;
64          }
65  
66          @Override
67          public long delayNanos(long currentTimeNanos) {
68              assert inEventLoop();
69              return 0;
70          }
71  
72          @Override
73          public long deadlineNanos() {
74              assert inEventLoop();
75              return -1;
76          }
77      };
78      private final BlockingIoHandlerContext blockingContext = new BlockingIoHandlerContext();
79      private final IoEventLoopGroup parent;
80      private final AtomicReference<Thread> owningThread;
81      private final IoHandler handler;
82      private final Ticker ticker;
83  
84      private volatile long gracefulShutdownQuietPeriod;
85      private volatile long gracefulShutdownTimeout;
86      private long gracefulShutdownStartTime;
87      private long lastExecutionTime;
88      private boolean initialized;
89  
90      /**
91       * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
92       * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
93       * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
94       *
95       * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
96       *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
97       *                          make progress.
98       * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
99       *                          used by this {@link IoEventLoop}.
100      */
101     public ManualIoEventLoop(Thread owningThread, IoHandlerFactory factory) {
102         this(null, owningThread, factory);
103     }
104 
105     /**
106      * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
107      * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
108      * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
109      *
110      * @param parent            the parent {@link IoEventLoopGroup} or {@code null} if no parent.
111      * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
112      *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
113      *                          make progress. If {@code null}, must be set later using
114      *                          {@link #setOwningThread(Thread)}.
115      * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
116      *                          used by this {@link IoEventLoop}.
117      */
118     public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory) {
119         this(parent, owningThread, factory, Ticker.systemTicker());
120     }
121 
122     /**
123      * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
124      * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
125      * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
126      *
127      * @param parent            the parent {@link IoEventLoopGroup} or {@code null} if no parent.
128      * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
129      *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
130      *                          make progress. If {@code null}, must be set later using
131      *                          {@link #setOwningThread(Thread)}.
132      * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
133      *                          used by this {@link IoEventLoop}.
134      * @param ticker            The {@link #ticker()} to use for this event loop. Note that the {@link IoHandler} does
135      *                          not use the ticker, so if the ticker advances faster than system time, you may have to
136      *                          {@link #wakeup()} this event loop manually.
137      */
138     public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory, Ticker ticker) {
139         this.parent = parent;
140         this.owningThread = new AtomicReference<>(owningThread);
141         this.handler = factory.newHandler(this);
142         this.ticker = Objects.requireNonNull(ticker, "ticker");
143         state = new AtomicInteger(ST_STARTED);
144     }
145 
146     @Override
147     public Ticker ticker() {
148         return ticker;
149     }
150 
151     private int runAllTasks() {
152         assert inEventLoop();
153         int numRun = 0;
154         boolean fetchedAll;
155         do {
156             fetchedAll = fetchFromScheduledTaskQueue(taskQueue);
157             for (;;) {
158                 Runnable task = taskQueue.poll();
159                 if (task == null) {
160                     break;
161                 }
162                 safeExecute(task);
163                 numRun++;
164             }
165         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
166         if (numRun > 0) {
167             lastExecutionTime = ticker.nanoTime();
168         }
169         return numRun;
170     }
171 
172     private int run(IoHandlerContext context) {
173         if (!initialized) {
174             if (owningThread.get() == null) {
175                 throw new IllegalStateException("Owning thread not set");
176             }
177             initialized = true;
178             handler.initialize();
179         }
180         EventExecutor old = ThreadExecutorMap.setCurrentExecutor(this);
181         try {
182             if (isShuttingDown()) {
183                 if (terminationFuture.isDone()) {
184                     // Already completely terminated
185                     return 0;
186                 }
187                 // Run all tasks before prepare to destroy.
188                 int run = runAllTasks();
189                 handler.prepareToDestroy();
190                 if (confirmShutdown()) {
191                     // Destroy the handler now and run all remaining tasks.
192                     try {
193                         handler.destroy();
194                         for (;;) {
195                             int r = runAllTasks();
196                             run += r;
197                             if (r == 0) {
198                                 break;
199                             }
200                         }
201                     } finally {
202                         state.set(ST_TERMINATED);
203                         terminationFuture.setSuccess(null);
204                     }
205                 }
206                 return run;
207             }
208             int run = handler.run(context);
209             // Now run all tasks.
210             return run + runAllTasks();
211         } finally {
212             ThreadExecutorMap.setCurrentExecutor(old);
213         }
214     }
215 
216     /**
217      * Run all ready IO and tasks for this {@link IoEventLoop}.
218      * This methods will <strong>NOT</strong> block and wait for IO / tasks to be ready, it will just
219      * return directly if there is nothing to do.
220      * <p>
221      * <strong>Must be called from the owning {@link Thread} that was passed as an parameter on construction.</strong>
222      *
223      * @return the number of IO and tasks executed.
224      */
225     public int runNow() {
226         checkCurrentThread();
227         return run(nonBlockingContext);
228     }
229 
230     /**
231      * Run all ready IO and tasks for this {@link IoEventLoop}.
232      * This methods will block and wait for IO / tasks to be ready if there is nothing to process atm for the given
233      * {@code waitNanos}.
234      * <p>
235      * <strong>Must be called from the owning {@link Thread} that was passed as an parameter on construction.</strong>
236      *
237      * @param waitNanos the maximum amount of nanoseconds to wait before returning. IF {@code 0} it will block until
238      *                  there is some IO / tasks ready, if {@code -1} will not block at all and just return directly
239      *                  if there is nothing to run (like {@link #runNow()}).
240      * @return          the number of IO and tasks executed.
241      */
242     public int run(long waitNanos) {
243         checkCurrentThread();
244 
245         final IoHandlerContext context;
246         if (waitNanos < 0) {
247             context = nonBlockingContext;
248         } else {
249             context = blockingContext;
250             blockingContext.maxBlockingNanos = waitNanos == 0 ? Long.MAX_VALUE : waitNanos;
251         }
252         return run(context);
253     }
254 
255     private void checkCurrentThread() {
256         if (!inEventLoop(Thread.currentThread())) {
257             throw new IllegalStateException();
258         }
259     }
260 
261     /**
262      * Force a wakeup and so the {@link #run(long)} method will unblock and return even if there was nothing to do.
263      */
264     public void wakeup() {
265         if (isShuttingDown()) {
266             return;
267         }
268         handler.wakeup();
269     }
270 
271     @Override
272     public ManualIoEventLoop next() {
273         return this;
274     }
275 
276     @Override
277     public IoEventLoopGroup parent() {
278         return parent;
279     }
280 
281     @Deprecated
282     @Override
283     public ChannelFuture register(Channel channel) {
284         return register(new DefaultChannelPromise(channel, this));
285     }
286 
287     @Deprecated
288     @Override
289     public ChannelFuture register(final ChannelPromise promise) {
290         ObjectUtil.checkNotNull(promise, "promise");
291         promise.channel().unsafe().register(this, promise);
292         return promise;
293     }
294 
295     @Override
296     public Future<IoRegistration> register(final IoHandle handle) {
297         Promise<IoRegistration> promise = newPromise();
298         if (inEventLoop()) {
299             registerForIo0(handle, promise);
300         } else {
301             execute(() -> registerForIo0(handle, promise));
302         }
303 
304         return promise;
305     }
306 
307     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
308         assert inEventLoop();
309         final IoRegistration registration;
310         try {
311             registration = handler.register(handle);
312         } catch (Exception e) {
313             promise.setFailure(e);
314             return;
315         }
316         promise.setSuccess(registration);
317     }
318 
319     @Deprecated
320     @Override
321     public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
322         ObjectUtil.checkNotNull(promise, "promise");
323         ObjectUtil.checkNotNull(channel, "channel");
324         channel.unsafe().register(this, promise);
325         return promise;
326     }
327 
328     @Override
329     public boolean isCompatible(Class<? extends IoHandle> handleType) {
330         return handler.isCompatible(handleType);
331     }
332 
333     @Override
334     public boolean isIoType(Class<? extends IoHandler> handlerType) {
335         return handler.getClass().equals(handlerType);
336     }
337 
338     @Override
339     public boolean inEventLoop(Thread thread) {
340         return this.owningThread.get() == thread;
341     }
342 
343     /**
344      * Set the owning thread that will call {@link #run}. May only be called once, and only if the owning thread was
345      * not set in the constructor already.
346      *
347      * @param owningThread The owning thread
348      */
349     public void setOwningThread(Thread owningThread) {
350         Objects.requireNonNull(owningThread, "owningThread");
351         if (!this.owningThread.compareAndSet(null, owningThread)) {
352             throw new IllegalStateException("Owning thread already set");
353         }
354     }
355 
356     private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
357         boolean inEventLoop = inEventLoop();
358         boolean wakeup;
359         int oldState;
360         for (;;) {
361             if (isShuttingDown()) {
362                 return;
363             }
364             int newState;
365             wakeup = true;
366             oldState = state.get();
367             if (inEventLoop) {
368                 newState = shutdownState;
369             } else if (oldState == ST_STARTED) {
370                 newState = shutdownState;
371             } else {
372                 newState = oldState;
373                 wakeup = false;
374             }
375 
376             if (state.compareAndSet(oldState, newState)) {
377                 break;
378             }
379         }
380         if (quietPeriod != -1) {
381             gracefulShutdownQuietPeriod = quietPeriod;
382         }
383         if (timeout != -1) {
384             gracefulShutdownTimeout = timeout;
385         }
386 
387         if (wakeup) {
388             handler.wakeup();
389         }
390     }
391 
392     @Override
393     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
394         ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
395         if (timeout < quietPeriod) {
396             throw new IllegalArgumentException(
397                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
398         }
399         ObjectUtil.checkNotNull(unit, "unit");
400 
401         shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
402         return terminationFuture();
403     }
404 
405     @Override
406     @Deprecated
407     public void shutdown() {
408         shutdown0(-1, -1, ST_SHUTDOWN);
409     }
410 
411     @Override
412     public Future<?> terminationFuture() {
413         return terminationFuture;
414     }
415 
416     @Override
417     public boolean isShuttingDown() {
418         return state.get() >= ST_SHUTTING_DOWN;
419     }
420 
421     @Override
422     public boolean isShutdown() {
423         return state.get() >= ST_SHUTDOWN;
424     }
425 
426     @Override
427     public boolean isTerminated() {
428         return state.get() == ST_TERMINATED;
429     }
430 
431     @Override
432     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
433         return terminationFuture.await(timeout, unit);
434     }
435 
436     @Override
437     public void execute(Runnable command) {
438         Objects.requireNonNull(command, "command");
439         boolean inEventLoop = inEventLoop();
440         if (inEventLoop) {
441             if (isShutdown()) {
442                 throw new RejectedExecutionException("event executor terminated");
443             }
444         }
445         taskQueue.add(command);
446         if (!inEventLoop) {
447             if (isShutdown()) {
448                 boolean reject = false;
449                 try {
450                     if (taskQueue.remove(command)) {
451                         reject = true;
452                     }
453                 } catch (UnsupportedOperationException e) {
454                     // The task queue does not support removal so the best thing we can do is to just move on and
455                     // hope we will be able to pick-up the task before its completely terminated.
456                     // In worst case we will log on termination.
457                 }
458                 if (reject) {
459                     throw new RejectedExecutionException("event executor terminated");
460                 }
461             }
462             handler.wakeup();
463         }
464     }
465 
466     private boolean hasTasks() {
467         return !taskQueue.isEmpty();
468     }
469 
470     private boolean confirmShutdown() {
471         if (!isShuttingDown()) {
472             return false;
473         }
474 
475         if (!inEventLoop()) {
476             throw new IllegalStateException("must be invoked from an event loop");
477         }
478 
479         cancelScheduledTasks();
480 
481         if (gracefulShutdownStartTime == 0) {
482             gracefulShutdownStartTime = ticker.nanoTime();
483         }
484 
485         if (runAllTasks() > 0) {
486             if (isShutdown()) {
487                 // Executor shut down - no new tasks anymore.
488                 return true;
489             }
490 
491             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
492             // terminate if the quiet period is 0.
493             // See https://github.com/netty/netty/issues/4241
494             if (gracefulShutdownQuietPeriod == 0) {
495                 return true;
496             }
497             return false;
498         }
499 
500         final long nanoTime = ticker.nanoTime();
501 
502         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
503             return true;
504         }
505 
506         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
507             try {
508                 Thread.sleep(100);
509             } catch (InterruptedException e) {
510                 // Ignore
511             }
512 
513             return false;
514         }
515 
516         // No tasks were added for last quiet period - hopefully safe to shut down.
517         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
518         return true;
519     }
520 
521     @Override
522     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
523         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
524         throwIfInEventLoop("invokeAny");
525         return super.invokeAny(tasks);
526     }
527 
528     @Override
529     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
530             throws InterruptedException, ExecutionException, TimeoutException {
531         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
532         throwIfInEventLoop("invokeAny");
533         return super.invokeAny(tasks, timeout, unit);
534     }
535 
536     @Override
537     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
538             throws InterruptedException {
539         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
540         throwIfInEventLoop("invokeAll");
541         return super.invokeAll(tasks);
542     }
543 
544     @Override
545     public <T> List<java.util.concurrent.Future<T>> invokeAll(
546             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
547         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
548         throwIfInEventLoop("invokeAll");
549         return super.invokeAll(tasks, timeout, unit);
550     }
551 
552     private void throwIfInEventLoop(String method) {
553         if (inEventLoop()) {
554             throw new RejectedExecutionException(
555                     "Calling " + method + " from within the EventLoop is not allowed as it would deadlock");
556         }
557     }
558 
559     private final class BlockingIoHandlerContext implements IoHandlerContext {
560         long maxBlockingNanos = Long.MAX_VALUE;
561 
562         @Override
563         public boolean canBlock() {
564             assert inEventLoop();
565             return !hasTasks() && !hasScheduledTasks();
566         }
567 
568         @Override
569         public long delayNanos(long currentTimeNanos) {
570             assert inEventLoop();
571             return Math.min(maxBlockingNanos, ManualIoEventLoop.this.delayNanos(currentTimeNanos, maxBlockingNanos));
572         }
573 
574         @Override
575         public long deadlineNanos() {
576             assert inEventLoop();
577             long next = nextScheduledTaskDeadlineNanos();
578             long maxDeadlineNanos = ticker.nanoTime() + maxBlockingNanos;
579             if (next == -1) {
580                 return maxDeadlineNanos;
581             }
582             return Math.min(next, maxDeadlineNanos);
583         }
584     };
585 }