View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.oio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.io.InterruptedIOException;
22  import java.net.DatagramPacket;
23  import java.net.SocketAddress;
24  import java.nio.ByteBuffer;
25  
26  import org.jboss.netty.buffer.ChannelBuffer;
27  import org.jboss.netty.channel.ChannelFuture;
28  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
29  
30  class OioDatagramWorker extends AbstractOioWorker<OioDatagramChannel> {
31  
32      OioDatagramWorker(OioDatagramChannel channel) {
33          super(channel);
34      }
35  
36  
37  
38      @Override
39      boolean process() throws IOException {
40  
41          ReceiveBufferSizePredictor predictor =
42              channel.getConfig().getReceiveBufferSizePredictor();
43  
44          byte[] buf = new byte[predictor.nextReceiveBufferSize()];
45          DatagramPacket packet = new DatagramPacket(buf, buf.length);
46          try {
47              channel.socket.receive(packet);
48          } catch (InterruptedIOException e) {
49              // Can happen on interruption.
50              // Keep receiving unless the channel is closed.
51              return true;
52          }
53  
54          fireMessageReceived(
55                  channel,
56                  channel.getConfig().getBufferFactory().getBuffer(buf, 0, packet.getLength()),
57                  packet.getSocketAddress());
58          return true;
59      }
60  
61  
62  
63      static void write(
64              OioDatagramChannel channel, ChannelFuture future,
65              Object message, SocketAddress remoteAddress) {
66          boolean iothread = isIoThread(channel);
67  
68          try {
69              ChannelBuffer buf = (ChannelBuffer) message;
70              int offset = buf.readerIndex();
71              int length = buf.readableBytes();
72              ByteBuffer nioBuf = buf.toByteBuffer();
73              DatagramPacket packet;
74              if (nioBuf.hasArray()) {
75                  // Avoid copy if the buffer is backed by an array.
76                  packet = new DatagramPacket(
77                          nioBuf.array(), nioBuf.arrayOffset() + offset, length);
78              } else {
79                  // Otherwise it will be expensive.
80                  byte[] arrayBuf = new byte[length];
81                  buf.getBytes(0, arrayBuf);
82                  packet = new DatagramPacket(arrayBuf, length);
83              }
84  
85              if (remoteAddress != null) {
86                  packet.setSocketAddress(remoteAddress);
87              }
88              channel.socket.send(packet);
89              if (iothread) {
90                  fireWriteComplete(channel, length);
91              } else {
92                  fireWriteCompleteLater(channel, length);
93              }
94              future.setSuccess();
95          } catch (Throwable t) {
96              future.setFailure(t);
97              if (iothread) {
98                  fireExceptionCaught(channel, t);
99              } else {
100                 fireExceptionCaughtLater(channel, t);
101             }
102         }
103     }
104 
105 
106     static void disconnect(OioDatagramChannel channel, ChannelFuture future) {
107         boolean connected = channel.isConnected();
108         boolean iothread = isIoThread(channel);
109 
110         try {
111             channel.socket.disconnect();
112             future.setSuccess();
113             if (connected) {
114                 // Notify.
115                 if (iothread) {
116                     fireChannelDisconnected(channel);
117                 } else {
118                     fireChannelDisconnectedLater(channel);
119                 }
120             }
121         } catch (Throwable t) {
122             future.setFailure(t);
123             if (iothread) {
124                 fireExceptionCaught(channel, t);
125             } else {
126                 fireExceptionCaughtLater(channel, t);
127             }
128         }
129     }
130 
131 }