1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import org.jboss.netty.channel.ChannelEvent;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.ChannelFutureListener;
21 import org.jboss.netty.channel.ChannelPipeline;
22 import org.jboss.netty.channel.ChannelState;
23 import org.jboss.netty.channel.ChannelStateEvent;
24 import org.jboss.netty.channel.MessageEvent;
25 import org.jboss.netty.util.ThreadNameDeterminer;
26 import org.jboss.netty.util.ThreadRenamingRunnable;
27 import org.jboss.netty.util.internal.DeadLockProofWorker;
28
29 import java.io.PushbackInputStream;
30 import java.net.ConnectException;
31 import java.net.SocketAddress;
32 import java.util.concurrent.Executor;
33
34 import static org.jboss.netty.channel.Channels.*;
35
36 class OioClientSocketPipelineSink extends AbstractOioChannelSink {
37
38 private final Executor workerExecutor;
39 private final ThreadNameDeterminer determiner;
40
41 OioClientSocketPipelineSink(Executor workerExecutor, ThreadNameDeterminer determiner) {
42 this.workerExecutor = workerExecutor;
43 this.determiner = determiner;
44 }
45
46 public void eventSunk(
47 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
48 OioClientSocketChannel channel = (OioClientSocketChannel) e.getChannel();
49 ChannelFuture future = e.getFuture();
50 if (e instanceof ChannelStateEvent) {
51 ChannelStateEvent stateEvent = (ChannelStateEvent) e;
52 ChannelState state = stateEvent.getState();
53 Object value = stateEvent.getValue();
54 switch (state) {
55 case OPEN:
56 if (Boolean.FALSE.equals(value)) {
57 AbstractOioWorker.close(channel, future);
58 }
59 break;
60 case BOUND:
61 if (value != null) {
62 bind(channel, future, (SocketAddress) value);
63 } else {
64 AbstractOioWorker.close(channel, future);
65 }
66 break;
67 case CONNECTED:
68 if (value != null) {
69 connect(channel, future, (SocketAddress) value);
70 } else {
71 AbstractOioWorker.close(channel, future);
72 }
73 break;
74 case INTEREST_OPS:
75 AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
76 break;
77 }
78 } else if (e instanceof MessageEvent) {
79 OioWorker.write(
80 channel, future,
81 ((MessageEvent) e).getMessage());
82 }
83 }
84
85 private static void bind(
86 OioClientSocketChannel channel, ChannelFuture future,
87 SocketAddress localAddress) {
88 try {
89 channel.socket.bind(localAddress);
90 future.setSuccess();
91 fireChannelBound(channel, channel.getLocalAddress());
92 } catch (Throwable t) {
93 future.setFailure(t);
94 fireExceptionCaught(channel, t);
95 }
96 }
97
98 private void connect(
99 OioClientSocketChannel channel, ChannelFuture future,
100 SocketAddress remoteAddress) {
101
102 boolean bound = channel.isBound();
103 boolean connected = false;
104 boolean workerStarted = false;
105
106 future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
107
108 try {
109 channel.socket.connect(
110 remoteAddress, channel.getConfig().getConnectTimeoutMillis());
111 connected = true;
112
113
114 channel.in = new PushbackInputStream(channel.socket.getInputStream(), 1);
115 channel.out = channel.socket.getOutputStream();
116
117
118 future.setSuccess();
119 if (!bound) {
120 fireChannelBound(channel, channel.getLocalAddress());
121 }
122 fireChannelConnected(channel, channel.getRemoteAddress());
123
124
125 DeadLockProofWorker.start(
126 workerExecutor,
127 new ThreadRenamingRunnable(
128 new OioWorker(channel),
129 "Old I/O client worker (" + channel + ')',
130 determiner));
131 workerStarted = true;
132 } catch (Throwable t) {
133 if (t instanceof ConnectException) {
134 if (t instanceof ConnectException) {
135 Throwable newT = new ConnectException(t.getMessage() + ": " + remoteAddress);
136 newT.setStackTrace(t.getStackTrace());
137 t = newT;
138 }
139 }
140 future.setFailure(t);
141 fireExceptionCaught(channel, t);
142 } finally {
143 if (connected && !workerStarted) {
144 AbstractOioWorker.close(channel, future);
145 }
146 }
147 }
148 }