1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.Future;
19 import io.netty.util.concurrent.Promise;
20 import io.netty.util.concurrent.RejectedExecutionHandler;
21 import io.netty.util.concurrent.SingleThreadEventExecutor;
22 import io.netty.util.internal.ObjectUtil;
23 import io.netty.util.internal.SystemPropertyUtil;
24
25 import java.util.Queue;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ThreadFactory;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicInteger;
30
31
32
33
34
35 public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
36
37
38 private static final long DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS = TimeUnit.MILLISECONDS.toNanos(Math.max(100,
39 SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskProcessingQuantumMs", 1000)));
40
41 private final long maxTaskProcessingQuantumNs;
42 private final IoHandlerContext context = new IoHandlerContext() {
43 @Override
44 public boolean canBlock() {
45 assert inEventLoop();
46 return !hasTasks() && !hasScheduledTasks();
47 }
48
49 @Override
50 public long delayNanos(long currentTimeNanos) {
51 assert inEventLoop();
52 return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
53 }
54
55 @Override
56 public long deadlineNanos() {
57 assert inEventLoop();
58 return SingleThreadIoEventLoop.this.deadlineNanos();
59 }
60 };
61
62 private final IoHandler ioHandler;
63
64 private final AtomicInteger numRegistrations = new AtomicInteger();
65
66
67
68
69
70
71
72
73
74 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
75 IoHandlerFactory ioHandlerFactory) {
76 super(parent, threadFactory, false, true);
77 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
78 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
79 }
80
81
82
83
84
85
86
87
88
89 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandlerFactory ioHandlerFactory) {
90 super(parent, executor, false, true);
91 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
92 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
113 IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
114 RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
115 super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
116 this.maxTaskProcessingQuantumNs =
117 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
118 DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS : maxTaskProcessingQuantumMs;
119 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
120 }
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
138 IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
139 RejectedExecutionHandler rejectedExecutionHandler,
140 long maxTaskProcessingQuantumMs) {
141 super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
142 this.maxTaskProcessingQuantumNs =
143 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
144 DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS : maxTaskProcessingQuantumMs;
145 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
146 }
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161 protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
162 IoHandlerFactory ioHandlerFactory, Queue<Runnable> taskQueue,
163 Queue<Runnable> tailTaskQueue,
164 RejectedExecutionHandler rejectedExecutionHandler) {
165 super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
166 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
167 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
168 }
169
170 @Override
171 protected void run() {
172 assert inEventLoop();
173 ioHandler.initialize();
174 do {
175 runIo();
176 if (isShuttingDown()) {
177 ioHandler.prepareToDestroy();
178 }
179
180 runAllTasks(maxTaskProcessingQuantumNs);
181
182
183 } while (!confirmShutdown() && !canSuspend());
184 }
185
186 protected final IoHandler ioHandler() {
187 return ioHandler;
188 }
189
190 @Override
191 protected boolean canSuspend(int state) {
192
193 return super.canSuspend(state) && numRegistrations.get() == 0;
194 }
195
196
197
198
199
200
201
202 protected int runIo() {
203 assert inEventLoop();
204 return ioHandler.run(context);
205 }
206
207 @Override
208 public IoEventLoop next() {
209 return this;
210 }
211
212 @Override
213 public final Future<IoRegistration> register(final IoHandle handle) {
214 Promise<IoRegistration> promise = newPromise();
215 if (inEventLoop()) {
216 registerForIo0(handle, promise);
217 } else {
218 execute(() -> registerForIo0(handle, promise));
219 }
220
221 return promise;
222 }
223
224 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
225 assert inEventLoop();
226 final IoRegistration registration;
227 try {
228 registration = ioHandler.register(handle);
229 } catch (Exception e) {
230 promise.setFailure(e);
231 return;
232 }
233 numRegistrations.incrementAndGet();
234 promise.setSuccess(new IoRegistrationWrapper(registration));
235 }
236
237 @Override
238 protected final void wakeup(boolean inEventLoop) {
239 ioHandler.wakeup();
240 }
241
242 @Override
243 protected final void cleanup() {
244 assert inEventLoop();
245 ioHandler.destroy();
246 }
247
248 @Override
249 public boolean isCompatible(Class<? extends IoHandle> handleType) {
250 return ioHandler.isCompatible(handleType);
251 }
252
253 @Override
254 public boolean isIoType(Class<? extends IoHandler> handlerType) {
255 return ioHandler.getClass().equals(handlerType);
256 }
257
258 private final class IoRegistrationWrapper implements IoRegistration {
259 private final IoRegistration registration;
260 IoRegistrationWrapper(IoRegistration registration) {
261 this.registration = registration;
262 }
263
264 @Override
265 public <T> T attachment() {
266 return registration.attachment();
267 }
268
269 @Override
270 public long submit(IoOps ops) {
271 return registration.submit(ops);
272 }
273
274 @Override
275 public boolean isValid() {
276 return registration.isValid();
277 }
278
279 @Override
280 public boolean cancel() {
281 if (registration.cancel()) {
282 numRegistrations.decrementAndGet();
283 return true;
284 }
285 return false;
286 }
287 }
288 }