View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelException;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelFutureListener;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.ConnectTimeoutException;
29  import io.netty.channel.EventLoop;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.ReferenceCounted;
32  import io.netty.util.concurrent.Future;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.net.SocketAddress;
38  import java.nio.channels.CancelledKeyException;
39  import java.nio.channels.ClosedChannelException;
40  import java.nio.channels.ConnectionPendingException;
41  import java.nio.channels.SelectableChannel;
42  import java.nio.channels.SelectionKey;
43  import java.util.concurrent.TimeUnit;
44  
45  /**
46   * Abstract base class for {@link Channel} implementations which use a Selector based approach.
47   */
48  public abstract class AbstractNioChannel extends AbstractChannel {
49  
50      private static final InternalLogger logger =
51              InternalLoggerFactory.getInstance(AbstractNioChannel.class);
52  
53      private final SelectableChannel ch;
54      protected final int readInterestOp;
55      volatile SelectionKey selectionKey;
56      boolean readPending;
57      private final Runnable clearReadPendingRunnable = new Runnable() {
58          @Override
59          public void run() {
60              clearReadPending0();
61          }
62      };
63  
64      /**
65       * The future of the current connection attempt.  If not null, subsequent
66       * connection attempts will fail.
67       */
68      private ChannelPromise connectPromise;
69      private Future<?> connectTimeoutFuture;
70      private SocketAddress requestedRemoteAddress;
71  
72      /**
73       * Create a new instance
74       *
75       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
76       * @param ch                the underlying {@link SelectableChannel} on which it operates
77       * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
78       */
79      protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
80          super(parent);
81          this.ch = ch;
82          this.readInterestOp = readInterestOp;
83          try {
84              ch.configureBlocking(false);
85          } catch (IOException e) {
86              try {
87                  ch.close();
88              } catch (IOException e2) {
89                  logger.warn(
90                              "Failed to close a partially initialized socket.", e2);
91              }
92  
93              throw new ChannelException("Failed to enter non-blocking mode.", e);
94          }
95      }
96  
97      @Override
98      public boolean isOpen() {
99          return ch.isOpen();
100     }
101 
102     @Override
103     public NioUnsafe unsafe() {
104         return (NioUnsafe) super.unsafe();
105     }
106 
107     protected SelectableChannel javaChannel() {
108         return ch;
109     }
110 
111     @Override
112     public NioEventLoop eventLoop() {
113         return (NioEventLoop) super.eventLoop();
114     }
115 
116     /**
117      * Return the current {@link SelectionKey}
118      */
119     protected SelectionKey selectionKey() {
120         assert selectionKey != null;
121         return selectionKey;
122     }
123 
124     /**
125      * @deprecated No longer supported.
126      * No longer supported.
127      */
128     @Deprecated
129     protected boolean isReadPending() {
130         return readPending;
131     }
132 
133     /**
134      * @deprecated Use {@link #clearReadPending()} if appropriate instead.
135      * No longer supported.
136      */
137     @Deprecated
138     protected void setReadPending(final boolean readPending) {
139         if (isRegistered()) {
140             EventLoop eventLoop = eventLoop();
141             if (eventLoop.inEventLoop()) {
142                 setReadPending0(readPending);
143             } else {
144                 eventLoop.execute(new Runnable() {
145                     @Override
146                     public void run() {
147                         setReadPending0(readPending);
148                     }
149                 });
150             }
151         } else {
152             // Best effort if we are not registered yet clear readPending.
153             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
154             // not set yet so it would produce an assertion failure.
155             this.readPending = readPending;
156         }
157     }
158 
159     /**
160      * Set read pending to {@code false}.
161      */
162     protected final void clearReadPending() {
163         if (isRegistered()) {
164             EventLoop eventLoop = eventLoop();
165             if (eventLoop.inEventLoop()) {
166                 clearReadPending0();
167             } else {
168                 eventLoop.execute(clearReadPendingRunnable);
169             }
170         } else {
171             // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
172             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
173             // not set yet so it would produce an assertion failure.
174             readPending = false;
175         }
176     }
177 
178     private void setReadPending0(boolean readPending) {
179         this.readPending = readPending;
180         if (!readPending) {
181             ((AbstractNioUnsafe) unsafe()).removeReadOp();
182         }
183     }
184 
185     private void clearReadPending0() {
186         readPending = false;
187         ((AbstractNioUnsafe) unsafe()).removeReadOp();
188     }
189 
190     /**
191      * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
192      */
193     public interface NioUnsafe extends Unsafe {
194         /**
195          * Return underlying {@link SelectableChannel}
196          */
197         SelectableChannel ch();
198 
199         /**
200          * Finish connect
201          */
202         void finishConnect();
203 
204         /**
205          * Read from underlying {@link SelectableChannel}
206          */
207         void read();
208 
209         void forceFlush();
210     }
211 
212     protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
213 
214         protected final void removeReadOp() {
215             SelectionKey key = selectionKey();
216             // Check first if the key is still valid as it may be canceled as part of the deregistration
217             // from the EventLoop
218             // See https://github.com/netty/netty/issues/2104
219             if (!key.isValid()) {
220                 return;
221             }
222             int interestOps = key.interestOps();
223             if ((interestOps & readInterestOp) != 0) {
224                 // only remove readInterestOp if needed
225                 key.interestOps(interestOps & ~readInterestOp);
226             }
227         }
228 
229         @Override
230         public final SelectableChannel ch() {
231             return javaChannel();
232         }
233 
234         @Override
235         public final void connect(
236                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
237             // Don't mark the connect promise as uncancellable as in fact we can cancel it as it is using
238             // non-blocking io.
239             if (promise.isDone() || !ensureOpen(promise)) {
240                 return;
241             }
242 
243             try {
244                 if (connectPromise != null) {
245                     // Already a connect in process.
246                     throw new ConnectionPendingException();
247                 }
248 
249                 boolean wasActive = isActive();
250                 if (doConnect(remoteAddress, localAddress)) {
251                     fulfillConnectPromise(promise, wasActive);
252                 } else {
253                     connectPromise = promise;
254                     requestedRemoteAddress = remoteAddress;
255 
256                     // Schedule connect timeout.
257                     final int connectTimeoutMillis = config().getConnectTimeoutMillis();
258                     if (connectTimeoutMillis > 0) {
259                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
260                             @Override
261                             public void run() {
262                                 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
263                                 if (connectPromise != null && !connectPromise.isDone()
264                                         && connectPromise.tryFailure(new ConnectTimeoutException(
265                                                 "connection timed out after " + connectTimeoutMillis + " ms: " +
266                                                         remoteAddress))) {
267                                     close(voidPromise());
268                                 }
269                             }
270                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
271                     }
272 
273                     promise.addListener(new ChannelFutureListener() {
274                         @Override
275                         public void operationComplete(ChannelFuture future) {
276                             // If the connect future is cancelled we also cancel the timeout and close the
277                             // underlying socket.
278                             if (future.isCancelled()) {
279                                 if (connectTimeoutFuture != null) {
280                                     connectTimeoutFuture.cancel(false);
281                                 }
282                                 connectPromise = null;
283                                 close(voidPromise());
284                             }
285                         }
286                     });
287                 }
288             } catch (Throwable t) {
289                 promise.tryFailure(annotateConnectException(t, remoteAddress));
290                 closeIfClosed();
291             }
292         }
293 
294         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
295             if (promise == null) {
296                 // Closed via cancellation and the promise has been notified already.
297                 return;
298             }
299 
300             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
301             // We still need to ensure we call fireChannelActive() in this case.
302             boolean active = isActive();
303 
304             // trySuccess() will return false if a user cancelled the connection attempt.
305             boolean promiseSet = promise.trySuccess();
306 
307             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
308             // because what happened is what happened.
309             if (!wasActive && active) {
310                 pipeline().fireChannelActive();
311             }
312 
313             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
314             if (!promiseSet) {
315                 close(voidPromise());
316             }
317         }
318 
319         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
320             if (promise == null) {
321                 // Closed via cancellation and the promise has been notified already.
322                 return;
323             }
324 
325             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
326             promise.tryFailure(cause);
327             closeIfClosed();
328         }
329 
330         @Override
331         public final void finishConnect() {
332             // Note this method is invoked by the event loop only if the connection attempt was
333             // neither cancelled nor timed out.
334 
335             assert eventLoop().inEventLoop();
336 
337             try {
338                 boolean wasActive = isActive();
339                 doFinishConnect();
340                 fulfillConnectPromise(connectPromise, wasActive);
341             } catch (Throwable t) {
342                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
343             } finally {
344                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
345                 // See https://github.com/netty/netty/issues/1770
346                 if (connectTimeoutFuture != null) {
347                     connectTimeoutFuture.cancel(false);
348                 }
349                 connectPromise = null;
350             }
351         }
352 
353         @Override
354         protected final void flush0() {
355             // Flush immediately only when there's no pending flush.
356             // If there's a pending flush operation, event loop will call forceFlush() later,
357             // and thus there's no need to call it now.
358             if (!isFlushPending()) {
359                 super.flush0();
360             }
361         }
362 
363         @Override
364         public final void forceFlush() {
365             // directly call super.flush0() to force a flush now
366             super.flush0();
367         }
368 
369         private boolean isFlushPending() {
370             SelectionKey selectionKey = selectionKey();
371             return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
372         }
373     }
374 
375     @Override
376     protected boolean isCompatible(EventLoop loop) {
377         return loop instanceof NioEventLoop;
378     }
379 
380     @Override
381     protected void doRegister() throws Exception {
382         boolean selected = false;
383         for (;;) {
384             try {
385                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
386                 return;
387             } catch (CancelledKeyException e) {
388                 if (!selected) {
389                     // Force the Selector to select now as the "canceled" SelectionKey may still be
390                     // cached and not removed because no Select.select(..) operation was called yet.
391                     eventLoop().selectNow();
392                     selected = true;
393                 } else {
394                     // We forced a select operation on the selector before but the SelectionKey is still cached
395                     // for whatever reason. JDK bug ?
396                     throw e;
397                 }
398             }
399         }
400     }
401 
402     @Override
403     protected void doDeregister() throws Exception {
404         eventLoop().cancel(selectionKey());
405     }
406 
407     @Override
408     protected void doBeginRead() throws Exception {
409         // Channel.read() or ChannelHandlerContext.read() was called
410         final SelectionKey selectionKey = this.selectionKey;
411         if (!selectionKey.isValid()) {
412             return;
413         }
414 
415         readPending = true;
416 
417         final int interestOps = selectionKey.interestOps();
418         if ((interestOps & readInterestOp) == 0) {
419             selectionKey.interestOps(interestOps | readInterestOp);
420         }
421     }
422 
423     /**
424      * Connect to the remote peer
425      */
426     protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
427 
428     /**
429      * Finish the connect
430      */
431     protected abstract void doFinishConnect() throws Exception;
432 
433     /**
434      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
435      * Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
436      * but just returns the original {@link ByteBuf}..
437      */
438     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
439         final int readableBytes = buf.readableBytes();
440         if (readableBytes == 0) {
441             ReferenceCountUtil.safeRelease(buf);
442             return Unpooled.EMPTY_BUFFER;
443         }
444 
445         final ByteBufAllocator alloc = alloc();
446         if (alloc.isDirectBufferPooled()) {
447             ByteBuf directBuf = alloc.directBuffer(readableBytes);
448             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
449             ReferenceCountUtil.safeRelease(buf);
450             return directBuf;
451         }
452 
453         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
454         if (directBuf != null) {
455             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
456             ReferenceCountUtil.safeRelease(buf);
457             return directBuf;
458         }
459 
460         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
461         return buf;
462     }
463 
464     /**
465      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
466      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
467      * this method.  Note that this method does not create an off-heap copy if the allocation / deallocation cost is
468      * too high, but just returns the original {@link ByteBuf}..
469      */
470     protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
471         final int readableBytes = buf.readableBytes();
472         if (readableBytes == 0) {
473             ReferenceCountUtil.safeRelease(holder);
474             return Unpooled.EMPTY_BUFFER;
475         }
476 
477         final ByteBufAllocator alloc = alloc();
478         if (alloc.isDirectBufferPooled()) {
479             ByteBuf directBuf = alloc.directBuffer(readableBytes);
480             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
481             ReferenceCountUtil.safeRelease(holder);
482             return directBuf;
483         }
484 
485         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
486         if (directBuf != null) {
487             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
488             ReferenceCountUtil.safeRelease(holder);
489             return directBuf;
490         }
491 
492         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
493         if (holder != buf) {
494             // Ensure to call holder.release() to give the holder a chance to release other resources than its content.
495             buf.retain();
496             ReferenceCountUtil.safeRelease(holder);
497         }
498 
499         return buf;
500     }
501 
502     @Override
503     protected void doClose() throws Exception {
504         ChannelPromise promise = connectPromise;
505         if (promise != null) {
506             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
507             promise.tryFailure(new ClosedChannelException());
508             connectPromise = null;
509         }
510 
511         Future<?> future = connectTimeoutFuture;
512         if (future != null) {
513             future.cancel(false);
514             connectTimeoutFuture = null;
515         }
516     }
517 }