View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelException;
20  import org.jboss.netty.channel.ChannelFuture;
21  import org.jboss.netty.logging.InternalLogger;
22  import org.jboss.netty.logging.InternalLoggerFactory;
23  import org.jboss.netty.util.ThreadNameDeterminer;
24  import org.jboss.netty.util.ThreadRenamingRunnable;
25  import org.jboss.netty.util.internal.DeadLockProofWorker;
26  
27  import java.io.IOException;
28  import java.nio.channels.CancelledKeyException;
29  import java.nio.channels.DatagramChannel;
30  import java.nio.channels.SelectableChannel;
31  import java.nio.channels.SelectionKey;
32  import java.nio.channels.Selector;
33  import java.nio.channels.SocketChannel;
34  import java.util.ConcurrentModificationException;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.Executor;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicInteger;
42  
43  abstract class AbstractNioSelector implements NioSelector {
44  
45      private static final AtomicInteger nextId = new AtomicInteger();
46  
47      private final int id = nextId.incrementAndGet();
48  
49      /**
50       * Internal Netty logger.
51       */
52      protected static final InternalLogger logger = InternalLoggerFactory
53              .getInstance(AbstractNioSelector.class);
54  
55      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
56  
57      /**
58       * Executor used to execute {@link Runnable}s such as channel registration
59       * task.
60       */
61      private final Executor executor;
62  
63      /**
64       * If this worker has been started thread will be a reference to the thread
65       * used when starting. i.e. the current thread when the run method is executed.
66       */
67      protected volatile Thread thread;
68  
69      /**
70       * The NIO {@link Selector}.
71       */
72      protected volatile Selector selector;
73  
74      /**
75       * Boolean that controls determines if a blocked Selector.select should
76       * break out of its selection process. In our case we use a timeone for
77       * the select method and the select method will block for that time unless
78       * waken up.
79       */
80      protected final AtomicBoolean wakenUp = new AtomicBoolean();
81  
82      private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
83  
84      private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
85  
86      private final CountDownLatch shutdownLatch = new CountDownLatch(1);
87      private volatile boolean shutdown;
88  
89      AbstractNioSelector(Executor executor) {
90          this(executor, null);
91      }
92  
93      AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
94          this.executor = executor;
95          openSelector(determiner);
96      }
97  
98      public void register(Channel channel, ChannelFuture future) {
99          Runnable task = createRegisterTask(channel, future);
100         registerTask(task);
101     }
102 
103     protected final void registerTask(Runnable task) {
104         taskQueue.add(task);
105 
106         Selector selector = this.selector;
107 
108         if (selector != null) {
109             if (wakenUp.compareAndSet(false, true)) {
110                 selector.wakeup();
111             }
112         } else {
113             if (taskQueue.remove(task)) {
114                 // the selector was null this means the Worker has already been shutdown.
115                 throw new RejectedExecutionException("Worker has already been shutdown");
116             }
117         }
118     }
119 
120     protected final boolean isIoThread() {
121         return Thread.currentThread() == thread;
122     }
123 
124     public void rebuildSelector() {
125         if (!isIoThread()) {
126             taskQueue.add(new Runnable() {
127                 public void run() {
128                     rebuildSelector();
129                 }
130             });
131             return;
132         }
133 
134         final Selector oldSelector = selector;
135         final Selector newSelector;
136 
137         if (oldSelector == null) {
138             return;
139         }
140 
141         try {
142             newSelector = Selector.open();
143         } catch (Exception e) {
144             logger.warn("Failed to create a new Selector.", e);
145             return;
146         }
147 
148         // Register all channels to the new Selector.
149         int nChannels = 0;
150         for (;;) {
151             try {
152                 for (SelectionKey key: oldSelector.keys()) {
153                     try {
154                         if (key.channel().keyFor(newSelector) != null) {
155                             continue;
156                         }
157 
158                         int interestOps = key.interestOps();
159                         key.cancel();
160                         key.channel().register(newSelector, interestOps, key.attachment());
161                         nChannels ++;
162                     } catch (Exception e) {
163                         logger.warn("Failed to re-register a Channel to the new Selector,", e);
164                         close(key);
165                     }
166                 }
167             } catch (ConcurrentModificationException e) {
168                 // Probably due to concurrent modification of the key set.
169                 continue;
170             }
171 
172             break;
173         }
174 
175         selector = newSelector;
176 
177         try {
178             // time to close the old selector as everything else is registered to the new one
179             oldSelector.close();
180         } catch (Throwable t) {
181             if (logger.isWarnEnabled()) {
182                 logger.warn("Failed to close the old Selector.", t);
183             }
184         }
185 
186         logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
187     }
188 
189     public void run() {
190         thread = Thread.currentThread();
191 
192         int selectReturnsImmediately = 0;
193         Selector selector = this.selector;
194 
195         if (selector == null) {
196             return;
197         }
198         // use 80% of the timeout for measure
199         final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
200         boolean wakenupFromLoop = false;
201         for (;;) {
202             wakenUp.set(false);
203 
204             try {
205                 long beforeSelect = System.nanoTime();
206                 int selected = select(selector);
207                 if (selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
208                     long timeBlocked = System.nanoTime() - beforeSelect;
209                     if (timeBlocked < minSelectTimeout) {
210                         boolean notConnected = false;
211                         // loop over all keys as the selector may was unblocked because of a closed channel
212                         for (SelectionKey key: selector.keys()) {
213                             SelectableChannel ch = key.channel();
214                             try {
215                                 if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() ||
216                                         ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
217                                     notConnected = true;
218                                     // cancel the key just to be on the safe side
219                                     key.cancel();
220                                 }
221                             } catch (CancelledKeyException e) {
222                                 // ignore
223                             }
224                         }
225                         if (notConnected) {
226                             selectReturnsImmediately = 0;
227                         } else {
228                             if (Thread.interrupted() && !shutdown) {
229                                 // Thread was interrupted but NioSelector was not shutdown.
230                                 // As this is most likely a bug in the handler of the user or it's client
231                                 // library we will log it.
232                                 //
233                                 // See https://github.com/netty/netty/issues/2426
234                                 if (logger.isDebugEnabled()) {
235                                     logger.debug("Selector.select() returned prematurely because the I/O thread " +
236                                             "has been interrupted. Use shutdown() to shut the NioSelector down.");
237                                 }
238                                 selectReturnsImmediately = 0;
239                             } else {
240                                 // Returned before the minSelectTimeout elapsed with nothing selected.
241                                 // This may be because of a bug in JDK NIO Selector provider, so increment the counter
242                                 // which we will use later to see if it's really the bug in JDK.
243                                 selectReturnsImmediately ++;
244                             }
245                         }
246                     } else {
247                         selectReturnsImmediately = 0;
248                     }
249                 } else {
250                     selectReturnsImmediately = 0;
251                 }
252 
253                 if (SelectorUtil.EPOLL_BUG_WORKAROUND) {
254                     if (selectReturnsImmediately == 1024) {
255                         // The selector returned immediately for 10 times in a row,
256                         // so recreate one selector as it seems like we hit the
257                         // famous epoll(..) jdk bug.
258                         rebuildSelector();
259                         selector = this.selector;
260                         selectReturnsImmediately = 0;
261                         wakenupFromLoop = false;
262                         // try to select again
263                         continue;
264                     }
265                 } else {
266                     // reset counter
267                     selectReturnsImmediately = 0;
268                 }
269 
270                 // 'wakenUp.compareAndSet(false, true)' is always evaluated
271                 // before calling 'selector.wakeup()' to reduce the wake-up
272                 // overhead. (Selector.wakeup() is an expensive operation.)
273                 //
274                 // However, there is a race condition in this approach.
275                 // The race condition is triggered when 'wakenUp' is set to
276                 // true too early.
277                 //
278                 // 'wakenUp' is set to true too early if:
279                 // 1) Selector is waken up between 'wakenUp.set(false)' and
280                 //    'selector.select(...)'. (BAD)
281                 // 2) Selector is waken up between 'selector.select(...)' and
282                 //    'if (wakenUp.get()) { ... }'. (OK)
283                 //
284                 // In the first case, 'wakenUp' is set to true and the
285                 // following 'selector.select(...)' will wake up immediately.
286                 // Until 'wakenUp' is set to false again in the next round,
287                 // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
288                 // any attempt to wake up the Selector will fail, too, causing
289                 // the following 'selector.select(...)' call to block
290                 // unnecessarily.
291                 //
292                 // To fix this problem, we wake up the selector again if wakenUp
293                 // is true immediately after selector.select(...).
294                 // It is inefficient in that it wakes up the selector for both
295                 // the first case (BAD - wake-up required) and the second case
296                 // (OK - no wake-up required).
297 
298                 if (wakenUp.get()) {
299                     wakenupFromLoop = true;
300                     selector.wakeup();
301                 } else {
302                     wakenupFromLoop = false;
303                 }
304 
305                 cancelledKeys = 0;
306                 processTaskQueue();
307                 selector = this.selector; // processTaskQueue() can call rebuildSelector()
308 
309                 if (shutdown) {
310                     this.selector = null;
311 
312                     // process one time again
313                     processTaskQueue();
314 
315                     for (SelectionKey k: selector.keys()) {
316                         close(k);
317                     }
318 
319                     try {
320                         selector.close();
321                     } catch (IOException e) {
322                         logger.warn(
323                                 "Failed to close a selector.", e);
324                     }
325                     shutdownLatch.countDown();
326                     break;
327                 } else {
328                     process(selector);
329                 }
330             } catch (Throwable t) {
331                 logger.warn(
332                         "Unexpected exception in the selector loop.", t);
333 
334                 // Prevent possible consecutive immediate failures that lead to
335                 // excessive CPU consumption.
336                 try {
337                     Thread.sleep(1000);
338                 } catch (InterruptedException e) {
339                     // Ignore.
340                 }
341             }
342         }
343     }
344 
345     /**
346      * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for
347      * the {@link AbstractNioChannel}'s when they get registered
348      */
349     private void openSelector(ThreadNameDeterminer determiner) {
350         try {
351             selector = Selector.open();
352         } catch (Throwable t) {
353             throw new ChannelException("Failed to create a selector.", t);
354         }
355 
356         // Start the worker thread with the new Selector.
357         boolean success = false;
358         try {
359             DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
360             success = true;
361         } finally {
362             if (!success) {
363                 // Release the Selector if the execution fails.
364                 try {
365                     selector.close();
366                 } catch (Throwable t) {
367                     logger.warn("Failed to close a selector.", t);
368                 }
369                 selector = null;
370                 // The method will return to the caller at this point.
371             }
372         }
373         assert selector != null && selector.isOpen();
374     }
375 
376     private void processTaskQueue() {
377         for (;;) {
378             final Runnable task = taskQueue.poll();
379             if (task == null) {
380                 break;
381             }
382             task.run();
383             try {
384                 cleanUpCancelledKeys();
385             } catch (IOException e) {
386                 // Ignore
387             }
388         }
389     }
390 
391     protected final void increaseCancelledKeys() {
392         cancelledKeys ++;
393     }
394 
395     protected final boolean cleanUpCancelledKeys() throws IOException {
396         if (cancelledKeys >= CLEANUP_INTERVAL) {
397             cancelledKeys = 0;
398             selector.selectNow();
399             return true;
400         }
401         return false;
402     }
403 
404     public void shutdown() {
405         if (isIoThread()) {
406             throw new IllegalStateException("Must not be called from a I/O-Thread to prevent deadlocks!");
407         }
408 
409         Selector selector = this.selector;
410         shutdown = true;
411         if (selector != null) {
412             selector.wakeup();
413         }
414         try {
415             shutdownLatch.await();
416         } catch (InterruptedException e) {
417             logger.error("Interrupted while wait for resources to be released #" + id);
418             Thread.currentThread().interrupt();
419         }
420     }
421 
422     protected abstract void process(Selector selector) throws IOException;
423 
424     protected int select(Selector selector) throws IOException {
425         return SelectorUtil.select(selector);
426     }
427 
428     protected abstract void close(SelectionKey k);
429 
430     protected abstract ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner);
431 
432     protected abstract Runnable createRegisterTask(Channel channel, ChannelFuture future);
433 }