View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelException;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelFutureListener;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.ConnectTimeoutException;
29  import io.netty.channel.EventLoop;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.ReferenceCounted;
32  import io.netty.util.concurrent.Future;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.net.SocketAddress;
38  import java.nio.channels.CancelledKeyException;
39  import java.nio.channels.ClosedChannelException;
40  import java.nio.channels.ConnectionPendingException;
41  import java.nio.channels.SelectableChannel;
42  import java.nio.channels.SelectionKey;
43  import java.util.concurrent.TimeUnit;
44  
45  /**
46   * Abstract base class for {@link Channel} implementations which use a Selector based approach.
47   */
48  public abstract class AbstractNioChannel extends AbstractChannel {
49  
50      private static final InternalLogger logger =
51              InternalLoggerFactory.getInstance(AbstractNioChannel.class);
52  
53      private final SelectableChannel ch;
54      protected final int readInterestOp;
55      volatile SelectionKey selectionKey;
56      boolean readPending;
57      private final Runnable clearReadPendingRunnable = new Runnable() {
58          @Override
59          public void run() {
60              clearReadPending0();
61          }
62      };
63  
64      /**
65       * The future of the current connection attempt.  If not null, subsequent
66       * connection attempts will fail.
67       */
68      private ChannelPromise connectPromise;
69      private Future<?> connectTimeoutFuture;
70      private SocketAddress requestedRemoteAddress;
71  
72      /**
73       * Create a new instance
74       *
75       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
76       * @param ch                the underlying {@link SelectableChannel} on which it operates
77       * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
78       */
79      protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
80          super(parent);
81          this.ch = ch;
82          this.readInterestOp = readInterestOp;
83          try {
84              ch.configureBlocking(false);
85          } catch (IOException e) {
86              try {
87                  ch.close();
88              } catch (IOException e2) {
89                  logger.warn(
90                              "Failed to close a partially initialized socket.", e2);
91              }
92  
93              throw new ChannelException("Failed to enter non-blocking mode.", e);
94          }
95      }
96  
97      @Override
98      public boolean isOpen() {
99          return ch.isOpen();
100     }
101 
102     @Override
103     public NioUnsafe unsafe() {
104         return (NioUnsafe) super.unsafe();
105     }
106 
107     protected SelectableChannel javaChannel() {
108         return ch;
109     }
110 
111     @Override
112     public NioEventLoop eventLoop() {
113         return (NioEventLoop) super.eventLoop();
114     }
115 
116     /**
117      * Return the current {@link SelectionKey}
118      */
119     protected SelectionKey selectionKey() {
120         assert selectionKey != null;
121         return selectionKey;
122     }
123 
124     /**
125      * @deprecated No longer supported.
126      * No longer supported.
127      */
128     @Deprecated
129     protected boolean isReadPending() {
130         return readPending;
131     }
132 
133     /**
134      * @deprecated Use {@link #clearReadPending()} if appropriate instead.
135      * No longer supported.
136      */
137     @Deprecated
138     protected void setReadPending(final boolean readPending) {
139         if (isRegistered()) {
140             EventLoop eventLoop = eventLoop();
141             if (eventLoop.inEventLoop()) {
142                 setReadPending0(readPending);
143             } else {
144                 eventLoop.execute(new Runnable() {
145                     @Override
146                     public void run() {
147                         setReadPending0(readPending);
148                     }
149                 });
150             }
151         } else {
152             // Best effort if we are not registered yet clear readPending.
153             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
154             // not set yet so it would produce an assertion failure.
155             this.readPending = readPending;
156         }
157     }
158 
159     /**
160      * Set read pending to {@code false}.
161      */
162     protected final void clearReadPending() {
163         if (isRegistered()) {
164             EventLoop eventLoop = eventLoop();
165             if (eventLoop.inEventLoop()) {
166                 clearReadPending0();
167             } else {
168                 eventLoop.execute(clearReadPendingRunnable);
169             }
170         } else {
171             // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
172             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
173             // not set yet so it would produce an assertion failure.
174             readPending = false;
175         }
176     }
177 
178     private void setReadPending0(boolean readPending) {
179         this.readPending = readPending;
180         if (!readPending) {
181             ((AbstractNioUnsafe) unsafe()).removeReadOp();
182         }
183     }
184 
185     private void clearReadPending0() {
186         readPending = false;
187         ((AbstractNioUnsafe) unsafe()).removeReadOp();
188     }
189 
190     /**
191      * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
192      */
193     public interface NioUnsafe extends Unsafe {
194         /**
195          * Return underlying {@link SelectableChannel}
196          */
197         SelectableChannel ch();
198 
199         /**
200          * Finish connect
201          */
202         void finishConnect();
203 
204         /**
205          * Read from underlying {@link SelectableChannel}
206          */
207         void read();
208 
209         void forceFlush();
210     }
211 
212     protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
213 
214         protected final void removeReadOp() {
215             SelectionKey key = selectionKey();
216             // Check first if the key is still valid as it may be canceled as part of the deregistration
217             // from the EventLoop
218             // See https://github.com/netty/netty/issues/2104
219             if (!key.isValid()) {
220                 return;
221             }
222             int interestOps = key.interestOps();
223             if ((interestOps & readInterestOp) != 0) {
224                 // only remove readInterestOp if needed
225                 key.interestOps(interestOps & ~readInterestOp);
226             }
227         }
228 
229         @Override
230         public final SelectableChannel ch() {
231             return javaChannel();
232         }
233 
234         @Override
235         public final void connect(
236                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
237             if (!promise.setUncancellable() || !ensureOpen(promise)) {
238                 return;
239             }
240 
241             try {
242                 if (connectPromise != null) {
243                     // Already a connect in process.
244                     throw new ConnectionPendingException();
245                 }
246 
247                 boolean wasActive = isActive();
248                 if (doConnect(remoteAddress, localAddress)) {
249                     fulfillConnectPromise(promise, wasActive);
250                 } else {
251                     connectPromise = promise;
252                     requestedRemoteAddress = remoteAddress;
253 
254                     // Schedule connect timeout.
255                     int connectTimeoutMillis = config().getConnectTimeoutMillis();
256                     if (connectTimeoutMillis > 0) {
257                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
258                             @Override
259                             public void run() {
260                                 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
261                                 if (connectPromise != null && !connectPromise.isDone()
262                                         && connectPromise.tryFailure(new ConnectTimeoutException(
263                                                 "connection timed out: " + remoteAddress))) {
264                                     close(voidPromise());
265                                 }
266                             }
267                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
268                     }
269 
270                     promise.addListener(new ChannelFutureListener() {
271                         @Override
272                         public void operationComplete(ChannelFuture future) throws Exception {
273                             if (future.isCancelled()) {
274                                 if (connectTimeoutFuture != null) {
275                                     connectTimeoutFuture.cancel(false);
276                                 }
277                                 connectPromise = null;
278                                 close(voidPromise());
279                             }
280                         }
281                     });
282                 }
283             } catch (Throwable t) {
284                 promise.tryFailure(annotateConnectException(t, remoteAddress));
285                 closeIfClosed();
286             }
287         }
288 
289         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
290             if (promise == null) {
291                 // Closed via cancellation and the promise has been notified already.
292                 return;
293             }
294 
295             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
296             // We still need to ensure we call fireChannelActive() in this case.
297             boolean active = isActive();
298 
299             // trySuccess() will return false if a user cancelled the connection attempt.
300             boolean promiseSet = promise.trySuccess();
301 
302             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
303             // because what happened is what happened.
304             if (!wasActive && active) {
305                 pipeline().fireChannelActive();
306             }
307 
308             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
309             if (!promiseSet) {
310                 close(voidPromise());
311             }
312         }
313 
314         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
315             if (promise == null) {
316                 // Closed via cancellation and the promise has been notified already.
317                 return;
318             }
319 
320             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
321             promise.tryFailure(cause);
322             closeIfClosed();
323         }
324 
325         @Override
326         public final void finishConnect() {
327             // Note this method is invoked by the event loop only if the connection attempt was
328             // neither cancelled nor timed out.
329 
330             assert eventLoop().inEventLoop();
331 
332             try {
333                 boolean wasActive = isActive();
334                 doFinishConnect();
335                 fulfillConnectPromise(connectPromise, wasActive);
336             } catch (Throwable t) {
337                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
338             } finally {
339                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
340                 // See https://github.com/netty/netty/issues/1770
341                 if (connectTimeoutFuture != null) {
342                     connectTimeoutFuture.cancel(false);
343                 }
344                 connectPromise = null;
345             }
346         }
347 
348         @Override
349         protected final void flush0() {
350             // Flush immediately only when there's no pending flush.
351             // If there's a pending flush operation, event loop will call forceFlush() later,
352             // and thus there's no need to call it now.
353             if (!isFlushPending()) {
354                 super.flush0();
355             }
356         }
357 
358         @Override
359         public final void forceFlush() {
360             // directly call super.flush0() to force a flush now
361             super.flush0();
362         }
363 
364         private boolean isFlushPending() {
365             SelectionKey selectionKey = selectionKey();
366             return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
367         }
368     }
369 
370     @Override
371     protected boolean isCompatible(EventLoop loop) {
372         return loop instanceof NioEventLoop;
373     }
374 
375     @Override
376     protected void doRegister() throws Exception {
377         boolean selected = false;
378         for (;;) {
379             try {
380                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
381                 return;
382             } catch (CancelledKeyException e) {
383                 if (!selected) {
384                     // Force the Selector to select now as the "canceled" SelectionKey may still be
385                     // cached and not removed because no Select.select(..) operation was called yet.
386                     eventLoop().selectNow();
387                     selected = true;
388                 } else {
389                     // We forced a select operation on the selector before but the SelectionKey is still cached
390                     // for whatever reason. JDK bug ?
391                     throw e;
392                 }
393             }
394         }
395     }
396 
397     @Override
398     protected void doDeregister() throws Exception {
399         eventLoop().cancel(selectionKey());
400     }
401 
402     @Override
403     protected void doBeginRead() throws Exception {
404         // Channel.read() or ChannelHandlerContext.read() was called
405         final SelectionKey selectionKey = this.selectionKey;
406         if (!selectionKey.isValid()) {
407             return;
408         }
409 
410         readPending = true;
411 
412         final int interestOps = selectionKey.interestOps();
413         if ((interestOps & readInterestOp) == 0) {
414             selectionKey.interestOps(interestOps | readInterestOp);
415         }
416     }
417 
418     /**
419      * Connect to the remote peer
420      */
421     protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
422 
423     /**
424      * Finish the connect
425      */
426     protected abstract void doFinishConnect() throws Exception;
427 
428     /**
429      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
430      * Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
431      * but just returns the original {@link ByteBuf}..
432      */
433     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
434         final int readableBytes = buf.readableBytes();
435         if (readableBytes == 0) {
436             ReferenceCountUtil.safeRelease(buf);
437             return Unpooled.EMPTY_BUFFER;
438         }
439 
440         final ByteBufAllocator alloc = alloc();
441         if (alloc.isDirectBufferPooled()) {
442             ByteBuf directBuf = alloc.directBuffer(readableBytes);
443             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
444             ReferenceCountUtil.safeRelease(buf);
445             return directBuf;
446         }
447 
448         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
449         if (directBuf != null) {
450             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
451             ReferenceCountUtil.safeRelease(buf);
452             return directBuf;
453         }
454 
455         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
456         return buf;
457     }
458 
459     /**
460      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
461      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
462      * this method.  Note that this method does not create an off-heap copy if the allocation / deallocation cost is
463      * too high, but just returns the original {@link ByteBuf}..
464      */
465     protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
466         final int readableBytes = buf.readableBytes();
467         if (readableBytes == 0) {
468             ReferenceCountUtil.safeRelease(holder);
469             return Unpooled.EMPTY_BUFFER;
470         }
471 
472         final ByteBufAllocator alloc = alloc();
473         if (alloc.isDirectBufferPooled()) {
474             ByteBuf directBuf = alloc.directBuffer(readableBytes);
475             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
476             ReferenceCountUtil.safeRelease(holder);
477             return directBuf;
478         }
479 
480         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
481         if (directBuf != null) {
482             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
483             ReferenceCountUtil.safeRelease(holder);
484             return directBuf;
485         }
486 
487         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
488         if (holder != buf) {
489             // Ensure to call holder.release() to give the holder a chance to release other resources than its content.
490             buf.retain();
491             ReferenceCountUtil.safeRelease(holder);
492         }
493 
494         return buf;
495     }
496 
497     @Override
498     protected void doClose() throws Exception {
499         ChannelPromise promise = connectPromise;
500         if (promise != null) {
501             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
502             promise.tryFailure(new ClosedChannelException());
503             connectPromise = null;
504         }
505 
506         Future<?> future = connectTimeoutFuture;
507         if (future != null) {
508             future.cancel(false);
509             connectTimeoutFuture = null;
510         }
511     }
512 }