1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.Future;
19 import io.netty.util.concurrent.FutureListener;
20 import io.netty.util.concurrent.Promise;
21 import io.netty.util.concurrent.RejectedExecutionHandler;
22 import io.netty.util.concurrent.SingleThreadEventExecutor;
23 import io.netty.util.internal.ObjectUtil;
24 import io.netty.util.internal.SystemPropertyUtil;
25
26 import java.util.Queue;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ThreadFactory;
29 import java.util.concurrent.TimeUnit;
30
31
32
33
34
35 public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
36
37
38 private static final long DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS = TimeUnit.MILLISECONDS.toNanos(Math.max(100,
39 SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskProcessingQuantumMs", 1000)));
40
41 private final long maxTaskProcessingQuantumNs;
42 private final IoExecutionContext context = new IoExecutionContext() {
43 @Override
44 public boolean canBlock() {
45 assert inEventLoop();
46 return !hasTasks() && !hasScheduledTasks();
47 }
48
49 @Override
50 public long delayNanos(long currentTimeNanos) {
51 assert inEventLoop();
52 return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
53 }
54
55 @Override
56 public long deadlineNanos() {
57 assert inEventLoop();
58 return SingleThreadIoEventLoop.this.deadlineNanos();
59 }
60 };
61
62 private final IoHandler ioHandler;
63
64 private int numRegistrations;
65 private final FutureListener<Object> decrementRegistrationListener = f -> {
66 assert inEventLoop();
67 numRegistrations--;
68 };
69
70
71
72
73
74
75
76
77 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
78 IoHandler ioHandler) {
79 super(parent, threadFactory, false, true);
80 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
81 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
82 }
83
84
85
86
87
88
89
90
91 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandler ioHandler) {
92 super(parent, executor, false, true);
93 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
94 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
114 IoHandler ioHandler, int maxPendingTasks,
115 RejectedExecutionHandler rejectedExecutionHandler, long maxTaskProcessingQuantumMs) {
116 super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
117 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
118 this.maxTaskProcessingQuantumNs =
119 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
120 DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS : maxTaskProcessingQuantumMs;
121 }
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
139 IoHandler ioHandler, int maxPendingTasks,
140 RejectedExecutionHandler rejectedExecutionHandler,
141 long maxTaskProcessingQuantumMs) {
142 super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
143 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
144 this.maxTaskProcessingQuantumNs =
145 ObjectUtil.checkPositiveOrZero(maxTaskProcessingQuantumMs, "maxTaskProcessingQuantumMs") == 0 ?
146 DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS : maxTaskProcessingQuantumMs;
147 }
148
149
150
151
152
153
154
155
156
157
158
159
160
161 protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
162 IoHandler ioHandler, Queue<Runnable> taskQueue,
163 Queue<Runnable> tailTaskQueue,
164 RejectedExecutionHandler rejectedExecutionHandler) {
165 super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
166 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
167 this.maxTaskProcessingQuantumNs = DEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;
168 }
169
170 @Override
171 protected void run() {
172 assert inEventLoop();
173 do {
174 runIo();
175 if (isShuttingDown()) {
176 ioHandler.prepareToDestroy();
177 }
178
179 runAllTasks(maxTaskProcessingQuantumNs);
180
181
182 } while (!confirmShutdown() && !canSuspend());
183 }
184
185 protected final IoHandler ioHandler() {
186 return ioHandler;
187 }
188
189 @Override
190 protected boolean canSuspend(int state) {
191
192 return super.canSuspend(state) && numRegistrations == 0;
193 }
194
195
196
197
198
199
200
201 protected int runIo() {
202 assert inEventLoop();
203 return ioHandler.run(context);
204 }
205
206 @Override
207 public IoEventLoop next() {
208 return this;
209 }
210
211 @Override
212 public final Future<IoRegistration> register(final IoHandle handle) {
213 Promise<IoRegistration> promise = newPromise();
214 if (inEventLoop()) {
215 registerForIo0(handle, promise);
216 } else {
217 execute(() -> registerForIo0(handle, promise));
218 }
219
220 return promise;
221 }
222
223 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
224 assert inEventLoop();
225 final IoRegistration registration;
226 try {
227 registration = ioHandler.register(this, handle);
228 } catch (Exception e) {
229 promise.setFailure(e);
230 return;
231 }
232 registration.cancelFuture().addListener(decrementRegistrationListener);
233 numRegistrations++;
234 promise.setSuccess(registration);
235 }
236
237 @Override
238 protected final void wakeup(boolean inEventLoop) {
239 ioHandler.wakeup(this);
240 }
241
242 @Override
243 protected final void cleanup() {
244 assert inEventLoop();
245 ioHandler.destroy();
246 }
247
248 @Override
249 public boolean isCompatible(Class<? extends IoHandle> handleType) {
250 return ioHandler.isCompatible(handleType);
251 }
252
253 @Override
254 public boolean isIoType(Class<? extends IoHandler> handlerType) {
255 return ioHandler.getClass().equals(handlerType);
256 }
257 }