View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.Future;
19  import io.netty.util.concurrent.Promise;
20  import io.netty.util.concurrent.RejectedExecutionHandler;
21  import io.netty.util.concurrent.SingleThreadEventExecutor;
22  import io.netty.util.internal.ObjectUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.SystemPropertyUtil;
25  
26  import java.util.Queue;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ThreadFactory;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  /**
33   * {@link IoEventLoop} implementation that execute all its submitted tasks in a single thread using the provided
34   * {@link IoHandler}.
35   */
36  public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
37  
38      // TODO: Is this a sensible default ?
39      private static final long DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS = TimeUnit.MILLISECONDS.toNanos(Math.max(100,
40              SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskProcessingQuantumMs", 1000)));
41  
42      private final long maxTaskProcessingQuantumNs;
43      private final IoHandlerContext context = new IoHandlerContext() {
44          @Override
45          public boolean canBlock() {
46              assert inEventLoop();
47              return !hasTasks() && !hasScheduledTasks();
48          }
49  
50          @Override
51          public long delayNanos(long currentTimeNanos) {
52              assert inEventLoop();
53              return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
54          }
55  
56          @Override
57          public long deadlineNanos() {
58              assert inEventLoop();
59              return SingleThreadIoEventLoop.this.deadlineNanos();
60          }
61  
62          @Override
63          public void reportActiveIoTime(long activeNanos) {
64              SingleThreadIoEventLoop.this.reportActiveIoTime(activeNanos);
65          }
66  
67          @Override
68          public boolean shouldReportActiveIoTime() {
69              return isSuspensionSupported();
70          }
71      };
72  
73      private final IoHandler ioHandler;
74  
75      private final AtomicInteger numRegistrations = new AtomicInteger();
76  
77      /**
78       *  Creates a new instance
79       *
80       * @param parent            the parent that holds this {@link IoEventLoop}.
81       * @param threadFactory     the {@link ThreadFactory} that is used to create the underlying {@link Thread}.
82       * @param ioHandlerFactory  the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler} to
83       *                          handle IO.
84       */
85      public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
86                                     IoHandlerFactory ioHandlerFactory) {
87          super(parent, threadFactory, false,
88                  ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported());
89          this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
90          this.ioHandler = ioHandlerFactory.newHandler(this);
91      }
92  
93      /**
94       *  Creates a new instance
95       *
96       * @param parent            the parent that holds this {@link IoEventLoop}.
97       * @param executor          the {@link Executor} that is used for dispatching the work.
98       * @param ioHandlerFactory  the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler} to
99       *                          handle IO.
100      */
101     public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandlerFactory ioHandlerFactory) {
102         super(parent, executor, false,
103                 ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported());
104         this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
105         this.ioHandler = ioHandlerFactory.newHandler(this);
106     }
107 
108     /**
109      *  Creates a new instance
110      *
111      * @param parent                        the parent that holds this {@link IoEventLoop}.
112      * @param threadFactory                 the {@link ThreadFactory} that is used to create the underlying
113      *                                      {@link Thread}.
114      * @param ioHandlerFactory              the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
115      *                                      to handle IO.
116      * @param maxPendingTasks               the maximum pending tasks that are allowed before
117      *                                      {@link RejectedExecutionHandler#rejected(Runnable,
118      *                                          SingleThreadEventExecutor)}
119      *                                      is called to handle it.
120      * @param rejectedExecutionHandler      the {@link RejectedExecutionHandler} that handles when more tasks are added
121      *                                      then allowed per {@code maxPendingTasks}.
122      * @param maxTaskProcessingQuantumMs    the maximum number of milliseconds that will be spent to run tasks before
123      *                                      trying to run IO again.
124      */
125     public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
126                                    IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
127                                    RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
128         super(parent, threadFactory, false,
129                 ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported(),
130                 maxPendingTasks, rejectedExecutionHandler);
131         this.maxTaskProcessingQuantumNs =
132                 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
133                         DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
134                         TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
135         this.ioHandler = ioHandlerFactory.newHandler(this);
136     }
137 
138     /**
139      *  Creates a new instance
140      *
141      * @param parent                        the parent that holds this {@link IoEventLoop}.
142      * @param ioHandlerFactory              the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
143      *                                      to handle IO.
144      * @param maxPendingTasks               the maximum pending tasks that are allowed before
145      *                                      {@link RejectedExecutionHandler#rejected(Runnable,
146      *                                          SingleThreadEventExecutor)}
147      *                                      is called to handle it.
148      * @param rejectedExecutionHandler      the {@link RejectedExecutionHandler} that handles when more tasks are added
149      *                                      then allowed per {@code maxPendingTasks}.
150      * @param maxTaskProcessingQuantumMs    the maximum number of milliseconds that will be spent to run tasks before
151      *                                      trying to run IO again.
152      */
153     public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
154                                    IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
155                                    RejectedExecutionHandler rejectedExecutionHandler,
156                                    long maxTaskProcessingQuantumMs) {
157         super(parent, executor, false,
158                 ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported(),
159                 maxPendingTasks, rejectedExecutionHandler);
160         this.maxTaskProcessingQuantumNs =
161                 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
162                         DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
163                         TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
164         this.ioHandler = ioHandlerFactory.newHandler(this);
165     }
166 
167     /**
168      *
169      *  Creates a new instance
170      *
171      * @param parent                    the parent that holds this {@link IoEventLoop}.
172      * @param executor                  the {@link Executor} that is used for dispatching the work.
173      * @param ioHandlerFactory          the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
174      *                                  to handle IO.
175      * @param taskQueue                 the {@link Queue} used for storing pending tasks.
176      * @param tailTaskQueue             the {@link Queue} used for storing tail pending tasks.
177      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
178      *                                  then allowed.
179      */
180     protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
181                                       IoHandlerFactory ioHandlerFactory, Queue<Runnable> taskQueue,
182                                       Queue<Runnable> tailTaskQueue,
183                                       RejectedExecutionHandler rejectedExecutionHandler) {
184         super(parent, executor, false,
185                 ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported(),
186                 taskQueue, tailTaskQueue, rejectedExecutionHandler);
187         this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
188         this.ioHandler = ioHandlerFactory.newHandler(this);
189     }
190 
191     @Override
192     protected void run() {
193         assert inEventLoop();
194         ioHandler.initialize();
195         do {
196             runIo();
197             if (isShuttingDown()) {
198                 ioHandler.prepareToDestroy();
199             }
200             // Now run all tasks for the maximum configured amount of time before trying to run IO again.
201             runAllTasks(maxTaskProcessingQuantumNs);
202 
203             // We should continue with our loop until we either confirmed a shutdown or we can suspend it.
204         } while (!confirmShutdown() && !canSuspend());
205     }
206 
207     protected final IoHandler ioHandler() {
208         return ioHandler;
209     }
210 
211     @Override
212     protected boolean canSuspend(int state) {
213         // We should only allow to suspend if there are no registrations on this loop atm.
214         return super.canSuspend(state) && numRegistrations.get() == 0;
215     }
216 
217     /**
218      * Called when IO will be processed for all the {@link IoHandle}s on this {@link SingleThreadIoEventLoop}.
219      * This method returns the number of {@link IoHandle}s for which IO was processed.
220      *
221      * This method must be called from the {@link EventLoop} thread.
222      */
223     protected int runIo() {
224         assert inEventLoop();
225         return ioHandler.run(context);
226     }
227 
228     @Override
229     public IoEventLoop next() {
230         return this;
231     }
232 
233     @Override
234     public final Future<IoRegistration> register(final IoHandle handle) {
235         Promise<IoRegistration> promise = newPromise();
236         if (inEventLoop()) {
237             registerForIo0(handle, promise);
238         } else {
239             execute(() -> registerForIo0(handle, promise));
240         }
241 
242         return promise;
243     }
244 
245     @Override
246     protected int getNumOfRegisteredChannels() {
247         return numRegistrations.get();
248     }
249 
250     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
251         assert inEventLoop();
252         final IoRegistration registration;
253         try {
254             registration = ioHandler.register(handle);
255         } catch (Exception e) {
256             promise.setFailure(e);
257             return;
258         }
259         numRegistrations.incrementAndGet();
260         promise.setSuccess(new IoRegistrationWrapper(registration));
261     }
262 
263     @Override
264     protected final void wakeup(boolean inEventLoop) {
265         ioHandler.wakeup();
266     }
267 
268     @Override
269     protected final void cleanup() {
270         assert inEventLoop();
271         ioHandler.destroy();
272     }
273 
274     @Override
275     public boolean isCompatible(Class<? extends IoHandle> handleType) {
276         return ioHandler.isCompatible(handleType);
277     }
278 
279     @Override
280     public boolean isIoType(Class<? extends IoHandler> handlerType) {
281         return ioHandler.getClass().equals(handlerType);
282     }
283 
284     @Override
285     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
286         return newTaskQueue0(maxPendingTasks);
287     }
288 
289     protected static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
290         // This event loop never calls takeTask()
291         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
292                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
293     }
294 
295     private final class IoRegistrationWrapper implements IoRegistration {
296         private final IoRegistration registration;
297         IoRegistrationWrapper(IoRegistration registration) {
298             this.registration = registration;
299         }
300 
301         @Override
302         public <T> T attachment() {
303             return registration.attachment();
304         }
305 
306         @Override
307         public long submit(IoOps ops) {
308             return registration.submit(ops);
309         }
310 
311         @Override
312         public boolean isValid() {
313             return registration.isValid();
314         }
315 
316         @Override
317         public boolean cancel() {
318             if (registration.cancel()) {
319                 numRegistrations.decrementAndGet();
320                 return true;
321             }
322             return false;
323         }
324     }
325 }