View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelException;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelFutureListener;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.ConnectTimeoutException;
29  import io.netty.channel.EventLoop;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.ReferenceCounted;
32  import io.netty.util.internal.OneTimeTask;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.net.SocketAddress;
38  import java.nio.channels.CancelledKeyException;
39  import java.nio.channels.SelectableChannel;
40  import java.nio.channels.SelectionKey;
41  import java.util.concurrent.ScheduledFuture;
42  import java.util.concurrent.TimeUnit;
43  
44  /**
45   * Abstract base class for {@link Channel} implementations which use a Selector based approach.
46   */
47  public abstract class AbstractNioChannel extends AbstractChannel {
48  
49      private static final InternalLogger logger =
50              InternalLoggerFactory.getInstance(AbstractNioChannel.class);
51  
52      private final SelectableChannel ch;
53      protected final int readInterestOp;
54      volatile SelectionKey selectionKey;
55      private volatile boolean inputShutdown;
56      private volatile boolean readPending;
57  
58      /**
59       * The future of the current connection attempt.  If not null, subsequent
60       * connection attempts will fail.
61       */
62      private ChannelPromise connectPromise;
63      private ScheduledFuture<?> connectTimeoutFuture;
64      private SocketAddress requestedRemoteAddress;
65  
66      /**
67       * Create a new instance
68       *
69       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
70       * @param ch                the underlying {@link SelectableChannel} on which it operates
71       * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
72       */
73      protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
74          super(parent);
75          this.ch = ch;
76          this.readInterestOp = readInterestOp;
77          try {
78              ch.configureBlocking(false);
79          } catch (IOException e) {
80              try {
81                  ch.close();
82              } catch (IOException e2) {
83                  if (logger.isWarnEnabled()) {
84                      logger.warn(
85                              "Failed to close a partially initialized socket.", e2);
86                  }
87              }
88  
89              throw new ChannelException("Failed to enter non-blocking mode.", e);
90          }
91      }
92  
93      @Override
94      public boolean isOpen() {
95          return ch.isOpen();
96      }
97  
98      @Override
99      public NioUnsafe unsafe() {
100         return (NioUnsafe) super.unsafe();
101     }
102 
103     protected SelectableChannel javaChannel() {
104         return ch;
105     }
106 
107     /**
108      * Return the current {@link SelectionKey}
109      */
110     protected SelectionKey selectionKey() {
111         assert selectionKey != null;
112         return selectionKey;
113     }
114 
115     protected boolean isReadPending() {
116         return readPending;
117     }
118 
119     protected void setReadPending(boolean readPending) {
120         this.readPending = readPending;
121     }
122 
123     /**
124      * Return {@code true} if the input of this {@link Channel} is shutdown
125      */
126     protected boolean isInputShutdown() {
127         return inputShutdown;
128     }
129 
130     /**
131      * Shutdown the input of this {@link Channel}.
132      */
133     void setInputShutdown() {
134         inputShutdown = true;
135     }
136 
137     /**
138      * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
139      */
140     public interface NioUnsafe extends Unsafe {
141         /**
142          * Return underlying {@link SelectableChannel}
143          */
144         SelectableChannel ch();
145 
146         /**
147          * Finish connect
148          */
149         void finishConnect();
150 
151         /**
152          * Read from underlying {@link SelectableChannel}
153          */
154         void read();
155 
156         void forceFlush();
157     }
158 
159     protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
160 
161         protected final void removeReadOp() {
162             SelectionKey key = selectionKey();
163             // Check first if the key is still valid as it may be canceled as part of the deregistration
164             // from the EventLoop
165             // See https://github.com/netty/netty/issues/2104
166             if (!key.isValid()) {
167                 return;
168             }
169             int interestOps = key.interestOps();
170             if ((interestOps & readInterestOp) != 0) {
171                 // only remove readInterestOp if needed
172                 key.interestOps(interestOps & ~readInterestOp);
173             }
174         }
175 
176         @Override
177         public final SelectableChannel ch() {
178             return javaChannel();
179         }
180 
181         @Override
182         public final void connect(
183                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
184             if (!promise.setUncancellable() || !ensureOpen(promise)) {
185                 return;
186             }
187 
188             try {
189                 if (connectPromise != null) {
190                     throw new IllegalStateException("connection attempt already made");
191                 }
192 
193                 boolean wasActive = isActive();
194                 if (doConnect(remoteAddress, localAddress)) {
195                     fulfillConnectPromise(promise, wasActive);
196                 } else {
197                     connectPromise = promise;
198                     requestedRemoteAddress = remoteAddress;
199 
200                     // Schedule connect timeout.
201                     int connectTimeoutMillis = config().getConnectTimeoutMillis();
202                     if (connectTimeoutMillis > 0) {
203                         connectTimeoutFuture = eventLoop().schedule(new OneTimeTask() {
204                             @Override
205                             public void run() {
206                                 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
207                                 ConnectTimeoutException cause =
208                                         new ConnectTimeoutException("connection timed out: " + remoteAddress);
209                                 if (connectPromise != null && connectPromise.tryFailure(cause)) {
210                                     close(voidPromise());
211                                 }
212                             }
213                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
214                     }
215 
216                     promise.addListener(new ChannelFutureListener() {
217                         @Override
218                         public void operationComplete(ChannelFuture future) throws Exception {
219                             if (future.isCancelled()) {
220                                 if (connectTimeoutFuture != null) {
221                                     connectTimeoutFuture.cancel(false);
222                                 }
223                                 connectPromise = null;
224                                 close(voidPromise());
225                             }
226                         }
227                     });
228                 }
229             } catch (Throwable t) {
230                 promise.tryFailure(annotateConnectException(t, remoteAddress));
231                 closeIfClosed();
232             }
233         }
234 
235         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
236             if (promise == null) {
237                 // Closed via cancellation and the promise has been notified already.
238                 return;
239             }
240 
241             // trySuccess() will return false if a user cancelled the connection attempt.
242             boolean promiseSet = promise.trySuccess();
243 
244             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
245             // because what happened is what happened.
246             if (!wasActive && isActive()) {
247                 pipeline().fireChannelActive();
248             }
249 
250             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
251             if (!promiseSet) {
252                 close(voidPromise());
253             }
254         }
255 
256         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
257             if (promise == null) {
258                 // Closed via cancellation and the promise has been notified already.
259                 return;
260             }
261 
262             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
263             promise.tryFailure(cause);
264             closeIfClosed();
265         }
266 
267         @Override
268         public final void finishConnect() {
269             // Note this method is invoked by the event loop only if the connection attempt was
270             // neither cancelled nor timed out.
271 
272             assert eventLoop().inEventLoop();
273 
274             try {
275                 boolean wasActive = isActive();
276                 doFinishConnect();
277                 fulfillConnectPromise(connectPromise, wasActive);
278             } catch (Throwable t) {
279                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
280             } finally {
281                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
282                 // See https://github.com/netty/netty/issues/1770
283                 if (connectTimeoutFuture != null) {
284                     connectTimeoutFuture.cancel(false);
285                 }
286                 connectPromise = null;
287             }
288         }
289 
290         @Override
291         protected final void flush0() {
292             // Flush immediately only when there's no pending flush.
293             // If there's a pending flush operation, event loop will call forceFlush() later,
294             // and thus there's no need to call it now.
295             if (isFlushPending()) {
296                 return;
297             }
298             super.flush0();
299         }
300 
301         @Override
302         public final void forceFlush() {
303             // directly call super.flush0() to force a flush now
304             super.flush0();
305         }
306 
307         private boolean isFlushPending() {
308             SelectionKey selectionKey = selectionKey();
309             return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
310         }
311     }
312 
313     @Override
314     protected boolean isCompatible(EventLoop loop) {
315         return loop instanceof NioEventLoop;
316     }
317 
318     @Override
319     protected void doRegister() throws Exception {
320         boolean selected = false;
321         for (;;) {
322             try {
323                 selectionKey = javaChannel().register(((NioEventLoop) eventLoop().unwrap()).selector, 0, this);
324                 return;
325             } catch (CancelledKeyException e) {
326                 if (!selected) {
327                     // Force the Selector to select now as the "canceled" SelectionKey may still be
328                     // cached and not removed because no Select.select(..) operation was called yet.
329                     ((NioEventLoop) eventLoop().unwrap()).selectNow();
330                     selected = true;
331                 } else {
332                     // We forced a select operation on the selector before but the SelectionKey is still cached
333                     // for whatever reason. JDK bug ?
334                     throw e;
335                 }
336             }
337         }
338     }
339 
340     @Override
341     protected void doDeregister() throws Exception {
342         ((NioEventLoop) eventLoop().unwrap()).cancel(selectionKey());
343     }
344 
345     @Override
346     protected void doBeginRead() throws Exception {
347         // Channel.read() or ChannelHandlerContext.read() was called
348         if (inputShutdown) {
349             return;
350         }
351 
352         final SelectionKey selectionKey = this.selectionKey;
353         if (!selectionKey.isValid()) {
354             return;
355         }
356 
357         readPending = true;
358 
359         final int interestOps = selectionKey.interestOps();
360         if ((interestOps & readInterestOp) == 0) {
361             selectionKey.interestOps(interestOps | readInterestOp);
362         }
363     }
364 
365     /**
366      * Connect to the remote peer
367      */
368     protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
369 
370     /**
371      * Finish the connect
372      */
373     protected abstract void doFinishConnect() throws Exception;
374 
375     /**
376      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
377      * Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
378      * but just returns the original {@link ByteBuf}..
379      */
380     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
381         final int readableBytes = buf.readableBytes();
382         if (readableBytes == 0) {
383             ReferenceCountUtil.safeRelease(buf);
384             return Unpooled.EMPTY_BUFFER;
385         }
386 
387         final ByteBufAllocator alloc = alloc();
388         if (alloc.isDirectBufferPooled()) {
389             ByteBuf directBuf = alloc.directBuffer(readableBytes);
390             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
391             ReferenceCountUtil.safeRelease(buf);
392             return directBuf;
393         }
394 
395         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
396         if (directBuf != null) {
397             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
398             ReferenceCountUtil.safeRelease(buf);
399             return directBuf;
400         }
401 
402         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
403         return buf;
404     }
405 
406     /**
407      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
408      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
409      * this method.  Note that this method does not create an off-heap copy if the allocation / deallocation cost is
410      * too high, but just returns the original {@link ByteBuf}..
411      */
412     protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
413         final int readableBytes = buf.readableBytes();
414         if (readableBytes == 0) {
415             ReferenceCountUtil.safeRelease(holder);
416             return Unpooled.EMPTY_BUFFER;
417         }
418 
419         final ByteBufAllocator alloc = alloc();
420         if (alloc.isDirectBufferPooled()) {
421             ByteBuf directBuf = alloc.directBuffer(readableBytes);
422             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
423             ReferenceCountUtil.safeRelease(holder);
424             return directBuf;
425         }
426 
427         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
428         if (directBuf != null) {
429             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
430             ReferenceCountUtil.safeRelease(holder);
431             return directBuf;
432         }
433 
434         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
435         if (holder != buf) {
436             // Ensure to call holder.release() to give the holder a chance to release other resources than its content.
437             buf.retain();
438             ReferenceCountUtil.safeRelease(holder);
439         }
440 
441         return buf;
442     }
443 }