View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelException;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelFutureListener;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.ConnectTimeoutException;
29  import io.netty.channel.EventLoop;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.ReferenceCounted;
32  import io.netty.util.internal.ThrowableUtil;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.net.SocketAddress;
38  import java.nio.channels.CancelledKeyException;
39  import java.nio.channels.ClosedChannelException;
40  import java.nio.channels.ConnectionPendingException;
41  import java.nio.channels.SelectableChannel;
42  import java.nio.channels.SelectionKey;
43  import java.util.concurrent.ScheduledFuture;
44  import java.util.concurrent.TimeUnit;
45  
46  /**
47   * Abstract base class for {@link Channel} implementations which use a Selector based approach.
48   */
49  public abstract class AbstractNioChannel extends AbstractChannel {
50  
51      private static final InternalLogger logger =
52              InternalLoggerFactory.getInstance(AbstractNioChannel.class);
53  
54      private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
55              new ClosedChannelException(), AbstractNioChannel.class, "doClose()");
56  
57      private final SelectableChannel ch;
58      protected final int readInterestOp;
59      volatile SelectionKey selectionKey;
60      boolean readPending;
61      private final Runnable clearReadPendingRunnable = new Runnable() {
62          @Override
63          public void run() {
64              clearReadPending0();
65          }
66      };
67  
68      /**
69       * The future of the current connection attempt.  If not null, subsequent
70       * connection attempts will fail.
71       */
72      private ChannelPromise connectPromise;
73      private ScheduledFuture<?> connectTimeoutFuture;
74      private SocketAddress requestedRemoteAddress;
75  
76      /**
77       * Create a new instance
78       *
79       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
80       * @param ch                the underlying {@link SelectableChannel} on which it operates
81       * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
82       */
83      protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
84          super(parent);
85          this.ch = ch;
86          this.readInterestOp = readInterestOp;
87          try {
88              ch.configureBlocking(false);
89          } catch (IOException e) {
90              try {
91                  ch.close();
92              } catch (IOException e2) {
93                  if (logger.isWarnEnabled()) {
94                      logger.warn(
95                              "Failed to close a partially initialized socket.", e2);
96                  }
97              }
98  
99              throw new ChannelException("Failed to enter non-blocking mode.", e);
100         }
101     }
102 
103     @Override
104     public boolean isOpen() {
105         return ch.isOpen();
106     }
107 
108     @Override
109     public NioUnsafe unsafe() {
110         return (NioUnsafe) super.unsafe();
111     }
112 
113     protected SelectableChannel javaChannel() {
114         return ch;
115     }
116 
117     @Override
118     public NioEventLoop eventLoop() {
119         return (NioEventLoop) super.eventLoop();
120     }
121 
122     /**
123      * Return the current {@link SelectionKey}
124      */
125     protected SelectionKey selectionKey() {
126         assert selectionKey != null;
127         return selectionKey;
128     }
129 
130     /**
131      * @deprecated No longer supported.
132      * No longer supported.
133      */
134     @Deprecated
135     protected boolean isReadPending() {
136         return readPending;
137     }
138 
139     /**
140      * @deprecated Use {@link #clearReadPending()} if appropriate instead.
141      * No longer supported.
142      */
143     @Deprecated
144     protected void setReadPending(final boolean readPending) {
145         if (isRegistered()) {
146             EventLoop eventLoop = eventLoop();
147             if (eventLoop.inEventLoop()) {
148                 setReadPending0(readPending);
149             } else {
150                 eventLoop.execute(new Runnable() {
151                     @Override
152                     public void run() {
153                         setReadPending0(readPending);
154                     }
155                 });
156             }
157         } else {
158             // Best effort if we are not registered yet clear readPending.
159             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
160             // not set yet so it would produce an assertion failure.
161             this.readPending = readPending;
162         }
163     }
164 
165     /**
166      * Set read pending to {@code false}.
167      */
168     protected final void clearReadPending() {
169         if (isRegistered()) {
170             EventLoop eventLoop = eventLoop();
171             if (eventLoop.inEventLoop()) {
172                 clearReadPending0();
173             } else {
174                 eventLoop.execute(clearReadPendingRunnable);
175             }
176         } else {
177             // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
178             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
179             // not set yet so it would produce an assertion failure.
180             readPending = false;
181         }
182     }
183 
184     private void setReadPending0(boolean readPending) {
185         this.readPending = readPending;
186         if (!readPending) {
187             ((AbstractNioUnsafe) unsafe()).removeReadOp();
188         }
189     }
190 
191     private void clearReadPending0() {
192         readPending = false;
193         ((AbstractNioUnsafe) unsafe()).removeReadOp();
194     }
195 
196     /**
197      * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
198      */
199     public interface NioUnsafe extends Unsafe {
200         /**
201          * Return underlying {@link SelectableChannel}
202          */
203         SelectableChannel ch();
204 
205         /**
206          * Finish connect
207          */
208         void finishConnect();
209 
210         /**
211          * Read from underlying {@link SelectableChannel}
212          */
213         void read();
214 
215         void forceFlush();
216     }
217 
218     protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
219 
220         protected final void removeReadOp() {
221             SelectionKey key = selectionKey();
222             // Check first if the key is still valid as it may be canceled as part of the deregistration
223             // from the EventLoop
224             // See https://github.com/netty/netty/issues/2104
225             if (!key.isValid()) {
226                 return;
227             }
228             int interestOps = key.interestOps();
229             if ((interestOps & readInterestOp) != 0) {
230                 // only remove readInterestOp if needed
231                 key.interestOps(interestOps & ~readInterestOp);
232             }
233         }
234 
235         @Override
236         public final SelectableChannel ch() {
237             return javaChannel();
238         }
239 
240         @Override
241         public final void connect(
242                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
243             if (!promise.setUncancellable() || !ensureOpen(promise)) {
244                 return;
245             }
246 
247             try {
248                 if (connectPromise != null) {
249                     // Already a connect in process.
250                     throw new ConnectionPendingException();
251                 }
252 
253                 boolean wasActive = isActive();
254                 if (doConnect(remoteAddress, localAddress)) {
255                     fulfillConnectPromise(promise, wasActive);
256                 } else {
257                     connectPromise = promise;
258                     requestedRemoteAddress = remoteAddress;
259 
260                     // Schedule connect timeout.
261                     int connectTimeoutMillis = config().getConnectTimeoutMillis();
262                     if (connectTimeoutMillis > 0) {
263                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
264                             @Override
265                             public void run() {
266                                 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
267                                 ConnectTimeoutException cause =
268                                         new ConnectTimeoutException("connection timed out: " + remoteAddress);
269                                 if (connectPromise != null && connectPromise.tryFailure(cause)) {
270                                     close(voidPromise());
271                                 }
272                             }
273                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
274                     }
275 
276                     promise.addListener(new ChannelFutureListener() {
277                         @Override
278                         public void operationComplete(ChannelFuture future) throws Exception {
279                             if (future.isCancelled()) {
280                                 if (connectTimeoutFuture != null) {
281                                     connectTimeoutFuture.cancel(false);
282                                 }
283                                 connectPromise = null;
284                                 close(voidPromise());
285                             }
286                         }
287                     });
288                 }
289             } catch (Throwable t) {
290                 promise.tryFailure(annotateConnectException(t, remoteAddress));
291                 closeIfClosed();
292             }
293         }
294 
295         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
296             if (promise == null) {
297                 // Closed via cancellation and the promise has been notified already.
298                 return;
299             }
300 
301             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
302             // We still need to ensure we call fireChannelActive() in this case.
303             boolean active = isActive();
304 
305             // trySuccess() will return false if a user cancelled the connection attempt.
306             boolean promiseSet = promise.trySuccess();
307 
308             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
309             // because what happened is what happened.
310             if (!wasActive && active) {
311                 pipeline().fireChannelActive();
312             }
313 
314             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
315             if (!promiseSet) {
316                 close(voidPromise());
317             }
318         }
319 
320         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
321             if (promise == null) {
322                 // Closed via cancellation and the promise has been notified already.
323                 return;
324             }
325 
326             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
327             promise.tryFailure(cause);
328             closeIfClosed();
329         }
330 
331         @Override
332         public final void finishConnect() {
333             // Note this method is invoked by the event loop only if the connection attempt was
334             // neither cancelled nor timed out.
335 
336             assert eventLoop().inEventLoop();
337 
338             try {
339                 boolean wasActive = isActive();
340                 doFinishConnect();
341                 fulfillConnectPromise(connectPromise, wasActive);
342             } catch (Throwable t) {
343                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
344             } finally {
345                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
346                 // See https://github.com/netty/netty/issues/1770
347                 if (connectTimeoutFuture != null) {
348                     connectTimeoutFuture.cancel(false);
349                 }
350                 connectPromise = null;
351             }
352         }
353 
354         @Override
355         protected final void flush0() {
356             // Flush immediately only when there's no pending flush.
357             // If there's a pending flush operation, event loop will call forceFlush() later,
358             // and thus there's no need to call it now.
359             if (isFlushPending()) {
360                 return;
361             }
362             super.flush0();
363         }
364 
365         @Override
366         public final void forceFlush() {
367             // directly call super.flush0() to force a flush now
368             super.flush0();
369         }
370 
371         private boolean isFlushPending() {
372             SelectionKey selectionKey = selectionKey();
373             return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
374         }
375     }
376 
377     @Override
378     protected boolean isCompatible(EventLoop loop) {
379         return loop instanceof NioEventLoop;
380     }
381 
382     @Override
383     protected void doRegister() throws Exception {
384         boolean selected = false;
385         for (;;) {
386             try {
387                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
388                 return;
389             } catch (CancelledKeyException e) {
390                 if (!selected) {
391                     // Force the Selector to select now as the "canceled" SelectionKey may still be
392                     // cached and not removed because no Select.select(..) operation was called yet.
393                     eventLoop().selectNow();
394                     selected = true;
395                 } else {
396                     // We forced a select operation on the selector before but the SelectionKey is still cached
397                     // for whatever reason. JDK bug ?
398                     throw e;
399                 }
400             }
401         }
402     }
403 
404     @Override
405     protected void doDeregister() throws Exception {
406         eventLoop().cancel(selectionKey());
407     }
408 
409     @Override
410     protected void doBeginRead() throws Exception {
411         // Channel.read() or ChannelHandlerContext.read() was called
412         final SelectionKey selectionKey = this.selectionKey;
413         if (!selectionKey.isValid()) {
414             return;
415         }
416 
417         readPending = true;
418 
419         final int interestOps = selectionKey.interestOps();
420         if ((interestOps & readInterestOp) == 0) {
421             selectionKey.interestOps(interestOps | readInterestOp);
422         }
423     }
424 
425     /**
426      * Connect to the remote peer
427      */
428     protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
429 
430     /**
431      * Finish the connect
432      */
433     protected abstract void doFinishConnect() throws Exception;
434 
435     /**
436      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
437      * Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
438      * but just returns the original {@link ByteBuf}..
439      */
440     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
441         final int readableBytes = buf.readableBytes();
442         if (readableBytes == 0) {
443             ReferenceCountUtil.safeRelease(buf);
444             return Unpooled.EMPTY_BUFFER;
445         }
446 
447         final ByteBufAllocator alloc = alloc();
448         if (alloc.isDirectBufferPooled()) {
449             ByteBuf directBuf = alloc.directBuffer(readableBytes);
450             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
451             ReferenceCountUtil.safeRelease(buf);
452             return directBuf;
453         }
454 
455         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
456         if (directBuf != null) {
457             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
458             ReferenceCountUtil.safeRelease(buf);
459             return directBuf;
460         }
461 
462         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
463         return buf;
464     }
465 
466     /**
467      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
468      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
469      * this method.  Note that this method does not create an off-heap copy if the allocation / deallocation cost is
470      * too high, but just returns the original {@link ByteBuf}..
471      */
472     protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
473         final int readableBytes = buf.readableBytes();
474         if (readableBytes == 0) {
475             ReferenceCountUtil.safeRelease(holder);
476             return Unpooled.EMPTY_BUFFER;
477         }
478 
479         final ByteBufAllocator alloc = alloc();
480         if (alloc.isDirectBufferPooled()) {
481             ByteBuf directBuf = alloc.directBuffer(readableBytes);
482             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
483             ReferenceCountUtil.safeRelease(holder);
484             return directBuf;
485         }
486 
487         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
488         if (directBuf != null) {
489             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
490             ReferenceCountUtil.safeRelease(holder);
491             return directBuf;
492         }
493 
494         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
495         if (holder != buf) {
496             // Ensure to call holder.release() to give the holder a chance to release other resources than its content.
497             buf.retain();
498             ReferenceCountUtil.safeRelease(holder);
499         }
500 
501         return buf;
502     }
503 
504     @Override
505     protected void doClose() throws Exception {
506         ChannelPromise promise = connectPromise;
507         if (promise != null) {
508             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
509             promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
510             connectPromise = null;
511         }
512 
513         ScheduledFuture<?> future = connectTimeoutFuture;
514         if (future != null) {
515             future.cancel(false);
516             connectTimeoutFuture = null;
517         }
518     }
519 }