1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.PushbackInputStream;
21 import java.net.SocketAddress;
22 import java.util.concurrent.Executor;
23
24 import org.jboss.netty.channel.ChannelEvent;
25 import org.jboss.netty.channel.ChannelFuture;
26 import org.jboss.netty.channel.ChannelFutureListener;
27 import org.jboss.netty.channel.ChannelPipeline;
28 import org.jboss.netty.channel.ChannelState;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.MessageEvent;
31 import org.jboss.netty.util.ThreadRenamingRunnable;
32 import org.jboss.netty.util.internal.DeadLockProofWorker;
33
34 class OioClientSocketPipelineSink extends AbstractOioChannelSink {
35
36 private final Executor workerExecutor;
37
38 OioClientSocketPipelineSink(Executor workerExecutor) {
39 this.workerExecutor = workerExecutor;
40 }
41
42 public void eventSunk(
43 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
44 OioClientSocketChannel channel = (OioClientSocketChannel) e.getChannel();
45 ChannelFuture future = e.getFuture();
46 if (e instanceof ChannelStateEvent) {
47 ChannelStateEvent stateEvent = (ChannelStateEvent) e;
48 ChannelState state = stateEvent.getState();
49 Object value = stateEvent.getValue();
50 switch (state) {
51 case OPEN:
52 if (Boolean.FALSE.equals(value)) {
53 AbstractOioWorker.close(channel, future);
54 }
55 break;
56 case BOUND:
57 if (value != null) {
58 bind(channel, future, (SocketAddress) value);
59 } else {
60 AbstractOioWorker.close(channel, future);
61 }
62 break;
63 case CONNECTED:
64 if (value != null) {
65 connect(channel, future, (SocketAddress) value);
66 } else {
67 AbstractOioWorker.close(channel, future);
68 }
69 break;
70 case INTEREST_OPS:
71 AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
72 break;
73 }
74 } else if (e instanceof MessageEvent) {
75 OioWorker.write(
76 channel, future,
77 ((MessageEvent) e).getMessage());
78 }
79 }
80
81 private static void bind(
82 OioClientSocketChannel channel, ChannelFuture future,
83 SocketAddress localAddress) {
84 try {
85 channel.socket.bind(localAddress);
86 future.setSuccess();
87 fireChannelBound(channel, channel.getLocalAddress());
88 } catch (Throwable t) {
89 future.setFailure(t);
90 fireExceptionCaught(channel, t);
91 }
92 }
93
94 private void connect(
95 OioClientSocketChannel channel, ChannelFuture future,
96 SocketAddress remoteAddress) {
97
98 boolean bound = channel.isBound();
99 boolean connected = false;
100 boolean workerStarted = false;
101
102 future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
103
104 try {
105 channel.socket.connect(
106 remoteAddress, channel.getConfig().getConnectTimeoutMillis());
107 connected = true;
108
109
110 channel.in = new PushbackInputStream(channel.socket.getInputStream(), 1);
111 channel.out = channel.socket.getOutputStream();
112
113
114 future.setSuccess();
115 if (!bound) {
116 fireChannelBound(channel, channel.getLocalAddress());
117 }
118 fireChannelConnected(channel, channel.getRemoteAddress());
119
120
121 DeadLockProofWorker.start(
122 workerExecutor,
123 new ThreadRenamingRunnable(
124 new OioWorker(channel),
125 "Old I/O client worker (" + channel + ')'));
126 workerStarted = true;
127 } catch (Throwable t) {
128 future.setFailure(t);
129 fireExceptionCaught(channel, t);
130 } finally {
131 if (connected && !workerStarted) {
132 AbstractOioWorker.close(channel, future);
133 }
134 }
135 }
136 }