View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelException;
20  import org.jboss.netty.channel.ChannelFuture;
21  import org.jboss.netty.logging.InternalLogger;
22  import org.jboss.netty.logging.InternalLoggerFactory;
23  import org.jboss.netty.util.ThreadNameDeterminer;
24  import org.jboss.netty.util.ThreadRenamingRunnable;
25  import org.jboss.netty.util.internal.DeadLockProofWorker;
26  
27  import java.io.IOException;
28  import java.nio.channels.CancelledKeyException;
29  import java.nio.channels.DatagramChannel;
30  import java.nio.channels.SelectableChannel;
31  import java.nio.channels.SelectionKey;
32  import java.nio.channels.Selector;
33  import java.nio.channels.SocketChannel;
34  import java.util.ConcurrentModificationException;
35  import java.util.Queue;
36  import java.util.concurrent.ConcurrentLinkedQueue;
37  import java.util.concurrent.CountDownLatch;
38  import java.util.concurrent.Executor;
39  import java.util.concurrent.RejectedExecutionException;
40  import java.util.concurrent.atomic.AtomicBoolean;
41  import java.util.concurrent.atomic.AtomicInteger;
42  
43  abstract class AbstractNioSelector implements NioSelector {
44  
45      private static final AtomicInteger nextId = new AtomicInteger();
46  
47      private final int id = nextId.incrementAndGet();
48  
49      /**
50       * Internal Netty logger.
51       */
52      protected static final InternalLogger logger = InternalLoggerFactory
53              .getInstance(AbstractNioSelector.class);
54  
55      private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.
56  
57      /**
58       * Executor used to execute {@link Runnable}s such as channel registration
59       * task.
60       */
61      private final Executor executor;
62  
63      /**
64       * If this worker has been started thread will be a reference to the thread
65       * used when starting. i.e. the current thread when the run method is executed.
66       */
67      protected volatile Thread thread;
68  
69      /**
70       * Count down to 0 when the I/O thread starts and {@link #thread} is set to non-null.
71       */
72      final CountDownLatch startupLatch = new CountDownLatch(1);
73  
74      /**
75       * The NIO {@link Selector}.
76       */
77      protected volatile Selector selector;
78  
79      /**
80       * Boolean that controls determines if a blocked Selector.select should
81       * break out of its selection process. In our case we use a timeone for
82       * the select method and the select method will block for that time unless
83       * waken up.
84       */
85      protected final AtomicBoolean wakenUp = new AtomicBoolean();
86  
87      private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
88  
89      private volatile int cancelledKeys; // should use AtomicInteger but we just need approximation
90  
91      private final CountDownLatch shutdownLatch = new CountDownLatch(1);
92      private volatile boolean shutdown;
93  
94      AbstractNioSelector(Executor executor) {
95          this(executor, null);
96      }
97  
98      AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {
99          this.executor = executor;
100         openSelector(determiner);
101     }
102 
103     public void register(Channel channel, ChannelFuture future) {
104         Runnable task = createRegisterTask(channel, future);
105         registerTask(task);
106     }
107 
108     protected final void registerTask(Runnable task) {
109         taskQueue.add(task);
110 
111         Selector selector = this.selector;
112 
113         if (selector != null) {
114             if (wakenUp.compareAndSet(false, true)) {
115                 selector.wakeup();
116             }
117         } else {
118             if (taskQueue.remove(task)) {
119                 // the selector was null this means the Worker has already been shutdown.
120                 throw new RejectedExecutionException("Worker has already been shutdown");
121             }
122         }
123     }
124 
125     protected final boolean isIoThread() {
126         return Thread.currentThread() == thread;
127     }
128 
129     public void rebuildSelector() {
130         if (!isIoThread()) {
131             taskQueue.add(new Runnable() {
132                 public void run() {
133                     rebuildSelector();
134                 }
135             });
136             return;
137         }
138 
139         final Selector oldSelector = selector;
140         final Selector newSelector;
141 
142         if (oldSelector == null) {
143             return;
144         }
145 
146         try {
147             newSelector = SelectorUtil.open();
148         } catch (Exception e) {
149             logger.warn("Failed to create a new Selector.", e);
150             return;
151         }
152 
153         // Register all channels to the new Selector.
154         int nChannels = 0;
155         for (;;) {
156             try {
157                 for (SelectionKey key: oldSelector.keys()) {
158                     try {
159                         if (key.channel().keyFor(newSelector) != null) {
160                             continue;
161                         }
162 
163                         int interestOps = key.interestOps();
164                         key.cancel();
165                         key.channel().register(newSelector, interestOps, key.attachment());
166                         nChannels ++;
167                     } catch (Exception e) {
168                         logger.warn("Failed to re-register a Channel to the new Selector,", e);
169                         close(key);
170                     }
171                 }
172             } catch (ConcurrentModificationException e) {
173                 // Probably due to concurrent modification of the key set.
174                 continue;
175             }
176 
177             break;
178         }
179 
180         selector = newSelector;
181 
182         try {
183             // time to close the old selector as everything else is registered to the new one
184             oldSelector.close();
185         } catch (Throwable t) {
186             if (logger.isWarnEnabled()) {
187                 logger.warn("Failed to close the old Selector.", t);
188             }
189         }
190 
191         logger.info("Migrated " + nChannels + " channel(s) to the new Selector,");
192     }
193 
194     public void run() {
195         thread = Thread.currentThread();
196         startupLatch.countDown();
197 
198         int selectReturnsImmediately = 0;
199         Selector selector = this.selector;
200 
201         if (selector == null) {
202             return;
203         }
204         // use 80% of the timeout for measure
205         final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
206         boolean wakenupFromLoop = false;
207         for (;;) {
208             wakenUp.set(false);
209 
210             try {
211                 long beforeSelect = System.nanoTime();
212                 int selected = select(selector);
213                 if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
214                     long timeBlocked = System.nanoTime() - beforeSelect;
215 
216                     if (timeBlocked < minSelectTimeout) {
217                         boolean notConnected = false;
218                         // loop over all keys as the selector may was unblocked because of a closed channel
219                         for (SelectionKey key: selector.keys()) {
220                             SelectableChannel ch = key.channel();
221                             try {
222                                 if (ch instanceof DatagramChannel && !ch.isOpen() ||
223                                         ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
224                                     notConnected = true;
225                                     // cancel the key just to be on the safe side
226                                     key.cancel();
227                                 }
228                             } catch (CancelledKeyException e) {
229                                 // ignore
230                             }
231                         }
232                         if (notConnected) {
233                             selectReturnsImmediately = 0;
234                         } else {
235                             // returned before the minSelectTimeout elapsed with nothing select.
236                             // this may be the cause of the jdk epoll(..) bug, so increment the counter
237                             // which we use later to see if its really the jdk bug.
238                             selectReturnsImmediately ++;
239                         }
240                     } else {
241                         selectReturnsImmediately = 0;
242                     }
243 
244                     if (selectReturnsImmediately == 1024) {
245                         // The selector returned immediately for 10 times in a row,
246                         // so recreate one selector as it seems like we hit the
247                         // famous epoll(..) jdk bug.
248                         rebuildSelector();
249                         selector = this.selector;
250                         selectReturnsImmediately = 0;
251                         wakenupFromLoop = false;
252                         // try to select again
253                         continue;
254                     }
255                 } else {
256                     // reset counter
257                     selectReturnsImmediately = 0;
258                 }
259 
260                 // 'wakenUp.compareAndSet(false, true)' is always evaluated
261                 // before calling 'selector.wakeup()' to reduce the wake-up
262                 // overhead. (Selector.wakeup() is an expensive operation.)
263                 //
264                 // However, there is a race condition in this approach.
265                 // The race condition is triggered when 'wakenUp' is set to
266                 // true too early.
267                 //
268                 // 'wakenUp' is set to true too early if:
269                 // 1) Selector is waken up between 'wakenUp.set(false)' and
270                 //    'selector.select(...)'. (BAD)
271                 // 2) Selector is waken up between 'selector.select(...)' and
272                 //    'if (wakenUp.get()) { ... }'. (OK)
273                 //
274                 // In the first case, 'wakenUp' is set to true and the
275                 // following 'selector.select(...)' will wake up immediately.
276                 // Until 'wakenUp' is set to false again in the next round,
277                 // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
278                 // any attempt to wake up the Selector will fail, too, causing
279                 // the following 'selector.select(...)' call to block
280                 // unnecessarily.
281                 //
282                 // To fix this problem, we wake up the selector again if wakenUp
283                 // is true immediately after selector.select(...).
284                 // It is inefficient in that it wakes up the selector for both
285                 // the first case (BAD - wake-up required) and the second case
286                 // (OK - no wake-up required).
287 
288                 if (wakenUp.get()) {
289                     wakenupFromLoop = true;
290                     selector.wakeup();
291                 } else {
292                     wakenupFromLoop = false;
293                 }
294 
295                 cancelledKeys = 0;
296                 processTaskQueue();
297                 selector = this.selector; // processTaskQueue() can call rebuildSelector()
298 
299                 if (shutdown) {
300                     this.selector = null;
301 
302                     // process one time again
303                     processTaskQueue();
304 
305                     for (SelectionKey k: selector.keys()) {
306                         close(k);
307                     }
308 
309                     try {
310                         selector.close();
311                     } catch (IOException e) {
312                         logger.warn(
313                                 "Failed to close a selector.", e);
314                     }
315                     shutdownLatch.countDown();
316                     break;
317                 } else {
318                     process(selector);
319                 }
320             } catch (Throwable t) {
321                 logger.warn(
322                         "Unexpected exception in the selector loop.", t);
323 
324                 // Prevent possible consecutive immediate failures that lead to
325                 // excessive CPU consumption.
326                 try {
327                     Thread.sleep(1000);
328                 } catch (InterruptedException e) {
329                     // Ignore.
330                 }
331             }
332         }
333     }
334 
335     /**
336      * Start the {@link AbstractNioWorker} and return the {@link Selector} that will be used for
337      * the {@link AbstractNioChannel}'s when they get registered
338      */
339     private void openSelector(ThreadNameDeterminer determiner) {
340         try {
341             selector = SelectorUtil.open();
342         } catch (Throwable t) {
343             throw new ChannelException("Failed to create a selector.", t);
344         }
345 
346         // Start the worker thread with the new Selector.
347         boolean success = false;
348         try {
349             DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));
350             success = true;
351         } finally {
352             if (!success) {
353                 // Release the Selector if the execution fails.
354                 try {
355                     selector.close();
356                 } catch (Throwable t) {
357                     logger.warn("Failed to close a selector.", t);
358                 }
359                 selector = null;
360                 // The method will return to the caller at this point.
361             }
362         }
363         assert selector != null && selector.isOpen();
364     }
365 
366     private void processTaskQueue() {
367         for (;;) {
368             final Runnable task = taskQueue.poll();
369             if (task == null) {
370                 break;
371             }
372             task.run();
373             try {
374                 cleanUpCancelledKeys();
375             } catch (IOException e) {
376                 // Ignore
377             }
378         }
379     }
380 
381     protected final void increaseCancelledKeys() {
382         cancelledKeys ++;
383     }
384 
385     protected final boolean cleanUpCancelledKeys() throws IOException {
386         if (cancelledKeys >= CLEANUP_INTERVAL) {
387             cancelledKeys = 0;
388             selector.selectNow();
389             return true;
390         }
391         return false;
392     }
393 
394     public void shutdown() {
395         if (isIoThread()) {
396             throw new IllegalStateException("Must not be called from a I/O-Thread to prevent deadlocks!");
397         }
398 
399         Selector selector = this.selector;
400         shutdown = true;
401         if (selector != null) {
402             selector.wakeup();
403         }
404         try {
405             shutdownLatch.await();
406         } catch (InterruptedException e) {
407             logger.error("Interrupted while wait for resources to be released #" + id);
408             Thread.currentThread().interrupt();
409         }
410     }
411 
412     protected abstract void process(Selector selector) throws IOException;
413 
414     protected int select(Selector selector) throws IOException {
415         return SelectorUtil.select(selector);
416     }
417 
418     protected abstract void close(SelectionKey k);
419 
420     protected abstract ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner);
421 
422     protected abstract Runnable createRegisterTask(Channel channel, ChannelFuture future);
423 }