View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.SocketAddress;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.AsynchronousCloseException;
24  import java.nio.channels.ClosedChannelException;
25  import java.nio.channels.DatagramChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.util.Queue;
29  import java.util.concurrent.Executor;
30  
31  import org.jboss.netty.buffer.ChannelBuffer;
32  import org.jboss.netty.buffer.ChannelBufferFactory;
33  import org.jboss.netty.channel.ChannelException;
34  import org.jboss.netty.channel.ChannelFuture;
35  import org.jboss.netty.channel.MessageEvent;
36  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
37  
38  /**
39   * A class responsible for registering channels with {@link Selector}.
40   * It also implements the {@link Selector} loop.
41   */
42  public class NioDatagramWorker extends AbstractNioWorker {
43  
44      private final SocketReceiveBufferAllocator bufferAllocator = new SocketReceiveBufferAllocator();
45  
46      /**
47       * Sole constructor.
48       *
49       * @param executor the {@link Executor} used to execute {@link Runnable}s
50       *                 such as {@link ChannelRegistionTask}
51       */
52      NioDatagramWorker(final Executor executor) {
53          super(executor);
54      }
55  
56      @Override
57      protected boolean read(final SelectionKey key) {
58          final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
59          ReceiveBufferSizePredictor predictor =
60              channel.getConfig().getReceiveBufferSizePredictor();
61          final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
62          final DatagramChannel nioChannel = (DatagramChannel) key.channel();
63          final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
64  
65          final ByteBuffer byteBuffer = bufferAllocator.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
66  
67          boolean failure = true;
68          SocketAddress remoteAddress = null;
69          try {
70              // Receive from the channel in a non blocking mode. We have already been notified that
71              // the channel is ready to receive.
72              remoteAddress = nioChannel.receive(byteBuffer);
73              failure = false;
74          } catch (ClosedChannelException e) {
75              // Can happen, and does not need a user attention.
76          } catch (Throwable t) {
77              fireExceptionCaught(channel, t);
78          }
79  
80          if (remoteAddress != null) {
81              // Flip the buffer so that we can wrap it.
82              byteBuffer.flip();
83  
84              int readBytes = byteBuffer.remaining();
85              if (readBytes > 0) {
86                  // Update the predictor.
87                  predictor.previousReceiveBufferSize(readBytes);
88  
89                  final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
90                  buffer.setBytes(0, byteBuffer);
91                  buffer.writerIndex(readBytes);
92  
93                  // Update the predictor.
94                  predictor.previousReceiveBufferSize(readBytes);
95  
96                  // Notify the interested parties about the newly arrived message.
97                  fireMessageReceived(
98                          channel, buffer, remoteAddress);
99              }
100         }
101 
102         if (failure) {
103             key.cancel(); // Some JDK implementations run into an infinite loop without this.
104             close(channel, succeededFuture(channel));
105             return false;
106         }
107 
108         return true;
109     }
110 
111 
112     @Override
113     public void releaseExternalResources() {
114         super.releaseExternalResources();
115         bufferAllocator.releaseExternalResources();
116     }
117 
118     @Override
119     protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
120         final Thread workerThread = thread;
121         if (workerThread == null || Thread.currentThread() != workerThread) {
122             if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
123                 // "add" the channels writeTask to the writeTaskQueue.
124                 boolean offered = writeTaskQueue.offer(channel.writeTask);
125                 assert offered;
126             }
127 
128             final Selector selector = this.selector;
129             if (selector != null) {
130                 if (wakenUp.compareAndSet(false, true)) {
131                     selector.wakeup();
132                 }
133             }
134             return true;
135         }
136 
137         return false;
138     }
139 
140 
141     static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
142         boolean connected = channel.isConnected();
143         boolean iothread = isIoThread(channel);
144         try {
145             channel.getDatagramChannel().disconnect();
146             future.setSuccess();
147             if (connected) {
148                 if (iothread) {
149                     fireChannelDisconnected(channel);
150                 } else {
151                     fireChannelDisconnectedLater(channel);
152                 }
153             }
154         } catch (Throwable t) {
155             future.setFailure(t);
156             if (iothread) {
157                 fireExceptionCaught(channel, t);
158             } else {
159                 fireExceptionCaughtLater(channel, t);
160             }
161         }
162     }
163 
164 
165     @Override
166     protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
167         return new ChannelRegistionTask((NioDatagramChannel) channel, future);
168     }
169 
170     /**
171      * RegisterTask is a task responsible for registering a channel with a
172      * selector.
173      */
174     private final class ChannelRegistionTask implements Runnable {
175         private final NioDatagramChannel channel;
176 
177         private final ChannelFuture future;
178 
179         ChannelRegistionTask(final NioDatagramChannel channel,
180                 final ChannelFuture future) {
181             this.channel = channel;
182             this.future = future;
183         }
184 
185         /**
186          * This runnable's task. Does the actual registering by calling the
187          * underlying DatagramChannels peer DatagramSocket register method.
188          */
189         public void run() {
190             final SocketAddress localAddress = channel.getLocalAddress();
191             if (localAddress == null) {
192                 if (future != null) {
193                     future.setFailure(new ClosedChannelException());
194                 }
195                 close(channel, succeededFuture(channel));
196                 return;
197             }
198 
199             try {
200                 synchronized (channel.interestOpsLock) {
201                     channel.getDatagramChannel().register(
202                             selector, channel.getRawInterestOps(), channel);
203                 }
204                 if (future != null) {
205                     future.setSuccess();
206                 }
207             } catch (final IOException e) {
208                 if (future != null) {
209                     future.setFailure(e);
210                 }
211                 close(channel, succeededFuture(channel));
212 
213                 if (!(e instanceof ClosedChannelException)) {
214                     throw new ChannelException(
215                             "Failed to register a socket to the selector.", e);
216                 }
217             }
218         }
219     }
220 
221     @Override
222     public void writeFromUserCode(final AbstractNioChannel<?> channel) {
223         /*
224          * Note that we are not checking if the channel is connected. Connected
225          * has a different meaning in UDP and means that the channels socket is
226          * configured to only send and receive from a given remote peer.
227          */
228         if (!channel.isBound()) {
229             cleanUpWriteBuffer(channel);
230             return;
231         }
232 
233         if (scheduleWriteIfNecessary(channel)) {
234             return;
235         }
236 
237         // From here, we are sure Thread.currentThread() == workerThread.
238 
239         if (channel.writeSuspended) {
240             return;
241         }
242 
243         if (channel.inWriteNowLoop) {
244             return;
245         }
246 
247         write0(channel);
248     }
249 
250     @Override
251     protected void write0(final AbstractNioChannel<?> channel) {
252 
253         boolean addOpWrite = false;
254         boolean removeOpWrite = false;
255 
256         long writtenBytes = 0;
257 
258         final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
259         final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
260         final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
261         final int writeSpinCount = channel.getConfig().getWriteSpinCount();
262         synchronized (channel.writeLock) {
263             // inform the channel that write is in-progress
264             channel.inWriteNowLoop = true;
265 
266             // loop forever...
267             for (;;) {
268                 MessageEvent evt = channel.currentWriteEvent;
269                 SocketSendBufferPool.SendBuffer buf;
270                 if (evt == null) {
271                     if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
272                         removeOpWrite = true;
273                         channel.writeSuspended = false;
274                         break;
275                     }
276 
277                     channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
278                 } else {
279                     buf = channel.currentWriteBuffer;
280                 }
281 
282                 try {
283                     long localWrittenBytes = 0;
284                     SocketAddress raddr = evt.getRemoteAddress();
285                     if (raddr == null) {
286                         for (int i = writeSpinCount; i > 0; i --) {
287                             localWrittenBytes = buf.transferTo(ch);
288                             if (localWrittenBytes != 0) {
289                                 writtenBytes += localWrittenBytes;
290                                 break;
291                             }
292                             if (buf.finished()) {
293                                 break;
294                             }
295                         }
296                     } else {
297                         for (int i = writeSpinCount; i > 0; i --) {
298                             localWrittenBytes = buf.transferTo(ch, raddr);
299                             if (localWrittenBytes != 0) {
300                                 writtenBytes += localWrittenBytes;
301                                 break;
302                             }
303                             if (buf.finished()) {
304                                 break;
305                             }
306                         }
307                     }
308 
309                     if (localWrittenBytes > 0 || buf.finished()) {
310                         // Successful write - proceed to the next message.
311                         buf.release();
312                         ChannelFuture future = evt.getFuture();
313                         channel.currentWriteEvent = null;
314                         channel.currentWriteBuffer = null;
315                         evt = null;
316                         buf = null;
317                         future.setSuccess();
318                     } else {
319                         // Not written at all - perhaps the kernel buffer is full.
320                         addOpWrite = true;
321                         channel.writeSuspended = true;
322                         break;
323                     }
324                 } catch (final AsynchronousCloseException e) {
325                     // Doesn't need a user attention - ignore.
326                 } catch (final Throwable t) {
327                     buf.release();
328                     ChannelFuture future = evt.getFuture();
329                     channel.currentWriteEvent = null;
330                     channel.currentWriteBuffer = null;
331                     buf = null;
332                     evt = null;
333                     future.setFailure(t);
334                     fireExceptionCaught(channel, t);
335                 }
336             }
337             channel.inWriteNowLoop = false;
338 
339             // Initially, the following block was executed after releasing
340             // the writeLock, but there was a race condition, and it has to be
341             // executed before releasing the writeLock:
342             //
343             // https://issues.jboss.org/browse/NETTY-410
344             //
345             if (addOpWrite) {
346                 setOpWrite(channel);
347             } else if (removeOpWrite) {
348                 clearOpWrite(channel);
349             }
350         }
351 
352         fireWriteComplete(channel, writtenBytes);
353     }
354 
355 }