View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.channel.socket.nio;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.ChannelBufferFactory;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelException;
22  import org.jboss.netty.channel.ChannelFuture;
23  import org.jboss.netty.channel.ReceiveBufferSizePredictor;
24  import org.jboss.netty.util.ThreadNameDeterminer;
25  
26  import java.io.IOException;
27  import java.net.SocketAddress;
28  import java.nio.ByteBuffer;
29  import java.nio.channels.ClosedChannelException;
30  import java.nio.channels.SelectionKey;
31  import java.nio.channels.SocketChannel;
32  import java.util.concurrent.Executor;
33  
34  import static org.jboss.netty.channel.Channels.*;
35  
36  public class NioWorker extends AbstractNioWorker {
37  
38      private final SocketReceiveBufferAllocator recvBufferPool = new SocketReceiveBufferAllocator();
39  
40      public NioWorker(Executor executor) {
41          super(executor);
42      }
43  
44      public NioWorker(Executor executor, ThreadNameDeterminer determiner) {
45          super(executor, determiner);
46      }
47  
48      @Override
49      protected boolean read(SelectionKey k) {
50          final SocketChannel ch = (SocketChannel) k.channel();
51          final NioSocketChannel channel = (NioSocketChannel) k.attachment();
52  
53          final ReceiveBufferSizePredictor predictor =
54              channel.getConfig().getReceiveBufferSizePredictor();
55          final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
56          final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
57  
58          int ret = 0;
59          int readBytes = 0;
60          boolean failure = true;
61  
62          ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
63          try {
64              while ((ret = ch.read(bb)) > 0) {
65                  readBytes += ret;
66                  if (!bb.hasRemaining()) {
67                      break;
68                  }
69              }
70              failure = false;
71          } catch (ClosedChannelException e) {
72              // Can happen, and does not need a user attention.
73          } catch (Throwable t) {
74              fireExceptionCaught(channel, t);
75          }
76  
77          if (readBytes > 0) {
78              bb.flip();
79  
80              final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
81              buffer.setBytes(0, bb);
82              buffer.writerIndex(readBytes);
83  
84              // Update the predictor.
85              predictor.previousReceiveBufferSize(readBytes);
86  
87              // Fire the event.
88              fireMessageReceived(channel, buffer);
89          }
90  
91          if (ret < 0 || failure) {
92              k.cancel(); // Some JDK implementations run into an infinite loop without this.
93              close(channel, succeededFuture(channel));
94              return false;
95          }
96  
97          return true;
98      }
99  
100     @Override
101     protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
102         final Thread currentThread = Thread.currentThread();
103         final Thread workerThread = thread;
104         if (currentThread != workerThread) {
105             if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
106                 registerTask(channel.writeTask);
107             }
108 
109             return true;
110         }
111 
112         return false;
113     }
114 
115     @Override
116     protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
117         boolean server = !(channel instanceof NioClientSocketChannel);
118         return new RegisterTask((NioSocketChannel) channel, future, server);
119     }
120 
121     private final class RegisterTask implements Runnable {
122         private final NioSocketChannel channel;
123         private final ChannelFuture future;
124         private final boolean server;
125 
126         RegisterTask(
127                 NioSocketChannel channel, ChannelFuture future, boolean server) {
128 
129             this.channel = channel;
130             this.future = future;
131             this.server = server;
132         }
133 
134         public void run() {
135             SocketAddress localAddress = channel.getLocalAddress();
136             SocketAddress remoteAddress = channel.getRemoteAddress();
137 
138             if (localAddress == null || remoteAddress == null) {
139                 if (future != null) {
140                     future.setFailure(new ClosedChannelException());
141                 }
142                 close(channel, succeededFuture(channel));
143                 return;
144             }
145 
146             try {
147                 if (server) {
148                     channel.channel.configureBlocking(false);
149                 }
150 
151                 channel.channel.register(
152                         selector, channel.getInternalInterestOps(), channel);
153 
154                 if (future != null) {
155                     channel.setConnected();
156                     future.setSuccess();
157                 }
158 
159                 if (server || !((NioClientSocketChannel) channel).boundManually) {
160                     fireChannelBound(channel, localAddress);
161                 }
162                 fireChannelConnected(channel, remoteAddress);
163             } catch (IOException e) {
164                 if (future != null) {
165                     future.setFailure(e);
166                 }
167                 close(channel, succeededFuture(channel));
168                 if (!(e instanceof ClosedChannelException)) {
169                     throw new ChannelException(
170                             "Failed to register a socket to the selector.", e);
171                 }
172             }
173         }
174     }
175 
176     @Override
177     public void run() {
178         super.run();
179         recvBufferPool.releaseExternalResources();
180     }
181 }