View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.Future;
19  import io.netty.util.concurrent.Promise;
20  import io.netty.util.concurrent.RejectedExecutionHandler;
21  import io.netty.util.concurrent.SingleThreadEventExecutor;
22  import io.netty.util.internal.ObjectUtil;
23  import io.netty.util.internal.PlatformDependent;
24  import io.netty.util.internal.SystemPropertyUtil;
25  
26  import java.util.Queue;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ThreadFactory;
29  import java.util.concurrent.TimeUnit;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  /**
33   * {@link IoEventLoop} implementation that execute all its submitted tasks in a single thread using the provided
34   * {@link IoHandler}.
35   */
36  public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
37  
38      // TODO: Is this a sensible default ?
39      private static final long DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS = TimeUnit.MILLISECONDS.toNanos(Math.max(100,
40              SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskProcessingQuantumMs", 1000)));
41  
42      private final long maxTaskProcessingQuantumNs;
43      private final IoHandlerContext context = new IoHandlerContext() {
44          @Override
45          public boolean canBlock() {
46              assert inEventLoop();
47              return !hasTasks() && !hasScheduledTasks();
48          }
49  
50          @Override
51          public long delayNanos(long currentTimeNanos) {
52              assert inEventLoop();
53              return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
54          }
55  
56          @Override
57          public long deadlineNanos() {
58              assert inEventLoop();
59              return SingleThreadIoEventLoop.this.deadlineNanos();
60          }
61      };
62  
63      private final IoHandler ioHandler;
64  
65      private final AtomicInteger numRegistrations = new AtomicInteger();
66  
67      /**
68       *  Creates a new instance
69       *
70       * @param parent            the parent that holds this {@link IoEventLoop}.
71       * @param threadFactory     the {@link ThreadFactory} that is used to create the underlying {@link Thread}.
72       * @param ioHandlerFactory  the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler} to
73       *                          handle IO.
74       */
75      public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
76                                     IoHandlerFactory ioHandlerFactory) {
77          super(parent, threadFactory, false, true);
78          this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
79          this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
80      }
81  
82      /**
83       *  Creates a new instance
84       *
85       * @param parent            the parent that holds this {@link IoEventLoop}.
86       * @param executor          the {@link Executor} that is used for dispatching the work.
87       * @param ioHandlerFactory  the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler} to
88       *                          handle IO.
89       */
90      public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandlerFactory ioHandlerFactory) {
91          super(parent, executor, false, true);
92          this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
93          this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
94      }
95  
96      /**
97       *  Creates a new instance
98       *
99       * @param parent                        the parent that holds this {@link IoEventLoop}.
100      * @param threadFactory                 the {@link ThreadFactory} that is used to create the underlying
101      *                                      {@link Thread}.
102      * @param ioHandlerFactory              the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
103      *                                      to handle IO.
104      * @param maxPendingTasks               the maximum pending tasks that are allowed before
105      *                                      {@link RejectedExecutionHandler#rejected(Runnable,
106      *                                          SingleThreadEventExecutor)}
107      *                                      is called to handle it.
108      * @param rejectedExecutionHandler      the {@link RejectedExecutionHandler} that handles when more tasks are added
109      *                                      then allowed per {@code maxPendingTasks}.
110      * @param maxTaskProcessingQuantumMs    the maximum number of milliseconds that will be spent to run tasks before
111      *                                      trying to run IO again.
112      */
113     public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
114                                    IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
115                                    RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
116         super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
117         this.maxTaskProcessingQuantumNs =
118                 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
119                         DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
120                         TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
121         this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
122     }
123 
124     /**
125      *  Creates a new instance
126      *
127      * @param parent                        the parent that holds this {@link IoEventLoop}.
128      * @param ioHandlerFactory              the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
129      *                                      to handle IO.
130      * @param maxPendingTasks               the maximum pending tasks that are allowed before
131      *                                      {@link RejectedExecutionHandler#rejected(Runnable,
132      *                                          SingleThreadEventExecutor)}
133      *                                      is called to handle it.
134      * @param rejectedExecutionHandler      the {@link RejectedExecutionHandler} that handles when more tasks are added
135      *                                      then allowed per {@code maxPendingTasks}.
136      * @param maxTaskProcessingQuantumMs    the maximum number of milliseconds that will be spent to run tasks before
137      *                                      trying to run IO again.
138      */
139     public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
140                                    IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
141                                    RejectedExecutionHandler rejectedExecutionHandler,
142                                    long maxTaskProcessingQuantumMs) {
143         super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
144         this.maxTaskProcessingQuantumNs =
145                 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
146                         DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
147                         TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
148         this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
149     }
150 
151     /**
152      *
153      *  Creates a new instance
154      *
155      * @param parent                    the parent that holds this {@link IoEventLoop}.
156      * @param executor                  the {@link Executor} that is used for dispatching the work.
157      * @param ioHandlerFactory          the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
158      *                                  to handle IO.
159      * @param taskQueue                 the {@link Queue} used for storing pending tasks.
160      * @param tailTaskQueue             the {@link Queue} used for storing tail pending tasks.
161      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
162      *                                  then allowed.
163      */
164     protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
165                                       IoHandlerFactory ioHandlerFactory, Queue<Runnable> taskQueue,
166                                       Queue<Runnable> tailTaskQueue,
167                                       RejectedExecutionHandler rejectedExecutionHandler) {
168         super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
169         this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
170         this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
171     }
172 
173     @Override
174     protected void run() {
175         assert inEventLoop();
176         ioHandler.initialize();
177         do {
178             runIo();
179             if (isShuttingDown()) {
180                 ioHandler.prepareToDestroy();
181             }
182             // Now run all tasks for the maximum configured amount of time before trying to run IO again.
183             runAllTasks(maxTaskProcessingQuantumNs);
184 
185             // We should continue with our loop until we either confirmed a shutdown or we can suspend it.
186         } while (!confirmShutdown() && !canSuspend());
187     }
188 
189     protected final IoHandler ioHandler() {
190         return ioHandler;
191     }
192 
193     @Override
194     protected boolean canSuspend(int state) {
195         // We should only allow to suspend if there are no registrations on this loop atm.
196         return super.canSuspend(state) && numRegistrations.get() == 0;
197     }
198 
199     /**
200      * Called when IO will be processed for all the {@link IoHandle}s on this {@link SingleThreadIoEventLoop}.
201      * This method returns the number of {@link IoHandle}s for which IO was processed.
202      *
203      * This method must be called from the {@link EventLoop} thread.
204      */
205     protected int runIo() {
206         assert inEventLoop();
207         return ioHandler.run(context);
208     }
209 
210     @Override
211     public IoEventLoop next() {
212         return this;
213     }
214 
215     @Override
216     public final Future<IoRegistration> register(final IoHandle handle) {
217         Promise<IoRegistration> promise = newPromise();
218         if (inEventLoop()) {
219             registerForIo0(handle, promise);
220         } else {
221             execute(() -> registerForIo0(handle, promise));
222         }
223 
224         return promise;
225     }
226 
227     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
228         assert inEventLoop();
229         final IoRegistration registration;
230         try {
231             registration = ioHandler.register(handle);
232         } catch (Exception e) {
233             promise.setFailure(e);
234             return;
235         }
236         numRegistrations.incrementAndGet();
237         promise.setSuccess(new IoRegistrationWrapper(registration));
238     }
239 
240     @Override
241     protected final void wakeup(boolean inEventLoop) {
242         ioHandler.wakeup();
243     }
244 
245     @Override
246     protected final void cleanup() {
247         assert inEventLoop();
248         ioHandler.destroy();
249     }
250 
251     @Override
252     public boolean isCompatible(Class<? extends IoHandle> handleType) {
253         return ioHandler.isCompatible(handleType);
254     }
255 
256     @Override
257     public boolean isIoType(Class<? extends IoHandler> handlerType) {
258         return ioHandler.getClass().equals(handlerType);
259     }
260 
261     @Override
262     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
263         return newTaskQueue0(maxPendingTasks);
264     }
265 
266     protected static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
267         // This event loop never calls takeTask()
268         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
269                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
270     }
271 
272     private final class IoRegistrationWrapper implements IoRegistration {
273         private final IoRegistration registration;
274         IoRegistrationWrapper(IoRegistration registration) {
275             this.registration = registration;
276         }
277 
278         @Override
279         public <T> T attachment() {
280             return registration.attachment();
281         }
282 
283         @Override
284         public long submit(IoOps ops) {
285             return registration.submit(ops);
286         }
287 
288         @Override
289         public boolean isValid() {
290             return registration.isValid();
291         }
292 
293         @Override
294         public boolean cancel() {
295             if (registration.cancel()) {
296                 numRegistrations.decrementAndGet();
297                 return true;
298             }
299             return false;
300         }
301     }
302 }