1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.Future;
19 import io.netty.util.concurrent.Promise;
20 import io.netty.util.concurrent.RejectedExecutionHandler;
21 import io.netty.util.concurrent.SingleThreadEventExecutor;
22 import io.netty.util.internal.ObjectUtil;
23 import io.netty.util.internal.PlatformDependent;
24 import io.netty.util.internal.SystemPropertyUtil;
25
26 import java.util.Queue;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ThreadFactory;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32
33
34
35
36 public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
37
38
39 private static final long DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS = TimeUnit.MILLISECONDS.toNanos(Math.max(100,
40 SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskProcessingQuantumMs", 1000)));
41
42 private final long maxTaskProcessingQuantumNs;
43 private final IoHandlerContext context = new IoHandlerContext() {
44 @Override
45 public boolean canBlock() {
46 assert inEventLoop();
47 return !hasTasks() && !hasScheduledTasks();
48 }
49
50 @Override
51 public long delayNanos(long currentTimeNanos) {
52 assert inEventLoop();
53 return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
54 }
55
56 @Override
57 public long deadlineNanos() {
58 assert inEventLoop();
59 return SingleThreadIoEventLoop.this.deadlineNanos();
60 }
61
62 @Override
63 public void reportActiveIoTime(long activeNanos) {
64 SingleThreadIoEventLoop.this.reportActiveIoTime(activeNanos);
65 }
66
67 @Override
68 public boolean shouldReportActiveIoTime() {
69 return isSuspensionSupported();
70 }
71 };
72
73 private final IoHandler ioHandler;
74
75 private final AtomicInteger numRegistrations = new AtomicInteger();
76
77
78
79
80
81
82
83
84
85 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
86 IoHandlerFactory ioHandlerFactory) {
87 super(parent, threadFactory, false,
88 ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported());
89 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
90 this.ioHandler = ioHandlerFactory.newHandler(this);
91 }
92
93
94
95
96
97
98
99
100
101 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandlerFactory ioHandlerFactory) {
102 super(parent, executor, false,
103 ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported());
104 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
105 this.ioHandler = ioHandlerFactory.newHandler(this);
106 }
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
126 IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
127 RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
128 super(parent, threadFactory, false,
129 ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported(),
130 maxPendingTasks, rejectedExecutionHandler);
131 this.maxTaskProcessingQuantumNs =
132 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
133 DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
134 TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
135 this.ioHandler = ioHandlerFactory.newHandler(this);
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
154 IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
155 RejectedExecutionHandler rejectedExecutionHandler,
156 long maxTaskProcessingQuantumMs) {
157 super(parent, executor, false,
158 ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported(),
159 maxPendingTasks, rejectedExecutionHandler);
160 this.maxTaskProcessingQuantumNs =
161 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
162 DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
163 TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
164 this.ioHandler = ioHandlerFactory.newHandler(this);
165 }
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180 protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
181 IoHandlerFactory ioHandlerFactory, Queue<Runnable> taskQueue,
182 Queue<Runnable> tailTaskQueue,
183 RejectedExecutionHandler rejectedExecutionHandler) {
184 super(parent, executor, false,
185 ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").isChangingThreadSupported(),
186 taskQueue, tailTaskQueue, rejectedExecutionHandler);
187 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
188 this.ioHandler = ioHandlerFactory.newHandler(this);
189 }
190
191 @Override
192 protected void run() {
193 assert inEventLoop();
194 ioHandler.initialize();
195 do {
196 runIo();
197 if (isShuttingDown()) {
198 ioHandler.prepareToDestroy();
199 }
200
201 runAllTasks(maxTaskProcessingQuantumNs);
202
203
204 } while (!confirmShutdown() && !canSuspend());
205 }
206
207 protected final IoHandler ioHandler() {
208 return ioHandler;
209 }
210
211 @Override
212 protected boolean canSuspend(int state) {
213
214 return super.canSuspend(state) && numRegistrations.get() == 0;
215 }
216
217
218
219
220
221
222
223 protected int runIo() {
224 assert inEventLoop();
225 return ioHandler.run(context);
226 }
227
228 @Override
229 public IoEventLoop next() {
230 return this;
231 }
232
233 @Override
234 public final Future<IoRegistration> register(final IoHandle handle) {
235 Promise<IoRegistration> promise = newPromise();
236 if (inEventLoop()) {
237 registerForIo0(handle, promise);
238 } else {
239 execute(() -> registerForIo0(handle, promise));
240 }
241
242 return promise;
243 }
244
245 @Override
246 protected int getNumOfRegisteredChannels() {
247 return numRegistrations.get();
248 }
249
250 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
251 assert inEventLoop();
252 final IoRegistration registration;
253 try {
254 registration = ioHandler.register(handle);
255 } catch (Exception e) {
256 promise.setFailure(e);
257 return;
258 }
259 numRegistrations.incrementAndGet();
260 promise.setSuccess(new IoRegistrationWrapper(registration));
261 }
262
263 @Override
264 protected final void wakeup(boolean inEventLoop) {
265 ioHandler.wakeup();
266 }
267
268 @Override
269 protected final void cleanup() {
270 assert inEventLoop();
271 ioHandler.destroy();
272 }
273
274 @Override
275 public boolean isCompatible(Class<? extends IoHandle> handleType) {
276 return ioHandler.isCompatible(handleType);
277 }
278
279 @Override
280 public boolean isIoType(Class<? extends IoHandler> handlerType) {
281 return ioHandler.getClass().equals(handlerType);
282 }
283
284 @Override
285 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
286 return newTaskQueue0(maxPendingTasks);
287 }
288
289 protected static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
290
291 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
292 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
293 }
294
295 private final class IoRegistrationWrapper implements IoRegistration {
296 private final IoRegistration registration;
297 IoRegistrationWrapper(IoRegistration registration) {
298 this.registration = registration;
299 }
300
301 @Override
302 public <T> T attachment() {
303 return registration.attachment();
304 }
305
306 @Override
307 public long submit(IoOps ops) {
308 return registration.submit(ops);
309 }
310
311 @Override
312 public boolean isValid() {
313 return registration.isValid();
314 }
315
316 @Override
317 public boolean cancel() {
318 if (registration.cancel()) {
319 numRegistrations.decrementAndGet();
320 return true;
321 }
322 return false;
323 }
324 }
325 }