1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel;
17
18 import io.netty5.util.concurrent.Future;
19 import io.netty5.util.concurrent.Promise;
20 import io.netty5.util.concurrent.RejectedExecutionHandler;
21 import io.netty5.util.concurrent.RejectedExecutionHandlers;
22 import io.netty5.util.concurrent.SingleThreadEventExecutor;
23 import io.netty5.util.internal.PlatformDependent;
24 import io.netty5.util.internal.SystemPropertyUtil;
25
26 import java.util.Queue;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ThreadFactory;
29
30 import static io.netty5.util.internal.ObjectUtil.checkPositive;
31 import static java.util.Objects.requireNonNull;
32
33
34
35
36
37 public class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
38
39 protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
40 SystemPropertyUtil.getInt("io.netty5.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
41
42
43 protected static final int DEFAULT_MAX_TASKS_PER_RUN = Math.max(1,
44 SystemPropertyUtil.getInt("io.netty5.eventLoop.maxTaskPerRun", 1024 * 4));
45
46 private final IoExecutionContext context = new IoExecutionContext() {
47 @Override
48 public boolean canBlock() {
49 assert inEventLoop();
50 return !hasTasks() && !hasScheduledTasks();
51 }
52
53 @Override
54 public long delayNanos(long currentTimeNanos) {
55 assert inEventLoop();
56 return SingleThreadEventLoop.this.delayNanos(currentTimeNanos);
57 }
58
59 @Override
60 public long deadlineNanos() {
61 assert inEventLoop();
62 return SingleThreadEventLoop.this.deadlineNanos();
63 }
64 };
65
66 private final IoHandler ioHandler;
67 private final int maxTasksPerRun;
68
69
70
71
72
73
74
75 public SingleThreadEventLoop(ThreadFactory threadFactory, IoHandler ioHandler) {
76 this(threadFactory, ioHandler, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
77 }
78
79
80
81
82
83
84
85 public SingleThreadEventLoop(Executor executor, IoHandler ioHandler) {
86 this(executor, ioHandler, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
87 }
88
89
90
91
92
93
94
95
96
97 public SingleThreadEventLoop(ThreadFactory threadFactory,
98 IoHandler ioHandler, int maxPendingTasks,
99 RejectedExecutionHandler rejectedHandler) {
100 this(threadFactory, ioHandler, maxPendingTasks, rejectedHandler, DEFAULT_MAX_TASKS_PER_RUN);
101 }
102
103
104
105
106
107
108
109
110
111 public SingleThreadEventLoop(Executor executor,
112 IoHandler ioHandler, int maxPendingTasks,
113 RejectedExecutionHandler rejectedHandler) {
114 this(executor, ioHandler, maxPendingTasks, rejectedHandler, DEFAULT_MAX_TASKS_PER_RUN);
115 }
116
117
118
119
120
121
122
123
124
125
126
127 public SingleThreadEventLoop(ThreadFactory threadFactory,
128 IoHandler ioHandler, int maxPendingTasks,
129 RejectedExecutionHandler rejectedHandler, int maxTasksPerRun) {
130 super(threadFactory, maxPendingTasks, rejectedHandler);
131 this.ioHandler = requireNonNull(ioHandler, "ioHandler");
132 this.maxTasksPerRun = checkPositive(maxTasksPerRun, "maxTasksPerRun");
133 }
134
135
136
137
138
139
140
141
142
143
144
145 public SingleThreadEventLoop(Executor executor,
146 IoHandler ioHandler, int maxPendingTasks,
147 RejectedExecutionHandler rejectedHandler, int maxTasksPerRun) {
148 super(executor, maxPendingTasks, rejectedHandler);
149 this.ioHandler = requireNonNull(ioHandler, "ioHandler");
150 this.maxTasksPerRun = checkPositive(maxTasksPerRun, "maxTasksPerRun");
151 }
152
153 @Override
154 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
155
156 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue()
157 : PlatformDependent.newMpscQueue(maxPendingTasks);
158 }
159
160 @Override
161 protected final boolean wakesUpForTask(Runnable task) {
162 return !(task instanceof NonWakeupRunnable);
163 }
164
165
166
167
168 interface NonWakeupRunnable extends Runnable { }
169
170
171
172 @Override
173 protected void run() {
174 assert inEventLoop();
175 do {
176 runIO();
177 if (isShuttingDown()) {
178 ioHandler.prepareToDestroy();
179 }
180 runAllTasks(maxTasksPerRun);
181 } while (!confirmShutdown());
182 }
183
184
185
186
187
188
189
190 protected int runIO() {
191 assert inEventLoop();
192 return ioHandler.run(context);
193 }
194
195 @Override
196 public final Future<Void> registerForIo(IoHandle handle) {
197 Promise<Void> promise = newPromise();
198 if (inEventLoop()) {
199 registerForIO0(handle, promise);
200 } else {
201 execute(() -> registerForIO0(handle, promise));
202 }
203 return promise.asFuture();
204 }
205
206 private void registerForIO0(IoHandle handle, Promise<Void> promise) {
207 try {
208 if (handle.isRegistered()) {
209 throw new IllegalStateException("IoHandle already registered");
210 }
211
212 checkInEventLoopIfPossible(handle);
213
214 ioHandler.register(handle);
215 } catch (Throwable cause) {
216 promise.setFailure(cause);
217 return;
218 }
219 promise.setSuccess(null);
220 }
221
222 @Override
223 public final Future<Void> deregisterForIo(IoHandle handle) {
224 Promise<Void> promise = newPromise();
225 if (inEventLoop()) {
226 deregisterForIO(handle, promise);
227 } else {
228 execute(() -> deregisterForIO(handle, promise));
229 }
230 return promise.asFuture();
231 }
232
233 private void deregisterForIO(IoHandle handle, Promise<Void> promise) {
234 try {
235 if (!handle.isRegistered()) {
236 throw new IllegalStateException("Channel not registered");
237 }
238 checkInEventLoopIfPossible(handle);
239
240 ioHandler.deregister(handle);
241 } catch (Throwable cause) {
242 promise.setFailure(cause);
243 return;
244 }
245 promise.setSuccess(null);
246 }
247
248 private static void checkInEventLoopIfPossible(IoHandle handle) {
249 if (handle instanceof Channel && !((Channel) handle).executor().inEventLoop()) {
250 throw new IllegalStateException("Channel.executor() is not using the same Thread as this EventLoop");
251 }
252 }
253
254 @Override
255 protected final void wakeup(boolean inEventLoop) {
256 ioHandler.wakeup(inEventLoop);
257 }
258
259 @Override
260 protected final void cleanup() {
261 assert inEventLoop();
262 ioHandler.destroy();
263 }
264
265 @Override
266 public boolean isCompatible(Class<? extends IoHandle> handleType) {
267 return ioHandler.isCompatible(handleType);
268 }
269 }