1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.net.InetSocketAddress;
21 import java.util.concurrent.Executor;
22
23 import org.jboss.netty.channel.ChannelEvent;
24 import org.jboss.netty.channel.ChannelFuture;
25 import org.jboss.netty.channel.ChannelFutureListener;
26 import org.jboss.netty.channel.ChannelPipeline;
27 import org.jboss.netty.channel.ChannelState;
28 import org.jboss.netty.channel.ChannelStateEvent;
29 import org.jboss.netty.channel.MessageEvent;
30
31
32
33
34
35 class NioDatagramPipelineSink extends AbstractNioChannelSink {
36
37 private final WorkerPool<NioDatagramWorker> workerPool;
38
39
40
41
42
43
44
45
46
47
48
49
50 NioDatagramPipelineSink(final WorkerPool<NioDatagramWorker> workerPool) {
51 this.workerPool = workerPool;
52 }
53
54
55
56
57
58
59
60
61 public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e)
62 throws Exception {
63 final NioDatagramChannel channel = (NioDatagramChannel) e.getChannel();
64 final ChannelFuture future = e.getFuture();
65 if (e instanceof ChannelStateEvent) {
66 final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
67 final ChannelState state = stateEvent.getState();
68 final Object value = stateEvent.getValue();
69 switch (state) {
70 case OPEN:
71 if (Boolean.FALSE.equals(value)) {
72 channel.worker.close(channel, future);
73 }
74 break;
75 case BOUND:
76 if (value != null) {
77 bind(channel, future, (InetSocketAddress) value);
78 } else {
79 channel.worker.close(channel, future);
80 }
81 break;
82 case CONNECTED:
83 if (value != null) {
84 connect(channel, future, (InetSocketAddress) value);
85 } else {
86 NioDatagramWorker.disconnect(channel, future);
87 }
88 break;
89 case INTEREST_OPS:
90 channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
91 break;
92 }
93 } else if (e instanceof MessageEvent) {
94 final MessageEvent event = (MessageEvent) e;
95 final boolean offered = channel.writeBufferQueue.offer(event);
96 assert offered;
97 channel.worker.writeFromUserCode(channel);
98 }
99 }
100
101 private static void close(NioDatagramChannel channel, ChannelFuture future) {
102 try {
103 channel.getDatagramChannel().socket().close();
104 if (channel.setClosed()) {
105 future.setSuccess();
106 if (channel.isBound()) {
107 fireChannelUnbound(channel);
108 }
109 fireChannelClosed(channel);
110 } else {
111 future.setSuccess();
112 }
113 } catch (final Throwable t) {
114 future.setFailure(t);
115 fireExceptionCaught(channel, t);
116 }
117 }
118
119
120
121
122
123 private static void bind(final NioDatagramChannel channel,
124 final ChannelFuture future, final InetSocketAddress address) {
125 boolean bound = false;
126 boolean started = false;
127 try {
128
129 channel.getDatagramChannel().socket().bind(address);
130 bound = true;
131
132 future.setSuccess();
133 fireChannelBound(channel, address);
134
135 channel.worker.register(channel, null);
136 started = true;
137 } catch (final Throwable t) {
138 future.setFailure(t);
139 fireExceptionCaught(channel, t);
140 } finally {
141 if (!started && bound) {
142 close(channel, future);
143 }
144 }
145 }
146
147 private static void connect(
148 NioDatagramChannel channel, ChannelFuture future,
149 InetSocketAddress remoteAddress) {
150
151 boolean bound = channel.isBound();
152 boolean connected = false;
153 boolean workerStarted = false;
154
155 future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
156
157
158
159 channel.remoteAddress = null;
160
161 try {
162 channel.getDatagramChannel().connect(remoteAddress);
163 connected = true;
164
165
166 future.setSuccess();
167 if (!bound) {
168 fireChannelBound(channel, channel.getLocalAddress());
169 }
170 fireChannelConnected(channel, channel.getRemoteAddress());
171
172 if (!bound) {
173 channel.worker.register(channel, future);
174 }
175
176 workerStarted = true;
177 } catch (Throwable t) {
178 future.setFailure(t);
179 fireExceptionCaught(channel, t);
180 } finally {
181 if (connected && !workerStarted) {
182 channel.worker.close(channel, future);
183 }
184 }
185 }
186
187 NioDatagramWorker nextWorker() {
188 return workerPool.nextWorker();
189 }
190
191 }