1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import org.jboss.netty.buffer.ChannelBuffer;
19 import org.jboss.netty.channel.ChannelFuture;
20 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
21
22 import java.io.IOException;
23 import java.io.InterruptedIOException;
24 import java.net.DatagramPacket;
25 import java.net.SocketAddress;
26 import java.nio.ByteBuffer;
27
28 import static org.jboss.netty.channel.Channels.*;
29
30 class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
31
32 OioDatagramWorker(OioDatagramChannel channel) {
33 super(channel);
34 }
35
36 @Override
37 boolean process() throws IOException {
38
39 ReceiveBufferSizePredictor predictor =
40 channel.getConfig().getReceiveBufferSizePredictor();
41
42 byte[] buf = new byte[predictor.nextReceiveBufferSize()];
43 DatagramPacket packet = new DatagramPacket(buf, buf.length);
44 try {
45 channel.socket.receive(packet);
46 } catch (InterruptedIOException e) {
47
48
49 return true;
50 }
51
52 fireMessageReceived(
53 channel,
54 channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
55 packet.getSocketAddress());
56 return true;
57 }
58
59 static void write(
60 OioDatagramChannel channel, ChannelFuture future,
61 Object message, SocketAddress remoteAddress) {
62 boolean iothread = isIoThread(channel);
63
64 try {
65 ChannelBuffer buf = (ChannelBuffer) message;
66 int offset = buf.readerIndex();
67 int length = buf.readableBytes();
68 ByteBuffer nioBuf = buf.toByteBuffer();
69 DatagramPacket packet;
70 if (nioBuf.hasArray()) {
71
72 packet = new DatagramPacket(
73 nioBuf.array(), nioBuf.arrayOffset() + offset, length);
74 } else {
75
76 byte[] arrayBuf = new byte[length];
77 buf.getBytes(0, arrayBuf);
78 packet = new DatagramPacket(arrayBuf, length);
79 }
80
81 if (remoteAddress != null) {
82 packet.setSocketAddress(remoteAddress);
83 }
84 channel.socket.send(packet);
85 if (iothread) {
86 fireWriteComplete(channel, length);
87 } else {
88 fireWriteCompleteLater(channel, length);
89 }
90 future.setSuccess();
91 } catch (Throwable t) {
92 future.setFailure(t);
93 if (iothread) {
94 fireExceptionCaught(channel, t);
95 } else {
96 fireExceptionCaughtLater(channel, t);
97 }
98 }
99 }
100
101 static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
102 boolean connected = channel.isConnected();
103 boolean iothread = isIoThread(channel);
104
105 try {
106 channel.socket.disconnect();
107 future.setSuccess();
108 if (connected) {
109
110 if (iothread) {
111 fireChannelDisconnected(channel);
112 } else {
113 fireChannelDisconnectedLater(channel);
114 }
115 }
116 } catch (Throwable t) {
117 future.setFailure(t);
118 if (iothread) {
119 fireExceptionCaught(channel, t);
120 } else {
121 fireExceptionCaughtLater(channel, t);
122 }
123 }
124 }
125 }