1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.channel.ChannelEvent;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.ChannelFutureListener;
21 import org.jboss.netty.channel.ChannelPipeline;
22 import org.jboss.netty.channel.ChannelState;
23 import org.jboss.netty.channel.ChannelStateEvent;
24 import org.jboss.netty.channel.MessageEvent;
25 import org.jboss.netty.logging.InternalLogger;
26 import org.jboss.netty.logging.InternalLoggerFactory;
27
28 import java.net.ConnectException;
29 import java.net.SocketAddress;
30 import java.nio.channels.ClosedChannelException;
31
32 import static org.jboss.netty.channel.Channels.*;
33
34 class NioClientSocketPipelineSink extends AbstractNioChannelSink {
35
36 static final InternalLogger logger =
37 InternalLoggerFactory.getInstance(NioClientSocketPipelineSink.class);
38
39 private final BossPool<NioClientBoss> bossPool;
40
41 NioClientSocketPipelineSink(BossPool<NioClientBoss> bossPool) {
42 this.bossPool = bossPool;
43 }
44
45 public void eventSunk(
46 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
47 if (e instanceof ChannelStateEvent) {
48 ChannelStateEvent event = (ChannelStateEvent) e;
49 NioClientSocketChannel channel =
50 (NioClientSocketChannel) event.getChannel();
51 ChannelFuture future = event.getFuture();
52 ChannelState state = event.getState();
53 Object value = event.getValue();
54
55 switch (state) {
56 case OPEN:
57 if (Boolean.FALSE.equals(value)) {
58 channel.worker.close(channel, future);
59 }
60 break;
61 case BOUND:
62 if (value != null) {
63 bind(channel, future, (SocketAddress) value);
64 } else {
65 channel.worker.close(channel, future);
66 }
67 break;
68 case CONNECTED:
69 if (value != null) {
70 connect(channel, future, (SocketAddress) value);
71 } else {
72 channel.worker.close(channel, future);
73 }
74 break;
75 case INTEREST_OPS:
76 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
77 break;
78 }
79 } else if (e instanceof MessageEvent) {
80 MessageEvent event = (MessageEvent) e;
81 NioSocketChannel channel = (NioSocketChannel) event.getChannel();
82 boolean offered = channel.writeBufferQueue.offer(event);
83 assert offered;
84 channel.worker.writeFromUserCode(channel);
85 }
86 }
87
88 private static void bind(
89 NioClientSocketChannel channel, ChannelFuture future,
90 SocketAddress localAddress) {
91 try {
92 channel.channel.socket().bind(localAddress);
93 channel.boundManually = true;
94 channel.setBound();
95 future.setSuccess();
96 fireChannelBound(channel, channel.getLocalAddress());
97 } catch (Throwable t) {
98 future.setFailure(t);
99 fireExceptionCaught(channel, t);
100 }
101 }
102
103 private void connect(
104 final NioClientSocketChannel channel, final ChannelFuture cf,
105 SocketAddress remoteAddress) {
106 channel.requestedRemoteAddress = remoteAddress;
107 try {
108 if (channel.channel.connect(remoteAddress)) {
109 channel.worker.register(channel, cf);
110 } else {
111 channel.getCloseFuture().addListener(new ChannelFutureListener() {
112 public void operationComplete(ChannelFuture f)
113 throws Exception {
114 if (!cf.isDone()) {
115 cf.setFailure(new ClosedChannelException());
116 }
117 }
118 });
119 cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
120 channel.connectFuture = cf;
121 nextBoss().register(channel, cf);
122 }
123
124 } catch (Throwable t) {
125 if (t instanceof ConnectException) {
126 Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
127 newT.setStackTrace(t.getStackTrace());
128 t = newT;
129 }
130 cf.setFailure(t);
131 fireExceptionCaught(channel, t);
132 channel.worker.close(channel, succeededFuture(channel));
133 }
134 }
135
136 private NioClientBoss nextBoss() {
137 return bossPool.nextBoss();
138 }
139
140 }