View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.socket.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelException;
21  import io.netty.channel.ChannelFuture;
22  import io.netty.channel.ChannelFutureListener;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.ChannelOutboundBuffer;
25  import io.netty.channel.ChannelPromise;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.FileRegion;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.channel.nio.AbstractNioByteChannel;
30  import io.netty.channel.socket.DefaultSocketChannelConfig;
31  import io.netty.channel.socket.ServerSocketChannel;
32  import io.netty.channel.socket.SocketChannelConfig;
33  import io.netty.util.concurrent.GlobalEventExecutor;
34  import io.netty.util.internal.PlatformDependent;
35  import io.netty.util.internal.SocketUtils;
36  import io.netty.util.internal.UnstableApi;
37  import io.netty.util.internal.logging.InternalLogger;
38  import io.netty.util.internal.logging.InternalLoggerFactory;
39  
40  import java.io.IOException;
41  import java.net.InetSocketAddress;
42  import java.net.Socket;
43  import java.net.SocketAddress;
44  import java.nio.ByteBuffer;
45  import java.nio.channels.SelectionKey;
46  import java.nio.channels.SocketChannel;
47  import java.nio.channels.spi.SelectorProvider;
48  import java.util.Map;
49  import java.util.concurrent.Executor;
50  
51  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
52  
53  /**
54   * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation.
55   */
56  public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
57      private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
58      private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
59  
60      private static SocketChannel newSocket(SelectorProvider provider) {
61          try {
62              /**
63               *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
64               *  {@link SelectorProvider#provider()} which is called by each SocketChannel.open() otherwise.
65               *
66               *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
67               */
68              return provider.openSocketChannel();
69          } catch (IOException e) {
70              throw new ChannelException("Failed to open a socket.", e);
71          }
72      }
73  
74      private final SocketChannelConfig config;
75  
76      /**
77       * Create a new instance
78       */
79      public NioSocketChannel() {
80          this(DEFAULT_SELECTOR_PROVIDER);
81      }
82  
83      /**
84       * Create a new instance using the given {@link SelectorProvider}.
85       */
86      public NioSocketChannel(SelectorProvider provider) {
87          this(newSocket(provider));
88      }
89  
90      /**
91       * Create a new instance using the given {@link SocketChannel}.
92       */
93      public NioSocketChannel(SocketChannel socket) {
94          this(null, socket);
95      }
96  
97      /**
98       * Create a new instance
99       *
100      * @param parent    the {@link Channel} which created this instance or {@code null} if it was created by the user
101      * @param socket    the {@link SocketChannel} which will be used
102      */
103     public NioSocketChannel(Channel parent, SocketChannel socket) {
104         super(parent, socket);
105         config = new NioSocketChannelConfig(this, socket.socket());
106     }
107 
108     @Override
109     public ServerSocketChannel parent() {
110         return (ServerSocketChannel) super.parent();
111     }
112 
113     @Override
114     public SocketChannelConfig config() {
115         return config;
116     }
117 
118     @Override
119     protected SocketChannel javaChannel() {
120         return (SocketChannel) super.javaChannel();
121     }
122 
123     @Override
124     public boolean isActive() {
125         SocketChannel ch = javaChannel();
126         return ch.isOpen() && ch.isConnected();
127     }
128 
129     @Override
130     public boolean isOutputShutdown() {
131         return javaChannel().socket().isOutputShutdown() || !isActive();
132     }
133 
134     @Override
135     public boolean isInputShutdown() {
136         return javaChannel().socket().isInputShutdown() || !isActive();
137     }
138 
139     @Override
140     public boolean isShutdown() {
141         Socket socket = javaChannel().socket();
142         return socket.isInputShutdown() && socket.isOutputShutdown() || !isActive();
143     }
144 
145     @Override
146     public InetSocketAddress localAddress() {
147         return (InetSocketAddress) super.localAddress();
148     }
149 
150     @Override
151     public InetSocketAddress remoteAddress() {
152         return (InetSocketAddress) super.remoteAddress();
153     }
154 
155     @UnstableApi
156     @Override
157     protected final void doShutdownOutput() throws Exception {
158         if (PlatformDependent.javaVersion() >= 7) {
159             javaChannel().shutdownOutput();
160         } else {
161             javaChannel().socket().shutdownOutput();
162         }
163     }
164 
165     @Override
166     public ChannelFuture shutdownOutput() {
167         return shutdownOutput(newPromise());
168     }
169 
170     @Override
171     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
172         final EventLoop loop = eventLoop();
173         if (loop.inEventLoop()) {
174             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
175         } else {
176             loop.execute(new Runnable() {
177                 @Override
178                 public void run() {
179                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
180                 }
181             });
182         }
183         return promise;
184     }
185 
186     @Override
187     public ChannelFuture shutdownInput() {
188         return shutdownInput(newPromise());
189     }
190 
191     @Override
192     protected boolean isInputShutdown0() {
193         return isInputShutdown();
194     }
195 
196     @Override
197     public ChannelFuture shutdownInput(final ChannelPromise promise) {
198         EventLoop loop = eventLoop();
199         if (loop.inEventLoop()) {
200             shutdownInput0(promise);
201         } else {
202             loop.execute(new Runnable() {
203                 @Override
204                 public void run() {
205                     shutdownInput0(promise);
206                 }
207             });
208         }
209         return promise;
210     }
211 
212     @Override
213     public ChannelFuture shutdown() {
214         return shutdown(newPromise());
215     }
216 
217     @Override
218     public ChannelFuture shutdown(final ChannelPromise promise) {
219         ChannelFuture shutdownOutputFuture = shutdownOutput();
220         if (shutdownOutputFuture.isDone()) {
221             shutdownOutputDone(shutdownOutputFuture, promise);
222         } else {
223             shutdownOutputFuture.addListener(new ChannelFutureListener() {
224                 @Override
225                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
226                     shutdownOutputDone(shutdownOutputFuture, promise);
227                 }
228             });
229         }
230         return promise;
231     }
232 
233     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
234         ChannelFuture shutdownInputFuture = shutdownInput();
235         if (shutdownInputFuture.isDone()) {
236             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
237         } else {
238             shutdownInputFuture.addListener(new ChannelFutureListener() {
239                 @Override
240                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
241                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
242                 }
243             });
244         }
245     }
246 
247     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
248                                      ChannelFuture shutdownInputFuture,
249                                      ChannelPromise promise) {
250         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
251         Throwable shutdownInputCause = shutdownInputFuture.cause();
252         if (shutdownOutputCause != null) {
253             if (shutdownInputCause != null) {
254                 logger.debug("Exception suppressed because a previous exception occurred.",
255                         shutdownInputCause);
256             }
257             promise.setFailure(shutdownOutputCause);
258         } else if (shutdownInputCause != null) {
259             promise.setFailure(shutdownInputCause);
260         } else {
261             promise.setSuccess();
262         }
263     }
264     private void shutdownInput0(final ChannelPromise promise) {
265         try {
266             shutdownInput0();
267             promise.setSuccess();
268         } catch (Throwable t) {
269             promise.setFailure(t);
270         }
271     }
272 
273     private void shutdownInput0() throws Exception {
274         if (PlatformDependent.javaVersion() >= 7) {
275             javaChannel().shutdownInput();
276         } else {
277             javaChannel().socket().shutdownInput();
278         }
279     }
280 
281     @Override
282     protected SocketAddress localAddress0() {
283         return javaChannel().socket().getLocalSocketAddress();
284     }
285 
286     @Override
287     protected SocketAddress remoteAddress0() {
288         return javaChannel().socket().getRemoteSocketAddress();
289     }
290 
291     @Override
292     protected void doBind(SocketAddress localAddress) throws Exception {
293         doBind0(localAddress);
294     }
295 
296     private void doBind0(SocketAddress localAddress) throws Exception {
297         if (PlatformDependent.javaVersion() >= 7) {
298             SocketUtils.bind(javaChannel(), localAddress);
299         } else {
300             SocketUtils.bind(javaChannel().socket(), localAddress);
301         }
302     }
303 
304     @Override
305     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
306         if (localAddress != null) {
307             doBind0(localAddress);
308         }
309 
310         boolean success = false;
311         try {
312             boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
313             if (!connected) {
314                 selectionKey().interestOps(SelectionKey.OP_CONNECT);
315             }
316             success = true;
317             return connected;
318         } finally {
319             if (!success) {
320                 doClose();
321             }
322         }
323     }
324 
325     @Override
326     protected void doFinishConnect() throws Exception {
327         if (!javaChannel().finishConnect()) {
328             throw new Error();
329         }
330     }
331 
332     @Override
333     protected void doDisconnect() throws Exception {
334         doClose();
335     }
336 
337     @Override
338     protected void doClose() throws Exception {
339         super.doClose();
340         javaChannel().close();
341     }
342 
343     @Override
344     protected int doReadBytes(ByteBuf byteBuf) throws Exception {
345         final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
346         allocHandle.attemptedBytesRead(byteBuf.writableBytes());
347         return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
348     }
349 
350     @Override
351     protected int doWriteBytes(ByteBuf buf) throws Exception {
352         final int expectedWrittenBytes = buf.readableBytes();
353         return buf.readBytes(javaChannel(), expectedWrittenBytes);
354     }
355 
356     @Override
357     protected long doWriteFileRegion(FileRegion region) throws Exception {
358         final long position = region.transferred();
359         return region.transferTo(javaChannel(), position);
360     }
361 
362     private void adjustMaxBytesPerGatheringWrite(int attempted, int written, int oldMaxBytesPerGatheringWrite) {
363         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
364         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
365         // make a best effort to adjust as OS behavior changes.
366         if (attempted == written) {
367             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
368                 ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted << 1);
369             }
370         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
371             ((NioSocketChannelConfig) config).setMaxBytesPerGatheringWrite(attempted >>> 1);
372         }
373     }
374 
375     @Override
376     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
377         SocketChannel ch = javaChannel();
378         int writeSpinCount = config().getWriteSpinCount();
379         do {
380             if (in.isEmpty()) {
381                 // All written so clear OP_WRITE
382                 clearOpWrite();
383                 // Directly return here so incompleteWrite(...) is not called.
384                 return;
385             }
386 
387             // Ensure the pending writes are made of ByteBufs only.
388             int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
389             ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
390             int nioBufferCnt = in.nioBufferCount();
391 
392             // Always us nioBuffers() to workaround data-corruption.
393             // See https://github.com/netty/netty/issues/2761
394             switch (nioBufferCnt) {
395                 case 0:
396                     // We have something else beside ByteBuffers to write so fallback to normal writes.
397                     writeSpinCount -= doWrite0(in);
398                     break;
399                 case 1: {
400                     // Only one ByteBuf so use non-gathering write
401                     // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
402                     // to check if the total size of all the buffers is non-zero.
403                     ByteBuffer buffer = nioBuffers[0];
404                     int attemptedBytes = buffer.remaining();
405                     final int localWrittenBytes = ch.write(buffer);
406                     if (localWrittenBytes <= 0) {
407                         incompleteWrite(true);
408                         return;
409                     }
410                     adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
411                     in.removeBytes(localWrittenBytes);
412                     --writeSpinCount;
413                     break;
414                 }
415                 default: {
416                     // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
417                     // to check if the total size of all the buffers is non-zero.
418                     // We limit the max amount to int above so cast is safe
419                     long attemptedBytes = in.nioBufferSize();
420                     final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
421                     if (localWrittenBytes <= 0) {
422                         incompleteWrite(true);
423                         return;
424                     }
425                     // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
426                     adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
427                             maxBytesPerGatheringWrite);
428                     in.removeBytes(localWrittenBytes);
429                     --writeSpinCount;
430                     break;
431                 }
432             }
433         } while (writeSpinCount > 0);
434 
435         incompleteWrite(writeSpinCount < 0);
436     }
437 
438     @Override
439     protected AbstractNioUnsafe newUnsafe() {
440         return new NioSocketChannelUnsafe();
441     }
442 
443     private final class NioSocketChannelUnsafe extends NioByteUnsafe {
444         @Override
445         protected Executor prepareToClose() {
446             try {
447                 if (javaChannel().isOpen() && config().getSoLinger() > 0) {
448                     // We need to cancel this key of the channel so we may not end up in a eventloop spin
449                     // because we try to read or write until the actual close happens which may be later due
450                     // SO_LINGER handling.
451                     // See https://github.com/netty/netty/issues/4449
452                     doDeregister();
453                     return GlobalEventExecutor.INSTANCE;
454                 }
455             } catch (Throwable ignore) {
456                 // Ignore the error as the underlying channel may be closed in the meantime and so
457                 // getSoLinger() may produce an exception. In this case we just return null.
458                 // See https://github.com/netty/netty/issues/4449
459             }
460             return null;
461         }
462     }
463 
464     private final class NioSocketChannelConfig extends DefaultSocketChannelConfig {
465         private volatile int maxBytesPerGatheringWrite = Integer.MAX_VALUE;
466         private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
467             super(channel, javaSocket);
468             calculateMaxBytesPerGatheringWrite();
469         }
470 
471         @Override
472         protected void autoReadCleared() {
473             clearReadPending();
474         }
475 
476         @Override
477         public NioSocketChannelConfig setSendBufferSize(int sendBufferSize) {
478             super.setSendBufferSize(sendBufferSize);
479             calculateMaxBytesPerGatheringWrite();
480             return this;
481         }
482 
483         @Override
484         public <T> boolean setOption(ChannelOption<T> option, T value) {
485             if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
486                 return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);
487             }
488             return super.setOption(option, value);
489         }
490 
491         @Override
492         public <T> T getOption(ChannelOption<T> option) {
493             if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
494                 return NioChannelOption.getOption(jdkChannel(), (NioChannelOption<T>) option);
495             }
496             return super.getOption(option);
497         }
498 
499         @SuppressWarnings("unchecked")
500         @Override
501         public Map<ChannelOption<?>, Object> getOptions() {
502             if (PlatformDependent.javaVersion() >= 7) {
503                 return getOptions(super.getOptions(), NioChannelOption.getOptions(jdkChannel()));
504             }
505             return super.getOptions();
506         }
507 
508         void setMaxBytesPerGatheringWrite(int maxBytesPerGatheringWrite) {
509             this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
510         }
511 
512         int getMaxBytesPerGatheringWrite() {
513             return maxBytesPerGatheringWrite;
514         }
515 
516         private void calculateMaxBytesPerGatheringWrite() {
517             // Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
518             int newSendBufferSize = getSendBufferSize() << 1;
519             if (newSendBufferSize > 0) {
520                 setMaxBytesPerGatheringWrite(getSendBufferSize() << 1);
521             }
522         }
523 
524         private SocketChannel jdkChannel() {
525             return ((NioSocketChannel) channel).javaChannel();
526         }
527     }
528 }