View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.Future;
19  import io.netty.util.concurrent.FutureListener;
20  import io.netty.util.concurrent.Promise;
21  import io.netty.util.concurrent.RejectedExecutionHandler;
22  import io.netty.util.concurrent.SingleThreadEventExecutor;
23  import io.netty.util.internal.ObjectUtil;
24  import io.netty.util.internal.SystemPropertyUtil;
25  
26  import java.util.Queue;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ThreadFactory;
29  import java.util.concurrent.TimeUnit;
30  
31  /**
32   * {@link IoEventLoop} implementation that execute all its submitted tasks in a single thread using the provided
33   * {@link IoHandler}.
34   */
35  public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
36  
37      // TODO: Is this a sensible default ?
38      private static final long DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS = TimeUnit.MILLISECONDS.toNanos(Math.max(100,
39              SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskProcessingQuantumMs", 1000)));
40  
41      private final long maxTaskProcessingQuantumNs;
42      private final IoExecutionContext context = new IoExecutionContext() {
43          @Override
44          public boolean canBlock() {
45              assert inEventLoop();
46              return !hasTasks() && !hasScheduledTasks();
47          }
48  
49          @Override
50          public long delayNanos(long currentTimeNanos) {
51              assert inEventLoop();
52              return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
53          }
54  
55          @Override
56          public long deadlineNanos() {
57              assert inEventLoop();
58              return SingleThreadIoEventLoop.this.deadlineNanos();
59          }
60      };
61  
62      private final IoHandler ioHandler;
63  
64      private int numRegistrations;
65      private final FutureListener<Object> decrementRegistrationListener = f -> {
66          assert inEventLoop();
67          numRegistrations--;
68      };
69  
70      /**
71       *  Creates a new instance
72       *
73       * @param parent            the parent that holds this {@link IoEventLoop}.
74       * @param ioHandler         the {@link IoHandler} used to run all IO.
75       * @param threadFactory     the {@link ThreadFactory} that is used to create the underlying {@link Thread}.
76       */
77      public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
78                                     IoHandler ioHandler) {
79          super(parent, threadFactory, false, true);
80          this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
81          this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
82      }
83  
84      /**
85       *  Creates a new instance
86       *
87       * @param parent            the parent that holds this {@link IoEventLoop}.
88       * @param executor          the {@link Executor} that is used for dispatching the work.
89       * @param ioHandler         the {@link IoHandler} used to run all IO.
90       */
91      public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandler ioHandler) {
92          super(parent, executor, false, true);
93          this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
94          this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
95      }
96  
97      /**
98       *  Creates a new instance
99       *
100      * @param parent                        the parent that holds this {@link IoEventLoop}.
101      * @param threadFactory                 the {@link ThreadFactory} that is used to create the underlying
102      *                                      {@link Thread}.
103      * @param ioHandler                     the {@link IoHandler} used to run all IO.
104      * @param maxPendingTasks               the maximum pending tasks that are allowed before
105      *                                      {@link RejectedExecutionHandler#rejected(Runnable,
106      *                                          SingleThreadEventExecutor)}
107      *                                      is called to handle it.
108      * @param rejectedExecutionHandler      the {@link RejectedExecutionHandler} that handles when more tasks are added
109      *                                      then allowed per {@code maxPendingTasks}.
110      * @param maxTaskProcessingQuantumMs    the maximum number of milliseconds that will be spent to run tasks before
111      *                                      trying to run IO again.
112      */
113     public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
114                                    IoHandler ioHandler, int maxPendingTasks,
115                                    RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
116         super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
117         this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
118         this.maxTaskProcessingQuantumNs =
119                 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
120                         DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS : maxTaskProcessingQuantumMs;
121     }
122 
123     /**
124      *  Creates a new instance
125      *
126      * @param parent                        the parent that holds this {@link IoEventLoop}.
127      * @param ioHandler                     the {@link IoHandler} used to run all IO.
128      * @param executor                      the {@link Executor} that is used for dispatching the work.
129      * @param maxPendingTasks               the maximum pending tasks that are allowed before
130      *                                      {@link RejectedExecutionHandler#rejected(Runnable,
131      *                                          SingleThreadEventExecutor)}
132      *                                      is called to handle it.
133      * @param rejectedExecutionHandler      the {@link RejectedExecutionHandler} that handles when more tasks are added
134      *                                      then allowed per {@code maxPendingTasks}.
135      * @param maxTaskProcessingQuantumMs    the maximum number of milliseconds that will be spent to run tasks before
136      *                                      trying to run IO again.
137      */
138     public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
139                                    IoHandler ioHandler, int maxPendingTasks,
140                                    RejectedExecutionHandler rejectedExecutionHandler,
141                                    long maxTaskProcessingQuantumMs) {
142         super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
143         this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
144         this.maxTaskProcessingQuantumNs =
145                 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
146                         DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS : maxTaskProcessingQuantumMs;
147     }
148 
149     /**
150      *
151      *  Creates a new instance
152      *
153      * @param parent                    the parent that holds this {@link IoEventLoop}.
154      * @param executor                  the {@link Executor} that is used for dispatching the work.
155      * @param ioHandler                 the {@link IoHandler} used to run all IO.
156      * @param taskQueue                 the {@link Queue} used for storing pending tasks.
157      * @param tailTaskQueue             the {@link Queue} used for storing tail pending tasks.
158      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
159      *                                  then allowed.
160      */
161     protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
162                                       IoHandler ioHandler, Queue<Runnable> taskQueue,
163                                       Queue<Runnable> tailTaskQueue,
164                                       RejectedExecutionHandler rejectedExecutionHandler) {
165         super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
166         this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
167         this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
168     }
169 
170     @Override
171     protected void run() {
172         assert inEventLoop();
173         do {
174             runIo();
175             if (isShuttingDown()) {
176                 ioHandler.prepareToDestroy();
177             }
178             // Now run all tasks for the maximum configured amount of time before trying to run IO again.
179             runAllTasks(maxTaskProcessingQuantumNs);
180 
181             // We should continue with our loop until we either confirmed a shutdown or we can suspend it.
182         } while (!confirmShutdown() && !canSuspend());
183     }
184 
185     protected final IoHandler ioHandler() {
186         return ioHandler;
187     }
188 
189     @Override
190     protected boolean canSuspend(int state) {
191         // We should only allow to suspend if there are no registrations on this loop atm.
192         return super.canSuspend(state) && numRegistrations == 0;
193     }
194 
195     /**
196      * Called when IO will be processed for all the {@link IoHandle}s on this {@link SingleThreadIoEventLoop}.
197      * This method returns the number of {@link IoHandle}s for which IO was processed.
198      *
199      * This method must be called from the {@link EventLoop} thread.
200      */
201     protected int runIo() {
202         assert inEventLoop();
203         return ioHandler.run(context);
204     }
205 
206     @Override
207     public IoEventLoop next() {
208         return this;
209     }
210 
211     @Override
212     public final Future<IoRegistration> register(final IoHandle handle) {
213         Promise<IoRegistration> promise = newPromise();
214         if (inEventLoop()) {
215             registerForIo0(handle, promise);
216         } else {
217             execute(() -> registerForIo0(handle, promise));
218         }
219 
220         return promise;
221     }
222 
223     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
224         assert inEventLoop();
225         final IoRegistration registration;
226         try {
227             registration = ioHandler.register(this, handle);
228         } catch (Exception e) {
229             promise.setFailure(e);
230             return;
231         }
232         registration.cancelFuture().addListener(decrementRegistrationListener);
233         numRegistrations++;
234         promise.setSuccess(registration);
235     }
236 
237     @Override
238     protected final void wakeup(boolean inEventLoop) {
239         ioHandler.wakeup(this);
240     }
241 
242     @Override
243     protected final void cleanup() {
244         assert inEventLoop();
245         ioHandler.destroy();
246     }
247 
248     @Override
249     public boolean isCompatible(Class<? extends IoHandle> handleType) {
250         return ioHandler.isCompatible(handleType);
251     }
252 
253     @Override
254     public boolean isIoType(Class<? extends IoHandler> handlerType) {
255         return ioHandler.getClass().equals(handlerType);
256     }
257 }