1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.util.concurrent.Future;
19 import io.netty.util.concurrent.Promise;
20 import io.netty.util.concurrent.RejectedExecutionHandler;
21 import io.netty.util.concurrent.SingleThreadEventExecutor;
22 import io.netty.util.internal.ObjectUtil;
23 import io.netty.util.internal.SystemPropertyUtil;
24
25 import java.util.Queue;
26 import java.util.concurrent.Executor;
27 import java.util.concurrent.ThreadFactory;
28
29
30
31
32
33 public class SingleThreadIoEventLoop extends SingleThreadEventLoop implements IoEventLoop {
34
35
36 protected static final int DEFAULT_MAX_TASKS_PER_RUN = Math.max(1,
37 SystemPropertyUtil.getInt("io.netty.eventLoop.maxTaskPerRun", 1024 * 4));
38
39 private final int maxTasksPerRun = DEFAULT_MAX_TASKS_PER_RUN;
40 private final IoExecutionContext context = new IoExecutionContext() {
41 @Override
42 public boolean canBlock() {
43 assert inEventLoop();
44 return !hasTasks() && !hasScheduledTasks();
45 }
46
47 @Override
48 public long delayNanos(long currentTimeNanos) {
49 assert inEventLoop();
50 return SingleThreadIoEventLoop.this.delayNanos(currentTimeNanos);
51 }
52
53 @Override
54 public long deadlineNanos() {
55 assert inEventLoop();
56 return SingleThreadIoEventLoop.this.deadlineNanos();
57 }
58 };
59
60 private final IoHandler ioHandler;
61
62
63
64
65
66
67
68
69 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
70 IoHandler ioHandler) {
71 super(parent, threadFactory, false);
72 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
73 }
74
75
76
77
78
79
80
81
82 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor, IoHandler ioHandler) {
83 super(parent, executor, false);
84 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
85 }
86
87
88
89
90
91
92
93
94
95
96
97
98
99 public SingleThreadIoEventLoop(IoEventLoopGroup parent, ThreadFactory threadFactory,
100 IoHandler ioHandler, int maxPendingTasks,
101 RejectedExecutionHandler rejectedExecutionHandler) {
102 super(parent, threadFactory, false, maxPendingTasks, rejectedExecutionHandler);
103 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117
118 public SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
119 IoHandler ioHandler, int maxPendingTasks,
120 RejectedExecutionHandler rejectedExecutionHandler) {
121 super(parent, executor, false, maxPendingTasks, rejectedExecutionHandler);
122 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
123 }
124
125
126
127
128
129
130
131
132
133
134
135
136
137 protected SingleThreadIoEventLoop(IoEventLoopGroup parent, Executor executor,
138 IoHandler ioHandler, Queue<Runnable> taskQueue,
139 Queue<Runnable> tailTaskQueue,
140
141 RejectedExecutionHandler rejectedExecutionHandler) {
142 super(parent, executor, false, taskQueue, tailTaskQueue, rejectedExecutionHandler);
143 this.ioHandler = ObjectUtil.checkNotNull(ioHandler, "ioHandler");
144 }
145
146 @Override
147 protected void run() {
148 assert inEventLoop();
149 do {
150 runIo();
151 if (isShuttingDown()) {
152 ioHandler.prepareToDestroy();
153 }
154 runAllTasks(maxTasksPerRun);
155 } while (!confirmShutdown());
156 }
157
158 protected final IoHandler ioHandler() {
159 return ioHandler;
160 }
161
162
163
164
165
166
167
168 protected int runIo() {
169 assert inEventLoop();
170 return ioHandler.run(context);
171 }
172
173 @Override
174 public IoEventLoop next() {
175 return this;
176 }
177
178 @Override
179 public final Future<IoRegistration> register(final IoHandle handle) {
180 Promise<IoRegistration> promise = newPromise();
181 if (inEventLoop()) {
182 registerForIo0(handle, promise);
183 } else {
184 execute(() -> registerForIo0(handle, promise));
185 }
186
187 return promise;
188 }
189
190 private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
191 assert inEventLoop();
192 final IoRegistration registration;
193 try {
194 registration = ioHandler.register(this, handle);
195 } catch (Exception e) {
196 promise.setFailure(e);
197 return;
198 }
199 promise.setSuccess(registration);
200 }
201
202 @Override
203 protected final void wakeup(boolean inEventLoop) {
204 ioHandler.wakeup(this);
205 }
206
207 @Override
208 protected final void cleanup() {
209 assert inEventLoop();
210 ioHandler.destroy();
211 }
212
213 @Override
214 public boolean isCompatible(Class<? extends IoHandle> handleType) {
215 return ioHandler.isCompatible(handleType);
216 }
217
218 @Override
219 public boolean isIoType(Class<? extends IoHandler> handlerType) {
220 return ioHandler.getClass().equals(handlerType);
221 }
222 }