1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.ConnectException;
22 import java.net.SocketAddress;
23 import java.nio.channels.CancelledKeyException;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.SocketChannel;
26 import java.nio.channels.SelectableChannel;
27 import java.nio.channels.SelectionKey;
28 import java.nio.channels.Selector;
29 import java.util.Iterator;
30 import java.util.Queue;
31 import java.util.Set;
32 import java.util.concurrent.ConcurrentLinkedQueue;
33 import java.util.concurrent.Executor;
34 import java.util.concurrent.ExecutorService;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.util.concurrent.atomic.AtomicInteger;
38
39 import org.jboss.netty.channel.ChannelEvent;
40 import org.jboss.netty.channel.ChannelException;
41 import org.jboss.netty.channel.ChannelFuture;
42 import org.jboss.netty.channel.ChannelFutureListener;
43 import org.jboss.netty.channel.ChannelPipeline;
44 import org.jboss.netty.channel.ChannelState;
45 import org.jboss.netty.channel.ChannelStateEvent;
46 import org.jboss.netty.channel.MessageEvent;
47 import org.jboss.netty.logging.InternalLogger;
48 import org.jboss.netty.logging.InternalLoggerFactory;
49 import org.jboss.netty.util.Timeout;
50 import org.jboss.netty.util.Timer;
51 import org.jboss.netty.util.TimerTask;
52 import org.jboss.netty.util.ThreadRenamingRunnable;
53 import org.jboss.netty.util.internal.DeadLockProofWorker;
54
55 class NioClientSocketPipelineSink extends AbstractNioChannelSink {
56
57 private static final AtomicInteger nextId = new AtomicInteger();
58
59 static final InternalLogger logger =
60 InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
61
62 final Executor bossExecutor;
63
64 final int id = nextId.incrementAndGet();
65 private final Boss[] bosses;
66
67 private final AtomicInteger bossIndex = new AtomicInteger();
68
69 private final WorkerPool<NioWorker> workerPool;
70
71 private final Timer timer;
72
73 NioClientSocketPipelineSink(
74 Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool, Timer timer) {
75
76 this.bossExecutor = bossExecutor;
77 this.timer = timer;
78 bosses = new Boss[bossCount];
79 for (int i = 0; i < bosses.length; i ++) {
80 bosses[i] = new Boss(i);
81 }
82
83 this.workerPool = workerPool;
84 }
85
86 public void eventSunk(
87 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
88 if (e instanceof ChannelStateEvent) {
89 ChannelStateEvent event = (ChannelStateEvent) e;
90 NioClientSocketChannel channel =
91 (NioClientSocketChannel) event.getChannel();
92 ChannelFuture future = event.getFuture();
93 ChannelState state = event.getState();
94 Object value = event.getValue();
95
96 switch (state) {
97 case OPEN:
98 if (Boolean.FALSE.equals(value)) {
99 channel.worker.close(channel, future);
100 }
101 break;
102 case BOUND:
103 if (value != null) {
104 bind(channel, future, (SocketAddress) value);
105 } else {
106 channel.worker.close(channel, future);
107 }
108 break;
109 case CONNECTED:
110 if (value != null) {
111 connect(channel, future, (SocketAddress) value);
112 } else {
113 channel.worker.close(channel, future);
114 }
115 break;
116 case INTEREST_OPS:
117 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
118 break;
119 }
120 } else if (e instanceof MessageEvent) {
121 MessageEvent event = (MessageEvent) e;
122 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
123 boolean offered = channel.writeBufferQueue.offer(event);
124 assert offered;
125 channel.worker.writeFromUserCode(channel);
126 }
127 }
128
129 private static void bind(
130 NioClientSocketChannel channel, ChannelFuture future,
131 SocketAddress localAddress) {
132 try {
133 channel.channel.socket().bind(localAddress);
134 channel.boundManually = true;
135 channel.setBound();
136 future.setSuccess();
137 fireChannelBound(channel, channel.getLocalAddress());
138 } catch (Throwable t) {
139 future.setFailure(t);
140 fireExceptionCaught(channel, t);
141 }
142 }
143
144 private void connect(
145 final NioClientSocketChannel channel, final ChannelFuture cf,
146 SocketAddress remoteAddress) {
147 try {
148 if (channel.channel.connect(remoteAddress)) {
149 channel.worker.register(channel, cf);
150 } else {
151 channel.getCloseFuture().addListener(new ChannelFutureListener() {
152 public void operationComplete(ChannelFuture f)
153 throws Exception {
154 if (!cf.isDone()) {
155 cf.setFailure(new ClosedChannelException());
156 }
157 }
158 });
159 cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
160 channel.connectFuture = cf;
161 nextBoss().register(channel);
162 }
163
164 } catch (Throwable t) {
165 cf.setFailure(t);
166 fireExceptionCaught(channel, t);
167 channel.worker.close(channel, succeededFuture(channel));
168 }
169 }
170
171 Boss nextBoss() {
172 return bosses[Math.abs(
173 bossIndex.getAndIncrement() % bosses.length)];
174 }
175
176 NioWorker nextWorker() {
177 return workerPool.nextWorker();
178 }
179
180 private final class Boss implements Runnable {
181
182 volatile Selector selector;
183 private boolean started;
184 private final AtomicBoolean wakenUp = new AtomicBoolean();
185 private final Object startStopLock = new Object();
186 private final Queue<Runnable> registerTaskQueue = new ConcurrentLinkedQueue<Runnable>();
187 private final int subId;
188 private final TimerTask wakeupTask = new TimerTask() {
189 public void run(Timeout timeout) throws Exception {
190
191
192
193
194 Selector selector = NioClientSocketPipelineSink.Boss.this.selector;
195
196 if (selector != null) {
197 if (wakenUp.compareAndSet(false, true)) {
198 selector.wakeup();
199 }
200 }
201 }
202 };
203
204 Boss(int subId) {
205 this.subId = subId;
206 }
207
208 void register(NioClientSocketChannel channel) {
209 Runnable registerTask = new RegisterTask(this, channel);
210 Selector selector;
211
212 synchronized (startStopLock) {
213 if (!started) {
214
215 try {
216 this.selector = selector = Selector.open();
217 } catch (Throwable t) {
218 throw new ChannelException(
219 "Failed to create a selector.", t);
220 }
221
222
223 boolean success = false;
224 try {
225 DeadLockProofWorker.start(bossExecutor,
226 new ThreadRenamingRunnable(this,
227 "New I/O client boss #" + id + '-' + subId));
228
229 success = true;
230 } finally {
231 if (!success) {
232
233 try {
234 selector.close();
235 } catch (Throwable t) {
236 if (logger.isWarnEnabled()) {
237 logger.warn("Failed to close a selector.", t);
238 }
239 }
240 this.selector = selector = null;
241
242 }
243 }
244 } else {
245
246 selector = this.selector;
247 }
248
249 assert selector != null && selector.isOpen();
250
251 started = true;
252 boolean offered = registerTaskQueue.offer(registerTask);
253 assert offered;
254 }
255 int timeout = channel.getConfig().getConnectTimeoutMillis();
256 if (timeout > 0) {
257 if (!channel.isConnected()) {
258 channel.timoutTimer = timer.newTimeout(wakeupTask,
259 timeout, TimeUnit.MILLISECONDS);
260 }
261 }
262 if (wakenUp.compareAndSet(false, true)) {
263 selector.wakeup();
264 }
265
266 }
267
268 public void run() {
269 boolean shutdown = false;
270 int selectReturnsImmediately = 0;
271
272 Selector selector = this.selector;
273
274
275 final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
276 boolean wakenupFromLoop = false;
277 for (;;) {
278 wakenUp.set(false);
279
280 try {
281 long beforeSelect = System.nanoTime();
282 int selected = SelectorUtil.select(selector);
283 if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {
284 long timeBlocked = System.nanoTime() - beforeSelect;
285
286 if (timeBlocked < minSelectTimeout) {
287 boolean notConnected = false;
288
289 for (SelectionKey key: selector.keys()) {
290 SelectableChannel ch = key.channel();
291 try {
292 if (ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
293 notConnected = true;
294
295 key.cancel();
296 }
297 } catch (CancelledKeyException e) {
298
299 }
300 }
301 if (notConnected) {
302 selectReturnsImmediately = 0;
303 } else {
304
305
306
307 selectReturnsImmediately ++;
308
309 }
310
311 } else {
312 selectReturnsImmediately = 0;
313 }
314
315 if (selectReturnsImmediately == 1024) {
316
317
318
319 selector = recreateSelector();
320 selectReturnsImmediately = 0;
321 wakenupFromLoop = false;
322
323 continue;
324 }
325 } else {
326
327 selectReturnsImmediately = 0;
328 }
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358 if (wakenUp.get()) {
359 wakenupFromLoop = true;
360 selector.wakeup();
361 } else {
362 wakenupFromLoop = false;
363 }
364 processRegisterTaskQueue();
365 processSelectedKeys(selector.selectedKeys());
366
367
368 long currentTimeNanos = System.nanoTime();
369 processConnectTimeout(selector.keys(), currentTimeNanos);
370
371
372
373
374
375
376 if (selector.keys().isEmpty()) {
377 if (shutdown ||
378 bossExecutor instanceof ExecutorService && ((ExecutorService) bossExecutor).isShutdown()) {
379
380 synchronized (startStopLock) {
381 if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
382 started = false;
383 try {
384 selector.close();
385 } catch (IOException e) {
386 if (logger.isWarnEnabled()) {
387 logger.warn(
388 "Failed to close a selector.", e);
389 }
390
391 } finally {
392 this.selector = null;
393 }
394 break;
395 } else {
396 shutdown = false;
397 }
398 }
399 } else {
400
401 shutdown = true;
402 }
403 } else {
404 shutdown = false;
405 }
406 } catch (Throwable t) {
407 if (logger.isWarnEnabled()) {
408 logger.warn(
409 "Unexpected exception in the selector loop.", t);
410 }
411
412
413
414 try {
415 Thread.sleep(1000);
416 } catch (InterruptedException e) {
417
418 }
419 }
420 }
421 }
422
423 private void processRegisterTaskQueue() {
424 for (;;) {
425 final Runnable task = registerTaskQueue.poll();
426 if (task == null) {
427 break;
428 }
429
430 task.run();
431 }
432 }
433
434 private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
435
436
437
438 if (selectedKeys.isEmpty()) {
439 return;
440 }
441 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
442 SelectionKey k = i.next();
443 i.remove();
444
445 if (!k.isValid()) {
446 close(k);
447 continue;
448 }
449
450 try {
451 if (k.isConnectable()) {
452 connect(k);
453 }
454 } catch (Throwable t) {
455 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
456 ch.connectFuture.setFailure(t);
457 fireExceptionCaught(ch, t);
458 k.cancel();
459 ch.worker.close(ch, succeededFuture(ch));
460 }
461 }
462 }
463
464 private void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
465 ConnectException cause = null;
466 for (SelectionKey k: keys) {
467 if (!k.isValid()) {
468
469
470
471
472
473
474
475
476 continue;
477 }
478
479 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
480 if (ch.connectDeadlineNanos > 0 &&
481 currentTimeNanos >= ch.connectDeadlineNanos) {
482
483 if (cause == null) {
484 cause = new ConnectException("connection timed out");
485 }
486
487 ch.connectFuture.setFailure(cause);
488 fireExceptionCaught(ch, cause);
489 ch.worker.close(ch, succeededFuture(ch));
490 }
491 }
492 }
493
494 private void connect(SelectionKey k) throws IOException {
495 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
496 if (ch.channel.finishConnect()) {
497 k.cancel();
498 if (ch.timoutTimer != null) {
499 ch.timoutTimer.cancel();
500 }
501 ch.worker.register(ch, ch.connectFuture);
502 }
503 }
504
505 private void close(SelectionKey k) {
506 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
507 ch.worker.close(ch, succeededFuture(ch));
508 }
509
510
511
512 private Selector recreateSelector() throws IOException {
513 Selector newSelector = Selector.open();
514 Selector selector = this.selector;
515 this.selector = newSelector;
516
517
518
519 for (SelectionKey key: selector.keys()) {
520 SelectableChannel ch = key.channel();
521 int ops = key.interestOps();
522 Object att = key.attachment();
523
524 key.cancel();
525
526 try {
527
528 ch.register(newSelector, ops, att);
529 } catch (ClosedChannelException e) {
530
531 close(key);
532 }
533 }
534
535 try {
536
537 selector.close();
538 } catch (Throwable t) {
539 if (logger.isWarnEnabled()) {
540 logger.warn("Failed to close a selector.", t);
541 }
542 }
543 if (logger.isWarnEnabled()) {
544 logger.warn("Recreated Selector because of possible jdk epoll(..) bug");
545 }
546 return newSelector;
547 }
548
549 }
550
551 private static final class RegisterTask implements Runnable {
552 private final Boss boss;
553 private final NioClientSocketChannel channel;
554
555 RegisterTask(Boss boss, NioClientSocketChannel channel) {
556 this.boss = boss;
557 this.channel = channel;
558 }
559
560 public void run() {
561 try {
562 channel.channel.register(
563 boss.selector, SelectionKey.OP_CONNECT, channel);
564 } catch (ClosedChannelException e) {
565 channel.worker.close(channel, succeededFuture(channel));
566 }
567
568 int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
569 if (connectTimeout > 0) {
570 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
571 }
572 }
573 }
574 }