View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.ConnectException;
22  import java.net.SocketAddress;
23  import java.nio.channels.CancelledKeyException;
24  import java.nio.channels.ClosedChannelException;
25  import java.nio.channels.SocketChannel;
26  import java.nio.channels.SelectableChannel;
27  import java.nio.channels.SelectionKey;
28  import java.nio.channels.Selector;
29  import java.util.Iterator;
30  import java.util.Queue;
31  import java.util.Set;
32  import java.util.concurrent.ConcurrentLinkedQueue;
33  import java.util.concurrent.Executor;
34  import java.util.concurrent.ExecutorService;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicBoolean;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import org.jboss.netty.channel.ChannelEvent;
40  import org.jboss.netty.channel.ChannelException;
41  import org.jboss.netty.channel.ChannelFuture;
42  import org.jboss.netty.channel.ChannelFutureListener;
43  import org.jboss.netty.channel.ChannelPipeline;
44  import org.jboss.netty.channel.ChannelState;
45  import org.jboss.netty.channel.ChannelStateEvent;
46  import org.jboss.netty.channel.MessageEvent;
47  import org.jboss.netty.logging.InternalLogger;
48  import org.jboss.netty.logging.InternalLoggerFactory;
49  import org.jboss.netty.util.Timeout;
50  import org.jboss.netty.util.Timer;
51  import org.jboss.netty.util.TimerTask;
52  import org.jboss.netty.util.ThreadRenamingRunnable;
53  import org.jboss.netty.util.internal.DeadLockProofWorker;
54  
55  class NioClientSocketPipelineSink extends AbstractNioChannelSink {
56  
57      private static final AtomicInteger nextId = new AtomicInteger();
58  
59      static final InternalLogger logger =
60          InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
61  
62      final Executor bossExecutor;
63  
64      final int id = nextId.incrementAndGet();
65      private final Boss[] bosses;
66  
67      private final AtomicInteger bossIndex = new AtomicInteger();
68  
69      private final WorkerPool<NioWorker> workerPool;
70  
71      private final Timer timer;
72  
73      NioClientSocketPipelineSink(
74              Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool, Timer timer) {
75  
76          this.bossExecutor = bossExecutor;
77          this.timer = timer;
78          bosses = new Boss[bossCount];
79          for (int i = 0; i < bosses.length; i ++) {
80              bosses[i] = new Boss(i);
81          }
82  
83          this.workerPool = workerPool;
84      }
85  
86      public void eventSunk(
87              ChannelPipeline pipeline, ChannelEvent e) throws Exception {
88          if (e instanceof ChannelStateEvent) {
89              ChannelStateEvent event = (ChannelStateEvent) e;
90              NioClientSocketChannel channel =
91                  (NioClientSocketChannel) event.getChannel();
92              ChannelFuture future = event.getFuture();
93              ChannelState state = event.getState();
94              Object value = event.getValue();
95  
96              switch (state) {
97              case OPEN:
98                  if (Boolean.FALSE.equals(value)) {
99                      channel.worker.close(channel, future);
100                 }
101                 break;
102             case BOUND:
103                 if (value != null) {
104                     bind(channel, future, (SocketAddress) value);
105                 } else {
106                     channel.worker.close(channel, future);
107                 }
108                 break;
109             case CONNECTED:
110                 if (value != null) {
111                     connect(channel, future, (SocketAddress) value);
112                 } else {
113                     channel.worker.close(channel, future);
114                 }
115                 break;
116             case INTEREST_OPS:
117                 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
118                 break;
119             }
120         } else if (e instanceof MessageEvent) {
121             MessageEvent event = (MessageEvent) e;
122             NioSocketChannel channel = (NioSocketChannel) event.getChannel();
123             boolean offered = channel.writeBufferQueue.offer(event);
124             assert offered;
125             channel.worker.writeFromUserCode(channel);
126         }
127     }
128 
129     private static void bind(
130             NioClientSocketChannel channel, ChannelFuture future,
131             SocketAddress localAddress) {
132         try {
133             channel.channel.socket().bind(localAddress);
134             channel.boundManually = true;
135             channel.setBound();
136             future.setSuccess();
137             fireChannelBound(channel, channel.getLocalAddress());
138         } catch (Throwable t) {
139             future.setFailure(t);
140             fireExceptionCaught(channel, t);
141         }
142     }
143 
144     private void connect(
145             final NioClientSocketChannel channel, final ChannelFuture cf,
146             SocketAddress remoteAddress) {
147         try {
148             if (channel.channel.connect(remoteAddress)) {
149                 channel.worker.register(channel, cf);
150             } else {
151                 channel.getCloseFuture().addListener(new ChannelFutureListener() {
152                     public void operationComplete(ChannelFuture f)
153                             throws Exception {
154                         if (!cf.isDone()) {
155                             cf.setFailure(new ClosedChannelException());
156                         }
157                     }
158                 });
159                 cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
160                 channel.connectFuture = cf;
161                 nextBoss().register(channel);
162             }
163 
164         } catch (Throwable t) {
165             cf.setFailure(t);
166             fireExceptionCaught(channel, t);
167             channel.worker.close(channel, succeededFuture(channel));
168         }
169     }
170 
171     Boss nextBoss() {
172         return bosses[Math.abs(
173                 bossIndex.getAndIncrement() % bosses.length)];
174     }
175 
176     NioWorker nextWorker() {
177         return workerPool.nextWorker();
178     }
179 
180     private final class Boss implements Runnable {
181 
182         volatile Selector selector;
183         private boolean started;
184         private final AtomicBoolean wakenUp = new AtomicBoolean();
185         private final Object startStopLock = new Object();
186         private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
187         private final int subId;
188         private final TimerTask wakeupTask = new TimerTask() {
189             public void run(Timeout timeout) throws Exception {
190                 // This is needed to prevent a possible race that can lead to a NPE
191                 // when the selector is closed before this is run
192                 //
193                 // See https://github.com/netty/netty/issues/685
194                 Selector selector = NioClientSocketPipelineSink.Boss.this.selector;
195 
196                 if (selector != null) {
197                     if (wakenUp.compareAndSet(false, true)) {
198                         selector.wakeup();
199                     }
200                 }
201             }
202         };
203 
204         Boss(int subId) {
205             this.subId = subId;
206         }
207 
208         void register(NioClientSocketChannel channel) {
209             Runnable registerTask = new RegisterTask(this, channel);
210             Selector selector;
211 
212             synchronized (startStopLock) {
213                 if (!started) {
214                     // Open a selector if this worker didn't start yet.
215                     try {
216                         this.selector = selector =  Selector.open();
217                     } catch (Throwable t) {
218                         throw new ChannelException(
219                                 "Failed to create a selector.", t);
220                     }
221 
222                     // Start the worker thread with the new Selector.
223                     boolean success = false;
224                     try {
225                         DeadLockProofWorker.start(bossExecutor,
226                                 new ThreadRenamingRunnable(this,
227                                         "New I/O client boss #" + id + '-' + subId));
228 
229                         success = true;
230                     } finally {
231                         if (!success) {
232                             // Release the Selector if the execution fails.
233                             try {
234                                 selector.close();
235                             } catch (Throwable t) {
236                                 if (logger.isWarnEnabled()) {
237                                     logger.warn("Failed to close a selector.", t);
238                                 }
239                             }
240                             this.selector = selector = null;
241                             // The method will return to the caller at this point.
242                         }
243                     }
244                 } else {
245                     // Use the existing selector if this worker has been started.
246                     selector = this.selector;
247                 }
248 
249                 assert selector != null && selector.isOpen();
250 
251                 started = true;
252                 boolean offered = registerTaskQueue.offer(registerTask);
253                 assert offered;
254             }
255             int timeout = channel.getConfig().getConnectTimeoutMillis();
256             if (timeout > 0) {
257                 if (!channel.isConnected()) {
258                     channel.timoutTimer = timer.newTimeout(wakeupTask,
259                             timeout, TimeUnit.MILLISECONDS);
260                 }
261             }
262             if (wakenUp.compareAndSet(false, true)) {
263                 selector.wakeup();
264             }
265 
266         }
267 
268         public void run() {
269             boolean shutdown = false;
270             int selectReturnsImmediately = 0;
271 
272             Selector selector = this.selector;
273 
274             // use 80% of the timeout for measure
275             final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
276             boolean wakenupFromLoop = false;
277             for (;;) {
278                 wakenUp.set(false);
279 
280                 try {
281                     long beforeSelect = System.nanoTime();
282                     int selected = SelectorUtil.select(selector);
283                     if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
284                         long timeBlocked = System.nanoTime() - beforeSelect;
285 
286                         if (timeBlocked < minSelectTimeout) {
287                             boolean notConnected = false;
288                             // loop over all keys as the selector may was unblocked because of a closed channel
289                             for (SelectionKey key: selector.keys()) {
290                                 SelectableChannel ch = key.channel();
291                                 try {
292                                     if (ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
293                                         notConnected = true;
294                                         // cancel the key just to be on the safe side
295                                         key.cancel();
296                                     }
297                                 } catch (CancelledKeyException e) {
298                                     // ignore
299                                 }
300                             }
301                             if (notConnected) {
302                                 selectReturnsImmediately = 0;
303                             } else {
304                                 // returned before the minSelectTimeout elapsed with nothing select.
305                                 // this may be the cause of the jdk epoll(..) bug, so increment the counter
306                                 // which we use later to see if its really the jdk bug.
307                                 selectReturnsImmediately ++;
308 
309                             }
310 
311                         } else {
312                             selectReturnsImmediately = 0;
313                         }
314 
315                         if (selectReturnsImmediately == 1024) {
316                             // The selector returned immediately for 10 times in a row,
317                             // so recreate one selector as it seems like we hit the
318                             // famous epoll(..) jdk bug.
319                             selector = recreateSelector();
320                             selectReturnsImmediately = 0;
321                             wakenupFromLoop = false;
322                             // try to select again
323                             continue;
324                         }
325                     } else {
326                         // reset counter
327                         selectReturnsImmediately = 0;
328                     }
329 
330                     // 'wakenUp.compareAndSet(false, true)' is always evaluated
331                     // before calling 'selector.wakeup()' to reduce the wake-up
332                     // overhead. (Selector.wakeup() is an expensive operation.)
333                     //
334                     // However, there is a race condition in this approach.
335                     // The race condition is triggered when 'wakenUp' is set to
336                     // true too early.
337                     //
338                     // 'wakenUp' is set to true too early if:
339                     // 1) Selector is waken up between 'wakenUp.set(false)' and
340                     //    'selector.select(...)'. (BAD)
341                     // 2) Selector is waken up between 'selector.select(...)' and
342                     //    'if (wakenUp.get()) { ... }'. (OK)
343                     //
344                     // In the first case, 'wakenUp' is set to true and the
345                     // following 'selector.select(...)' will wake up immediately.
346                     // Until 'wakenUp' is set to false again in the next round,
347                     // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
348                     // any attempt to wake up the Selector will fail, too, causing
349                     // the following 'selector.select(...)' call to block
350                     // unnecessarily.
351                     //
352                     // To fix this problem, we wake up the selector again if wakenUp
353                     // is true immediately after selector.select(...).
354                     // It is inefficient in that it wakes up the selector for both
355                     // the first case (BAD - wake-up required) and the second case
356                     // (OK - no wake-up required).
357 
358                     if (wakenUp.get()) {
359                         wakenupFromLoop = true;
360                         selector.wakeup();
361                     } else {
362                         wakenupFromLoop = false;
363                     }
364                     processRegisterTaskQueue();
365                     processSelectedKeys(selector.selectedKeys());
366 
367                     // Handle connection timeout every 10 milliseconds approximately.
368                     long currentTimeNanos = System.nanoTime();
369                     processConnectTimeout(selector.keys(), currentTimeNanos);
370 
371                     // Exit the loop when there's nothing to handle.
372                     // The shutdown flag is used to delay the shutdown of this
373                     // loop to avoid excessive Selector creation when
374                     // connection attempts are made in a one-by-one manner
375                     // instead of concurrent manner.
376                     if (selector.keys().isEmpty()) {
377                         if (shutdown ||
378                             bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
379 
380                             synchronized (startStopLock) {
381                                 if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
382                                     started = false;
383                                     try {
384                                         selector.close();
385                                     } catch (IOException e) {
386                                         if (logger.isWarnEnabled()) {
387                                             logger.warn(
388                                                     "Failed to close a selector.", e);
389                                         }
390 
391                                     } finally {
392                                         this.selector = null;
393                                     }
394                                     break;
395                                 } else {
396                                     shutdown = false;
397                                 }
398                             }
399                         } else {
400                             // Give one more second.
401                             shutdown = true;
402                         }
403                     } else {
404                         shutdown = false;
405                     }
406                 } catch (Throwable t) {
407                     if (logger.isWarnEnabled()) {
408                         logger.warn(
409                                 "Unexpected exception in the selector loop.", t);
410                     }
411 
412 
413                     // Prevent possible consecutive immediate failures.
414                     try {
415                         Thread.sleep(1000);
416                     } catch (InterruptedException e) {
417                         // Ignore.
418                     }
419                 }
420             }
421         }
422 
423         private void processRegisterTaskQueue() {
424             for (;;) {
425                 final Runnable task = registerTaskQueue.poll();
426                 if (task == null) {
427                     break;
428                 }
429 
430                 task.run();
431             }
432         }
433 
434         private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
435             // check if the set is empty and if so just return to not create garbage by
436             // creating a new Iterator every time even if there is nothing to process.
437             // See https://github.com/netty/netty/issues/597
438             if (selectedKeys.isEmpty()) {
439                 return;
440             }
441             for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
442                 SelectionKey k = i.next();
443                 i.remove();
444 
445                 if (!k.isValid()) {
446                     close(k);
447                     continue;
448                 }
449 
450                 try {
451                     if (k.isConnectable()) {
452                         connect(k);
453                     }
454                 } catch (Throwable t) {
455                     NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
456                     ch.connectFuture.setFailure(t);
457                     fireExceptionCaught(ch, t);
458                     k.cancel(); // Some JDK implementations run into an infinite loop without this.
459                     ch.worker.close(ch, succeededFuture(ch));
460                 }
461             }
462         }
463 
464         private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
465             ConnectException cause = null;
466             for (SelectionKey k: keys) {
467                 if (!k.isValid()) {
468                     // Comment the close call again as it gave us major problems
469                     // with ClosedChannelExceptions.
470                     //
471                     // See:
472                     // * https://github.com/netty/netty/issues/142
473                     // * https://github.com/netty/netty/issues/138
474                     //
475                     // close(k);
476                     continue;
477                 }
478 
479                 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
480                 if (ch.connectDeadlineNanos > 0 &&
481                         currentTimeNanos >= ch.connectDeadlineNanos) {
482 
483                     if (cause == null) {
484                         cause = new ConnectException("connection timed out");
485                     }
486 
487                     ch.connectFuture.setFailure(cause);
488                     fireExceptionCaught(ch, cause);
489                     ch.worker.close(ch, succeededFuture(ch));
490                 }
491             }
492         }
493 
494         private void connect(SelectionKey k) throws IOException {
495             NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
496             if (ch.channel.finishConnect()) {
497                 k.cancel();
498                 if (ch.timoutTimer != null) {
499                     ch.timoutTimer.cancel();
500                 }
501                 ch.worker.register(ch, ch.connectFuture);
502             }
503         }
504 
505         private void close(SelectionKey k) {
506             NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
507             ch.worker.close(ch, succeededFuture(ch));
508         }
509 
510         // Create a new selector and "transfer" all channels from the old
511         // selector to the new one
512         private Selector recreateSelector() throws IOException {
513             Selector newSelector = Selector.open();
514             Selector selector = this.selector;
515             this.selector = newSelector;
516 
517             // loop over all the keys that are registered with the old Selector
518             // and register them with the new one
519             for (SelectionKey key: selector.keys()) {
520                 SelectableChannel ch = key.channel();
521                 int ops = key.interestOps();
522                 Object att = key.attachment();
523                 // cancel the old key
524                 key.cancel();
525 
526                 try {
527                     // register the channel with the new selector now
528                     ch.register(newSelector, ops, att);
529                 } catch (ClosedChannelException e) {
530                     // close the Channel if we can't register it
531                     close(key);
532                 }
533             }
534 
535             try {
536                 // time to close the old selector as everything else is registered to the new one
537                 selector.close();
538             } catch (Throwable t) {
539                 if (logger.isWarnEnabled()) {
540                     logger.warn("Failed to close a selector.", t);
541                 }
542             }
543             if (logger.isWarnEnabled()) {
544                 logger.warn("Recreated Selector because of possible jdk epoll(..) bug");
545             }
546             return newSelector;
547         }
548 
549     }
550 
551     private static final class RegisterTask implements Runnable {
552         private final Boss boss;
553         private final NioClientSocketChannel channel;
554 
555         RegisterTask(Boss boss, NioClientSocketChannel channel) {
556             this.boss = boss;
557             this.channel = channel;
558         }
559 
560         public void run() {
561             try {
562                 channel.channel.register(
563                         boss.selector, SelectionKey.OP_CONNECT, channel);
564             } catch (ClosedChannelException e) {
565                 channel.worker.close(channel, succeededFuture(channel));
566             }
567 
568             int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
569             if (connectTimeout > 0) {
570                 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
571             }
572         }
573     }
574 }