View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.Future;
19  import io.netty.util.concurrent.Promise;
20  import io.netty.util.concurrent.RejectedExecutionHandler;
21  import io.netty.util.concurrent.SingleThreadEventExecutor;
22  import io.netty.util.internal.ObjectUtil;
23  import io.netty.util.internal.SystemPropertyUtil;
24  
25  import java.util.Queue;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  /**
32   * {@link IoEventLoop} implementation that execute all its submitted tasks in a single thread using the provided
33   * {@link IoHandler}.
34   */
35  public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
36  
37      // TODO: Is this a sensible default ?
38      private static final long DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS = TimeUnit.MILLISECONDS.toNanos(Math.max(100,
39              SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskProcessingQuantumMs", 1000)));
40  
41      private final long maxTaskProcessingQuantumNs;
42      private final IoHandlerContext context = new IoHandlerContext() {
43          @Override
44          public boolean canBlock() {
45              assert inEventLoop();
46              return !hasTasks() && !hasScheduledTasks();
47          }
48  
49          @Override
50          public long delayNanos(long currentTimeNanos) {
51              assert inEventLoop();
52              return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
53          }
54  
55          @Override
56          public long deadlineNanos() {
57              assert inEventLoop();
58              return SingleThreadIoEventLoop.this.deadlineNanos();
59          }
60      };
61  
62      private final IoHandler ioHandler;
63  
64      private final AtomicInteger numRegistrations = new AtomicInteger();
65  
66      /**
67       *  Creates a new instance
68       *
69       * @param parent            the parent that holds this {@link IoEventLoop}.
70       * @param threadFactory     the {@link ThreadFactory} that is used to create the underlying {@link Thread}.
71       * @param ioHandlerFactory  the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler} to
72       *                          handle IO.
73       */
74      public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
75                                     IoHandlerFactory ioHandlerFactory) {
76          super(parent, threadFactory, false, true);
77          this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
78          this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
79      }
80  
81      /**
82       *  Creates a new instance
83       *
84       * @param parent            the parent that holds this {@link IoEventLoop}.
85       * @param executor          the {@link Executor} that is used for dispatching the work.
86       * @param ioHandlerFactory  the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler} to
87       *                          handle IO.
88       */
89      public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandlerFactory ioHandlerFactory) {
90          super(parent, executor, false, true);
91          this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
92          this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
93      }
94  
95      /**
96       *  Creates a new instance
97       *
98       * @param parent                        the parent that holds this {@link IoEventLoop}.
99       * @param threadFactory                 the {@link ThreadFactory} that is used to create the underlying
100      *                                      {@link Thread}.
101      * @param ioHandlerFactory              the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
102      *                                      to handle IO.
103      * @param maxPendingTasks               the maximum pending tasks that are allowed before
104      *                                      {@link RejectedExecutionHandler#rejected(Runnable,
105      *                                          SingleThreadEventExecutor)}
106      *                                      is called to handle it.
107      * @param rejectedExecutionHandler      the {@link RejectedExecutionHandler} that handles when more tasks are added
108      *                                      then allowed per {@code maxPendingTasks}.
109      * @param maxTaskProcessingQuantumMs    the maximum number of milliseconds that will be spent to run tasks before
110      *                                      trying to run IO again.
111      */
112     public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
113                                    IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
114                                    RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
115         super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
116         this.maxTaskProcessingQuantumNs =
117                 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
118                         DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS : maxTaskProcessingQuantumMs;
119         this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
120     }
121 
122     /**
123      *  Creates a new instance
124      *
125      * @param parent                        the parent that holds this {@link IoEventLoop}.
126      * @param ioHandlerFactory              the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
127      *                                      to handle IO.
128      * @param maxPendingTasks               the maximum pending tasks that are allowed before
129      *                                      {@link RejectedExecutionHandler#rejected(Runnable,
130      *                                          SingleThreadEventExecutor)}
131      *                                      is called to handle it.
132      * @param rejectedExecutionHandler      the {@link RejectedExecutionHandler} that handles when more tasks are added
133      *                                      then allowed per {@code maxPendingTasks}.
134      * @param maxTaskProcessingQuantumMs    the maximum number of milliseconds that will be spent to run tasks before
135      *                                      trying to run IO again.
136      */
137     public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
138                                    IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
139                                    RejectedExecutionHandler rejectedExecutionHandler,
140                                    long maxTaskProcessingQuantumMs) {
141         super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
142         this.maxTaskProcessingQuantumNs =
143                 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
144                         DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS : maxTaskProcessingQuantumMs;
145         this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
146     }
147 
148     /**
149      *
150      *  Creates a new instance
151      *
152      * @param parent                    the parent that holds this {@link IoEventLoop}.
153      * @param executor                  the {@link Executor} that is used for dispatching the work.
154      * @param ioHandlerFactory          the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
155      *                                  to handle IO.
156      * @param taskQueue                 the {@link Queue} used for storing pending tasks.
157      * @param tailTaskQueue             the {@link Queue} used for storing tail pending tasks.
158      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
159      *                                  then allowed.
160      */
161     protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
162                                       IoHandlerFactory ioHandlerFactory, Queue<Runnable> taskQueue,
163                                       Queue<Runnable> tailTaskQueue,
164                                       RejectedExecutionHandler rejectedExecutionHandler) {
165         super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
166         this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
167         this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
168     }
169 
170     @Override
171     protected void run() {
172         assert inEventLoop();
173         ioHandler.initialize();
174         do {
175             runIo();
176             if (isShuttingDown()) {
177                 ioHandler.prepareToDestroy();
178             }
179             // Now run all tasks for the maximum configured amount of time before trying to run IO again.
180             runAllTasks(maxTaskProcessingQuantumNs);
181 
182             // We should continue with our loop until we either confirmed a shutdown or we can suspend it.
183         } while (!confirmShutdown() && !canSuspend());
184     }
185 
186     protected final IoHandler ioHandler() {
187         return ioHandler;
188     }
189 
190     @Override
191     protected boolean canSuspend(int state) {
192         // We should only allow to suspend if there are no registrations on this loop atm.
193         return super.canSuspend(state) && numRegistrations.get() == 0;
194     }
195 
196     /**
197      * Called when IO will be processed for all the {@link IoHandle}s on this {@link SingleThreadIoEventLoop}.
198      * This method returns the number of {@link IoHandle}s for which IO was processed.
199      *
200      * This method must be called from the {@link EventLoop} thread.
201      */
202     protected int runIo() {
203         assert inEventLoop();
204         return ioHandler.run(context);
205     }
206 
207     @Override
208     public IoEventLoop next() {
209         return this;
210     }
211 
212     @Override
213     public final Future<IoRegistration> register(final IoHandle handle) {
214         Promise<IoRegistration> promise = newPromise();
215         if (inEventLoop()) {
216             registerForIo0(handle, promise);
217         } else {
218             execute(() -> registerForIo0(handle, promise));
219         }
220 
221         return promise;
222     }
223 
224     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
225         assert inEventLoop();
226         final IoRegistration registration;
227         try {
228             registration = ioHandler.register(handle);
229         } catch (Exception e) {
230             promise.setFailure(e);
231             return;
232         }
233         numRegistrations.incrementAndGet();
234         promise.setSuccess(new IoRegistrationWrapper(registration));
235     }
236 
237     @Override
238     protected final void wakeup(boolean inEventLoop) {
239         ioHandler.wakeup();
240     }
241 
242     @Override
243     protected final void cleanup() {
244         assert inEventLoop();
245         ioHandler.destroy();
246     }
247 
248     @Override
249     public boolean isCompatible(Class<? extends IoHandle> handleType) {
250         return ioHandler.isCompatible(handleType);
251     }
252 
253     @Override
254     public boolean isIoType(Class<? extends IoHandler> handlerType) {
255         return ioHandler.getClass().equals(handlerType);
256     }
257 
258     private final class IoRegistrationWrapper implements IoRegistration {
259         private final IoRegistration registration;
260         IoRegistrationWrapper(IoRegistration registration) {
261             this.registration = registration;
262         }
263 
264         @Override
265         public <T> T attachment() {
266             return registration.attachment();
267         }
268 
269         @Override
270         public long submit(IoOps ops) {
271             return registration.submit(ops);
272         }
273 
274         @Override
275         public boolean isValid() {
276             return registration.isValid();
277         }
278 
279         @Override
280         public boolean cancel() {
281             if (registration.cancel()) {
282                 numRegistrations.decrementAndGet();
283                 return true;
284             }
285             return false;
286         }
287     }
288 }