1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.socket.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelException;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelMetadata;
23 import io.netty.channel.ChannelOutboundBuffer;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.EventLoop;
26 import io.netty.channel.FileRegion;
27 import io.netty.channel.nio.AbstractNioByteChannel;
28 import io.netty.channel.socket.DefaultSocketChannelConfig;
29 import io.netty.channel.socket.ServerSocketChannel;
30 import io.netty.channel.socket.SocketChannelConfig;
31 import io.netty.util.concurrent.GlobalEventExecutor;
32 import io.netty.util.internal.PlatformDependent;
33 import io.netty.util.internal.SocketUtils;
34 import io.netty.util.internal.UnstableApi;
35
36 import java.io.IOException;
37 import java.net.InetSocketAddress;
38 import java.net.Socket;
39 import java.net.SocketAddress;
40 import java.nio.ByteBuffer;
41 import java.nio.channels.SelectionKey;
42 import java.nio.channels.SocketChannel;
43 import java.nio.channels.spi.SelectorProvider;
44 import java.util.concurrent.Executor;
45
46
47
48
49 public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
50
51 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
52 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
53
54 private static SocketChannel newSocket(SelectorProvider provider) {
55 try {
56
57
58
59
60
61
62 return provider.openSocketChannel();
63 } catch (IOException e) {
64 throw new ChannelException("Failed to open a socket.", e);
65 }
66 }
67
68 private final SocketChannelConfig config;
69
70
71
72
73 public NioSocketChannel() {
74 this(newSocket(DEFAULT_SELECTOR_PROVIDER));
75 }
76
77
78
79
80 public NioSocketChannel(SelectorProvider provider) {
81 this(newSocket(provider));
82 }
83
84
85
86
87 public NioSocketChannel(SocketChannel socket) {
88 this(null, socket);
89 }
90
91
92
93
94
95
96
97 public NioSocketChannel(Channel parent, SocketChannel socket) {
98 super(parent, socket);
99 config = new NioSocketChannelConfig(this, socket.socket());
100 }
101
102 @Override
103 public ServerSocketChannel parent() {
104 return (ServerSocketChannel) super.parent();
105 }
106
107 @Override
108 public ChannelMetadata metadata() {
109 return METADATA;
110 }
111
112 @Override
113 public SocketChannelConfig config() {
114 return config;
115 }
116
117 @Override
118 protected SocketChannel javaChannel() {
119 return (SocketChannel) super.javaChannel();
120 }
121
122 @Override
123 public boolean isActive() {
124 SocketChannel ch = javaChannel();
125 return ch.isOpen() && ch.isConnected();
126 }
127
128 @Override
129 public boolean isInputShutdown() {
130 return super.isInputShutdown();
131 }
132
133 @Override
134 public InetSocketAddress localAddress() {
135 return (InetSocketAddress) super.localAddress();
136 }
137
138 @Override
139 public InetSocketAddress remoteAddress() {
140 return (InetSocketAddress) super.remoteAddress();
141 }
142
143 @UnstableApi
144 @Override
145 protected final void doShutdownOutput() throws Exception {
146 if (PlatformDependent.javaVersion() >= 7) {
147 javaChannel().shutdownOutput();
148 } else {
149 javaChannel().socket().shutdownOutput();
150 }
151 }
152
153 @Override
154 public boolean isOutputShutdown() {
155 return javaChannel().socket().isOutputShutdown() || !isActive();
156 }
157
158 @Override
159 public ChannelFuture shutdownOutput() {
160 return shutdownOutput(newPromise());
161 }
162
163 @Override
164 public ChannelFuture shutdownOutput(final ChannelPromise promise) {
165 final EventLoop loop = eventLoop();
166 if (loop.inEventLoop()) {
167 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
168 } else {
169 loop.execute(new Runnable() {
170 @Override
171 public void run() {
172 ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
173 }
174 });
175 }
176 return promise;
177 }
178
179 @Override
180 protected SocketAddress localAddress0() {
181 return javaChannel().socket().getLocalSocketAddress();
182 }
183
184 @Override
185 protected SocketAddress remoteAddress0() {
186 return javaChannel().socket().getRemoteSocketAddress();
187 }
188
189 @Override
190 protected void doBind(SocketAddress localAddress) throws Exception {
191 doBind0(localAddress);
192 }
193
194 private void doBind0(SocketAddress localAddress) throws Exception {
195 if (PlatformDependent.javaVersion() >= 7) {
196 SocketUtils.bind(javaChannel(), localAddress);
197 } else {
198 SocketUtils.bind(javaChannel().socket(), localAddress);
199 }
200 }
201
202 @Override
203 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
204 if (localAddress != null) {
205 doBind0(localAddress);
206 }
207
208 boolean success = false;
209 try {
210 boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
211 if (!connected) {
212 selectionKey().interestOps(SelectionKey.OP_CONNECT);
213 }
214 success = true;
215 return connected;
216 } finally {
217 if (!success) {
218 doClose();
219 }
220 }
221 }
222
223 @Override
224 protected void doFinishConnect() throws Exception {
225 if (!javaChannel().finishConnect()) {
226 throw new Error();
227 }
228 }
229
230 @Override
231 protected void doDisconnect() throws Exception {
232 doClose();
233 }
234
235 @Override
236 protected void doClose() throws Exception {
237 super.doClose();
238 javaChannel().close();
239 }
240
241 @Override
242 protected int doReadBytes(ByteBuf byteBuf) throws Exception {
243 return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
244 }
245
246 @Override
247 protected int doWriteBytes(ByteBuf buf) throws Exception {
248 final int expectedWrittenBytes = buf.readableBytes();
249 return buf.readBytes(javaChannel(), expectedWrittenBytes);
250 }
251
252 @Override
253 protected long doWriteFileRegion(FileRegion region) throws Exception {
254 final long position = region.transfered();
255 return region.transferTo(javaChannel(), position);
256 }
257
258 @Override
259 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
260 for (;;) {
261 int size = in.size();
262 if (size == 0) {
263
264 clearOpWrite();
265 break;
266 }
267 long writtenBytes = 0;
268 boolean done = false;
269 boolean setOpWrite = false;
270
271
272 ByteBuffer[] nioBuffers = in.nioBuffers();
273 int nioBufferCnt = in.nioBufferCount();
274 long expectedWrittenBytes = in.nioBufferSize();
275 SocketChannel ch = javaChannel();
276
277
278
279 switch (nioBufferCnt) {
280 case 0:
281
282 super.doWrite(in);
283 return;
284 case 1:
285
286 ByteBuffer nioBuffer = nioBuffers[0];
287 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
288 final int localWrittenBytes = ch.write(nioBuffer);
289 if (localWrittenBytes == 0) {
290 setOpWrite = true;
291 break;
292 }
293 expectedWrittenBytes -= localWrittenBytes;
294 writtenBytes += localWrittenBytes;
295 if (expectedWrittenBytes == 0) {
296 done = true;
297 break;
298 }
299 }
300 break;
301 default:
302 for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
303 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
304 if (localWrittenBytes == 0) {
305 setOpWrite = true;
306 break;
307 }
308 expectedWrittenBytes -= localWrittenBytes;
309 writtenBytes += localWrittenBytes;
310 if (expectedWrittenBytes == 0) {
311 done = true;
312 break;
313 }
314 }
315 break;
316 }
317
318
319 in.removeBytes(writtenBytes);
320
321 if (!done) {
322
323 incompleteWrite(setOpWrite);
324 break;
325 }
326 }
327 }
328
329 @Override
330 protected AbstractNioUnsafe newUnsafe() {
331 return new NioSocketChannelUnsafe();
332 }
333
334 private final class NioSocketChannelUnsafe extends NioByteUnsafe {
335 @Override
336 protected Executor prepareToClose() {
337 try {
338 if (javaChannel().isOpen() && config().getSoLinger() > 0) {
339
340
341
342
343 doDeregister();
344 return GlobalEventExecutor.INSTANCE;
345 }
346 } catch (Throwable ignore) {
347
348
349
350 }
351 return null;
352 }
353 }
354
355 private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
356 private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
357 super(channel, javaSocket);
358 }
359
360 @Override
361 protected void autoReadCleared() {
362 setReadPending(false);
363 }
364 }
365 }