View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelMetadata;
23  import io.netty.channel.ChannelOutboundBuffer;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.EventLoop;
26  import io.netty.channel.FileRegion;
27  import io.netty.channel.nio.AbstractNioByteChannel;
28  import io.netty.channel.socket.DefaultSocketChannelConfig;
29  import io.netty.channel.socket.ServerSocketChannel;
30  import io.netty.channel.socket.SocketChannelConfig;
31  import io.netty.util.concurrent.GlobalEventExecutor;
32  import io.netty.util.internal.OneTimeTask;
33  
34  import java.io.IOException;
35  import java.net.InetSocketAddress;
36  import java.net.Socket;
37  import java.net.SocketAddress;
38  import java.nio.ByteBuffer;
39  import java.nio.channels.SelectionKey;
40  import java.nio.channels.SocketChannel;
41  import java.nio.channels.spi.SelectorProvider;
42  import java.util.concurrent.Executor;
43  
44  /**
45   * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
46   */
47  public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
48  
49      private static final ChannelMetadata METADATA = new ChannelMetadata(false);
50      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
51  
52      private static SocketChannel newSocket(SelectorProvider provider) {
53          try {
54              /**
55               *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
56               *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
57               *
58               *  See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.
59               */
60              return provider.openSocketChannel();
61          } catch (IOException e) {
62              throw new ChannelException("Failed to open a socket.", e);
63          }
64      }
65  
66      private final SocketChannelConfig config;
67  
68      /**
69       * Create a new instance
70       */
71      public NioSocketChannel() {
72          this(DEFAULT_SELECTOR_PROVIDER);
73      }
74  
75      /**
76       * Create a new instance using the given {@link SelectorProvider}.
77       */
78      public NioSocketChannel(SelectorProvider provider) {
79          this(newSocket(provider));
80      }
81  
82      /**
83       * Create a new instance using the given {@link SocketChannel}.
84       */
85      public NioSocketChannel(SocketChannel socket) {
86          this(null, socket);
87      }
88      /**
89       * Create a new instance
90       *
91       * @param parent    the {@link Channel} which created this instance or {@code null} if it was created by the user
92       * @param socket    the {@link SocketChannel} which will be used
93       */
94      public NioSocketChannel(Channel parent, SocketChannel socket) {
95          super(parent, socket);
96          config = new NioSocketChannelConfig(this, socket.socket());
97      }
98  
99      @Override
100     public ServerSocketChannel parent() {
101         return (ServerSocketChannel) super.parent();
102     }
103 
104     @Override
105     public ChannelMetadata metadata() {
106         return METADATA;
107     }
108 
109     @Override
110     public SocketChannelConfig config() {
111         return config;
112     }
113 
114     @Override
115     protected SocketChannel javaChannel() {
116         return (SocketChannel) super.javaChannel();
117     }
118 
119     @Override
120     public boolean isActive() {
121         SocketChannel ch = javaChannel();
122         return ch.isOpen() && ch.isConnected();
123     }
124 
125     @Override
126     public boolean isInputShutdown() {
127         return super.isInputShutdown();
128     }
129 
130     @Override
131     public InetSocketAddress localAddress() {
132         return (InetSocketAddress) super.localAddress();
133     }
134 
135     @Override
136     public InetSocketAddress remoteAddress() {
137         return (InetSocketAddress) super.remoteAddress();
138     }
139 
140     @Override
141     public boolean isOutputShutdown() {
142         return javaChannel().socket().isOutputShutdown() || !isActive();
143     }
144 
145     @Override
146     public ChannelFuture shutdownOutput() {
147         return shutdownOutput(newPromise());
148     }
149 
150     @Override
151     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
152         Executor closeExecutor = ((NioSocketChannelUnsafe) unsafe()).closeExecutor();
153         if (closeExecutor != null) {
154             closeExecutor.execute(new OneTimeTask() {
155                 @Override
156                 public void run() {
157                     shutdownOutput0(promise);
158                 }
159             });
160         } else {
161             EventLoop loop = eventLoop();
162             if (loop.inEventLoop()) {
163                 shutdownOutput0(promise);
164             } else {
165                 loop.execute(new OneTimeTask() {
166                     @Override
167                     public void run() {
168                         shutdownOutput0(promise);
169                     }
170                 });
171             }
172         }
173         return promise;
174     }
175 
176     private void shutdownOutput0(final ChannelPromise promise) {
177         try {
178             javaChannel().socket().shutdownOutput();
179             promise.setSuccess();
180         } catch (Throwable t) {
181             promise.setFailure(t);
182         }
183     }
184 
185     @Override
186     protected SocketAddress localAddress0() {
187         return javaChannel().socket().getLocalSocketAddress();
188     }
189 
190     @Override
191     protected SocketAddress remoteAddress0() {
192         return javaChannel().socket().getRemoteSocketAddress();
193     }
194 
195     @Override
196     protected void doBind(SocketAddress localAddress) throws Exception {
197         javaChannel().socket().bind(localAddress);
198     }
199 
200     @Override
201     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
202         if (localAddress != null) {
203             javaChannel().socket().bind(localAddress);
204         }
205 
206         boolean success = false;
207         try {
208             boolean connected = javaChannel().connect(remoteAddress);
209             if (!connected) {
210                 selectionKey().interestOps(SelectionKey.OP_CONNECT);
211             }
212             success = true;
213             return connected;
214         } finally {
215             if (!success) {
216                 doClose();
217             }
218         }
219     }
220 
221     @Override
222     protected void doFinishConnect() throws Exception {
223         if (!javaChannel().finishConnect()) {
224             throw new Error();
225         }
226     }
227 
228     @Override
229     protected void doDisconnect() throws Exception {
230         doClose();
231     }
232 
233     @Override
234     protected void doClose() throws Exception {
235         javaChannel().close();
236     }
237 
238     @Override
239     protected int doReadBytes(ByteBuf byteBuf) throws Exception {
240         return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
241     }
242 
243     @Override
244     protected int doWriteBytes(ByteBuf buf) throws Exception {
245         final int expectedWrittenBytes = buf.readableBytes();
246         return buf.readBytes(javaChannel(), expectedWrittenBytes);
247     }
248 
249     @Override
250     protected long doWriteFileRegion(FileRegion region) throws Exception {
251         final long position = region.transfered();
252         return region.transferTo(javaChannel(), position);
253     }
254 
255     @Override
256     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
257         for (;;) {
258             int size = in.size();
259             if (size == 0) {
260                 // All written so clear OP_WRITE
261                 clearOpWrite();
262                 break;
263             }
264             long writtenBytes = 0;
265             boolean done = false;
266             boolean setOpWrite = false;
267 
268             // Ensure the pending writes are made of ByteBufs only.
269             ByteBuffer[] nioBuffers = in.nioBuffers();
270             int nioBufferCnt = in.nioBufferCount();
271             long expectedWrittenBytes = in.nioBufferSize();
272             SocketChannel ch = javaChannel();
273 
274             // Always us nioBuffers() to workaround data-corruption.
275             // See https://github.com/netty/netty/issues/2761
276             switch (nioBufferCnt) {
277                 case 0:
278                     // We have something else beside ByteBuffers to write so fallback to normal writes.
279                     super.doWrite(in);
280                     return;
281                 case 1:
282                     // Only one ByteBuf so use non-gathering write
283                     ByteBuffer nioBuffer = nioBuffers[0];
284                     for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
285                         final int localWrittenBytes = ch.write(nioBuffer);
286                         if (localWrittenBytes == 0) {
287                             setOpWrite = true;
288                             break;
289                         }
290                         expectedWrittenBytes -= localWrittenBytes;
291                         writtenBytes += localWrittenBytes;
292                         if (expectedWrittenBytes == 0) {
293                             done = true;
294                             break;
295                         }
296                     }
297                     break;
298                 default:
299                     for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
300                         final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
301                         if (localWrittenBytes == 0) {
302                             setOpWrite = true;
303                             break;
304                         }
305                         expectedWrittenBytes -= localWrittenBytes;
306                         writtenBytes += localWrittenBytes;
307                         if (expectedWrittenBytes == 0) {
308                             done = true;
309                             break;
310                         }
311                     }
312                     break;
313             }
314 
315             // Release the fully written buffers, and update the indexes of the partially written buffer.
316             in.removeBytes(writtenBytes);
317 
318             if (!done) {
319                 // Did not write all buffers completely.
320                 incompleteWrite(setOpWrite);
321                 break;
322             }
323         }
324     }
325 
326     @Override
327     protected AbstractNioUnsafe newUnsafe() {
328         return new NioSocketChannelUnsafe();
329     }
330 
331     private final class NioSocketChannelUnsafe extends NioByteUnsafe {
332         @Override
333         protected Executor closeExecutor() {
334             if (config().getSoLinger() > 0) {
335                 return GlobalEventExecutor.INSTANCE;
336             }
337             return null;
338         }
339     }
340 
341     private final class NioSocketChannelConfig  extends DefaultSocketChannelConfig {
342         private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
343             super(channel, javaSocket);
344         }
345 
346         @Override
347         protected void autoReadCleared() {
348             setReadPending(false);
349         }
350     }
351 }