View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.Future;
19  import io.netty.util.concurrent.Promise;
20  import io.netty.util.concurrent.RejectedExecutionHandler;
21  import io.netty.util.concurrent.SingleThreadEventExecutor;
22  import io.netty.util.internal.ObjectUtil;
23  import io.netty.util.internal.SystemPropertyUtil;
24  
25  import java.util.Queue;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.TimeUnit;
29  import java.util.concurrent.atomic.AtomicInteger;
30  
31  /**
32   * {@link IoEventLoop} implementation that execute all its submitted tasks in a single thread using the provided
33   * {@link IoHandler}.
34   */
35  public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
36  
37      // TODO: Is this a sensible default ?
38      private static final long DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS = TimeUnit.MILLISECONDS.toNanos(Math.max(100,
39              SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskProcessingQuantumMs", 1000)));
40  
41      private final long maxTaskProcessingQuantumNs;
42      private final IoHandlerContext context = new IoHandlerContext() {
43          @Override
44          public boolean canBlock() {
45              assert inEventLoop();
46              return !hasTasks() && !hasScheduledTasks();
47          }
48  
49          @Override
50          public long delayNanos(long currentTimeNanos) {
51              assert inEventLoop();
52              return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
53          }
54  
55          @Override
56          public long deadlineNanos() {
57              assert inEventLoop();
58              return SingleThreadIoEventLoop.this.deadlineNanos();
59          }
60      };
61  
62      private final IoHandler ioHandler;
63  
64      private final AtomicInteger numRegistrations = new AtomicInteger();
65  
66      /**
67       *  Creates a new instance
68       *
69       * @param parent            the parent that holds this {@link IoEventLoop}.
70       * @param threadFactory     the {@link ThreadFactory} that is used to create the underlying {@link Thread}.
71       * @param ioHandlerFactory  the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler} to
72       *                          handle IO.
73       */
74      public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
75                                     IoHandlerFactory ioHandlerFactory) {
76          super(parent, threadFactory, false, true);
77          this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
78          this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
79      }
80  
81      /**
82       *  Creates a new instance
83       *
84       * @param parent            the parent that holds this {@link IoEventLoop}.
85       * @param executor          the {@link Executor} that is used for dispatching the work.
86       * @param ioHandlerFactory  the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler} to
87       *                          handle IO.
88       */
89      public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandlerFactory ioHandlerFactory) {
90          super(parent, executor, false, true);
91          this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
92          this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
93      }
94  
95      /**
96       *  Creates a new instance
97       *
98       * @param parent                        the parent that holds this {@link IoEventLoop}.
99       * @param threadFactory                 the {@link ThreadFactory} that is used to create the underlying
100      *                                      {@link Thread}.
101      * @param ioHandlerFactory              the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
102      *                                      to handle IO.
103      * @param maxPendingTasks               the maximum pending tasks that are allowed before
104      *                                      {@link RejectedExecutionHandler#rejected(Runnable,
105      *                                          SingleThreadEventExecutor)}
106      *                                      is called to handle it.
107      * @param rejectedExecutionHandler      the {@link RejectedExecutionHandler} that handles when more tasks are added
108      *                                      then allowed per {@code maxPendingTasks}.
109      * @param maxTaskProcessingQuantumMs    the maximum number of milliseconds that will be spent to run tasks before
110      *                                      trying to run IO again.
111      */
112     public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
113                                    IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
114                                    RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
115         super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
116         this.maxTaskProcessingQuantumNs =
117                 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
118                         DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
119                         TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
120         this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
121     }
122 
123     /**
124      *  Creates a new instance
125      *
126      * @param parent                        the parent that holds this {@link IoEventLoop}.
127      * @param ioHandlerFactory              the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
128      *                                      to handle IO.
129      * @param maxPendingTasks               the maximum pending tasks that are allowed before
130      *                                      {@link RejectedExecutionHandler#rejected(Runnable,
131      *                                          SingleThreadEventExecutor)}
132      *                                      is called to handle it.
133      * @param rejectedExecutionHandler      the {@link RejectedExecutionHandler} that handles when more tasks are added
134      *                                      then allowed per {@code maxPendingTasks}.
135      * @param maxTaskProcessingQuantumMs    the maximum number of milliseconds that will be spent to run tasks before
136      *                                      trying to run IO again.
137      */
138     public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
139                                    IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
140                                    RejectedExecutionHandler rejectedExecutionHandler,
141                                    long maxTaskProcessingQuantumMs) {
142         super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
143         this.maxTaskProcessingQuantumNs =
144                 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
145                         DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
146                         TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
147         this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
148     }
149 
150     /**
151      *
152      *  Creates a new instance
153      *
154      * @param parent                    the parent that holds this {@link IoEventLoop}.
155      * @param executor                  the {@link Executor} that is used for dispatching the work.
156      * @param ioHandlerFactory          the {@link IoHandlerFactory} that should be used to obtain {@link IoHandler}
157      *                                  to handle IO.
158      * @param taskQueue                 the {@link Queue} used for storing pending tasks.
159      * @param tailTaskQueue             the {@link Queue} used for storing tail pending tasks.
160      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
161      *                                  then allowed.
162      */
163     protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
164                                       IoHandlerFactory ioHandlerFactory, Queue<Runnable> taskQueue,
165                                       Queue<Runnable> tailTaskQueue,
166                                       RejectedExecutionHandler rejectedExecutionHandler) {
167         super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
168         this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
169         this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
170     }
171 
172     @Override
173     protected void run() {
174         assert inEventLoop();
175         ioHandler.initialize();
176         do {
177             runIo();
178             if (isShuttingDown()) {
179                 ioHandler.prepareToDestroy();
180             }
181             // Now run all tasks for the maximum configured amount of time before trying to run IO again.
182             runAllTasks(maxTaskProcessingQuantumNs);
183 
184             // We should continue with our loop until we either confirmed a shutdown or we can suspend it.
185         } while (!confirmShutdown() && !canSuspend());
186     }
187 
188     protected final IoHandler ioHandler() {
189         return ioHandler;
190     }
191 
192     @Override
193     protected boolean canSuspend(int state) {
194         // We should only allow to suspend if there are no registrations on this loop atm.
195         return super.canSuspend(state) && numRegistrations.get() == 0;
196     }
197 
198     /**
199      * Called when IO will be processed for all the {@link IoHandle}s on this {@link SingleThreadIoEventLoop}.
200      * This method returns the number of {@link IoHandle}s for which IO was processed.
201      *
202      * This method must be called from the {@link EventLoop} thread.
203      */
204     protected int runIo() {
205         assert inEventLoop();
206         return ioHandler.run(context);
207     }
208 
209     @Override
210     public IoEventLoop next() {
211         return this;
212     }
213 
214     @Override
215     public final Future<IoRegistration> register(final IoHandle handle) {
216         Promise<IoRegistration> promise = newPromise();
217         if (inEventLoop()) {
218             registerForIo0(handle, promise);
219         } else {
220             execute(() -> registerForIo0(handle, promise));
221         }
222 
223         return promise;
224     }
225 
226     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
227         assert inEventLoop();
228         final IoRegistration registration;
229         try {
230             registration = ioHandler.register(handle);
231         } catch (Exception e) {
232             promise.setFailure(e);
233             return;
234         }
235         numRegistrations.incrementAndGet();
236         promise.setSuccess(new IoRegistrationWrapper(registration));
237     }
238 
239     @Override
240     protected final void wakeup(boolean inEventLoop) {
241         ioHandler.wakeup();
242     }
243 
244     @Override
245     protected final void cleanup() {
246         assert inEventLoop();
247         ioHandler.destroy();
248     }
249 
250     @Override
251     public boolean isCompatible(Class<? extends IoHandle> handleType) {
252         return ioHandler.isCompatible(handleType);
253     }
254 
255     @Override
256     public boolean isIoType(Class<? extends IoHandler> handlerType) {
257         return ioHandler.getClass().equals(handlerType);
258     }
259 
260     private final class IoRegistrationWrapper implements IoRegistration {
261         private final IoRegistration registration;
262         IoRegistrationWrapper(IoRegistration registration) {
263             this.registration = registration;
264         }
265 
266         @Override
267         public <T> T attachment() {
268             return registration.attachment();
269         }
270 
271         @Override
272         public long submit(IoOps ops) {
273             return registration.submit(ops);
274         }
275 
276         @Override
277         public boolean isValid() {
278             return registration.isValid();
279         }
280 
281         @Override
282         public boolean cancel() {
283             if (registration.cancel()) {
284                 numRegistrations.decrementAndGet();
285                 return true;
286             }
287             return false;
288         }
289     }
290 }