View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelException;
20  import org.jboss.netty.channel.ChannelFuture;
21  import org.jboss.netty.logging.InternalLogger;
22  import org.jboss.netty.logging.InternalLoggerFactory;
23  import org.jboss.netty.util.ThreadNameDeterminer;
24  import org.jboss.netty.util.ThreadRenamingRunnable;
25  import org.jboss.netty.util.internal.DeadLockProofWorker;
26  
27  import java.io.IOException;
28  import java.nio.channels.CancelledKeyException;
29  import java.nio.channels.DatagramChannel;
30  import java.nio.channels.SelectableChannel;
31  import java.nio.channels.SelectionKey;
32  import java.nio.channels.Selector;
33  import java.nio.channels.SocketChannel;
34  import java.util.ConcurrentModificationException;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.Executor;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicInteger;
42  
43  abstract class AbstractNioSelector implements NioSelector {
44  
45      private static final AtomicInteger nextId = new AtomicInteger();
46  
47      private final int id = nextId.incrementAndGet();
48  
49      /**
50       * Internal Netty logger.
51       */
52      protected static final InternalLogger logger = InternalLoggerFactory
53              .getInstance(AbstractNioSelector.class);
54  
55      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
56  
57      /**
58       * Executor used to execute {@link Runnable}s such as channel registration
59       * task.
60       */
61      private final Executor executor;
62  
63      /**
64       * If this worker has been started thread will be a reference to the thread
65       * used when starting. i.e. the current thread when the run method is executed.
66       */
67      protected volatile Thread thread;
68  
69      /**
70       * Count down to 0 when the I/O thread starts and {@link #thread} is set to non-null.
71       */
72      final CountDownLatch startupLatch = new CountDownLatch(1);
73  
74      /**
75       * The NIO {@link Selector}.
76       */
77      protected volatile Selector selector;
78  
79      /**
80       * Boolean that controls determines if a blocked Selector.select should
81       * break out of its selection process. In our case we use a timeone for
82       * the select method and the select method will block for that time unless
83       * waken up.
84       */
85      protected final AtomicBoolean wakenUp = new AtomicBoolean();
86  
87      private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
88  
89      private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
90  
91      private final CountDownLatch shutdownLatch = new CountDownLatch(1);
92      private volatile boolean shutdown;
93  
94      AbstractNioSelector(Executor executor) {
95          this(executor, null);
96      }
97  
98      AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
99          this.executor = executor;
100         openSelector(determiner);
101     }
102 
103     public void register(Channel channel, ChannelFuture future) {
104         Runnable task = createRegisterTask(channel, future);
105         registerTask(task);
106     }
107 
108     protected final void registerTask(Runnable task) {
109         taskQueue.add(task);
110 
111         Selector selector = this.selector;
112 
113         if (selector != null) {
114             if (wakenUp.compareAndSet(false, true)) {
115                 selector.wakeup();
116             }
117         } else {
118             if (taskQueue.remove(task)) {
119                 // the selector was null this means the Worker has already been shutdown.
120                 throw new RejectedExecutionException("Worker has already been shutdown");
121             }
122         }
123     }
124 
125     protected final boolean isIoThread() {
126         return Thread.currentThread() == thread;
127     }
128 
129     public void rebuildSelector() {
130         if (!isIoThread()) {
131             taskQueue.add(new Runnable() {
132                 public void run() {
133                     rebuildSelector();
134                 }
135             });
136             return;
137         }
138 
139         final Selector oldSelector = selector;
140         final Selector newSelector;
141 
142         if (oldSelector == null) {
143             return;
144         }
145 
146         try {
147             newSelector = SelectorUtil.open();
148         } catch (Exception e) {
149             logger.warn("Failed to create a new Selector.", e);
150             return;
151         }
152 
153         // Register all channels to the new Selector.
154         int nChannels = 0;
155         for (;;) {
156             try {
157                 for (SelectionKey key: oldSelector.keys()) {
158                     try {
159                         if (key.channel().keyFor(newSelector) != null) {
160                             continue;
161                         }
162 
163                         int interestOps = key.interestOps();
164                         key.cancel();
165                         key.channel().register(newSelector, interestOps, key.attachment());
166                         nChannels ++;
167                     } catch (Exception e) {
168                         logger.warn("Failed to re-register a Channel to the new Selector,", e);
169                         close(key);
170                     }
171                 }
172             } catch (ConcurrentModificationException e) {
173                 // Probably due to concurrent modification of the key set.
174                 continue;
175             }
176 
177             break;
178         }
179 
180         selector = newSelector;
181 
182         try {
183             // time to close the old selector as everything else is registered to the new one
184             oldSelector.close();
185         } catch (Throwable t) {
186             if (logger.isWarnEnabled()) {
187                 logger.warn("Failed to close the old Selector.", t);
188             }
189         }
190 
191         logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
192     }
193 
194     public void run() {
195         thread = Thread.currentThread();
196         startupLatch.countDown();
197 
198         int selectReturnsImmediately = 0;
199         Selector selector = this.selector;
200 
201         if (selector == null) {
202             return;
203         }
204         // use 80% of the timeout for measure
205         final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
206         boolean wakenupFromLoop = false;
207         for (;;) {
208             wakenUp.set(false);
209 
210             try {
211                 long beforeSelect = System.nanoTime();
212                 int selected = select(selector);
213                 if (selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
214                     long timeBlocked = System.nanoTime() - beforeSelect;
215                     if (timeBlocked < minSelectTimeout) {
216                         boolean notConnected = false;
217                         // loop over all keys as the selector may was unblocked because of a closed channel
218                         for (SelectionKey key: selector.keys()) {
219                             SelectableChannel ch = key.channel();
220                             try {
221                                 if (ch instanceof DatagramChannel && !ch.isOpen() ||
222                                         ch instanceof SocketChannel && !((SocketChannel) ch).isConnected() &&
223                                                 // Only cancel if the connection is not pending
224                                                 // See https://github.com/netty/netty/issues/2931
225                                                 !((SocketChannel) ch).isConnectionPending()) {
226                                     notConnected = true;
227                                     // cancel the key just to be on the safe side
228                                     key.cancel();
229                                 }
230                             } catch (CancelledKeyException e) {
231                                 // ignore
232                             }
233                         }
234                         if (notConnected) {
235                             selectReturnsImmediately = 0;
236                         } else {
237                             if (Thread.interrupted() && !shutdown) {
238                                 // Thread was interrupted but NioSelector was not shutdown.
239                                 // As this is most likely a bug in the handler of the user or it's client
240                                 // library we will log it.
241                                 //
242                                 // See https://github.com/netty/netty/issues/2426
243                                 if (logger.isDebugEnabled()) {
244                                     logger.debug("Selector.select() returned prematurely because the I/O thread " +
245                                             "has been interrupted. Use shutdown() to shut the NioSelector down.");
246                                 }
247                                 selectReturnsImmediately = 0;
248                             } else {
249                                 // Returned before the minSelectTimeout elapsed with nothing selected.
250                                 // This may be because of a bug in JDK NIO Selector provider, so increment the counter
251                                 // which we will use later to see if it's really the bug in JDK.
252                                 selectReturnsImmediately ++;
253                             }
254                         }
255                     } else {
256                         selectReturnsImmediately = 0;
257                     }
258                 } else {
259                     selectReturnsImmediately = 0;
260                 }
261 
262                 if (SelectorUtil.EPOLL_BUG_WORKAROUND) {
263                     if (selectReturnsImmediately == 1024) {
264                         // The selector returned immediately for 10 times in a row,
265                         // so recreate one selector as it seems like we hit the
266                         // famous epoll(..) jdk bug.
267                         rebuildSelector();
268                         selector = this.selector;
269                         selectReturnsImmediately = 0;
270                         wakenupFromLoop = false;
271                         // try to select again
272                         continue;
273                     }
274                 } else {
275                     // reset counter
276                     selectReturnsImmediately = 0;
277                 }
278 
279                 // 'wakenUp.compareAndSet(false, true)' is always evaluated
280                 // before calling 'selector.wakeup()' to reduce the wake-up
281                 // overhead. (Selector.wakeup() is an expensive operation.)
282                 //
283                 // However, there is a race condition in this approach.
284                 // The race condition is triggered when 'wakenUp' is set to
285                 // true too early.
286                 //
287                 // 'wakenUp' is set to true too early if:
288                 // 1) Selector is waken up between 'wakenUp.set(false)' and
289                 //    'selector.select(...)'. (BAD)
290                 // 2) Selector is waken up between 'selector.select(...)' and
291                 //    'if (wakenUp.get()) { ... }'. (OK)
292                 //
293                 // In the first case, 'wakenUp' is set to true and the
294                 // following 'selector.select(...)' will wake up immediately.
295                 // Until 'wakenUp' is set to false again in the next round,
296                 // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
297                 // any attempt to wake up the Selector will fail, too, causing
298                 // the following 'selector.select(...)' call to block
299                 // unnecessarily.
300                 //
301                 // To fix this problem, we wake up the selector again if wakenUp
302                 // is true immediately after selector.select(...).
303                 // It is inefficient in that it wakes up the selector for both
304                 // the first case (BAD - wake-up required) and the second case
305                 // (OK - no wake-up required).
306 
307                 if (wakenUp.get()) {
308                     wakenupFromLoop = true;
309                     selector.wakeup();
310                 } else {
311                     wakenupFromLoop = false;
312                 }
313 
314                 cancelledKeys = 0;
315                 processTaskQueue();
316                 selector = this.selector; // processTaskQueue() can call rebuildSelector()
317 
318                 if (shutdown) {
319                     this.selector = null;
320 
321                     // process one time again
322                     processTaskQueue();
323 
324                     for (SelectionKey k: selector.keys()) {
325                         close(k);
326                     }
327 
328                     try {
329                         selector.close();
330                     } catch (IOException e) {
331                         logger.warn(
332                                 "Failed to close a selector.", e);
333                     }
334                     shutdownLatch.countDown();
335                     break;
336                 } else {
337                     process(selector);
338                 }
339             } catch (Throwable t) {
340                 logger.warn(
341                         "Unexpected exception in the selector loop.", t);
342 
343                 // Prevent possible consecutive immediate failures that lead to
344                 // excessive CPU consumption.
345                 try {
346                     Thread.sleep(1000);
347                 } catch (InterruptedException e) {
348                     // Ignore.
349                 }
350             }
351         }
352     }
353 
354     /**
355      * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for
356      * the {@link AbstractNioChannel}'s when they get registered
357      */
358     private void openSelector(ThreadNameDeterminer determiner) {
359         try {
360             selector = SelectorUtil.open();
361         } catch (Throwable t) {
362             throw new ChannelException("Failed to create a selector.", t);
363         }
364 
365         // Start the worker thread with the new Selector.
366         boolean success = false;
367         try {
368             DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
369             success = true;
370         } finally {
371             if (!success) {
372                 // Release the Selector if the execution fails.
373                 try {
374                     selector.close();
375                 } catch (Throwable t) {
376                     logger.warn("Failed to close a selector.", t);
377                 }
378                 selector = null;
379                 // The method will return to the caller at this point.
380             }
381         }
382         assert selector != null && selector.isOpen();
383     }
384 
385     private void processTaskQueue() {
386         for (;;) {
387             final Runnable task = taskQueue.poll();
388             if (task == null) {
389                 break;
390             }
391             task.run();
392             try {
393                 cleanUpCancelledKeys();
394             } catch (IOException e) {
395                 // Ignore
396             }
397         }
398     }
399 
400     protected final void increaseCancelledKeys() {
401         cancelledKeys ++;
402     }
403 
404     protected final boolean cleanUpCancelledKeys() throws IOException {
405         if (cancelledKeys >= CLEANUP_INTERVAL) {
406             cancelledKeys = 0;
407             selector.selectNow();
408             return true;
409         }
410         return false;
411     }
412 
413     public void shutdown() {
414         if (isIoThread()) {
415             throw new IllegalStateException("Must not be called from a I/O-Thread to prevent deadlocks!");
416         }
417 
418         Selector selector = this.selector;
419         shutdown = true;
420         if (selector != null) {
421             selector.wakeup();
422         }
423         try {
424             shutdownLatch.await();
425         } catch (InterruptedException e) {
426             logger.error("Interrupted while wait for resources to be released #" + id);
427             Thread.currentThread().interrupt();
428         }
429     }
430 
431     protected abstract void process(Selector selector) throws IOException;
432 
433     protected int select(Selector selector) throws IOException {
434         return SelectorUtil.select(selector);
435     }
436 
437     protected abstract void close(SelectionKey k);
438 
439     protected abstract ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner);
440 
441     protected abstract Runnable createRegisterTask(Channel channel, ChannelFuture future);
442 }