1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.net.SocketAddress;
21 import java.util.concurrent.Executor;
22
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelPipeline;
27 import org.jboss.netty.channel.ChannelState;
28 import org.jboss.netty.channel.ChannelStateEvent;
29 import org.jboss.netty.channel.MessageEvent;
30 import org.jboss.netty.util.ThreadRenamingRunnable;
31 import org.jboss.netty.util.internal.DeadLockProofWorker;
32
33 class OioDatagramPipelineSink extends AbstractOioChannelSink {
34
35 private final Executor workerExecutor;
36
37 OioDatagramPipelineSink(Executor workerExecutor) {
38 this.workerExecutor = workerExecutor;
39 }
40
41 public void eventSunk(
42 ChannelPipeline pipeline, ChannelEvent e) throws Exception {
43 OioDatagramChannel channel = (OioDatagramChannel) e.getChannel();
44 ChannelFuture future = e.getFuture();
45 if (e instanceof ChannelStateEvent) {
46 ChannelStateEvent stateEvent = (ChannelStateEvent) e;
47 ChannelState state = stateEvent.getState();
48 Object value = stateEvent.getValue();
49 switch (state) {
50 case OPEN:
51 if (Boolean.FALSE.equals(value)) {
52 AbstractOioWorker.close(channel, future);
53 }
54 break;
55 case BOUND:
56 if (value != null) {
57 bind(channel, future, (SocketAddress) value);
58 } else {
59 AbstractOioWorker.close(channel, future);
60 }
61 break;
62 case CONNECTED:
63 if (value != null) {
64 connect(channel, future, (SocketAddress) value);
65 } else {
66 OioDatagramWorker.disconnect(channel, future);
67 }
68 break;
69 case INTEREST_OPS:
70 AbstractOioWorker.setInterestOps(channel, future, ((Integer) value).intValue());
71 break;
72 }
73 } else if (e instanceof MessageEvent) {
74 MessageEvent evt = (MessageEvent) e;
75 OioDatagramWorker.write(
76 channel, future, evt.getMessage(), evt.getRemoteAddress());
77 }
78 }
79
80 private void bind(
81 OioDatagramChannel channel, ChannelFuture future,
82 SocketAddress localAddress) {
83 boolean bound = false;
84 boolean workerStarted = false;
85 try {
86 channel.socket.bind(localAddress);
87 bound = true;
88
89
90 future.setSuccess();
91 fireChannelBound(channel, channel.getLocalAddress());
92
93
94 DeadLockProofWorker.start(
95 workerExecutor,
96 new ThreadRenamingRunnable(
97 new OioDatagramWorker(channel),
98 "Old I/O datagram worker (" + channel + ')'));
99 workerStarted = true;
100 } catch (Throwable t) {
101 future.setFailure(t);
102 fireExceptionCaught(channel, t);
103 } finally {
104 if (bound && !workerStarted) {
105 AbstractOioWorker.close(channel, future);
106 }
107 }
108 }
109
110 private void connect(
111 OioDatagramChannel channel, ChannelFuture future,
112 SocketAddress remoteAddress) {
113
114 boolean bound = channel.isBound();
115 boolean connected = false;
116 boolean workerStarted = false;
117
118 future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
119
120
121
122 channel.remoteAddress = null;
123
124 try {
125 channel.socket.connect(remoteAddress);
126 connected = true;
127
128
129 future.setSuccess();
130 if (!bound) {
131 fireChannelBound(channel, channel.getLocalAddress());
132 }
133 fireChannelConnected(channel, channel.getRemoteAddress());
134
135 String threadName = "Old I/O datagram worker (" + channel + ')';
136 if (!bound) {
137
138 DeadLockProofWorker.start(
139 workerExecutor,
140 new ThreadRenamingRunnable(
141 new OioDatagramWorker(channel), threadName));
142 } else {
143
144 Thread workerThread = channel.workerThread;
145 if (workerThread != null) {
146 try {
147 workerThread.setName(threadName);
148 } catch (SecurityException e) {
149
150 }
151 }
152 }
153
154 workerStarted = true;
155 } catch (Throwable t) {
156 future.setFailure(t);
157 fireExceptionCaught(channel, t);
158 } finally {
159 if (connected && !workerStarted) {
160 AbstractOioWorker.close(channel, future);
161 }
162 }
163 }
164 }