View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.EventLoop;
26  import io.netty.channel.FileRegion;
27  import io.netty.channel.nio.AbstractNioByteChannel;
28  import io.netty.channel.socket.DefaultSocketChannelConfig;
29  import io.netty.channel.socket.ServerSocketChannel;
30  import io.netty.channel.socket.SocketChannelConfig;
31  import io.netty.util.concurrent.GlobalEventExecutor;
32  import io.netty.util.internal.PlatformDependent;
33  import io.netty.util.internal.SocketUtils;
34  import io.netty.util.internal.UnstableApi;
35  
36  import java.io.IOException;
37  import java.net.InetSocketAddress;
38  import java.net.Socket;
39  import java.net.SocketAddress;
40  import java.nio.ByteBuffer;
41  import java.nio.channels.SelectionKey;
42  import java.nio.channels.SocketChannel;
43  import java.nio.channels.spi.SelectorProvider;
44  import java.util.concurrent.Executor;
45  
46  /**
47   * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
48   */
49  public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
50  
51      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
52      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
53  
54      private static SocketChannel newSocket(SelectorProvider provider) {
55          try {
56              /**
57               *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
58               *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
59               *
60               *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
61               */
62              return provider.openSocketChannel();
63          } catch (IOException e) {
64              throw new ChannelException("Failed to open a socket.", e);
65          }
66      }
67  
68      private final SocketChannelConfig config;
69  
70      /**
71       * Create a new instance
72       */
73      public NioSocketChannel() {
74          this(newSocket(DEFAULT_SELECTOR_PROVIDER));
75      }
76  
77      /**
78       * Create a new instance using the given {@link SelectorProvider}.
79       */
80      public NioSocketChannel(SelectorProvider provider) {
81          this(newSocket(provider));
82      }
83  
84      /**
85       * Create a new instance using the given {@link SocketChannel}.
86       */
87      public NioSocketChannel(SocketChannel socket) {
88          this(null, socket);
89      }
90  
91      /**
92       * Create a new instance
93       *
94       * @param parent    the {@link Channel} which created this instance or {@code null} if it was created by the user
95       * @param socket    the {@link SocketChannel} which will be used
96       */
97      public NioSocketChannel(Channel parent, SocketChannel socket) {
98          super(parent, socket);
99          config = new NioSocketChannelConfig(this, socket.socket());
100     }
101 
102     @Override
103     public ServerSocketChannel parent() {
104         return (ServerSocketChannel) super.parent();
105     }
106 
107     @Override
108     public ChannelMetadata metadata() {
109         return METADATA;
110     }
111 
112     @Override
113     public SocketChannelConfig config() {
114         return config;
115     }
116 
117     @Override
118     protected SocketChannel javaChannel() {
119         return (SocketChannel) super.javaChannel();
120     }
121 
122     @Override
123     public boolean isActive() {
124         SocketChannel ch = javaChannel();
125         return ch.isOpen() && ch.isConnected();
126     }
127 
128     @Override
129     public boolean isInputShutdown() {
130         return super.isInputShutdown();
131     }
132 
133     @Override
134     public InetSocketAddress localAddress() {
135         return (InetSocketAddress) super.localAddress();
136     }
137 
138     @Override
139     public InetSocketAddress remoteAddress() {
140         return (InetSocketAddress) super.remoteAddress();
141     }
142 
143     @UnstableApi
144     @Override
145     protected final void doShutdownOutput() throws Exception {
146         if (PlatformDependent.javaVersion() >= 7) {
147             javaChannel().shutdownOutput();
148         } else {
149             javaChannel().socket().shutdownOutput();
150         }
151     }
152 
153     @Override
154     public boolean isOutputShutdown() {
155         return javaChannel().socket().isOutputShutdown() || !isActive();
156     }
157 
158     @Override
159     public ChannelFuture shutdownOutput() {
160         return shutdownOutput(newPromise());
161     }
162 
163     @Override
164     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
165         final EventLoop loop = eventLoop();
166         if (loop.inEventLoop()) {
167             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
168         } else {
169             loop.execute(new Runnable() {
170                 @Override
171                 public void run() {
172                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
173                 }
174             });
175         }
176         return promise;
177     }
178 
179     @Override
180     protected SocketAddress localAddress0() {
181         return javaChannel().socket().getLocalSocketAddress();
182     }
183 
184     @Override
185     protected SocketAddress remoteAddress0() {
186         return javaChannel().socket().getRemoteSocketAddress();
187     }
188 
189     @Override
190     protected void doBind(SocketAddress localAddress) throws Exception {
191         doBind0(localAddress);
192     }
193 
194     private void doBind0(SocketAddress localAddress) throws Exception {
195         if (PlatformDependent.javaVersion() >= 7) {
196             SocketUtils.bind(javaChannel(), localAddress);
197         } else {
198             SocketUtils.bind(javaChannel().socket(), localAddress);
199         }
200     }
201 
202     @Override
203     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
204         if (localAddress != null) {
205             doBind0(localAddress);
206         }
207 
208         boolean success = false;
209         try {
210             boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
211             if (!connected) {
212                 selectionKey().interestOps(SelectionKey.OP_CONNECT);
213             }
214             success = true;
215             return connected;
216         } finally {
217             if (!success) {
218                 doClose();
219             }
220         }
221     }
222 
223     @Override
224     protected void doFinishConnect() throws Exception {
225         if (!javaChannel().finishConnect()) {
226             throw new Error();
227         }
228     }
229 
230     @Override
231     protected void doDisconnect() throws Exception {
232         doClose();
233     }
234 
235     @Override
236     protected void doClose() throws Exception {
237         super.doClose();
238         javaChannel().close();
239     }
240 
241     @Override
242     protected int doReadBytes(ByteBuf byteBuf) throws Exception {
243         return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
244     }
245 
246     @Override
247     protected int doWriteBytes(ByteBuf buf) throws Exception {
248         final int expectedWrittenBytes = buf.readableBytes();
249         return buf.readBytes(javaChannel(), expectedWrittenBytes);
250     }
251 
252     @Override
253     protected long doWriteFileRegion(FileRegion region) throws Exception {
254         final long position = region.transfered();
255         return region.transferTo(javaChannel(), position);
256     }
257 
258     @Override
259     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
260         for (;;) {
261             int size = in.size();
262             if (size == 0) {
263                 // All written so clear OP_WRITE
264                 clearOpWrite();
265                 break;
266             }
267             long writtenBytes = 0;
268             boolean done = false;
269             boolean setOpWrite = false;
270 
271             // Ensure the pending writes are made of ByteBufs only.
272             ByteBuffer[] nioBuffers = in.nioBuffers();
273             int nioBufferCnt = in.nioBufferCount();
274             long expectedWrittenBytes = in.nioBufferSize();
275             SocketChannel ch = javaChannel();
276 
277             // Always us nioBuffers() to workaround data-corruption.
278             // See https://github.com/netty/netty/issues/2761
279             switch (nioBufferCnt) {
280                 case 0:
281                     // We have something else beside ByteBuffers to write so fallback to normal writes.
282                     super.doWrite(in);
283                     return;
284                 case 1:
285                     // Only one ByteBuf so use non-gathering write
286                     ByteBuffer nioBuffer = nioBuffers[0];
287                     for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
288                         final int localWrittenBytes = ch.write(nioBuffer);
289                         if (localWrittenBytes == 0) {
290                             setOpWrite = true;
291                             break;
292                         }
293                         expectedWrittenBytes -= localWrittenBytes;
294                         writtenBytes += localWrittenBytes;
295                         if (expectedWrittenBytes == 0) {
296                             done = true;
297                             break;
298                         }
299                     }
300                     break;
301                 default:
302                     for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
303                         final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
304                         if (localWrittenBytes == 0) {
305                             setOpWrite = true;
306                             break;
307                         }
308                         expectedWrittenBytes -= localWrittenBytes;
309                         writtenBytes += localWrittenBytes;
310                         if (expectedWrittenBytes == 0) {
311                             done = true;
312                             break;
313                         }
314                     }
315                     break;
316             }
317 
318             // Release the fully written buffers, and update the indexes of the partially written buffer.
319             in.removeBytes(writtenBytes);
320 
321             if (!done) {
322                 // Did not write all buffers completely.
323                 incompleteWrite(setOpWrite);
324                 break;
325             }
326         }
327     }
328 
329     @Override
330     protected AbstractNioUnsafe newUnsafe() {
331         return new NioSocketChannelUnsafe();
332     }
333 
334     private final class NioSocketChannelUnsafe extends NioByteUnsafe {
335         @Override
336         protected Executor prepareToClose() {
337             try {
338                 if (javaChannel().isOpen() && config().getSoLinger() > 0) {
339                     // We need to cancel this key of the channel so we may not end up in a eventloop spin
340                     // because we try to read or write until the actual close happens which may be later due
341                     // SO_LINGER handling.
342                     // See https://github.com/netty/netty/issues/4449
343                     doDeregister();
344                     return GlobalEventExecutor.INSTANCE;
345                 }
346             } catch (Throwable ignore) {
347                 // Ignore the error as the underlying channel may be closed in the meantime and so
348                 // getSoLinger() may produce an exception. In this case we just return null.
349                 // See https://github.com/netty/netty/issues/4449
350             }
351             return null;
352         }
353     }
354 
355     private final class NioSocketChannelConfig  extends DefaultSocketChannelConfig {
356         private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
357             super(channel, javaSocket);
358         }
359 
360         @Override
361         protected void autoReadCleared() {
362             setReadPending(false);
363         }
364     }
365 }