View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.Future;
19  import io.netty.util.concurrent.FutureListener;
20  import io.netty.util.concurrent.Promise;
21  import io.netty.util.concurrent.RejectedExecutionHandler;
22  import io.netty.util.concurrent.SingleThreadEventExecutor;
23  import io.netty.util.internal.ObjectUtil;
24  import io.netty.util.internal.SystemPropertyUtil;
25  
26  import java.util.Queue;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ThreadFactory;
29  
30  /**
31   * {@link IoEventLoop} implementation that execute all its submitted tasks in a single thread using the provided
32   * {@link IoHandler}.
33   */
34  public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
35  
36      // TODO: Is this a sensible default ?
37      protected static final int DEFAULT_MAX_TASKS_PER_RUN = Math.max(1,
38              SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskPerRun", 1024 * 4));
39  
40      private final int maxTasksPerRun = DEFAULT_MAX_TASKS_PER_RUN;
41      private final IoExecutionContext context = new IoExecutionContext() {
42          @Override
43          public boolean canBlock() {
44              assert inEventLoop();
45              return !hasTasks() && !hasScheduledTasks();
46          }
47  
48          @Override
49          public long delayNanos(long currentTimeNanos) {
50              assert inEventLoop();
51              return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
52          }
53  
54          @Override
55          public long deadlineNanos() {
56              assert inEventLoop();
57              return SingleThreadIoEventLoop.this.deadlineNanos();
58          }
59      };
60  
61      private final IoHandler ioHandler;
62  
63      private int numRegistrations;
64      private final FutureListener<Object> decrementRegistrationListener = f -> {
65          assert inEventLoop();
66          numRegistrations--;
67      };
68  
69      /**
70       *  Creates a new instance
71       *
72       * @param parent            the parent that holds this {@link IoEventLoop}.
73       * @param ioHandler         the {@link IoHandler} used to run all IO.
74       * @param threadFactory     the {@link ThreadFactory} that is used to create the underlying {@link Thread}.
75       */
76      public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
77                                     IoHandler ioHandler) {
78          super(parent, threadFactory, false, true);
79          this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
80      }
81  
82      /**
83       *  Creates a new instance
84       *
85       * @param parent            the parent that holds this {@link IoEventLoop}.
86       * @param executor          the {@link Executor} that is used for dispatching the work.
87       * @param ioHandler         the {@link IoHandler} used to run all IO.
88       */
89      public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandler ioHandler) {
90          super(parent, executor, false, true);
91          this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
92      }
93  
94      /**
95       *  Creates a new instance
96       *
97       * @param parent                    the parent that holds this {@link IoEventLoop}.
98       * @param threadFactory             the {@link ThreadFactory} that is used to create the underlying {@link Thread}.
99       * @param ioHandler                 the {@link IoHandler} used to run all IO.
100      * @param maxPendingTasks           the maximum pending tasks that are allowed before
101      *                                  {@link RejectedExecutionHandler#rejected(Runnable, SingleThreadEventExecutor)}
102      *                                  is called to handle it.
103      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
104      *                                  then allowed per {@code maxPendingTasks}.
105      */
106     public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
107                                    IoHandler ioHandler, int maxPendingTasks,
108                                    RejectedExecutionHandler rejectedExecutionHandler) {
109         super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
110         this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
111     }
112 
113     /**
114      *  Creates a new instance
115      *
116      * @param parent                    the parent that holds this {@link IoEventLoop}.
117      * @param ioHandler                 the {@link IoHandler} used to run all IO.
118      * @param executor                  the {@link Executor} that is used for dispatching the work.
119      * @param maxPendingTasks           the maximum pending tasks that are allowed before
120      *                                  {@link RejectedExecutionHandler#rejected(Runnable, SingleThreadEventExecutor)}
121      *                                  is called to handle it.
122      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
123      *                                  then allowed per {@code maxPendingTasks}.
124      */
125     public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
126                                    IoHandler ioHandler, int maxPendingTasks,
127                                    RejectedExecutionHandler rejectedExecutionHandler) {
128         super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
129         this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
130     }
131 
132     /**
133      *
134      *  Creates a new instance
135      *
136      * @param parent                    the parent that holds this {@link IoEventLoop}.
137      * @param executor                  the {@link Executor} that is used for dispatching the work.
138      * @param ioHandler                 the {@link IoHandler} used to run all IO.
139      * @param taskQueue                 the {@link Queue} used for storing pending tasks.
140      * @param tailTaskQueue             the {@link Queue} used for storing tail pending tasks.
141      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
142      *                                  then allowed.
143      */
144     protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
145                                       IoHandler ioHandler, Queue<Runnable> taskQueue,
146                                       Queue<Runnable> tailTaskQueue,
147                                       RejectedExecutionHandler rejectedExecutionHandler) {
148         super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
149         this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
150     }
151 
152     @Override
153     protected void run() {
154         assert inEventLoop();
155         do {
156             runIo();
157             if (isShuttingDown()) {
158                 ioHandler.prepareToDestroy();
159             }
160             runAllTasks(maxTasksPerRun);
161 
162             // We should continue with our loop until we either confirmed a shutdown or we can suspend it.
163         } while (!confirmShutdown() && !canSuspend());
164     }
165 
166     protected final IoHandler ioHandler() {
167         return ioHandler;
168     }
169 
170     @Override
171     protected boolean canSuspend(int state) {
172         // We should only allow to suspend if there are no registrations on this loop atm.
173         return super.canSuspend(state) && numRegistrations == 0;
174     }
175 
176     /**
177      * Called when IO will be processed for all the {@link IoHandle}s on this {@link SingleThreadIoEventLoop}.
178      * This method returns the number of {@link IoHandle}s for which IO was processed.
179      *
180      * This method must be called from the {@link EventLoop} thread.
181      */
182     protected int runIo() {
183         assert inEventLoop();
184         return ioHandler.run(context);
185     }
186 
187     @Override
188     public IoEventLoop next() {
189         return this;
190     }
191 
192     @Override
193     public final Future<IoRegistration> register(final IoHandle handle) {
194         Promise<IoRegistration> promise = newPromise();
195         if (inEventLoop()) {
196             registerForIo0(handle, promise);
197         } else {
198             execute(() -> registerForIo0(handle, promise));
199         }
200 
201         return promise;
202     }
203 
204     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
205         assert inEventLoop();
206         final IoRegistration registration;
207         try {
208             registration = ioHandler.register(this, handle);
209         } catch (Exception e) {
210             promise.setFailure(e);
211             return;
212         }
213         registration.cancelFuture().addListener(decrementRegistrationListener);
214         numRegistrations++;
215         promise.setSuccess(registration);
216     }
217 
218     @Override
219     protected final void wakeup(boolean inEventLoop) {
220         ioHandler.wakeup(this);
221     }
222 
223     @Override
224     protected final void cleanup() {
225         assert inEventLoop();
226         ioHandler.destroy();
227     }
228 
229     @Override
230     public boolean isCompatible(Class<? extends IoHandle> handleType) {
231         return ioHandler.isCompatible(handleType);
232     }
233 
234     @Override
235     public boolean isIoType(Class<? extends IoHandler> handlerType) {
236         return ioHandler.getClass().equals(handlerType);
237     }
238 }