View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.AbstractScheduledEventExecutor;
19  import io.netty.util.concurrent.DefaultPromise;
20  import io.netty.util.concurrent.EventExecutor;
21  import io.netty.util.concurrent.Future;
22  import io.netty.util.concurrent.GlobalEventExecutor;
23  import io.netty.util.concurrent.Promise;
24  import io.netty.util.internal.ObjectUtil;
25  import io.netty.util.internal.PlatformDependent;
26  import io.netty.util.internal.ThreadExecutorMap;
27  
28  import java.util.Collection;
29  import java.util.List;
30  import java.util.Objects;
31  import java.util.Queue;
32  import java.util.concurrent.Callable;
33  import java.util.concurrent.ExecutionException;
34  import java.util.concurrent.RejectedExecutionException;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.TimeoutException;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  /**
40   * {@link IoEventLoop} implementation that is owned by the user and so needs to be driven by the user manually with the
41   * given {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or {@link #run(long)}
42   * to execute IO and tasks that were submitted to this {@link IoEventLoop}.
43   * <p>
44   * This is for <strong>advanced use-cases only</strong>, where the user wants to own the {@link Thread} that drives the
45   * {@link IoEventLoop} to also do other work. Care must be taken that the {@link #runNow() or
46   * {@link #waitAndRun()}} methods are called in a timely fashion.
47   */
48  public final class ManualIoEventLoop extends AbstractScheduledEventExecutor implements IoEventLoop {
49      private static final int ST_STARTED = 0;
50      private static final int ST_SHUTTING_DOWN = 1;
51      private static final int ST_SHUTDOWN = 2;
52      private static final int ST_TERMINATED = 3;
53  
54      private final AtomicInteger state;
55      private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
56      private final Queue<Runnable> taskQueue = PlatformDependent.newMpscQueue();
57      private final IoHandlerContext nonBlockingContext = new IoHandlerContext() {
58          @Override
59          public boolean canBlock() {
60              assert inEventLoop();
61              return false;
62          }
63  
64          @Override
65          public long delayNanos(long currentTimeNanos) {
66              assert inEventLoop();
67              return 0;
68          }
69  
70          @Override
71          public long deadlineNanos() {
72              assert inEventLoop();
73              return -1;
74          }
75      };
76      private final BlockingIoHandlerContext blockingContext = new BlockingIoHandlerContext();
77      private final IoEventLoopGroup parent;
78      private final Thread owningThread;
79      private final IoHandler handler;
80  
81      private volatile long gracefulShutdownQuietPeriod;
82      private volatile long gracefulShutdownTimeout;
83      private long gracefulShutdownStartTime;
84      private long lastExecutionTime;
85      private boolean initialized;
86  
87      /**
88       * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
89       * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
90       * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
91       *
92       * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
93       *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
94       *                          make progress.
95       * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
96       *                          used by this {@link IoEventLoop}.
97       */
98      public ManualIoEventLoop(Thread owningThread, IoHandlerFactory factory) {
99          this(null, owningThread, factory);
100     }
101 
102     /**
103      * Create a new {@link IoEventLoop} that is owned by the user and so needs to be driven by the user with the given
104      * {@link Thread}. This means that the user is responsible to call either {@link #runNow()} or
105      * {@link #run(long)} to execute IO or tasks that were submitted to this {@link IoEventLoop}.
106      *
107      * @param parent            the parent {@link IoEventLoopGroup} or {@code null} if no parent.
108      * @param owningThread      the {@link Thread} that executes the IO and tasks for this {@link IoEventLoop}. The
109      *                          user will use this {@link Thread} to call {@link #runNow()} or {@link #run(long)} to
110      *                          make progress.
111      * @param factory           the {@link IoHandlerFactory} that will be used to create the {@link IoHandler} that is
112      *                          used by this {@link IoEventLoop}.
113      */
114     public ManualIoEventLoop(IoEventLoopGroup parent, Thread owningThread, IoHandlerFactory factory) {
115         this.parent = parent;
116         this.owningThread = Objects.requireNonNull(owningThread, "owningThread");
117         this.handler = factory.newHandler(this);
118         state = new AtomicInteger(ST_STARTED);
119     }
120 
121     private int runAllTasks() {
122         assert inEventLoop();
123         int numRun = 0;
124         boolean fetchedAll;
125         do {
126             fetchedAll = fetchFromScheduledTaskQueue(taskQueue);
127             for (;;) {
128                 Runnable task = taskQueue.poll();
129                 if (task == null) {
130                     break;
131                 }
132                 safeExecute(task);
133                 numRun++;
134             }
135         } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
136         if (numRun > 0) {
137             lastExecutionTime = getCurrentTimeNanos();
138         }
139         return numRun;
140     }
141 
142     private int run(IoHandlerContext context) {
143         if (!initialized) {
144             initialized = true;
145             handler.initialize();
146         }
147         EventExecutor old = ThreadExecutorMap.setCurrentExecutor(this);
148         try {
149             if (isShuttingDown()) {
150                 if (terminationFuture.isDone()) {
151                     // Already completely terminated
152                     return 0;
153                 }
154                 // Run all tasks before prepare to destroy.
155                 int run = runAllTasks();
156                 handler.prepareToDestroy();
157                 if (confirmShutdown()) {
158                     // Destroy the handler now and run all remaining tasks.
159                     try {
160                         handler.destroy();
161                         for (;;) {
162                             int r = runAllTasks();
163                             run += r;
164                             if (r == 0) {
165                                 break;
166                             }
167                         }
168                     } finally {
169                         state.set(ST_TERMINATED);
170                         terminationFuture.setSuccess(null);
171                     }
172                 }
173                 return run;
174             }
175             int run = handler.run(context);
176             // Now run all tasks.
177             return run + runAllTasks();
178         } finally {
179             ThreadExecutorMap.setCurrentExecutor(old);
180         }
181     }
182 
183     /**
184      * Run all ready IO and tasks for this {@link IoEventLoop}.
185      * This methods will <strong>NOT</strong> block and wait for IO / tasks to be ready, it will just
186      * return directly if there is nothing to do.
187      * <p>
188      * <strong>Must be called from the owning {@link Thread} that was passed as an parameter on construction.</strong>
189      *
190      * @return the number of IO and tasks executed.
191      */
192     public int runNow() {
193         checkCurrentThread();
194         return run(nonBlockingContext);
195     }
196 
197     /**
198      * Run all ready IO and tasks for this {@link IoEventLoop}.
199      * This methods will block and wait for IO / tasks to be ready if there is nothing to process atm for the given
200      * {@code waitNanos}.
201      * <p>
202      * <strong>Must be called from the owning {@link Thread} that was passed as an parameter on construction.</strong>
203      *
204      * @param waitNanos the maximum amount of nanoseconds to wait before returning.
205      * @return          the number of IO and tasks executed.
206      */
207     public int run(long waitNanos) {
208         checkCurrentThread();
209         blockingContext.maxBlockingNanos = waitNanos;
210         return run(blockingContext);
211     }
212 
213     private void checkCurrentThread() {
214         if (!inEventLoop(Thread.currentThread())) {
215             throw new IllegalStateException();
216         }
217     }
218 
219     /**
220      * Force a wakeup and so the {@link #run(long)} method will unblock and return even if there was nothing to do.
221      */
222     public void wakeup() {
223         if (isShuttingDown()) {
224             return;
225         }
226         handler.wakeup();
227     }
228 
229     @Override
230     public ManualIoEventLoop next() {
231         return this;
232     }
233 
234     @Override
235     public IoEventLoopGroup parent() {
236         return parent;
237     }
238 
239     @Deprecated
240     @Override
241     public ChannelFuture register(Channel channel) {
242         return register(new DefaultChannelPromise(channel, this));
243     }
244 
245     @Deprecated
246     @Override
247     public ChannelFuture register(final ChannelPromise promise) {
248         ObjectUtil.checkNotNull(promise, "promise");
249         promise.channel().unsafe().register(this, promise);
250         return promise;
251     }
252 
253     @Override
254     public Future<IoRegistration> register(final IoHandle handle) {
255         Promise<IoRegistration> promise = newPromise();
256         if (inEventLoop()) {
257             registerForIo0(handle, promise);
258         } else {
259             execute(() -> registerForIo0(handle, promise));
260         }
261 
262         return promise;
263     }
264 
265     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
266         assert inEventLoop();
267         final IoRegistration registration;
268         try {
269             registration = handler.register(handle);
270         } catch (Exception e) {
271             promise.setFailure(e);
272             return;
273         }
274         promise.setSuccess(registration);
275     }
276 
277     @Deprecated
278     @Override
279     public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
280         ObjectUtil.checkNotNull(promise, "promise");
281         ObjectUtil.checkNotNull(channel, "channel");
282         channel.unsafe().register(this, promise);
283         return promise;
284     }
285 
286     @Override
287     public boolean isCompatible(Class<? extends IoHandle> handleType) {
288         return handler.isCompatible(handleType);
289     }
290 
291     @Override
292     public boolean isIoType(Class<? extends IoHandler> handlerType) {
293         return handler.getClass().equals(handlerType);
294     }
295 
296     @Override
297     public boolean inEventLoop(Thread thread) {
298         return this.owningThread == thread;
299     }
300 
301     private void shutdown0(long quietPeriod, long timeout, int shutdownState) {
302         boolean inEventLoop = inEventLoop();
303         boolean wakeup;
304         int oldState;
305         for (;;) {
306             if (isShuttingDown()) {
307                 return;
308             }
309             int newState;
310             wakeup = true;
311             oldState = state.get();
312             if (inEventLoop) {
313                 newState = shutdownState;
314             } else if (oldState == ST_STARTED) {
315                 newState = shutdownState;
316             } else {
317                 newState = oldState;
318                 wakeup = false;
319             }
320 
321             if (state.compareAndSet(oldState, newState)) {
322                 break;
323             }
324         }
325         if (quietPeriod != -1) {
326             gracefulShutdownQuietPeriod = quietPeriod;
327         }
328         if (timeout != -1) {
329             gracefulShutdownTimeout = timeout;
330         }
331 
332         if (wakeup) {
333             handler.wakeup();
334         }
335     }
336 
337     @Override
338     public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
339         ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
340         if (timeout < quietPeriod) {
341             throw new IllegalArgumentException(
342                     "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
343         }
344         ObjectUtil.checkNotNull(unit, "unit");
345 
346         shutdown0(unit.toNanos(quietPeriod), unit.toNanos(timeout), ST_SHUTTING_DOWN);
347         return terminationFuture();
348     }
349 
350     @Override
351     @Deprecated
352     public void shutdown() {
353         shutdown0(-1, -1, ST_SHUTDOWN);
354     }
355 
356     @Override
357     public Future<?> terminationFuture() {
358         return terminationFuture;
359     }
360 
361     @Override
362     public boolean isShuttingDown() {
363         return state.get() >= ST_SHUTTING_DOWN;
364     }
365 
366     @Override
367     public boolean isShutdown() {
368         return state.get() >= ST_SHUTDOWN;
369     }
370 
371     @Override
372     public boolean isTerminated() {
373         return state.get() == ST_TERMINATED;
374     }
375 
376     @Override
377     public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
378         return terminationFuture.await(timeout, unit);
379     }
380 
381     @Override
382     public void execute(Runnable command) {
383         Objects.requireNonNull(command, "command");
384         boolean inEventLoop = inEventLoop();
385         if (inEventLoop) {
386             if (isShutdown()) {
387                 throw new RejectedExecutionException("event executor terminated");
388             }
389         }
390         taskQueue.add(command);
391         if (!inEventLoop) {
392             if (isShutdown()) {
393                 boolean reject = false;
394                 try {
395                     if (taskQueue.remove(command)) {
396                         reject = true;
397                     }
398                 } catch (UnsupportedOperationException e) {
399                     // The task queue does not support removal so the best thing we can do is to just move on and
400                     // hope we will be able to pick-up the task before its completely terminated.
401                     // In worst case we will log on termination.
402                 }
403                 if (reject) {
404                     throw new RejectedExecutionException("event executor terminated");
405                 }
406             }
407             handler.wakeup();
408         }
409     }
410 
411     private boolean hasTasks() {
412         return !taskQueue.isEmpty();
413     }
414 
415     private boolean confirmShutdown() {
416         if (!isShuttingDown()) {
417             return false;
418         }
419 
420         if (!inEventLoop()) {
421             throw new IllegalStateException("must be invoked from an event loop");
422         }
423 
424         cancelScheduledTasks();
425 
426         if (gracefulShutdownStartTime == 0) {
427             gracefulShutdownStartTime = getCurrentTimeNanos();
428         }
429 
430         if (runAllTasks() > 0) {
431             if (isShutdown()) {
432                 // Executor shut down - no new tasks anymore.
433                 return true;
434             }
435 
436             // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
437             // terminate if the quiet period is 0.
438             // See https://github.com/netty/netty/issues/4241
439             if (gracefulShutdownQuietPeriod == 0) {
440                 return true;
441             }
442             return false;
443         }
444 
445         final long nanoTime = getCurrentTimeNanos();
446 
447         if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
448             return true;
449         }
450 
451         if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
452             try {
453                 Thread.sleep(100);
454             } catch (InterruptedException e) {
455                 // Ignore
456             }
457 
458             return false;
459         }
460 
461         // No tasks were added for last quiet period - hopefully safe to shut down.
462         // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
463         return true;
464     }
465 
466     @Override
467     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
468         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
469         throwIfInEventLoop("invokeAny");
470         return super.invokeAny(tasks);
471     }
472 
473     @Override
474     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
475             throws InterruptedException, ExecutionException, TimeoutException {
476         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
477         throwIfInEventLoop("invokeAny");
478         return super.invokeAny(tasks, timeout, unit);
479     }
480 
481     @Override
482     public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
483             throws InterruptedException {
484         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
485         throwIfInEventLoop("invokeAll");
486         return super.invokeAll(tasks);
487     }
488 
489     @Override
490     public <T> List<java.util.concurrent.Future<T>> invokeAll(
491             Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
492         // We need to check if the method was called from within the EventLoop as this would cause a deadlock.
493         throwIfInEventLoop("invokeAll");
494         return super.invokeAll(tasks, timeout, unit);
495     }
496 
497     private void throwIfInEventLoop(String method) {
498         if (inEventLoop()) {
499             throw new RejectedExecutionException(
500                     "Calling " + method + " from within the EventLoop is not allowed as it would deadlock");
501         }
502     }
503 
504     private final class BlockingIoHandlerContext implements IoHandlerContext {
505         long maxBlockingNanos = Long.MAX_VALUE;
506 
507         @Override
508         public boolean canBlock() {
509             assert inEventLoop();
510             return !hasTasks() && !hasScheduledTasks();
511         }
512 
513         @Override
514         public long delayNanos(long currentTimeNanos) {
515             assert inEventLoop();
516             return ManualIoEventLoop.this.delayNanos(currentTimeNanos, maxBlockingNanos);
517         }
518 
519         @Override
520         public long deadlineNanos() {
521             assert inEventLoop();
522             long next = nextScheduledTaskDeadlineNanos();
523             if (next == -1) {
524                 return maxBlockingNanos;
525             }
526             return Math.min(next, maxBlockingNanos);
527         }
528     };
529 }