1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.Future;
19 import io.netty.util.concurrent.FutureListener;
20 import io.netty.util.concurrent.Promise;
21 import io.netty.util.concurrent.RejectedExecutionHandler;
22 import io.netty.util.concurrent.SingleThreadEventExecutor;
23 import io.netty.util.internal.ObjectUtil;
24 import io.netty.util.internal.SystemPropertyUtil;
25
26 import java.util.Queue;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ThreadFactory;
29
30
31
32
33
34 public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
35
36
37 protected static final int DEFAULT_MAX_TASKS_PER_RUN = Math.max(1,
38 SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskPerRun", 1024 * 4));
39
40 private final int maxTasksPerRun = DEFAULT_MAX_TASKS_PER_RUN;
41 private final IoExecutionContext context = new IoExecutionContext() {
42 @Override
43 public boolean canBlock() {
44 assert inEventLoop();
45 return !hasTasks() && !hasScheduledTasks();
46 }
47
48 @Override
49 public long delayNanos(long currentTimeNanos) {
50 assert inEventLoop();
51 return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
52 }
53
54 @Override
55 public long deadlineNanos() {
56 assert inEventLoop();
57 return SingleThreadIoEventLoop.this.deadlineNanos();
58 }
59 };
60
61 private final IoHandler ioHandler;
62
63 private int numRegistrations;
64 private final FutureListener<Object> decrementRegistrationListener = f -> {
65 assert inEventLoop();
66 numRegistrations--;
67 };
68
69
70
71
72
73
74
75
76 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
77 IoHandler ioHandler) {
78 super(parent, threadFactory, false, true);
79 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
80 }
81
82
83
84
85
86
87
88
89 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandler ioHandler) {
90 super(parent, executor, false, true);
91 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
92 }
93
94
95
96
97
98
99
100
101
102
103
104
105
106 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
107 IoHandler ioHandler, int maxPendingTasks,
108 RejectedExecutionHandler rejectedExecutionHandler) {
109 super(parent, threadFactory, false, true, maxPendingTasks, rejectedExecutionHandler);
110 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124
125 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
126 IoHandler ioHandler, int maxPendingTasks,
127 RejectedExecutionHandler rejectedExecutionHandler) {
128 super(parent, executor, false, true, maxPendingTasks, rejectedExecutionHandler);
129 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
130 }
131
132
133
134
135
136
137
138
139
140
141
142
143
144 protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
145 IoHandler ioHandler, Queue<Runnable> taskQueue,
146 Queue<Runnable> tailTaskQueue,
147 RejectedExecutionHandler rejectedExecutionHandler) {
148 super(parent, executor, false, true, taskQueue, tailTaskQueue, rejectedExecutionHandler);
149 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
150 }
151
152 @Override
153 protected void run() {
154 assert inEventLoop();
155 do {
156 runIo();
157 if (isShuttingDown()) {
158 ioHandler.prepareToDestroy();
159 }
160 runAllTasks(maxTasksPerRun);
161
162
163 } while (!confirmShutdown() && !canSuspend());
164 }
165
166 protected final IoHandler ioHandler() {
167 return ioHandler;
168 }
169
170 @Override
171 protected boolean canSuspend(int state) {
172
173 return super.canSuspend(state) && numRegistrations == 0;
174 }
175
176
177
178
179
180
181
182 protected int runIo() {
183 assert inEventLoop();
184 return ioHandler.run(context);
185 }
186
187 @Override
188 public IoEventLoop next() {
189 return this;
190 }
191
192 @Override
193 public final Future<IoRegistration> register(final IoHandle handle) {
194 Promise<IoRegistration> promise = newPromise();
195 if (inEventLoop()) {
196 registerForIo0(handle, promise);
197 } else {
198 execute(() -> registerForIo0(handle, promise));
199 }
200
201 return promise;
202 }
203
204 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
205 assert inEventLoop();
206 final IoRegistration registration;
207 try {
208 registration = ioHandler.register(this, handle);
209 } catch (Exception e) {
210 promise.setFailure(e);
211 return;
212 }
213 registration.cancelFuture().addListener(decrementRegistrationListener);
214 numRegistrations++;
215 promise.setSuccess(registration);
216 }
217
218 @Override
219 protected final void wakeup(boolean inEventLoop) {
220 ioHandler.wakeup(this);
221 }
222
223 @Override
224 protected final void cleanup() {
225 assert inEventLoop();
226 ioHandler.destroy();
227 }
228
229 @Override
230 public boolean isCompatible(Class<? extends IoHandle> handleType) {
231 return ioHandler.isCompatible(handleType);
232 }
233
234 @Override
235 public boolean isIoType(Class<? extends IoHandler> handlerType) {
236 return ioHandler.getClass().equals(handlerType);
237 }
238 }