1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.jboss.netty.channel.socket.nio;
17
18 import static org.jboss.netty.channel.Channels.*;
19
20 import java.io.IOException;
21 import java.net.SocketAddress;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.ClosedChannelException;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.SocketChannel;
27 import java.util.concurrent.Executor;
28
29 import org.jboss.netty.buffer.ChannelBuffer;
30 import org.jboss.netty.buffer.ChannelBufferFactory;
31 import org.jboss.netty.channel.ChannelException;
32 import org.jboss.netty.channel.ChannelFuture;
33 import org.jboss.netty.channel.ReceiveBufferSizePredictor;
34
35 public class NioWorker extends AbstractNioWorker {
36
37 private final SocketReceiveBufferAllocator recvBufferPool = new SocketReceiveBufferAllocator();
38
39 public NioWorker(Executor executor) {
40 super(executor);
41 }
42
43 @Override
44 protected boolean read(SelectionKey k) {
45 final SocketChannel ch = (SocketChannel) k.channel();
46 final NioSocketChannel channel = (NioSocketChannel) k.attachment();
47
48 final ReceiveBufferSizePredictor predictor =
49 channel.getConfig().getReceiveBufferSizePredictor();
50 final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
51 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
52
53 int ret = 0;
54 int readBytes = 0;
55 boolean failure = true;
56
57 ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());
58 try {
59 while ((ret = ch.read(bb)) > 0) {
60 readBytes += ret;
61 if (!bb.hasRemaining()) {
62 break;
63 }
64 }
65 failure = false;
66 } catch (ClosedChannelException e) {
67
68 } catch (Throwable t) {
69 fireExceptionCaught(channel, t);
70 }
71
72 if (readBytes > 0) {
73 bb.flip();
74
75 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
76 buffer.setBytes(0, bb);
77 buffer.writerIndex(readBytes);
78
79
80
81 predictor.previousReceiveBufferSize(readBytes);
82
83
84 fireMessageReceived(channel, buffer);
85 }
86
87 if (ret < 0 || failure) {
88 k.cancel();
89 close(channel, succeededFuture(channel));
90 return false;
91 }
92
93 return true;
94 }
95
96
97 @Override
98 protected boolean scheduleWriteIfNecessary(final AbstractNioChannel<?> channel) {
99 final Thread currentThread = Thread.currentThread();
100 final Thread workerThread = thread;
101 if (currentThread != workerThread) {
102 if (channel.writeTaskInTaskQueue.compareAndSet(false, true)) {
103 boolean offered = writeTaskQueue.offer(channel.writeTask);
104 assert offered;
105 }
106
107 if (!(channel instanceof NioAcceptedSocketChannel) ||
108 ((NioAcceptedSocketChannel) channel).bossThread != currentThread) {
109 final Selector workerSelector = selector;
110 if (workerSelector != null) {
111 if (wakenUp.compareAndSet(false, true)) {
112 workerSelector.wakeup();
113 }
114 }
115 } else {
116
117
118
119
120
121
122
123
124
125 }
126
127 return true;
128 }
129
130 return false;
131 }
132
133 @Override
134 public void releaseExternalResources() {
135 super.releaseExternalResources();
136 recvBufferPool.releaseExternalResources();
137 }
138
139 @Override
140 protected Runnable createRegisterTask(AbstractNioChannel<?> channel, ChannelFuture future) {
141 boolean server = !(channel instanceof NioClientSocketChannel);
142 return new RegisterTask((NioSocketChannel) channel, future, server);
143 }
144
145 private final class RegisterTask implements Runnable {
146 private final NioSocketChannel channel;
147 private final ChannelFuture future;
148 private final boolean server;
149
150 RegisterTask(
151 NioSocketChannel channel, ChannelFuture future, boolean server) {
152
153 this.channel = channel;
154 this.future = future;
155 this.server = server;
156 }
157
158 public void run() {
159 SocketAddress localAddress = channel.getLocalAddress();
160 SocketAddress remoteAddress = channel.getRemoteAddress();
161
162 if (localAddress == null || remoteAddress == null) {
163 if (future != null) {
164 future.setFailure(new ClosedChannelException());
165 }
166 close(channel, succeededFuture(channel));
167 return;
168 }
169
170 try {
171 if (server) {
172 channel.channel.configureBlocking(false);
173 }
174
175 synchronized (channel.interestOpsLock) {
176 channel.channel.register(
177 selector, channel.getRawInterestOps(), channel);
178 }
179 if (future != null) {
180 channel.setConnected();
181 future.setSuccess();
182 }
183
184 if (server || !((NioClientSocketChannel) channel).boundManually) {
185 fireChannelBound(channel, localAddress);
186 }
187 fireChannelConnected(channel, remoteAddress);
188 } catch (IOException e) {
189 if (future != null) {
190 future.setFailure(e);
191 }
192 close(channel, succeededFuture(channel));
193 if (!(e instanceof ClosedChannelException)) {
194 throw new ChannelException(
195 "Failed to register a socket to the selector.", e);
196 }
197 }
198
199 }
200 }
201
202 }