1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.oio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.io.InterruptedIOException;
22 import java.net.DatagramPacket;
23 import java.net.SocketAddress;
24 import java.nio.ByteBuffer;
25
26 import org.jboss.netty.buffer.ChannelBuffer;
27 import org.jboss.netty.channel.ChannelFuture;
28 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
29
30 class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
31
32 OioDatagramWorker(OioDatagramChannel channel) {
33 super(channel);
34 }
35
36
37
38 @Override
39 boolean process() throws IOException {
40
41 ReceiveBufferSizePredictor predictor =
42 channel.getConfig().getReceiveBufferSizePredictor();
43
44 byte[] buf = new byte[predictor.nextReceiveBufferSize()];
45 DatagramPacket packet = new DatagramPacket(buf, buf.length);
46 try {
47 channel.socket.receive(packet);
48 } catch (InterruptedIOException e) {
49
50
51 return true;
52 }
53
54 fireMessageReceived(
55 channel,
56 channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
57 packet.getSocketAddress());
58 return true;
59 }
60
61
62
63 static void write(
64 OioDatagramChannel channel, ChannelFuture future,
65 Object message, SocketAddress remoteAddress) {
66 boolean iothread = isIoThread(channel);
67
68 try {
69 ChannelBuffer buf = (ChannelBuffer) message;
70 int offset = buf.readerIndex();
71 int length = buf.readableBytes();
72 ByteBuffer nioBuf = buf.toByteBuffer();
73 DatagramPacket packet;
74 if (nioBuf.hasArray()) {
75
76 packet = new DatagramPacket(
77 nioBuf.array(), nioBuf.arrayOffset() + offset, length);
78 } else {
79
80 byte[] arrayBuf = new byte[length];
81 buf.getBytes(0, arrayBuf);
82 packet = new DatagramPacket(arrayBuf, length);
83 }
84
85 if (remoteAddress != null) {
86 packet.setSocketAddress(remoteAddress);
87 }
88 channel.socket.send(packet);
89 if (iothread) {
90 fireWriteComplete(channel, length);
91 } else {
92 fireWriteCompleteLater(channel, length);
93 }
94 future.setSuccess();
95 } catch (Throwable t) {
96 future.setFailure(t);
97 if (iothread) {
98 fireExceptionCaught(channel, t);
99 } else {
100 fireExceptionCaughtLater(channel, t);
101 }
102 }
103 }
104
105
106 static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
107 boolean connected = channel.isConnected();
108 boolean iothread = isIoThread(channel);
109
110 try {
111 channel.socket.disconnect();
112 future.setSuccess();
113 if (connected) {
114
115 if (iothread) {
116 fireChannelDisconnected(channel);
117 } else {
118 fireChannelDisconnectedLater(channel);
119 }
120 }
121 } catch (Throwable t) {
122 future.setFailure(t);
123 if (iothread) {
124 fireExceptionCaught(channel, t);
125 } else {
126 fireExceptionCaughtLater(channel, t);
127 }
128 }
129 }
130
131 }