View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import static org.jboss.netty.channel.Channels.*;
19  
20  import java.io.IOException;
21  import java.net.SocketAddress;
22  import java.nio.ByteBuffer;
23  import java.nio.channels.ClosedChannelException;
24  import java.nio.channels.SelectionKey;
25  import java.nio.channels.Selector;
26  import java.nio.channels.SocketChannel;
27  import java.util.concurrent.Executor;
28  
29  import org.jboss.netty.buffer.ChannelBuffer;
30  import org.jboss.netty.buffer.ChannelBufferFactory;
31  import org.jboss.netty.channel.ChannelException;
32  import org.jboss.netty.channel.ChannelFuture;
33  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
34  
35  public class NioWorker extends AbstractNioWorker {
36  
37      private final SocketReceiveBufferAllocator recvBufferPool = new SocketReceiveBufferAllocator();
38  
39      public NioWorker(Executor executor) {
40          super(executor);
41      }
42  
43      @Override
44      protected boolean read(SelectionKey k) {
45          final SocketChannel ch = (SocketChannel) k.channel();
46          final NioSocketChannel channel = (NioSocketChannel) k.attachment();
47  
48          final ReceiveBufferSizePredictor predictor =
49              channel.getConfig().getReceiveBufferSizePredictor();
50          final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
51          final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
52  
53          int ret = 0;
54          int readBytes = 0;
55          boolean failure = true;
56  
57          ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
58          try {
59              while ((ret = ch.read(bb)) > 0) {
60                  readBytes += ret;
61                  if (!bb.hasRemaining()) {
62                      break;
63                  }
64              }
65              failure = false;
66          } catch (ClosedChannelException e) {
67              // Can happen, and does not need a user attention.
68          } catch (Throwable t) {
69              fireExceptionCaught(channel, t);
70          }
71  
72          if (readBytes > 0) {
73              bb.flip();
74  
75              final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
76              buffer.setBytes(0, bb);
77              buffer.writerIndex(readBytes);
78  
79  
80              // Update the predictor.
81              predictor.previousReceiveBufferSize(readBytes);
82  
83              // Fire the event.
84              fireMessageReceived(channel, buffer);
85          }
86  
87          if (ret < 0 || failure) {
88              k.cancel(); // Some JDK implementations run into an infinite loop without this.
89              close(channel, succeededFuture(channel));
90              return false;
91          }
92  
93          return true;
94      }
95  
96  
97      @Override
98      protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
99          final Thread currentThread = Thread.currentThread();
100         final Thread workerThread = thread;
101         if (currentThread != workerThread) {
102             if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
103                 boolean offered = writeTaskQueue.offer(channel.writeTask);
104                 assert offered;
105             }
106 
107             if (!(channel instanceof NioAcceptedSocketChannel) ||
108                 ((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
109                 final Selector workerSelector = selector;
110                 if (workerSelector != null) {
111                     if (wakenUp.compareAndSet(false, true)) {
112                         workerSelector.wakeup();
113                     }
114                 }
115             } else {
116                 // A write request can be made from an acceptor thread (boss)
117                 // when a user attempted to write something in:
118                 //
119                 //   * channelOpen()
120                 //   * channelBound()
121                 //   * channelConnected().
122                 //
123                 // In this case, there's no need to wake up the selector because
124                 // the channel is not even registered yet at this moment.
125             }
126 
127             return true;
128         }
129 
130         return false;
131     }
132 
133     @Override
134     public void releaseExternalResources() {
135         super.releaseExternalResources();
136         recvBufferPool.releaseExternalResources();
137     }
138 
139     @Override
140     protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
141         boolean server = !(channel instanceof NioClientSocketChannel);
142         return new RegisterTask((NioSocketChannel) channel, future, server);
143     }
144 
145     private final class RegisterTask implements Runnable {
146         private final NioSocketChannel channel;
147         private final ChannelFuture future;
148         private final boolean server;
149 
150         RegisterTask(
151                 NioSocketChannel channel, ChannelFuture future, boolean server) {
152 
153             this.channel = channel;
154             this.future = future;
155             this.server = server;
156         }
157 
158         public void run() {
159             SocketAddress localAddress = channel.getLocalAddress();
160             SocketAddress remoteAddress = channel.getRemoteAddress();
161 
162             if (localAddress == null || remoteAddress == null) {
163                 if (future != null) {
164                     future.setFailure(new ClosedChannelException());
165                 }
166                 close(channel, succeededFuture(channel));
167                 return;
168             }
169 
170             try {
171                 if (server) {
172                     channel.channel.configureBlocking(false);
173                 }
174 
175                 synchronized (channel.interestOpsLock) {
176                     channel.channel.register(
177                             selector, channel.getRawInterestOps(), channel);
178                 }
179                 if (future != null) {
180                     channel.setConnected();
181                     future.setSuccess();
182                 }
183 
184                 if (server || !((NioClientSocketChannel) channel).boundManually) {
185                     fireChannelBound(channel, localAddress);
186                 }
187                 fireChannelConnected(channel, remoteAddress);
188             } catch (IOException e) {
189                 if (future != null) {
190                     future.setFailure(e);
191                 }
192                 close(channel, succeededFuture(channel));
193                 if (!(e instanceof ClosedChannelException)) {
194                     throw new ChannelException(
195                             "Failed to register a socket to the selector.", e);
196                 }
197             }
198 
199         }
200     }
201 
202 }