1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.buffer.ChannelBufferFactory;
20 import org.jboss.netty.channel.Channel;
21 import org.jboss.netty.channel.ChannelException;
22 import org.jboss.netty.channel.ChannelFuture;
23 import org.jboss.netty.channel.MessageEvent;
24 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
25 import org.jboss.netty.channel.socket.nio.AbstractNioChannel.WriteRequestQueue;
26
27 import java.io.IOException;
28 import java.net.SocketAddress;
29 import java.nio.ByteBuffer;
30 import java.nio.channels.AsynchronousCloseException;
31 import java.nio.channels.ClosedChannelException;
32 import java.nio.channels.DatagramChannel;
33 import java.nio.channels.SelectionKey;
34 import java.nio.channels.Selector;
35 import java.util.Queue;
36 import java.util.concurrent.Executor;
37
38 import static org.jboss.netty.channel.Channels.*;
39
40
41
42
43
44 public class NioDatagramWorker extends AbstractNioWorker {
45
46 private final SocketReceiveBufferAllocator bufferAllocator = new SocketReceiveBufferAllocator();
47
48
49
50
51
52
53
54 NioDatagramWorker(final Executor executor) {
55 super(executor);
56 }
57
58 @Override
59 protected boolean read(final SelectionKey key) {
60 final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
61 ReceiveBufferSizePredictor predictor =
62 channel.getConfig().getReceiveBufferSizePredictor();
63 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
64 final DatagramChannel nioChannel = (DatagramChannel) key.channel();
65 final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
66
67 final ByteBuffer byteBuffer = bufferAllocator.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
68
69 boolean failure = true;
70 SocketAddress remoteAddress = null;
71 try {
72
73
74 remoteAddress = nioChannel.receive(byteBuffer);
75 failure = false;
76 } catch (ClosedChannelException e) {
77
78 } catch (Throwable t) {
79 fireExceptionCaught(channel, t);
80 }
81
82 if (remoteAddress != null) {
83
84 byteBuffer.flip();
85
86 int readBytes = byteBuffer.remaining();
87 if (readBytes > 0) {
88
89 predictor.previousReceiveBufferSize(readBytes);
90
91 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
92 buffer.setBytes(0, byteBuffer);
93 buffer.writerIndex(readBytes);
94
95
96 predictor.previousReceiveBufferSize(readBytes);
97
98
99 fireMessageReceived(
100 channel, buffer, remoteAddress);
101 }
102 }
103
104 if (failure) {
105 key.cancel();
106 close(channel, succeededFuture(channel));
107 return false;
108 }
109
110 return true;
111 }
112
113 @Override
114 protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
115 final Thread workerThread = thread;
116 if (workerThread == null || Thread.currentThread() != workerThread) {
117 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
118
119 registerTask(channel.writeTask);
120 }
121 return true;
122 }
123
124 return false;
125 }
126
127 static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
128 boolean connected = channel.isConnected();
129 boolean iothread = isIoThread(channel);
130 try {
131 channel.getDatagramChannel().disconnect();
132 future.setSuccess();
133 if (connected) {
134 if (iothread) {
135 fireChannelDisconnected(channel);
136 } else {
137 fireChannelDisconnectedLater(channel);
138 }
139 }
140 } catch (Throwable t) {
141 future.setFailure(t);
142 if (iothread) {
143 fireExceptionCaught(channel, t);
144 } else {
145 fireExceptionCaughtLater(channel, t);
146 }
147 }
148 }
149
150 @Override
151 protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
152 return new ChannelRegistionTask((NioDatagramChannel) channel, future);
153 }
154
155
156
157
158
159 private final class ChannelRegistionTask implements Runnable {
160 private final NioDatagramChannel channel;
161
162 private final ChannelFuture future;
163
164 ChannelRegistionTask(final NioDatagramChannel channel,
165 final ChannelFuture future) {
166 this.channel = channel;
167 this.future = future;
168 }
169
170
171
172
173
174 public void run() {
175 final SocketAddress localAddress = channel.getLocalAddress();
176 if (localAddress == null) {
177 if (future != null) {
178 future.setFailure(new ClosedChannelException());
179 }
180 close(channel, succeededFuture(channel));
181 return;
182 }
183
184 try {
185 channel.getDatagramChannel().register(
186 selector, channel.getInternalInterestOps(), channel);
187
188 if (future != null) {
189 future.setSuccess();
190 }
191 } catch (final IOException e) {
192 if (future != null) {
193 future.setFailure(e);
194 }
195 close(channel, succeededFuture(channel));
196
197 if (!(e instanceof ClosedChannelException)) {
198 throw new ChannelException(
199 "Failed to register a socket to the selector.", e);
200 }
201 }
202 }
203 }
204
205 @Override
206 public void writeFromUserCode(final AbstractNioChannel<?> channel) {
207
208
209
210
211
212 if (!channel.isBound()) {
213 cleanUpWriteBuffer(channel);
214 return;
215 }
216
217 if (scheduleWriteIfNecessary(channel)) {
218 return;
219 }
220
221
222
223 if (channel.writeSuspended) {
224 return;
225 }
226
227 if (channel.inWriteNowLoop) {
228 return;
229 }
230
231 write0(channel);
232 }
233
234 @Override
235 protected void write0(final AbstractNioChannel<?> channel) {
236
237 boolean addOpWrite = false;
238 boolean removeOpWrite = false;
239
240 long writtenBytes = 0;
241
242 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
243 final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
244 final WriteRequestQueue writeBuffer = channel.writeBufferQueue;
245 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
246 synchronized (channel.writeLock) {
247
248 channel.inWriteNowLoop = true;
249
250
251 for (;;) {
252 MessageEvent evt = channel.currentWriteEvent;
253 SocketSendBufferPool.SendBuffer buf;
254 if (evt == null) {
255 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
256 removeOpWrite = true;
257 channel.writeSuspended = false;
258 break;
259 }
260
261 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
262 } else {
263 buf = channel.currentWriteBuffer;
264 }
265
266 try {
267 long localWrittenBytes = 0;
268 SocketAddress raddr = evt.getRemoteAddress();
269 if (raddr == null) {
270 for (int i = writeSpinCount; i > 0; i --) {
271 localWrittenBytes = buf.transferTo(ch);
272 if (localWrittenBytes != 0) {
273 writtenBytes += localWrittenBytes;
274 break;
275 }
276 if (buf.finished()) {
277 break;
278 }
279 }
280 } else {
281 for (int i = writeSpinCount; i > 0; i --) {
282 localWrittenBytes = buf.transferTo(ch, raddr);
283 if (localWrittenBytes != 0) {
284 writtenBytes += localWrittenBytes;
285 break;
286 }
287 if (buf.finished()) {
288 break;
289 }
290 }
291 }
292
293 if (localWrittenBytes > 0 || buf.finished()) {
294
295 buf.release();
296 ChannelFuture future = evt.getFuture();
297 channel.currentWriteEvent = null;
298 channel.currentWriteBuffer = null;
299 evt = null;
300 buf = null;
301 future.setSuccess();
302 } else {
303
304 addOpWrite = true;
305 channel.writeSuspended = true;
306 break;
307 }
308 } catch (final AsynchronousCloseException e) {
309
310 } catch (final Throwable t) {
311 buf.release();
312 ChannelFuture future = evt.getFuture();
313 channel.currentWriteEvent = null;
314 channel.currentWriteBuffer = null;
315
316
317 buf = null;
318
319 evt = null;
320 future.setFailure(t);
321 fireExceptionCaught(channel, t);
322 }
323 }
324 channel.inWriteNowLoop = false;
325
326
327
328
329
330
331
332 if (addOpWrite) {
333 setOpWrite(channel);
334 } else if (removeOpWrite) {
335 clearOpWrite(channel);
336 }
337 }
338
339 fireWriteComplete(channel, writtenBytes);
340 }
341
342 @Override
343 public void run() {
344 super.run();
345 bufferAllocator.releaseExternalResources();
346 }
347 }