View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel;
17  
18  import io.netty5.util.concurrent.Future;
19  import io.netty5.util.concurrent.Promise;
20  import io.netty5.util.concurrent.RejectedExecutionHandler;
21  import io.netty5.util.concurrent.RejectedExecutionHandlers;
22  import io.netty5.util.concurrent.SingleThreadEventExecutor;
23  import io.netty5.util.internal.PlatformDependent;
24  import io.netty5.util.internal.SystemPropertyUtil;
25  
26  import java.util.Queue;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ThreadFactory;
29  
30  import static io.netty5.util.internal.ObjectUtil.checkPositive;
31  import static java.util.Objects.requireNonNull;
32  
33  /**
34   * {@link EventLoop} that execute all its submitted tasks in a single thread and uses an {@link IoHandler} for
35   * IO processing.
36   */
37  public class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
38  
39      protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
40              SystemPropertyUtil.getInt("io.netty5.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
41  
42      // TODO: Is this a sensible default ?
43      protected static final int DEFAULT_MAX_TASKS_PER_RUN = Math.max(1,
44              SystemPropertyUtil.getInt("io.netty5.eventLoop.maxTaskPerRun", 1024 * 4));
45  
46      private final IoExecutionContext context = new IoExecutionContext() {
47          @Override
48          public boolean canBlock() {
49              assert inEventLoop();
50              return !hasTasks() && !hasScheduledTasks();
51          }
52  
53          @Override
54          public long delayNanos(long currentTimeNanos) {
55              assert inEventLoop();
56              return SingleThreadEventLoop.this.delayNanos(currentTimeNanos);
57          }
58  
59          @Override
60          public long deadlineNanos() {
61              assert inEventLoop();
62              return SingleThreadEventLoop.this.deadlineNanos();
63          }
64      };
65  
66      private final IoHandler ioHandler;
67      private final int maxTasksPerRun;
68  
69      /**
70       * Create a new instance
71       *
72       * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
73       * @param ioHandler         the {@link IoHandler} to use.
74       */
75      public SingleThreadEventLoop(ThreadFactory threadFactory, IoHandler ioHandler) {
76          this(threadFactory, ioHandler, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
77      }
78  
79      /**
80       * Create a new instance
81       *
82       * @param executor          the {@link Executor} which will be used to run this {@link EventLoop}.
83       * @param ioHandler         the {@link IoHandler} to use.
84       */
85      public SingleThreadEventLoop(Executor executor, IoHandler ioHandler) {
86          this(executor, ioHandler, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
87      }
88  
89      /**
90       * Create a new instance
91       *
92       * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
93       * @param ioHandler         the {@link IoHandler} to use.
94       * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
95       * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
96       */
97      public SingleThreadEventLoop(ThreadFactory threadFactory,
98                                   IoHandler ioHandler, int maxPendingTasks,
99                                   RejectedExecutionHandler rejectedHandler) {
100         this(threadFactory, ioHandler, maxPendingTasks, rejectedHandler, DEFAULT_MAX_TASKS_PER_RUN);
101     }
102 
103     /**
104      * Create a new instance
105      *
106      * @param executor          the {@link Executor} which will be used to run this {@link EventLoop}.
107      * @param ioHandler         the {@link IoHandler} to use.
108      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
109      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
110      */
111     public SingleThreadEventLoop(Executor executor,
112                                  IoHandler ioHandler, int maxPendingTasks,
113                                  RejectedExecutionHandler rejectedHandler) {
114         this(executor, ioHandler, maxPendingTasks, rejectedHandler, DEFAULT_MAX_TASKS_PER_RUN);
115     }
116 
117     /**
118      * Create a new instance
119      *
120      * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread}
121      * @param ioHandler         the {@link IoHandler} to use.
122      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
123      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
124      * @param maxTasksPerRun    the maximum number of tasks per {@link EventLoop} run that will be processed
125      *                          before trying to handle IO again.
126      */
127     public SingleThreadEventLoop(ThreadFactory threadFactory,
128                                  IoHandler ioHandler, int maxPendingTasks,
129                                  RejectedExecutionHandler rejectedHandler, int maxTasksPerRun) {
130         super(threadFactory, maxPendingTasks, rejectedHandler);
131         this.ioHandler = requireNonNull(ioHandler, "ioHandler");
132         this.maxTasksPerRun = checkPositive(maxTasksPerRun, "maxTasksPerRun");
133     }
134 
135     /**
136      * Create a new instance
137      *
138      * @param executor          the {@link Executor} which will be used to run this {@link EventLoop}.
139      * @param ioHandler         the {@link IoHandler} to use.
140      * @param maxPendingTasks   the maximum number of pending tasks before new tasks will be rejected.
141      * @param rejectedHandler   the {@link RejectedExecutionHandler} to use.
142      * @param maxTasksPerRun    the maximum number of tasks per {@link EventLoop} run that will be processed
143      *                          before trying to handle IO again.
144      */
145     public SingleThreadEventLoop(Executor executor,
146                                  IoHandler ioHandler, int maxPendingTasks,
147                                  RejectedExecutionHandler rejectedHandler, int maxTasksPerRun) {
148         super(executor, maxPendingTasks, rejectedHandler);
149         this.ioHandler = requireNonNull(ioHandler, "ioHandler");
150         this.maxTasksPerRun = checkPositive(maxTasksPerRun, "maxTasksPerRun");
151     }
152 
153     @Override
154     protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
155         // This event loop never calls takeTask()
156         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue()
157                 : PlatformDependent.newMpscQueue(maxPendingTasks);
158     }
159 
160     @Override
161     protected final boolean wakesUpForTask(Runnable task) {
162         return !(task instanceof NonWakeupRunnable);
163     }
164 
165     /**
166      * Marker interface for {@link Runnable} that will not trigger an {@link #wakeup(boolean)} in all cases.
167      */
168     interface NonWakeupRunnable extends Runnable { }
169 
170     // Methods that a user can override to easily add instrumentation and other things.
171 
172     @Override
173     protected void run() {
174         assert inEventLoop();
175         do {
176             runIO();
177             if (isShuttingDown()) {
178                 ioHandler.prepareToDestroy();
179             }
180             runAllTasks(maxTasksPerRun);
181         } while (!confirmShutdown());
182     }
183 
184     /**
185      * Called when IO will be processed for all the {@link Channel}s on this {@link SingleThreadEventLoop}.
186      * This method returns the number of {@link Channel}s for which IO was processed.
187      *
188      * This method must be called from the {@link EventLoop} thread.
189      */
190     protected int runIO() {
191         assert inEventLoop();
192         return ioHandler.run(context);
193     }
194 
195     @Override
196     public final Future<Void> registerForIo(IoHandle handle) {
197         Promise<Void> promise = newPromise();
198         if (inEventLoop()) {
199             registerForIO0(handle, promise);
200         } else {
201             execute(() -> registerForIO0(handle, promise));
202         }
203         return promise.asFuture();
204     }
205 
206     private void registerForIO0(IoHandle handle, Promise<Void> promise) {
207         try {
208             if (handle.isRegistered()) {
209                 throw new IllegalStateException("IoHandle already registered");
210             }
211 
212             checkInEventLoopIfPossible(handle);
213 
214             ioHandler.register(handle);
215         } catch (Throwable cause) {
216             promise.setFailure(cause);
217             return;
218         }
219         promise.setSuccess(null);
220     }
221 
222     @Override
223     public final Future<Void> deregisterForIo(IoHandle handle) {
224        Promise<Void> promise = newPromise();
225        if (inEventLoop()) {
226            deregisterForIO(handle, promise);
227        } else {
228            execute(() -> deregisterForIO(handle, promise));
229        }
230        return promise.asFuture();
231     }
232 
233     private void deregisterForIO(IoHandle handle, Promise<Void> promise) {
234         try {
235             if (!handle.isRegistered()) {
236                 throw new IllegalStateException("Channel not registered");
237             }
238             checkInEventLoopIfPossible(handle);
239 
240             ioHandler.deregister(handle);
241         } catch (Throwable cause) {
242             promise.setFailure(cause);
243             return;
244         }
245         promise.setSuccess(null);
246     }
247 
248     private static void checkInEventLoopIfPossible(IoHandle handle) {
249         if (handle instanceof Channel && !((Channel) handle).executor().inEventLoop()) {
250             throw new IllegalStateException("Channel.executor() is not using the same Thread as this EventLoop");
251         }
252     }
253 
254     @Override
255     protected final void wakeup(boolean inEventLoop) {
256         ioHandler.wakeup(inEventLoop);
257     }
258 
259     @Override
260     protected final void cleanup() {
261         assert inEventLoop();
262         ioHandler.destroy();
263     }
264 
265     @Override
266     public boolean isCompatible(Class<? extends IoHandle> handleType) {
267         return ioHandler.isCompatible(handleType);
268     }
269 }