1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.net.SocketAddress;
21 import java.util.concurrent.Executor;
22
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelPipeline;
27 import org.jboss.netty.channel.ChannelState;
28 import org.jboss.netty.channel.ChannelStateEvent;
29 import org.jboss.netty.channel.MessageEvent;
30 import org.jboss.netty.util.ThreadNameDeterminer;
31 import org.jboss.netty.util.ThreadRenamingRunnable;
32 import org.jboss.netty.util.internal.DeadLockProofWorker;
33
34 class OioDatagramPipelineSink extends AbstractOioChannelSink {
35
36 private final Executor workerExecutor;
37 private final ThreadNameDeterminer determiner;
38
39 OioDatagramPipelineSink(Executor workerExecutor, ThreadNameDeterminer determiner) {
40 this.workerExecutor = workerExecutor;
41 this.determiner = determiner;
42 }
43
44 public void eventSunk(
45 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
46 OioDatagramChannel channel = (OioDatagramChannel) e.getChannel();
47 ChannelFuture future = e.getFuture();
48 if (e instanceof ChannelStateEvent) {
49 ChannelStateEvent stateEvent = (ChannelStateEvent) e;
50 ChannelState state = stateEvent.getState();
51 Object value = stateEvent.getValue();
52 switch (state) {
53 case OPEN:
54 if (Boolean.FALSE.equals(value)) {
55 AbstractOioWorker.close(channel, future);
56 }
57 break;
58 case BOUND:
59 if (value != null) {
60 bind(channel, future, (SocketAddress) value);
61 } else {
62 AbstractOioWorker.close(channel, future);
63 }
64 break;
65 case CONNECTED:
66 if (value != null) {
67 connect(channel, future, (SocketAddress) value);
68 } else {
69 OioDatagramWorker.disconnect(channel, future);
70 }
71 break;
72 case INTEREST_OPS:
73 AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
74 break;
75 }
76 } else if (e instanceof MessageEvent) {
77 MessageEvent evt = (MessageEvent) e;
78 OioDatagramWorker.write(
79 channel, future, evt.getMessage(), evt.getRemoteAddress());
80 }
81 }
82
83 private void bind(
84 OioDatagramChannel channel, ChannelFuture future,
85 SocketAddress localAddress) {
86 boolean bound = false;
87 boolean workerStarted = false;
88 try {
89 channel.socket.bind(localAddress);
90 bound = true;
91
92
93 future.setSuccess();
94 fireChannelBound(channel, channel.getLocalAddress());
95
96
97 DeadLockProofWorker.start(
98 workerExecutor,
99 new ThreadRenamingRunnable(
100 new OioDatagramWorker(channel),
101 "Old I/O datagram worker (" + channel + ')',
102 determiner));
103 workerStarted = true;
104 } catch (Throwable t) {
105 future.setFailure(t);
106 fireExceptionCaught(channel, t);
107 } finally {
108 if (bound && !workerStarted) {
109 AbstractOioWorker.close(channel, future);
110 }
111 }
112 }
113
114 private void connect(
115 OioDatagramChannel channel, ChannelFuture future,
116 SocketAddress remoteAddress) {
117
118 boolean bound = channel.isBound();
119 boolean connected = false;
120 boolean workerStarted = false;
121
122 future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
123
124
125
126 channel.remoteAddress = null;
127
128 try {
129 channel.socket.connect(remoteAddress);
130 connected = true;
131
132
133 future.setSuccess();
134 if (!bound) {
135 fireChannelBound(channel, channel.getLocalAddress());
136 }
137 fireChannelConnected(channel, channel.getRemoteAddress());
138
139 String threadName = "Old I/O datagram worker (" + channel + ')';
140 if (!bound) {
141
142 DeadLockProofWorker.start(
143 workerExecutor,
144 new ThreadRenamingRunnable(
145 new OioDatagramWorker(channel), threadName, determiner));
146 } else {
147
148 Thread workerThread = channel.workerThread;
149 if (workerThread != null) {
150 try {
151 workerThread.setName(threadName);
152 } catch (SecurityException e) {
153
154 }
155 }
156 }
157
158 workerStarted = true;
159 } catch (Throwable t) {
160 future.setFailure(t);
161 fireExceptionCaught(channel, t);
162 } finally {
163 if (connected && !workerStarted) {
164 AbstractOioWorker.close(channel, future);
165 }
166 }
167 }
168 }