1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.Channel;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.ConnectTimeoutException;
21 import org.jboss.netty.util.ThreadNameDeterminer;
22 import org.jboss.netty.util.ThreadRenamingRunnable;
23 import org.jboss.netty.util.Timeout;
24 import org.jboss.netty.util.Timer;
25 import org.jboss.netty.util.TimerTask;
26
27 import java.io.IOException;
28 import java.net.ConnectException;
29 import java.nio.channels.ClosedChannelException;
30 import java.nio.channels.SelectionKey;
31 import java.nio.channels.Selector;
32 import java.util.Iterator;
33 import java.util.Set;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.TimeUnit;
36
37 import static org.jboss.netty.channel.Channels.*;
38
39
40
41
42 public final class NioClientBoss extends AbstractNioSelector implements Boss {
43
44 private final TimerTask wakeupTask = new TimerTask() {
45 public void run(Timeout timeout) throws Exception {
46
47
48
49
50 Selector selector = NioClientBoss.this.selector;
51
52 if (selector != null) {
53 if (wakenUp.compareAndSet(false, true)) {
54 selector.wakeup();
55 }
56 }
57 }
58 };
59
60 private final Timer timer;
61
62 NioClientBoss(Executor bossExecutor, Timer timer, ThreadNameDeterminer determiner) {
63 super(bossExecutor, determiner);
64 this.timer = timer;
65 }
66
67 @Override
68 protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {
69 return new ThreadRenamingRunnable(this, "New I/O boss #" + id, determiner);
70 }
71
72 @Override
73 protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
74 return new RegisterTask(this, (NioClientSocketChannel) channel);
75 }
76
77 @Override
78 protected void process(Selector selector) {
79 processSelectedKeys(selector.selectedKeys());
80
81
82 long currentTimeNanos = System.nanoTime();
83 processConnectTimeout(selector.keys(), currentTimeNanos);
84 }
85
86 private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
87
88
89
90
91 if (selectedKeys.isEmpty()) {
92 return;
93 }
94 for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
95 SelectionKey k = i.next();
96 i.remove();
97
98 if (!k.isValid()) {
99 close(k);
100 continue;
101 }
102
103 try {
104 if (k.isConnectable()) {
105 connect(k);
106 }
107 } catch (Throwable t) {
108 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
109 ch.connectFuture.setFailure(t);
110 fireExceptionCaught(ch, t);
111 k.cancel();
112 ch.worker.close(ch, succeededFuture(ch));
113 }
114 }
115 }
116
117 private static void processConnectTimeout(Set<SelectionKey> keys, long currentTimeNanos) {
118 ConnectException cause = null;
119 for (SelectionKey k: keys) {
120 if (!k.isValid()) {
121
122
123
124
125
126
127
128
129 continue;
130 }
131
132 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
133 if (ch.connectDeadlineNanos > 0 &&
134 currentTimeNanos >= ch.connectDeadlineNanos) {
135
136 if (cause == null) {
137 cause = new ConnectTimeoutException("connection timed out: " + ch.requestedRemoteAddress);
138 }
139
140 ch.connectFuture.setFailure(cause);
141 fireExceptionCaught(ch, cause);
142 ch.worker.close(ch, succeededFuture(ch));
143 }
144 }
145 }
146
147 private static void connect(SelectionKey k) throws IOException {
148 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
149 try {
150 if (ch.channel.finishConnect()) {
151 k.cancel();
152 if (ch.timoutTimer != null) {
153 ch.timoutTimer.cancel();
154 }
155 ch.worker.register(ch, ch.connectFuture);
156 }
157 } catch (ConnectException e) {
158 ConnectException newE = new ConnectException(e.getMessage() + ": " + ch.requestedRemoteAddress);
159 newE.setStackTrace(e.getStackTrace());
160 throw newE;
161 }
162 }
163
164 @Override
165 protected void close(SelectionKey k) {
166 NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
167 ch.worker.close(ch, succeededFuture(ch));
168 }
169
170 private final class RegisterTask implements Runnable {
171 private final NioClientBoss boss;
172 private final NioClientSocketChannel channel;
173
174 RegisterTask(NioClientBoss boss, NioClientSocketChannel channel) {
175 this.boss = boss;
176 this.channel = channel;
177 }
178
179 public void run() {
180 int timeout = channel.getConfig().getConnectTimeoutMillis();
181 if (timeout > 0) {
182 if (!channel.isConnected()) {
183 channel.timoutTimer = timer.newTimeout(wakeupTask,
184 timeout, TimeUnit.MILLISECONDS);
185 }
186 }
187 try {
188 channel.channel.register(
189 boss.selector, SelectionKey.OP_CONNECT, channel);
190 } catch (ClosedChannelException e) {
191 channel.worker.close(channel, succeededFuture(channel));
192 }
193
194 int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
195 if (connectTimeout > 0) {
196 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
197 }
198 }
199 }
200 }