View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.ChannelBufferFactory;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelException;
22  import org.jboss.netty.channel.ChannelFuture;
23  import org.jboss.netty.channel.MessageEvent;
24  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
25  import org.jboss.netty.channel.socket.nio.AbstractNioChannel.WriteRequestQueue;
26  
27  import java.io.IOException;
28  import java.net.SocketAddress;
29  import java.nio.ByteBuffer;
30  import java.nio.channels.AsynchronousCloseException;
31  import java.nio.channels.ClosedChannelException;
32  import java.nio.channels.DatagramChannel;
33  import java.nio.channels.SelectionKey;
34  import java.nio.channels.Selector;
35  import java.util.Queue;
36  import java.util.concurrent.Executor;
37  
38  import static org.jboss.netty.channel.Channels.*;
39  
40  /**
41   * A class responsible for registering channels with {@link Selector}.
42   * It also implements the {@link Selector} loop.
43   */
44  public class NioDatagramWorker extends AbstractNioWorker {
45  
46      private final SocketReceiveBufferAllocator bufferAllocator = new SocketReceiveBufferAllocator();
47  
48      /**
49       * Sole constructor.
50       *
51       * @param executor the {@link Executor} used to execute {@link Runnable}s
52       *                 such as {@link ChannelRegistionTask}
53       */
54      NioDatagramWorker(final Executor executor) {
55          super(executor);
56      }
57  
58      @Override
59      protected boolean read(final SelectionKey key) {
60          final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
61          ReceiveBufferSizePredictor predictor =
62              channel.getConfig().getReceiveBufferSizePredictor();
63          final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
64          final DatagramChannel nioChannel = (DatagramChannel) key.channel();
65          final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
66  
67          final ByteBuffer byteBuffer = bufferAllocator.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
68  
69          boolean failure = true;
70          SocketAddress remoteAddress = null;
71          try {
72              // Receive from the channel in a non blocking mode. We have already been notified that
73              // the channel is ready to receive.
74              remoteAddress = nioChannel.receive(byteBuffer);
75              failure = false;
76          } catch (ClosedChannelException e) {
77              // Can happen, and does not need a user attention.
78          } catch (Throwable t) {
79              fireExceptionCaught(channel, t);
80          }
81  
82          if (remoteAddress != null) {
83              // Flip the buffer so that we can wrap it.
84              byteBuffer.flip();
85  
86              int readBytes = byteBuffer.remaining();
87              if (readBytes > 0) {
88                  // Update the predictor.
89                  predictor.previousReceiveBufferSize(readBytes);
90  
91                  final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
92                  buffer.setBytes(0, byteBuffer);
93                  buffer.writerIndex(readBytes);
94  
95                  // Update the predictor.
96                  predictor.previousReceiveBufferSize(readBytes);
97  
98                  // Notify the interested parties about the newly arrived message.
99                  fireMessageReceived(
100                         channel, buffer, remoteAddress);
101             }
102         }
103 
104         if (failure) {
105             key.cancel(); // Some JDK implementations run into an infinite loop without this.
106             close(channel, succeededFuture(channel));
107             return false;
108         }
109 
110         return true;
111     }
112 
113     @Override
114     protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
115         final Thread workerThread = thread;
116         if (workerThread == null || Thread.currentThread() != workerThread) {
117             if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
118                 // "add" the channels writeTask to the writeTaskQueue.
119                 registerTask(channel.writeTask);
120             }
121             return true;
122         }
123 
124         return false;
125     }
126 
127     static void disconnect(NioDatagramChannel channel, ChannelFuture future) {
128         boolean connected = channel.isConnected();
129         boolean iothread = isIoThread(channel);
130         try {
131             channel.getDatagramChannel().disconnect();
132             future.setSuccess();
133             if (connected) {
134                 if (iothread) {
135                     fireChannelDisconnected(channel);
136                 } else {
137                     fireChannelDisconnectedLater(channel);
138                 }
139             }
140         } catch (Throwable t) {
141             future.setFailure(t);
142             if (iothread) {
143                 fireExceptionCaught(channel, t);
144             } else {
145                 fireExceptionCaughtLater(channel, t);
146             }
147         }
148     }
149 
150     @Override
151     protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
152         return new ChannelRegistionTask((NioDatagramChannel) channel, future);
153     }
154 
155     /**
156      * RegisterTask is a task responsible for registering a channel with a
157      * selector.
158      */
159     private final class ChannelRegistionTask implements Runnable {
160         private final NioDatagramChannel channel;
161 
162         private final ChannelFuture future;
163 
164         ChannelRegistionTask(final NioDatagramChannel channel,
165                 final ChannelFuture future) {
166             this.channel = channel;
167             this.future = future;
168         }
169 
170         /**
171          * This runnable's task. Does the actual registering by calling the
172          * underlying DatagramChannels peer DatagramSocket register method.
173          */
174         public void run() {
175             final SocketAddress localAddress = channel.getLocalAddress();
176             if (localAddress == null) {
177                 if (future != null) {
178                     future.setFailure(new ClosedChannelException());
179                 }
180                 close(channel, succeededFuture(channel));
181                 return;
182             }
183 
184             try {
185                 channel.getDatagramChannel().register(
186                         selector, channel.getInternalInterestOps(), channel);
187 
188                 if (future != null) {
189                     future.setSuccess();
190                 }
191             } catch (final IOException e) {
192                 if (future != null) {
193                     future.setFailure(e);
194                 }
195                 close(channel, succeededFuture(channel));
196 
197                 if (!(e instanceof ClosedChannelException)) {
198                     throw new ChannelException(
199                             "Failed to register a socket to the selector.", e);
200                 }
201             }
202         }
203     }
204 
205     @Override
206     public void writeFromUserCode(final AbstractNioChannel<?> channel) {
207         /*
208          * Note that we are not checking if the channel is connected. Connected
209          * has a different meaning in UDP and means that the channels socket is
210          * configured to only send and receive from a given remote peer.
211          */
212         if (!channel.isBound()) {
213             cleanUpWriteBuffer(channel);
214             return;
215         }
216 
217         if (scheduleWriteIfNecessary(channel)) {
218             return;
219         }
220 
221         // From here, we are sure Thread.currentThread() == workerThread.
222 
223         if (channel.writeSuspended) {
224             return;
225         }
226 
227         if (channel.inWriteNowLoop) {
228             return;
229         }
230 
231         write0(channel);
232     }
233 
234     @Override
235     protected void write0(final AbstractNioChannel<?> channel) {
236 
237         boolean addOpWrite = false;
238         boolean removeOpWrite = false;
239 
240         long writtenBytes = 0;
241 
242         final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
243         final DatagramChannel ch = ((NioDatagramChannel) channel).getDatagramChannel();
244         final WriteRequestQueue writeBuffer = channel.writeBufferQueue;
245         final int writeSpinCount = channel.getConfig().getWriteSpinCount();
246         synchronized (channel.writeLock) {
247             // inform the channel that write is in-progress
248             channel.inWriteNowLoop = true;
249 
250             // loop forever...
251             for (;;) {
252                 MessageEvent evt = channel.currentWriteEvent;
253                 SocketSendBufferPool.SendBuffer buf;
254                 if (evt == null) {
255                     if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
256                         removeOpWrite = true;
257                         channel.writeSuspended = false;
258                         break;
259                     }
260 
261                     channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
262                 } else {
263                     buf = channel.currentWriteBuffer;
264                 }
265 
266                 try {
267                     long localWrittenBytes = 0;
268                     SocketAddress raddr = evt.getRemoteAddress();
269                     if (raddr == null) {
270                         for (int i = writeSpinCount; i > 0; i --) {
271                             localWrittenBytes = buf.transferTo(ch);
272                             if (localWrittenBytes != 0) {
273                                 writtenBytes += localWrittenBytes;
274                                 break;
275                             }
276                             if (buf.finished()) {
277                                 break;
278                             }
279                         }
280                     } else {
281                         for (int i = writeSpinCount; i > 0; i --) {
282                             localWrittenBytes = buf.transferTo(ch, raddr);
283                             if (localWrittenBytes != 0) {
284                                 writtenBytes += localWrittenBytes;
285                                 break;
286                             }
287                             if (buf.finished()) {
288                                 break;
289                             }
290                         }
291                     }
292 
293                     if (localWrittenBytes > 0 || buf.finished()) {
294                         // Successful write - proceed to the next message.
295                         buf.release();
296                         ChannelFuture future = evt.getFuture();
297                         channel.currentWriteEvent = null;
298                         channel.currentWriteBuffer = null;
299                         evt = null;
300                         buf = null;
301                         future.setSuccess();
302                     } else {
303                         // Not written at all - perhaps the kernel buffer is full.
304                         addOpWrite = true;
305                         channel.writeSuspended = true;
306                         break;
307                     }
308                 } catch (final AsynchronousCloseException e) {
309                     // Doesn't need a user attention - ignore.
310                 } catch (final Throwable t) {
311                     buf.release();
312                     ChannelFuture future = evt.getFuture();
313                     channel.currentWriteEvent = null;
314                     channel.currentWriteBuffer = null;
315                     // Mark the event object for garbage collection.
316                     //noinspection UnusedAssignment
317                     buf = null;
318                     //noinspection UnusedAssignment
319                     evt = null;
320                     future.setFailure(t);
321                     fireExceptionCaught(channel, t);
322                 }
323             }
324             channel.inWriteNowLoop = false;
325 
326             // Initially, the following block was executed after releasing
327             // the writeLock, but there was a race condition, and it has to be
328             // executed before releasing the writeLock:
329             //
330             // https://issues.jboss.org/browse/NETTY-410
331             //
332             if (addOpWrite) {
333                 setOpWrite(channel);
334             } else if (removeOpWrite) {
335                 clearOpWrite(channel);
336             }
337         }
338 
339         fireWriteComplete(channel, writtenBytes);
340     }
341 
342     @Override
343     public void run() {
344         super.run();
345         bufferAllocator.releaseExternalResources();
346     }
347 }