View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel;
17  
18  import io.netty.util.concurrent.Future;
19  import io.netty.util.concurrent.Promise;
20  import io.netty.util.concurrent.RejectedExecutionHandler;
21  import io.netty.util.concurrent.SingleThreadEventExecutor;
22  import io.netty.util.internal.ObjectUtil;
23  import io.netty.util.internal.SystemPropertyUtil;
24  
25  import java.util.Queue;
26  import java.util.concurrent.Executor;
27  import java.util.concurrent.ThreadFactory;
28  
29  /**
30   * {@link IoEventLoop} implementation that execute all its submitted tasks in a single thread using the provided
31   * {@link IoHandler}.
32   */
33  public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
34  
35      // TODO: Is this a sensible default ?
36      protected static final int DEFAULT_MAX_TASKS_PER_RUN = Math.max(1,
37              SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskPerRun", 1024 * 4));
38  
39      private final int maxTasksPerRun = DEFAULT_MAX_TASKS_PER_RUN;
40      private final IoExecutionContext context = new IoExecutionContext() {
41          @Override
42          public boolean canBlock() {
43              assert inEventLoop();
44              return !hasTasks() && !hasScheduledTasks();
45          }
46  
47          @Override
48          public long delayNanos(long currentTimeNanos) {
49              assert inEventLoop();
50              return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
51          }
52  
53          @Override
54          public long deadlineNanos() {
55              assert inEventLoop();
56              return SingleThreadIoEventLoop.this.deadlineNanos();
57          }
58      };
59  
60      private final IoHandler ioHandler;
61  
62      /**
63       *  Creates a new instance
64       *
65       * @param parent            the parent that holds this {@link IoEventLoop}.
66       * @param ioHandler         the {@link IoHandler} used to run all IO.
67       * @param threadFactory     the {@link ThreadFactory} that is used to create the underlying {@link Thread}.
68       */
69      public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
70                                     IoHandler ioHandler) {
71          super(parent, threadFactory, false);
72          this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
73      }
74  
75      /**
76       *  Creates a new instance
77       *
78       * @param parent            the parent that holds this {@link IoEventLoop}.
79       * @param executor          the {@link Executor} that is used for dispatching the work.
80       * @param ioHandler         the {@link IoHandler} used to run all IO.
81       */
82      public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandler ioHandler) {
83          super(parent, executor, false);
84          this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
85      }
86  
87      /**
88       *  Creates a new instance
89       *
90       * @param parent                    the parent that holds this {@link IoEventLoop}.
91       * @param threadFactory             the {@link ThreadFactory} that is used to create the underlying {@link Thread}.
92       * @param ioHandler                 the {@link IoHandler} used to run all IO.
93       * @param maxPendingTasks           the maximum pending tasks that are allowed before
94       *                                  {@link RejectedExecutionHandler#rejected(Runnable, SingleThreadEventExecutor)}
95       *                                  is called to handle it.
96       * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
97       *                                  then allowed per {@code maxPendingTasks}.
98       */
99      public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
100                                    IoHandler ioHandler, int maxPendingTasks,
101                                    RejectedExecutionHandler rejectedExecutionHandler) {
102         super(parent, threadFactory, false, maxPendingTasks, rejectedExecutionHandler);
103         this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
104     }
105 
106     /**
107      *  Creates a new instance
108      *
109      * @param parent                    the parent that holds this {@link IoEventLoop}.
110      * @param ioHandler                 the {@link IoHandler} used to run all IO.
111      * @param executor                  the {@link Executor} that is used for dispatching the work.
112      * @param maxPendingTasks           the maximum pending tasks that are allowed before
113      *                                  {@link RejectedExecutionHandler#rejected(Runnable, SingleThreadEventExecutor)}
114      *                                  is called to handle it.
115      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
116      *                                  then allowed per {@code maxPendingTasks}.
117      */
118     public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
119                                    IoHandler ioHandler, int maxPendingTasks,
120                                    RejectedExecutionHandler rejectedExecutionHandler) {
121         super(parent, executor, false, maxPendingTasks, rejectedExecutionHandler);
122         this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
123     }
124 
125     /**
126      *
127      *  Creates a new instance
128      *
129      * @param parent                    the parent that holds this {@link IoEventLoop}.
130      * @param executor                  the {@link Executor} that is used for dispatching the work.
131      * @param ioHandler                 the {@link IoHandler} used to run all IO.
132      * @param taskQueue                 the {@link Queue} used for storing pending tasks.
133      * @param tailTaskQueue             the {@link Queue} used for storing tail pending tasks.
134      * @param rejectedExecutionHandler  the {@link RejectedExecutionHandler} that handles when more tasks are added
135      *                                  then allowed.
136      */
137     protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
138                                       IoHandler ioHandler, Queue<Runnable> taskQueue,
139                                       Queue<Runnable> tailTaskQueue,
140 
141                                       RejectedExecutionHandler rejectedExecutionHandler) {
142         super(parent, executor, false, taskQueue, tailTaskQueue, rejectedExecutionHandler);
143         this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
144     }
145 
146     @Override
147     protected void run() {
148         assert inEventLoop();
149         do {
150             runIo();
151             if (isShuttingDown()) {
152                 ioHandler.prepareToDestroy();
153             }
154             runAllTasks(maxTasksPerRun);
155         } while (!confirmShutdown());
156     }
157 
158     protected final IoHandler ioHandler() {
159         return ioHandler;
160     }
161 
162     /**
163      * Called when IO will be processed for all the {@link IoHandle}s on this {@link SingleThreadIoEventLoop}.
164      * This method returns the number of {@link IoHandle}s for which IO was processed.
165      *
166      * This method must be called from the {@link EventLoop} thread.
167      */
168     protected int runIo() {
169         assert inEventLoop();
170         return ioHandler.run(context);
171     }
172 
173     @Override
174     public IoEventLoop next() {
175         return this;
176     }
177 
178     @Override
179     public final Future<IoRegistration> register(final IoHandle handle) {
180         Promise<IoRegistration> promise = newPromise();
181         if (inEventLoop()) {
182             registerForIo0(handle, promise);
183         } else {
184             execute(() -> registerForIo0(handle, promise));
185         }
186 
187         return promise;
188     }
189 
190     private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
191         assert inEventLoop();
192         final IoRegistration registration;
193         try {
194             registration = ioHandler.register(this, handle);
195         } catch (Exception e) {
196             promise.setFailure(e);
197             return;
198         }
199         promise.setSuccess(registration);
200     }
201 
202     @Override
203     protected final void wakeup(boolean inEventLoop) {
204         ioHandler.wakeup(this);
205     }
206 
207     @Override
208     protected final void cleanup() {
209         assert inEventLoop();
210         ioHandler.destroy();
211     }
212 
213     @Override
214     public boolean isCompatible(Class<? extends IoHandle> handleType) {
215         return ioHandler.isCompatible(handleType);
216     }
217 
218     @Override
219     public boolean isIoType(Class<? extends IoHandler> handlerType) {
220         return ioHandler.getClass().equals(handlerType);
221     }
222 }