1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.AsynchronousCloseException;
24 import java.nio.channels.ClosedChannelException;
25 import java.nio.channels.DatagramChannel;
26 import java.nio.channels.SelectionKey;
27 import java.nio.channels.Selector;
28 import java.util.Queue;
29 import java.util.concurrent.Executor;
30
31 import org.jboss.netty.buffer.ChannelBuffer;
32 import org.jboss.netty.buffer.ChannelBufferFactory;
33 import org.jboss.netty.channel.ChannelException;
34 import org.jboss.netty.channel.ChannelFuture;
35 import org.jboss.netty.channel.MessageEvent;
36 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
37
38
39
40
41
42 public class NioDatagramWorker extends AbstractNioWorker {
43
44 private final SocketReceiveBufferAllocator bufferAllocator = new SocketReceiveBufferAllocator();
45
46
47
48
49
50
51
52 NioDatagramWorker(final Executor executor) {
53 super(executor);
54 }
55
56 @Override
57 protected boolean read(final SelectionKey key) {
58 final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
59 ReceiveBufferSizePredictor predictor =
60 channel.getConfig().getReceiveBufferSizePredictor();
61 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
62 final DatagramChannel nioChannel = (DatagramChannel) key.channel();
63 final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
64
65 final ByteBuffer byteBuffer = bufferAllocator.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
66
67 boolean failure = true;
68 SocketAddress remoteAddress = null;
69 try {
70
71
72 remoteAddress = nioChannel.receive(byteBuffer);
73 failure = false;
74 } catch (ClosedChannelException e) {
75
76 } catch (Throwable t) {
77 fireExceptionCaught(channel, t);
78 }
79
80 if (remoteAddress != null) {
81
82 byteBuffer.flip();
83
84 int readBytes = byteBuffer.remaining();
85 if (readBytes > 0) {
86
87 predictor.previousReceiveBufferSize(readBytes);
88
89 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
90 buffer.setBytes(0, byteBuffer);
91 buffer.writerIndex(readBytes);
92
93
94 predictor.previousReceiveBufferSize(readBytes);
95
96
97 fireMessageReceived(
98 channel, buffer, remoteAddress);
99 }
100 }
101
102 if (failure) {
103 key.cancel();
104 close(channel, succeededFuture(channel));
105 return false;
106 }
107
108 return true;
109 }
110
111
112 @Override
113 public void releaseExternalResources() {
114 super.releaseExternalResources();
115 bufferAllocator.releaseExternalResources();
116 }
117
118 @Override
119 protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
120 final Thread workerThread = thread;
121 if (workerThread == null || Thread.currentThread() != workerThread) {
122 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
123
124 boolean offered = writeTaskQueue.offer(channel.writeTask);
125 assert offered;
126 }
127
128 final Selector selector = this.selector;
129 if (selector != null) {
130 if (wakenUp.compareAndSet(false, true)) {
131 selector.wakeup();
132 }
133 }
134 return true;
135 }
136
137 return false;
138 }
139
140
141 static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
142 boolean connected = channel.isConnected();
143 boolean iothread = isIoThread(channel);
144 try {
145 channel.getDatagramChannel().disconnect();
146 future.setSuccess();
147 if (connected) {
148 if (iothread) {
149 fireChannelDisconnected(channel);
150 } else {
151 fireChannelDisconnectedLater(channel);
152 }
153 }
154 } catch (Throwable t) {
155 future.setFailure(t);
156 if (iothread) {
157 fireExceptionCaught(channel, t);
158 } else {
159 fireExceptionCaughtLater(channel, t);
160 }
161 }
162 }
163
164
165 @Override
166 protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
167 return new ChannelRegistionTask((NioDatagramChannel) channel, future);
168 }
169
170
171
172
173
174 private final class ChannelRegistionTask implements Runnable {
175 private final NioDatagramChannel channel;
176
177 private final ChannelFuture future;
178
179 ChannelRegistionTask(final NioDatagramChannel channel,
180 final ChannelFuture future) {
181 this.channel = channel;
182 this.future = future;
183 }
184
185
186
187
188
189 public void run() {
190 final SocketAddress localAddress = channel.getLocalAddress();
191 if (localAddress == null) {
192 if (future != null) {
193 future.setFailure(new ClosedChannelException());
194 }
195 close(channel, succeededFuture(channel));
196 return;
197 }
198
199 try {
200 synchronized (channel.interestOpsLock) {
201 channel.getDatagramChannel().register(
202 selector, channel.getRawInterestOps(), channel);
203 }
204 if (future != null) {
205 future.setSuccess();
206 }
207 } catch (final IOException e) {
208 if (future != null) {
209 future.setFailure(e);
210 }
211 close(channel, succeededFuture(channel));
212
213 if (!(e instanceof ClosedChannelException)) {
214 throw new ChannelException(
215 "Failed to register a socket to the selector.", e);
216 }
217 }
218 }
219 }
220
221 @Override
222 public void writeFromUserCode(final AbstractNioChannel<?> channel) {
223
224
225
226
227
228 if (!channel.isBound()) {
229 cleanUpWriteBuffer(channel);
230 return;
231 }
232
233 if (scheduleWriteIfNecessary(channel)) {
234 return;
235 }
236
237
238
239 if (channel.writeSuspended) {
240 return;
241 }
242
243 if (channel.inWriteNowLoop) {
244 return;
245 }
246
247 write0(channel);
248 }
249
250 @Override
251 protected void write0(final AbstractNioChannel<?> channel) {
252
253 boolean addOpWrite = false;
254 boolean removeOpWrite = false;
255
256 long writtenBytes = 0;
257
258 final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
259 final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
260 final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
261 final int writeSpinCount = channel.getConfig().getWriteSpinCount();
262 synchronized (channel.writeLock) {
263
264 channel.inWriteNowLoop = true;
265
266
267 for (;;) {
268 MessageEvent evt = channel.currentWriteEvent;
269 SocketSendBufferPool.SendBuffer buf;
270 if (evt == null) {
271 if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
272 removeOpWrite = true;
273 channel.writeSuspended = false;
274 break;
275 }
276
277 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
278 } else {
279 buf = channel.currentWriteBuffer;
280 }
281
282 try {
283 long localWrittenBytes = 0;
284 SocketAddress raddr = evt.getRemoteAddress();
285 if (raddr == null) {
286 for (int i = writeSpinCount; i > 0; i --) {
287 localWrittenBytes = buf.transferTo(ch);
288 if (localWrittenBytes != 0) {
289 writtenBytes += localWrittenBytes;
290 break;
291 }
292 if (buf.finished()) {
293 break;
294 }
295 }
296 } else {
297 for (int i = writeSpinCount; i > 0; i --) {
298 localWrittenBytes = buf.transferTo(ch, raddr);
299 if (localWrittenBytes != 0) {
300 writtenBytes += localWrittenBytes;
301 break;
302 }
303 if (buf.finished()) {
304 break;
305 }
306 }
307 }
308
309 if (localWrittenBytes > 0 || buf.finished()) {
310
311 buf.release();
312 ChannelFuture future = evt.getFuture();
313 channel.currentWriteEvent = null;
314 channel.currentWriteBuffer = null;
315 evt = null;
316 buf = null;
317 future.setSuccess();
318 } else {
319
320 addOpWrite = true;
321 channel.writeSuspended = true;
322 break;
323 }
324 } catch (final AsynchronousCloseException e) {
325
326 } catch (final Throwable t) {
327 buf.release();
328 ChannelFuture future = evt.getFuture();
329 channel.currentWriteEvent = null;
330 channel.currentWriteBuffer = null;
331 buf = null;
332 evt = null;
333 future.setFailure(t);
334 fireExceptionCaught(channel, t);
335 }
336 }
337 channel.inWriteNowLoop = false;
338
339
340
341
342
343
344
345 if (addOpWrite) {
346 setOpWrite(channel);
347 } else if (removeOpWrite) {
348 clearOpWrite(channel);
349 }
350 }
351
352 fireWriteComplete(channel, writtenBytes);
353 }
354
355 }