1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.Future;
19 import io.netty.util.concurrent.Promise;
20 import io.netty.util.concurrent.RejectedExecutionHandler;
21 import io.netty.util.concurrent.SingleThreadEventExecutor;
22 import io.netty.util.internal.ObjectUtil;
23 import io.netty.util.internal.SystemPropertyUtil;
24
25 import java.util.Queue;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31
32
33
34
35 public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
36
37
38 private static final long DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS = TimeUnit.MILLISECONDS.toNanos(Math.max(100,
39 SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskProcessingQuantumMs", 1000)));
40
41 private final long maxTaskProcessingQuantumNs;
42 private final IoHandlerContext context = new IoHandlerContext() {
43 @Override
44 public boolean canBlock() {
45 assert inEventLoop();
46 return !hasTasks() && !hasScheduledTasks();
47 }
48
49 @Override
50 public long delayNanos(long currentTimeNanos) {
51 assert inEventLoop();
52 return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
53 }
54
55 @Override
56 public long deadlineNanos() {
57 assert inEventLoop();
58 return SingleThreadIoEventLoop.this.deadlineNanos();
59 }
60 };
61
62 private final IoHandler ioHandler;
63
64 private final AtomicInteger numRegistrations = new AtomicInteger();
65
66
67
68
69
70
71
72
73
74 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
75 IoHandlerFactory ioHandlerFactory) {
76 super(parent, threadFactory, false, true);
77 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
78 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
79 }
80
81
82
83
84
85
86
87
88
89 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandlerFactory ioHandlerFactory) {
90 super(parent, executor, false, true);
91 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
92 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
113 IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
114 RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
115 super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
116 this.maxTaskProcessingQuantumNs =
117 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
118 DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
119 TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
120 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
139 IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
140 RejectedExecutionHandler rejectedExecutionHandler,
141 long maxTaskProcessingQuantumMs) {
142 super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
143 this.maxTaskProcessingQuantumNs =
144 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
145 DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
146 TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
147 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
148 }
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163 protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
164 IoHandlerFactory ioHandlerFactory, Queue<Runnable> taskQueue,
165 Queue<Runnable> tailTaskQueue,
166 RejectedExecutionHandler rejectedExecutionHandler) {
167 super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
168 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
169 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
170 }
171
172 @Override
173 protected void run() {
174 assert inEventLoop();
175 ioHandler.initialize();
176 do {
177 runIo();
178 if (isShuttingDown()) {
179 ioHandler.prepareToDestroy();
180 }
181
182 runAllTasks(maxTaskProcessingQuantumNs);
183
184
185 } while (!confirmShutdown() && !canSuspend());
186 }
187
188 protected final IoHandler ioHandler() {
189 return ioHandler;
190 }
191
192 @Override
193 protected boolean canSuspend(int state) {
194
195 return super.canSuspend(state) && numRegistrations.get() == 0;
196 }
197
198
199
200
201
202
203
204 protected int runIo() {
205 assert inEventLoop();
206 return ioHandler.run(context);
207 }
208
209 @Override
210 public IoEventLoop next() {
211 return this;
212 }
213
214 @Override
215 public final Future<IoRegistration> register(final IoHandle handle) {
216 Promise<IoRegistration> promise = newPromise();
217 if (inEventLoop()) {
218 registerForIo0(handle, promise);
219 } else {
220 execute(() -> registerForIo0(handle, promise));
221 }
222
223 return promise;
224 }
225
226 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
227 assert inEventLoop();
228 final IoRegistration registration;
229 try {
230 registration = ioHandler.register(handle);
231 } catch (Exception e) {
232 promise.setFailure(e);
233 return;
234 }
235 numRegistrations.incrementAndGet();
236 promise.setSuccess(new IoRegistrationWrapper(registration));
237 }
238
239 @Override
240 protected final void wakeup(boolean inEventLoop) {
241 ioHandler.wakeup();
242 }
243
244 @Override
245 protected final void cleanup() {
246 assert inEventLoop();
247 ioHandler.destroy();
248 }
249
250 @Override
251 public boolean isCompatible(Class<? extends IoHandle> handleType) {
252 return ioHandler.isCompatible(handleType);
253 }
254
255 @Override
256 public boolean isIoType(Class<? extends IoHandler> handlerType) {
257 return ioHandler.getClass().equals(handlerType);
258 }
259
260 private final class IoRegistrationWrapper implements IoRegistration {
261 private final IoRegistration registration;
262 IoRegistrationWrapper(IoRegistration registration) {
263 this.registration = registration;
264 }
265
266 @Override
267 public <T> T attachment() {
268 return registration.attachment();
269 }
270
271 @Override
272 public long submit(IoOps ops) {
273 return registration.submit(ops);
274 }
275
276 @Override
277 public boolean isValid() {
278 return registration.isValid();
279 }
280
281 @Override
282 public boolean cancel() {
283 if (registration.cancel()) {
284 numRegistrations.decrementAndGet();
285 return true;
286 }
287 return false;
288 }
289 }
290 }