1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelException;
20 import org.jboss.netty.channel.ChannelFuture;
21 import org.jboss.netty.logging.InternalLogger;
22 import org.jboss.netty.logging.InternalLoggerFactory;
23 import org.jboss.netty.util.ThreadNameDeterminer;
24 import org.jboss.netty.util.ThreadRenamingRunnable;
25 import org.jboss.netty.util.internal.DeadLockProofWorker;
26
27 import java.io.IOException;
28 import java.nio.channels.CancelledKeyException;
29 import java.nio.channels.DatagramChannel;
30 import java.nio.channels.SelectableChannel;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.Selector;
33 import java.nio.channels.SocketChannel;
34 import java.util.ConcurrentModificationException;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43 abstract class AbstractNioSelector implements NioSelector {
44
45 private static final AtomicInteger nextId = new AtomicInteger();
46
47 private final int id = nextId.incrementAndGet();
48
49
50
51
52 protected static final InternalLogger logger = InternalLoggerFactory
53 .getInstance(AbstractNioSelector.class);
54
55 private static final int CLEANUP_INTERVAL = 256;
56
57
58
59
60
61 private final Executor executor;
62
63
64
65
66
67 protected volatile Thread thread;
68
69
70
71
72 final CountDownLatch startupLatch = new CountDownLatch(1);
73
74
75
76
77 protected volatile Selector selector;
78
79
80
81
82
83
84
85 protected final AtomicBoolean wakenUp = new AtomicBoolean();
86
87 private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
88
89 private volatile int cancelledKeys;
90
91 private final CountDownLatch shutdownLatch = new CountDownLatch(1);
92 private volatile boolean shutdown;
93
94 AbstractNioSelector(Executor executor) {
95 this(executor, null);
96 }
97
98 AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
99 this.executor = executor;
100 openSelector(determiner);
101 }
102
103 public void register(Channel channel, ChannelFuture future) {
104 Runnable task = createRegisterTask(channel, future);
105 registerTask(task);
106 }
107
108 protected final void registerTask(Runnable task) {
109 taskQueue.add(task);
110
111 Selector selector = this.selector;
112
113 if (selector != null) {
114 if (wakenUp.compareAndSet(false, true)) {
115 selector.wakeup();
116 }
117 } else {
118 if (taskQueue.remove(task)) {
119
120 throw new RejectedExecutionException("Worker has already been shutdown");
121 }
122 }
123 }
124
125 protected final boolean isIoThread() {
126 return Thread.currentThread() == thread;
127 }
128
129 public void rebuildSelector() {
130 if (!isIoThread()) {
131 taskQueue.add(new Runnable() {
132 public void run() {
133 rebuildSelector();
134 }
135 });
136 return;
137 }
138
139 final Selector oldSelector = selector;
140 final Selector newSelector;
141
142 if (oldSelector == null) {
143 return;
144 }
145
146 try {
147 newSelector = SelectorUtil.open();
148 } catch (Exception e) {
149 logger.warn("Failed to create a new Selector.", e);
150 return;
151 }
152
153
154 int nChannels = 0;
155 for (;;) {
156 try {
157 for (SelectionKey key: oldSelector.keys()) {
158 try {
159 if (key.channel().keyFor(newSelector) != null) {
160 continue;
161 }
162
163 int interestOps = key.interestOps();
164 key.cancel();
165 key.channel().register(newSelector, interestOps, key.attachment());
166 nChannels ++;
167 } catch (Exception e) {
168 logger.warn("Failed to re-register a Channel to the new Selector,", e);
169 close(key);
170 }
171 }
172 } catch (ConcurrentModificationException e) {
173
174 continue;
175 }
176
177 break;
178 }
179
180 selector = newSelector;
181
182 try {
183
184 oldSelector.close();
185 } catch (Throwable t) {
186 if (logger.isWarnEnabled()) {
187 logger.warn("Failed to close the old Selector.", t);
188 }
189 }
190
191 logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
192 }
193
194 public void run() {
195 thread = Thread.currentThread();
196 startupLatch.countDown();
197
198 int selectReturnsImmediately = 0;
199 Selector selector = this.selector;
200
201 if (selector == null) {
202 return;
203 }
204
205 final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
206 boolean wakenupFromLoop = false;
207 for (;;) {
208 wakenUp.set(false);
209
210 try {
211 long beforeSelect = System.nanoTime();
212 int selected = select(selector);
213 if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
214 long timeBlocked = System.nanoTime() - beforeSelect;
215
216 if (timeBlocked < minSelectTimeout) {
217 boolean notConnected = false;
218
219 for (SelectionKey key: selector.keys()) {
220 SelectableChannel ch = key.channel();
221 try {
222 if (ch instanceof DatagramChannel && !ch.isOpen() ||
223 ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
224 notConnected = true;
225
226 key.cancel();
227 }
228 } catch (CancelledKeyException e) {
229
230 }
231 }
232 if (notConnected) {
233 selectReturnsImmediately = 0;
234 } else {
235
236
237
238 selectReturnsImmediately ++;
239 }
240 } else {
241 selectReturnsImmediately = 0;
242 }
243
244 if (selectReturnsImmediately == 1024) {
245
246
247
248 rebuildSelector();
249 selector = this.selector;
250 selectReturnsImmediately = 0;
251 wakenupFromLoop = false;
252
253 continue;
254 }
255 } else {
256
257 selectReturnsImmediately = 0;
258 }
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288 if (wakenUp.get()) {
289 wakenupFromLoop = true;
290 selector.wakeup();
291 } else {
292 wakenupFromLoop = false;
293 }
294
295 cancelledKeys = 0;
296 processTaskQueue();
297 selector = this.selector;
298
299 if (shutdown) {
300 this.selector = null;
301
302
303 processTaskQueue();
304
305 for (SelectionKey k: selector.keys()) {
306 close(k);
307 }
308
309 try {
310 selector.close();
311 } catch (IOException e) {
312 logger.warn(
313 "Failed to close a selector.", e);
314 }
315 shutdownLatch.countDown();
316 break;
317 } else {
318 process(selector);
319 }
320 } catch (Throwable t) {
321 logger.warn(
322 "Unexpected exception in the selector loop.", t);
323
324
325
326 try {
327 Thread.sleep(1000);
328 } catch (InterruptedException e) {
329
330 }
331 }
332 }
333 }
334
335
336
337
338
339 private void openSelector(ThreadNameDeterminer determiner) {
340 try {
341 selector = SelectorUtil.open();
342 } catch (Throwable t) {
343 throw new ChannelException("Failed to create a selector.", t);
344 }
345
346
347 boolean success = false;
348 try {
349 DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
350 success = true;
351 } finally {
352 if (!success) {
353
354 try {
355 selector.close();
356 } catch (Throwable t) {
357 logger.warn("Failed to close a selector.", t);
358 }
359 selector = null;
360
361 }
362 }
363 assert selector != null && selector.isOpen();
364 }
365
366 private void processTaskQueue() {
367 for (;;) {
368 final Runnable task = taskQueue.poll();
369 if (task == null) {
370 break;
371 }
372 task.run();
373 try {
374 cleanUpCancelledKeys();
375 } catch (IOException e) {
376
377 }
378 }
379 }
380
381 protected final void increaseCancelledKeys() {
382 cancelledKeys ++;
383 }
384
385 protected final boolean cleanUpCancelledKeys() throws IOException {
386 if (cancelledKeys >= CLEANUP_INTERVAL) {
387 cancelledKeys = 0;
388 selector.selectNow();
389 return true;
390 }
391 return false;
392 }
393
394 public void shutdown() {
395 if (isIoThread()) {
396 throw new IllegalStateException("Must not be called from a I/O-Thread to prevent deadlocks!");
397 }
398
399 Selector selector = this.selector;
400 shutdown = true;
401 if (selector != null) {
402 selector.wakeup();
403 }
404 try {
405 shutdownLatch.await();
406 } catch (InterruptedException e) {
407 logger.error("Interrupted while wait for resources to be released #" + id);
408 Thread.currentThread().interrupt();
409 }
410 }
411
412 protected abstract void process(Selector selector) throws IOException;
413
414 protected int select(Selector selector) throws IOException {
415 return SelectorUtil.select(selector);
416 }
417
418 protected abstract void close(SelectionKey k);
419
420 protected abstract ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner);
421
422 protected abstract Runnable createRegisterTask(Channel channel, ChannelFuture future);
423 }