1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelException;
20 import org.jboss.netty.channel.ChannelFuture;
21 import org.jboss.netty.logging.InternalLogger;
22 import org.jboss.netty.logging.InternalLoggerFactory;
23 import org.jboss.netty.util.ThreadNameDeterminer;
24 import org.jboss.netty.util.ThreadRenamingRunnable;
25 import org.jboss.netty.util.internal.DeadLockProofWorker;
26
27 import java.io.IOException;
28 import java.nio.channels.CancelledKeyException;
29 import java.nio.channels.DatagramChannel;
30 import java.nio.channels.SelectableChannel;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.Selector;
33 import java.nio.channels.SocketChannel;
34 import java.util.ConcurrentModificationException;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43 abstract class AbstractNioSelector implements NioSelector {
44
45 private static final AtomicInteger nextId = new AtomicInteger();
46
47 private final int id = nextId.incrementAndGet();
48
49
50
51
52 protected static final InternalLogger logger = InternalLoggerFactory
53 .getInstance(AbstractNioSelector.class);
54
55 private static final int CLEANUP_INTERVAL = 256;
56
57
58
59
60
61 private final Executor executor;
62
63
64
65
66
67 protected volatile Thread thread;
68
69
70
71
72 protected volatile Selector selector;
73
74
75
76
77
78
79
80 protected final AtomicBoolean wakenUp = new AtomicBoolean();
81
82 private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
83
84 private volatile int cancelledKeys;
85
86 private final CountDownLatch shutdownLatch = new CountDownLatch(1);
87 private volatile boolean shutdown;
88
89 AbstractNioSelector(Executor executor) {
90 this(executor, null);
91 }
92
93 AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
94 this.executor = executor;
95 openSelector(determiner);
96 }
97
98 public void register(Channel channel, ChannelFuture future) {
99 Runnable task = createRegisterTask(channel, future);
100 registerTask(task);
101 }
102
103 protected final void registerTask(Runnable task) {
104 taskQueue.add(task);
105
106 Selector selector = this.selector;
107
108 if (selector != null) {
109 if (wakenUp.compareAndSet(false, true)) {
110 selector.wakeup();
111 }
112 } else {
113 if (taskQueue.remove(task)) {
114
115 throw new RejectedExecutionException("Worker has already been shutdown");
116 }
117 }
118 }
119
120 protected final boolean isIoThread() {
121 return Thread.currentThread() == thread;
122 }
123
124 public void rebuildSelector() {
125 if (!isIoThread()) {
126 taskQueue.add(new Runnable() {
127 public void run() {
128 rebuildSelector();
129 }
130 });
131 return;
132 }
133
134 final Selector oldSelector = selector;
135 final Selector newSelector;
136
137 if (oldSelector == null) {
138 return;
139 }
140
141 try {
142 newSelector = Selector.open();
143 } catch (Exception e) {
144 logger.warn("Failed to create a new Selector.", e);
145 return;
146 }
147
148
149 int nChannels = 0;
150 for (;;) {
151 try {
152 for (SelectionKey key: oldSelector.keys()) {
153 try {
154 if (key.channel().keyFor(newSelector) != null) {
155 continue;
156 }
157
158 int interestOps = key.interestOps();
159 key.cancel();
160 key.channel().register(newSelector, interestOps, key.attachment());
161 nChannels ++;
162 } catch (Exception e) {
163 logger.warn("Failed to re-register a Channel to the new Selector,", e);
164 close(key);
165 }
166 }
167 } catch (ConcurrentModificationException e) {
168
169 continue;
170 }
171
172 break;
173 }
174
175 selector = newSelector;
176
177 try {
178
179 oldSelector.close();
180 } catch (Throwable t) {
181 if (logger.isWarnEnabled()) {
182 logger.warn("Failed to close the old Selector.", t);
183 }
184 }
185
186 logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
187 }
188
189 public void run() {
190 thread = Thread.currentThread();
191
192 int selectReturnsImmediately = 0;
193 Selector selector = this.selector;
194
195 if (selector == null) {
196 return;
197 }
198
199 final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
200 boolean wakenupFromLoop = false;
201 for (;;) {
202 wakenUp.set(false);
203
204 try {
205 long beforeSelect = System.nanoTime();
206 int selected = select(selector);
207 if (selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
208 long timeBlocked = System.nanoTime() - beforeSelect;
209 if (timeBlocked < minSelectTimeout) {
210 boolean notConnected = false;
211
212 for (SelectionKey key: selector.keys()) {
213 SelectableChannel ch = key.channel();
214 try {
215 if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() ||
216 ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
217 notConnected = true;
218
219 key.cancel();
220 }
221 } catch (CancelledKeyException e) {
222
223 }
224 }
225 if (notConnected) {
226 selectReturnsImmediately = 0;
227 } else {
228 if (Thread.interrupted() && !shutdown) {
229
230
231
232
233
234 if (logger.isDebugEnabled()) {
235 logger.debug("Selector.select() returned prematurely because the I/O thread " +
236 "has been interrupted. Use shutdown() to shut the NioSelector down.");
237 }
238 selectReturnsImmediately = 0;
239 } else {
240
241
242
243 selectReturnsImmediately ++;
244 }
245 }
246 } else {
247 selectReturnsImmediately = 0;
248 }
249 } else {
250 selectReturnsImmediately = 0;
251 }
252
253 if (SelectorUtil.EPOLL_BUG_WORKAROUND) {
254 if (selectReturnsImmediately == 1024) {
255
256
257
258 rebuildSelector();
259 selector = this.selector;
260 selectReturnsImmediately = 0;
261 wakenupFromLoop = false;
262
263 continue;
264 }
265 } else {
266
267 selectReturnsImmediately = 0;
268 }
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298 if (wakenUp.get()) {
299 wakenupFromLoop = true;
300 selector.wakeup();
301 } else {
302 wakenupFromLoop = false;
303 }
304
305 cancelledKeys = 0;
306 processTaskQueue();
307 selector = this.selector;
308
309 if (shutdown) {
310 this.selector = null;
311
312
313 processTaskQueue();
314
315 for (SelectionKey k: selector.keys()) {
316 close(k);
317 }
318
319 try {
320 selector.close();
321 } catch (IOException e) {
322 logger.warn(
323 "Failed to close a selector.", e);
324 }
325 shutdownLatch.countDown();
326 break;
327 } else {
328 process(selector);
329 }
330 } catch (Throwable t) {
331 logger.warn(
332 "Unexpected exception in the selector loop.", t);
333
334
335
336 try {
337 Thread.sleep(1000);
338 } catch (InterruptedException e) {
339
340 }
341 }
342 }
343 }
344
345
346
347
348
349 private void openSelector(ThreadNameDeterminer determiner) {
350 try {
351 selector = Selector.open();
352 } catch (Throwable t) {
353 throw new ChannelException("Failed to create a selector.", t);
354 }
355
356
357 boolean success = false;
358 try {
359 DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
360 success = true;
361 } finally {
362 if (!success) {
363
364 try {
365 selector.close();
366 } catch (Throwable t) {
367 logger.warn("Failed to close a selector.", t);
368 }
369 selector = null;
370
371 }
372 }
373 assert selector != null && selector.isOpen();
374 }
375
376 private void processTaskQueue() {
377 for (;;) {
378 final Runnable task = taskQueue.poll();
379 if (task == null) {
380 break;
381 }
382 task.run();
383 try {
384 cleanUpCancelledKeys();
385 } catch (IOException e) {
386
387 }
388 }
389 }
390
391 protected final void increaseCancelledKeys() {
392 cancelledKeys ++;
393 }
394
395 protected final boolean cleanUpCancelledKeys() throws IOException {
396 if (cancelledKeys >= CLEANUP_INTERVAL) {
397 cancelledKeys = 0;
398 selector.selectNow();
399 return true;
400 }
401 return false;
402 }
403
404 public void shutdown() {
405 if (isIoThread()) {
406 throw new IllegalStateException("Must not be called from a I/O-Thread to prevent deadlocks!");
407 }
408
409 Selector selector = this.selector;
410 shutdown = true;
411 if (selector != null) {
412 selector.wakeup();
413 }
414 try {
415 shutdownLatch.await();
416 } catch (InterruptedException e) {
417 logger.error("Interrupted while wait for resources to be released #" + id);
418 Thread.currentThread().interrupt();
419 }
420 }
421
422 protected abstract void process(Selector selector) throws IOException;
423
424 protected int select(Selector selector) throws IOException {
425 return SelectorUtil.select(selector);
426 }
427
428 protected abstract void close(SelectionKey k);
429
430 protected abstract ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner);
431
432 protected abstract Runnable createRegisterTask(Channel channel, ChannelFuture future);
433 }