1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.Future;
19 import io.netty.util.concurrent.Promise;
20 import io.netty.util.concurrent.RejectedExecutionHandler;
21 import io.netty.util.concurrent.SingleThreadEventExecutor;
22 import io.netty.util.internal.ObjectUtil;
23 import io.netty.util.internal.PlatformDependent;
24 import io.netty.util.internal.SystemPropertyUtil;
25
26 import java.util.Queue;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ThreadFactory;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32
33
34
35
36 public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
37
38
39 private static final long DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS = TimeUnit.MILLISECONDS.toNanos(Math.max(100,
40 SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskProcessingQuantumMs", 1000)));
41
42 private final long maxTaskProcessingQuantumNs;
43 private final IoHandlerContext context = new IoHandlerContext() {
44 @Override
45 public boolean canBlock() {
46 assert inEventLoop();
47 return !hasTasks() && !hasScheduledTasks();
48 }
49
50 @Override
51 public long delayNanos(long currentTimeNanos) {
52 assert inEventLoop();
53 return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
54 }
55
56 @Override
57 public long deadlineNanos() {
58 assert inEventLoop();
59 return SingleThreadIoEventLoop.this.deadlineNanos();
60 }
61 };
62
63 private final IoHandler ioHandler;
64
65 private final AtomicInteger numRegistrations = new AtomicInteger();
66
67
68
69
70
71
72
73
74
75 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
76 IoHandlerFactory ioHandlerFactory) {
77 super(parent, threadFactory, false, true);
78 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
79 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
80 }
81
82
83
84
85
86
87
88
89
90 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandlerFactory ioHandlerFactory) {
91 super(parent, executor, false, true);
92 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
93 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
114 IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
115 RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
116 super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
117 this.maxTaskProcessingQuantumNs =
118 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
119 DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
120 TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
121 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
122 }
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
140 IoHandlerFactory ioHandlerFactory, int maxPendingTasks,
141 RejectedExecutionHandler rejectedExecutionHandler,
142 long maxTaskProcessingQuantumMs) {
143 super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
144 this.maxTaskProcessingQuantumNs =
145 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
146 DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS :
147 TimeUnit.MILLISECONDS.toNanos(maxTaskProcessingQuantumMs);
148 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164 protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
165 IoHandlerFactory ioHandlerFactory, Queue<Runnable> taskQueue,
166 Queue<Runnable> tailTaskQueue,
167 RejectedExecutionHandler rejectedExecutionHandler) {
168 super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
169 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
170 this.ioHandler = ObjectUtil.checkNotNull(ioHandlerFactory, "ioHandlerFactory").newHandler(this);
171 }
172
173 @Override
174 protected void run() {
175 assert inEventLoop();
176 ioHandler.initialize();
177 do {
178 runIo();
179 if (isShuttingDown()) {
180 ioHandler.prepareToDestroy();
181 }
182
183 runAllTasks(maxTaskProcessingQuantumNs);
184
185
186 } while (!confirmShutdown() && !canSuspend());
187 }
188
189 protected final IoHandler ioHandler() {
190 return ioHandler;
191 }
192
193 @Override
194 protected boolean canSuspend(int state) {
195
196 return super.canSuspend(state) && numRegistrations.get() == 0;
197 }
198
199
200
201
202
203
204
205 protected int runIo() {
206 assert inEventLoop();
207 return ioHandler.run(context);
208 }
209
210 @Override
211 public IoEventLoop next() {
212 return this;
213 }
214
215 @Override
216 public final Future<IoRegistration> register(final IoHandle handle) {
217 Promise<IoRegistration> promise = newPromise();
218 if (inEventLoop()) {
219 registerForIo0(handle, promise);
220 } else {
221 execute(() -> registerForIo0(handle, promise));
222 }
223
224 return promise;
225 }
226
227 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
228 assert inEventLoop();
229 final IoRegistration registration;
230 try {
231 registration = ioHandler.register(handle);
232 } catch (Exception e) {
233 promise.setFailure(e);
234 return;
235 }
236 numRegistrations.incrementAndGet();
237 promise.setSuccess(new IoRegistrationWrapper(registration));
238 }
239
240 @Override
241 protected final void wakeup(boolean inEventLoop) {
242 ioHandler.wakeup();
243 }
244
245 @Override
246 protected final void cleanup() {
247 assert inEventLoop();
248 ioHandler.destroy();
249 }
250
251 @Override
252 public boolean isCompatible(Class<? extends IoHandle> handleType) {
253 return ioHandler.isCompatible(handleType);
254 }
255
256 @Override
257 public boolean isIoType(Class<? extends IoHandler> handlerType) {
258 return ioHandler.getClass().equals(handlerType);
259 }
260
261 @Override
262 protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
263 return newTaskQueue0(maxPendingTasks);
264 }
265
266 protected static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
267
268 return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
269 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
270 }
271
272 private final class IoRegistrationWrapper implements IoRegistration {
273 private final IoRegistration registration;
274 IoRegistrationWrapper(IoRegistration registration) {
275 this.registration = registration;
276 }
277
278 @Override
279 public <T> T attachment() {
280 return registration.attachment();
281 }
282
283 @Override
284 public long submit(IoOps ops) {
285 return registration.submit(ops);
286 }
287
288 @Override
289 public boolean isValid() {
290 return registration.isValid();
291 }
292
293 @Override
294 public boolean cancel() {
295 if (registration.cancel()) {
296 numRegistrations.decrementAndGet();
297 return true;
298 }
299 return false;
300 }
301 }
302 }