1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelException;
20 import org.jboss.netty.channel.ChannelFuture;
21 import org.jboss.netty.logging.InternalLogger;
22 import org.jboss.netty.logging.InternalLoggerFactory;
23 import org.jboss.netty.util.ThreadNameDeterminer;
24 import org.jboss.netty.util.ThreadRenamingRunnable;
25 import org.jboss.netty.util.internal.DeadLockProofWorker;
26
27 import java.io.IOException;
28 import java.nio.channels.CancelledKeyException;
29 import java.nio.channels.DatagramChannel;
30 import java.nio.channels.SelectableChannel;
31 import java.nio.channels.SelectionKey;
32 import java.nio.channels.Selector;
33 import java.nio.channels.SocketChannel;
34 import java.util.ConcurrentModificationException;
35 import java.util.Queue;
36 import java.util.concurrent.ConcurrentLinkedQueue;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.Executor;
39 import java.util.concurrent.RejectedExecutionException;
40 import java.util.concurrent.atomic.AtomicBoolean;
41 import java.util.concurrent.atomic.AtomicInteger;
42
43 abstract class AbstractNioSelector implements NioSelector {
44
45 private static final AtomicInteger nextId = new AtomicInteger();
46
47 private final int id = nextId.incrementAndGet();
48
49 /**
50 * Internal Netty logger.
51 */
52 protected static final InternalLogger logger = InternalLoggerFactory
53 .getInstance(AbstractNioSelector.class);
54
55 private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
56
57 /**
58 * Executor used to execute {@link Runnable}s such as channel registration
59 * task.
60 */
61 private final Executor executor;
62
63 /**
64 * If this worker has been started thread will be a reference to the thread
65 * used when starting. i.e. the current thread when the run method is executed.
66 */
67 protected volatile Thread thread;
68
69 /**
70 * Count down to 0 when the I/O thread starts and {@link #thread} is set to non-null.
71 */
72 final CountDownLatch startupLatch = new CountDownLatch(1);
73
74 /**
75 * The NIO {@link Selector}.
76 */
77 protected volatile Selector selector;
78
79 /**
80 * Boolean that controls determines if a blocked Selector.select should
81 * break out of its selection process. In our case we use a timeone for
82 * the select method and the select method will block for that time unless
83 * waken up.
84 */
85 protected final AtomicBoolean wakenUp = new AtomicBoolean();
86
87 private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
88
89 private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
90
91 private final CountDownLatch shutdownLatch = new CountDownLatch(1);
92 private volatile boolean shutdown;
93
94 AbstractNioSelector(Executor executor) {
95 this(executor, null);
96 }
97
98 AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
99 this.executor = executor;
100 openSelector(determiner);
101 }
102
103 public void register(Channel channel, ChannelFuture future) {
104 Runnable task = createRegisterTask(channel, future);
105 registerTask(task);
106 }
107
108 protected final void registerTask(Runnable task) {
109 taskQueue.add(task);
110
111 Selector selector = this.selector;
112
113 if (selector != null) {
114 if (wakenUp.compareAndSet(false, true)) {
115 selector.wakeup();
116 }
117 } else {
118 if (taskQueue.remove(task)) {
119 // the selector was null this means the Worker has already been shutdown.
120 throw new RejectedExecutionException("Worker has already been shutdown");
121 }
122 }
123 }
124
125 protected final boolean isIoThread() {
126 return Thread.currentThread() == thread;
127 }
128
129 public void rebuildSelector() {
130 if (!isIoThread()) {
131 taskQueue.add(new Runnable() {
132 public void run() {
133 rebuildSelector();
134 }
135 });
136 return;
137 }
138
139 final Selector oldSelector = selector;
140 final Selector newSelector;
141
142 if (oldSelector == null) {
143 return;
144 }
145
146 try {
147 newSelector = SelectorUtil.open();
148 } catch (Exception e) {
149 logger.warn("Failed to create a new Selector.", e);
150 return;
151 }
152
153 // Register all channels to the new Selector.
154 int nChannels = 0;
155 for (;;) {
156 try {
157 for (SelectionKey key: oldSelector.keys()) {
158 try {
159 if (key.channel().keyFor(newSelector) != null) {
160 continue;
161 }
162
163 int interestOps = key.interestOps();
164 key.cancel();
165 key.channel().register(newSelector, interestOps, key.attachment());
166 nChannels ++;
167 } catch (Exception e) {
168 logger.warn("Failed to re-register a Channel to the new Selector,", e);
169 close(key);
170 }
171 }
172 } catch (ConcurrentModificationException e) {
173 // Probably due to concurrent modification of the key set.
174 continue;
175 }
176
177 break;
178 }
179
180 selector = newSelector;
181
182 try {
183 // time to close the old selector as everything else is registered to the new one
184 oldSelector.close();
185 } catch (Throwable t) {
186 if (logger.isWarnEnabled()) {
187 logger.warn("Failed to close the old Selector.", t);
188 }
189 }
190
191 logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
192 }
193
194 public void run() {
195 thread = Thread.currentThread();
196 startupLatch.countDown();
197
198 int selectReturnsImmediately = 0;
199 Selector selector = this.selector;
200
201 if (selector == null) {
202 return;
203 }
204 // use 80% of the timeout for measure
205 final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
206 boolean wakenupFromLoop = false;
207 for (;;) {
208 wakenUp.set(false);
209
210 try {
211 long beforeSelect = System.nanoTime();
212 int selected = select(selector);
213 if (selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
214 long timeBlocked = System.nanoTime() - beforeSelect;
215 if (timeBlocked < minSelectTimeout) {
216 boolean notConnected = false;
217 // loop over all keys as the selector may was unblocked because of a closed channel
218 for (SelectionKey key: selector.keys()) {
219 SelectableChannel ch = key.channel();
220 try {
221 if (ch instanceof DatagramChannel && !ch.isOpen() ||
222 ch instanceof SocketChannel && !((SocketChannel) ch).isConnected() &&
223 // Only cancel if the connection is not pending
224 // See https://github.com/netty/netty/issues/2931
225 !((SocketChannel) ch).isConnectionPending()) {
226 notConnected = true;
227 // cancel the key just to be on the safe side
228 key.cancel();
229 }
230 } catch (CancelledKeyException e) {
231 // ignore
232 }
233 }
234 if (notConnected) {
235 selectReturnsImmediately = 0;
236 } else {
237 if (Thread.interrupted() && !shutdown) {
238 // Thread was interrupted but NioSelector was not shutdown.
239 // As this is most likely a bug in the handler of the user or it's client
240 // library we will log it.
241 //
242 // See https://github.com/netty/netty/issues/2426
243 if (logger.isDebugEnabled()) {
244 logger.debug("Selector.select() returned prematurely because the I/O thread " +
245 "has been interrupted. Use shutdown() to shut the NioSelector down.");
246 }
247 selectReturnsImmediately = 0;
248 } else {
249 // Returned before the minSelectTimeout elapsed with nothing selected.
250 // This may be because of a bug in JDK NIO Selector provider, so increment the counter
251 // which we will use later to see if it's really the bug in JDK.
252 selectReturnsImmediately ++;
253 }
254 }
255 } else {
256 selectReturnsImmediately = 0;
257 }
258 } else {
259 selectReturnsImmediately = 0;
260 }
261
262 if (SelectorUtil.EPOLL_BUG_WORKAROUND) {
263 if (selectReturnsImmediately == 1024) {
264 // The selector returned immediately for 10 times in a row,
265 // so recreate one selector as it seems like we hit the
266 // famous epoll(..) jdk bug.
267 rebuildSelector();
268 selector = this.selector;
269 selectReturnsImmediately = 0;
270 wakenupFromLoop = false;
271 // try to select again
272 continue;
273 }
274 } else {
275 // reset counter
276 selectReturnsImmediately = 0;
277 }
278
279 // 'wakenUp.compareAndSet(false, true)' is always evaluated
280 // before calling 'selector.wakeup()' to reduce the wake-up
281 // overhead. (Selector.wakeup() is an expensive operation.)
282 //
283 // However, there is a race condition in this approach.
284 // The race condition is triggered when 'wakenUp' is set to
285 // true too early.
286 //
287 // 'wakenUp' is set to true too early if:
288 // 1) Selector is waken up between 'wakenUp.set(false)' and
289 // 'selector.select(...)'. (BAD)
290 // 2) Selector is waken up between 'selector.select(...)' and
291 // 'if (wakenUp.get()) { ... }'. (OK)
292 //
293 // In the first case, 'wakenUp' is set to true and the
294 // following 'selector.select(...)' will wake up immediately.
295 // Until 'wakenUp' is set to false again in the next round,
296 // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
297 // any attempt to wake up the Selector will fail, too, causing
298 // the following 'selector.select(...)' call to block
299 // unnecessarily.
300 //
301 // To fix this problem, we wake up the selector again if wakenUp
302 // is true immediately after selector.select(...).
303 // It is inefficient in that it wakes up the selector for both
304 // the first case (BAD - wake-up required) and the second case
305 // (OK - no wake-up required).
306
307 if (wakenUp.get()) {
308 wakenupFromLoop = true;
309 selector.wakeup();
310 } else {
311 wakenupFromLoop = false;
312 }
313
314 cancelledKeys = 0;
315 processTaskQueue();
316 selector = this.selector; // processTaskQueue() can call rebuildSelector()
317
318 if (shutdown) {
319 this.selector = null;
320
321 // process one time again
322 processTaskQueue();
323
324 for (SelectionKey k: selector.keys()) {
325 close(k);
326 }
327
328 try {
329 selector.close();
330 } catch (IOException e) {
331 logger.warn(
332 "Failed to close a selector.", e);
333 }
334 shutdownLatch.countDown();
335 break;
336 } else {
337 process(selector);
338 }
339 } catch (Throwable t) {
340 logger.warn(
341 "Unexpected exception in the selector loop.", t);
342
343 // Prevent possible consecutive immediate failures that lead to
344 // excessive CPU consumption.
345 try {
346 Thread.sleep(1000);
347 } catch (InterruptedException e) {
348 // Ignore.
349 }
350 }
351 }
352 }
353
354 /**
355 * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for
356 * the {@link AbstractNioChannel}'s when they get registered
357 */
358 private void openSelector(ThreadNameDeterminer determiner) {
359 try {
360 selector = SelectorUtil.open();
361 } catch (Throwable t) {
362 throw new ChannelException("Failed to create a selector.", t);
363 }
364
365 // Start the worker thread with the new Selector.
366 boolean success = false;
367 try {
368 DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
369 success = true;
370 } finally {
371 if (!success) {
372 // Release the Selector if the execution fails.
373 try {
374 selector.close();
375 } catch (Throwable t) {
376 logger.warn("Failed to close a selector.", t);
377 }
378 selector = null;
379 // The method will return to the caller at this point.
380 }
381 }
382 assert selector != null && selector.isOpen();
383 }
384
385 private void processTaskQueue() {
386 for (;;) {
387 final Runnable task = taskQueue.poll();
388 if (task == null) {
389 break;
390 }
391 task.run();
392 try {
393 cleanUpCancelledKeys();
394 } catch (IOException e) {
395 // Ignore
396 }
397 }
398 }
399
400 protected final void increaseCancelledKeys() {
401 cancelledKeys ++;
402 }
403
404 protected final boolean cleanUpCancelledKeys() throws IOException {
405 if (cancelledKeys >= CLEANUP_INTERVAL) {
406 cancelledKeys = 0;
407 selector.selectNow();
408 return true;
409 }
410 return false;
411 }
412
413 public void shutdown() {
414 if (isIoThread()) {
415 throw new IllegalStateException("Must not be called from a I/O-Thread to prevent deadlocks!");
416 }
417
418 Selector selector = this.selector;
419 shutdown = true;
420 if (selector != null) {
421 selector.wakeup();
422 }
423 try {
424 shutdownLatch.await();
425 } catch (InterruptedException e) {
426 logger.error("Interrupted while wait for resources to be released #" + id);
427 Thread.currentThread().interrupt();
428 }
429 }
430
431 protected abstract void process(Selector selector) throws IOException;
432
433 protected int select(Selector selector) throws IOException {
434 return SelectorUtil.select(selector);
435 }
436
437 protected abstract void close(SelectionKey k);
438
439 protected abstract ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner);
440
441 protected abstract Runnable createRegisterTask(Channel channel, ChannelFuture future);
442 }