View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.channel.ChannelFuture;
20  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
21  
22  import java.io.IOException;
23  import java.io.InterruptedIOException;
24  import java.net.DatagramPacket;
25  import java.net.SocketAddress;
26  import java.nio.ByteBuffer;
27  
28  import static org.jboss.netty.channel.Channels.*;
29  
30  class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
31  
32      OioDatagramWorker(OioDatagramChannel channel) {
33          super(channel);
34      }
35  
36      @Override
37      boolean process() throws IOException {
38  
39          ReceiveBufferSizePredictor predictor =
40              channel.getConfig().getReceiveBufferSizePredictor();
41  
42          byte[] buf = new byte[predictor.nextReceiveBufferSize()];
43          DatagramPacket packet = new DatagramPacket(buf, buf.length);
44          try {
45              channel.socket.receive(packet);
46          } catch (InterruptedIOException e) {
47              // Can happen on interruption.
48              // Keep receiving unless the channel is closed.
49              return true;
50          }
51  
52          fireMessageReceived(
53                  channel,
54                  channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
55                  packet.getSocketAddress());
56          return true;
57      }
58  
59      static void write(
60              OioDatagramChannel channel, ChannelFuture future,
61              Object message, SocketAddress remoteAddress) {
62          boolean iothread = isIoThread(channel);
63  
64          try {
65              ChannelBuffer buf = (ChannelBuffer) message;
66              int offset = buf.readerIndex();
67              int length = buf.readableBytes();
68              ByteBuffer nioBuf = buf.toByteBuffer();
69              DatagramPacket packet;
70              if (nioBuf.hasArray()) {
71                  // Avoid copy if the buffer is backed by an array.
72                  packet = new DatagramPacket(
73                          nioBuf.array(), nioBuf.arrayOffset() + offset, length);
74              } else {
75                  // Otherwise it will be expensive.
76                  byte[] arrayBuf = new byte[length];
77                  buf.getBytes(0, arrayBuf);
78                  packet = new DatagramPacket(arrayBuf, length);
79              }
80  
81              if (remoteAddress != null) {
82                  packet.setSocketAddress(remoteAddress);
83              }
84              channel.socket.send(packet);
85              if (iothread) {
86                  fireWriteComplete(channel, length);
87              } else {
88                  fireWriteCompleteLater(channel, length);
89              }
90              future.setSuccess();
91          } catch (Throwable t) {
92              future.setFailure(t);
93              if (iothread) {
94                  fireExceptionCaught(channel, t);
95              } else {
96                  fireExceptionCaughtLater(channel, t);
97              }
98          }
99      }
100 
101     static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
102         boolean connected = channel.isConnected();
103         boolean iothread = isIoThread(channel);
104 
105         try {
106             channel.socket.disconnect();
107             future.setSuccess();
108             if (connected) {
109                 // Notify.
110                 if (iothread) {
111                     fireChannelDisconnected(channel);
112                 } else {
113                     fireChannelDisconnectedLater(channel);
114                 }
115             }
116         } catch (Throwable t) {
117             future.setFailure(t);
118             if (iothread) {
119                 fireExceptionCaught(channel, t);
120             } else {
121                 fireExceptionCaughtLater(channel, t);
122             }
123         }
124     }
125 }