View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelException;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelFutureListener;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.ConnectTimeoutException;
29  import io.netty.channel.EventLoop;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.ReferenceCounted;
32  import io.netty.util.internal.ThrowableUtil;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.net.SocketAddress;
38  import java.nio.channels.CancelledKeyException;
39  import java.nio.channels.ClosedChannelException;
40  import java.nio.channels.ConnectionPendingException;
41  import java.nio.channels.SelectableChannel;
42  import java.nio.channels.SelectionKey;
43  import java.util.concurrent.ScheduledFuture;
44  import java.util.concurrent.TimeUnit;
45  
46  /**
47   * Abstract base class for {@link Channel} implementations which use a Selector based approach.
48   */
49  public abstract class AbstractNioChannel extends AbstractChannel {
50  
51      private static final InternalLogger logger =
52              InternalLoggerFactory.getInstance(AbstractNioChannel.class);
53  
54      private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
55              new ClosedChannelException(), AbstractNioChannel.class, "doClose()");
56  
57      private final SelectableChannel ch;
58      protected final int readInterestOp;
59      volatile SelectionKey selectionKey;
60      boolean readPending;
61      private final Runnable clearReadPendingRunnable = new Runnable() {
62          @Override
63          public void run() {
64              clearReadPending0();
65          }
66      };
67  
68      /**
69       * The future of the current connection attempt.  If not null, subsequent
70       * connection attempts will fail.
71       */
72      private ChannelPromise connectPromise;
73      private ScheduledFuture<?> connectTimeoutFuture;
74      private SocketAddress requestedRemoteAddress;
75  
76      /**
77       * Create a new instance
78       *
79       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
80       * @param ch                the underlying {@link SelectableChannel} on which it operates
81       * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
82       */
83      protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
84          super(parent);
85          this.ch = ch;
86          this.readInterestOp = readInterestOp;
87          try {
88              ch.configureBlocking(false);
89          } catch (IOException e) {
90              try {
91                  ch.close();
92              } catch (IOException e2) {
93                  if (logger.isWarnEnabled()) {
94                      logger.warn(
95                              "Failed to close a partially initialized socket.", e2);
96                  }
97              }
98  
99              throw new ChannelException("Failed to enter non-blocking mode.", e);
100         }
101     }
102 
103     @Override
104     public boolean isOpen() {
105         return ch.isOpen();
106     }
107 
108     @Override
109     public NioUnsafe unsafe() {
110         return (NioUnsafe) super.unsafe();
111     }
112 
113     protected SelectableChannel javaChannel() {
114         return ch;
115     }
116 
117     @Override
118     public NioEventLoop eventLoop() {
119         return (NioEventLoop) super.eventLoop();
120     }
121 
122     /**
123      * Return the current {@link SelectionKey}
124      */
125     protected SelectionKey selectionKey() {
126         assert selectionKey != null;
127         return selectionKey;
128     }
129 
130     /**
131      * @deprecated No longer supported.
132      * No longer supported.
133      */
134     @Deprecated
135     protected boolean isReadPending() {
136         return readPending;
137     }
138 
139     /**
140      * @deprecated Use {@link #clearReadPending()} if appropriate instead.
141      * No longer supported.
142      */
143     @Deprecated
144     protected void setReadPending(final boolean readPending) {
145         if (isRegistered()) {
146             EventLoop eventLoop = eventLoop();
147             if (eventLoop.inEventLoop()) {
148                 setReadPending0(readPending);
149             } else {
150                 eventLoop.execute(new Runnable() {
151                     @Override
152                     public void run() {
153                         setReadPending0(readPending);
154                     }
155                 });
156             }
157         } else {
158             // Best effort if we are not registered yet clear readPending.
159             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
160             // not set yet so it would produce an assertion failure.
161             this.readPending = readPending;
162         }
163     }
164 
165     /**
166      * Set read pending to {@code false}.
167      */
168     protected final void clearReadPending() {
169         if (isRegistered()) {
170             EventLoop eventLoop = eventLoop();
171             if (eventLoop.inEventLoop()) {
172                 clearReadPending0();
173             } else {
174                 eventLoop.execute(clearReadPendingRunnable);
175             }
176         } else {
177             // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
178             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
179             // not set yet so it would produce an assertion failure.
180             readPending = false;
181         }
182     }
183 
184     private void setReadPending0(boolean readPending) {
185         this.readPending = readPending;
186         if (!readPending) {
187             ((AbstractNioUnsafe) unsafe()).removeReadOp();
188         }
189     }
190 
191     private void clearReadPending0() {
192         readPending = false;
193         ((AbstractNioUnsafe) unsafe()).removeReadOp();
194     }
195 
196     /**
197      * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
198      */
199     public interface NioUnsafe extends Unsafe {
200         /**
201          * Return underlying {@link SelectableChannel}
202          */
203         SelectableChannel ch();
204 
205         /**
206          * Finish connect
207          */
208         void finishConnect();
209 
210         /**
211          * Read from underlying {@link SelectableChannel}
212          */
213         void read();
214 
215         void forceFlush();
216     }
217 
218     protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
219 
220         protected final void removeReadOp() {
221             SelectionKey key = selectionKey();
222             // Check first if the key is still valid as it may be canceled as part of the deregistration
223             // from the EventLoop
224             // See https://github.com/netty/netty/issues/2104
225             if (!key.isValid()) {
226                 return;
227             }
228             int interestOps = key.interestOps();
229             if ((interestOps & readInterestOp) != 0) {
230                 // only remove readInterestOp if needed
231                 key.interestOps(interestOps & ~readInterestOp);
232             }
233         }
234 
235         @Override
236         public final SelectableChannel ch() {
237             return javaChannel();
238         }
239 
240         @Override
241         public final void connect(
242                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
243             if (!promise.setUncancellable() || !ensureOpen(promise)) {
244                 return;
245             }
246 
247             try {
248                 if (connectPromise != null) {
249                     // Already a connect in process.
250                     throw new ConnectionPendingException();
251                 }
252 
253                 boolean wasActive = isActive();
254                 if (doConnect(remoteAddress, localAddress)) {
255                     fulfillConnectPromise(promise, wasActive);
256                 } else {
257                     connectPromise = promise;
258                     requestedRemoteAddress = remoteAddress;
259 
260                     // Schedule connect timeout.
261                     int connectTimeoutMillis = config().getConnectTimeoutMillis();
262                     if (connectTimeoutMillis > 0) {
263                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
264                             @Override
265                             public void run() {
266                                 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
267                                 ConnectTimeoutException cause =
268                                         new ConnectTimeoutException("connection timed out: " + remoteAddress);
269                                 if (connectPromise != null && connectPromise.tryFailure(cause)) {
270                                     close(voidPromise());
271                                 }
272                             }
273                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
274                     }
275 
276                     promise.addListener(new ChannelFutureListener() {
277                         @Override
278                         public void operationComplete(ChannelFuture future) throws Exception {
279                             if (future.isCancelled()) {
280                                 if (connectTimeoutFuture != null) {
281                                     connectTimeoutFuture.cancel(false);
282                                 }
283                                 connectPromise = null;
284                                 close(voidPromise());
285                             }
286                         }
287                     });
288                 }
289             } catch (Throwable t) {
290                 promise.tryFailure(annotateConnectException(t, remoteAddress));
291                 closeIfClosed();
292             }
293         }
294 
295         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
296             if (promise == null) {
297                 // Closed via cancellation and the promise has been notified already.
298                 return;
299             }
300 
301             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
302             // We still need to ensure we call fireChannelActive() in this case.
303             boolean active = isActive();
304 
305             // trySuccess() will return false if a user cancelled the connection attempt.
306             boolean promiseSet = promise.trySuccess();
307 
308             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
309             // because what happened is what happened.
310             if (!wasActive && active) {
311                 pipeline().fireChannelActive();
312             }
313 
314             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
315             if (!promiseSet) {
316                 close(voidPromise());
317             }
318         }
319 
320         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
321             if (promise == null) {
322                 // Closed via cancellation and the promise has been notified already.
323                 return;
324             }
325 
326             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
327             promise.tryFailure(cause);
328             closeIfClosed();
329         }
330 
331         @Override
332         public final void finishConnect() {
333             // Note this method is invoked by the event loop only if the connection attempt was
334             // neither cancelled nor timed out.
335 
336             assert eventLoop().inEventLoop();
337 
338             try {
339                 boolean wasActive = isActive();
340                 doFinishConnect();
341                 fulfillConnectPromise(connectPromise, wasActive);
342             } catch (Throwable t) {
343                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
344             } finally {
345                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
346                 // See https://github.com/netty/netty/issues/1770
347                 if (connectTimeoutFuture != null) {
348                     connectTimeoutFuture.cancel(false);
349                 }
350                 connectPromise = null;
351             }
352         }
353 
354         @Override
355         protected final void flush0() {
356             // Flush immediately only when there's no pending flush.
357             // If there's a pending flush operation, event loop will call forceFlush() later,
358             // and thus there's no need to call it now.
359             if (!isFlushPending()) {
360                 super.flush0();
361             }
362         }
363 
364         @Override
365         public final void forceFlush() {
366             // directly call super.flush0() to force a flush now
367             super.flush0();
368         }
369 
370         private boolean isFlushPending() {
371             SelectionKey selectionKey = selectionKey();
372             return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
373         }
374     }
375 
376     @Override
377     protected boolean isCompatible(EventLoop loop) {
378         return loop instanceof NioEventLoop;
379     }
380 
381     @Override
382     protected void doRegister() throws Exception {
383         boolean selected = false;
384         for (;;) {
385             try {
386                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
387                 return;
388             } catch (CancelledKeyException e) {
389                 if (!selected) {
390                     // Force the Selector to select now as the "canceled" SelectionKey may still be
391                     // cached and not removed because no Select.select(..) operation was called yet.
392                     eventLoop().selectNow();
393                     selected = true;
394                 } else {
395                     // We forced a select operation on the selector before but the SelectionKey is still cached
396                     // for whatever reason. JDK bug ?
397                     throw e;
398                 }
399             }
400         }
401     }
402 
403     @Override
404     protected void doDeregister() throws Exception {
405         eventLoop().cancel(selectionKey());
406     }
407 
408     @Override
409     protected void doBeginRead() throws Exception {
410         // Channel.read() or ChannelHandlerContext.read() was called
411         final SelectionKey selectionKey = this.selectionKey;
412         if (!selectionKey.isValid()) {
413             return;
414         }
415 
416         readPending = true;
417 
418         final int interestOps = selectionKey.interestOps();
419         if ((interestOps & readInterestOp) == 0) {
420             selectionKey.interestOps(interestOps | readInterestOp);
421         }
422     }
423 
424     /**
425      * Connect to the remote peer
426      */
427     protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
428 
429     /**
430      * Finish the connect
431      */
432     protected abstract void doFinishConnect() throws Exception;
433 
434     /**
435      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
436      * Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
437      * but just returns the original {@link ByteBuf}..
438      */
439     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
440         final int readableBytes = buf.readableBytes();
441         if (readableBytes == 0) {
442             ReferenceCountUtil.safeRelease(buf);
443             return Unpooled.EMPTY_BUFFER;
444         }
445 
446         final ByteBufAllocator alloc = alloc();
447         if (alloc.isDirectBufferPooled()) {
448             ByteBuf directBuf = alloc.directBuffer(readableBytes);
449             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
450             ReferenceCountUtil.safeRelease(buf);
451             return directBuf;
452         }
453 
454         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
455         if (directBuf != null) {
456             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
457             ReferenceCountUtil.safeRelease(buf);
458             return directBuf;
459         }
460 
461         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
462         return buf;
463     }
464 
465     /**
466      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
467      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
468      * this method.  Note that this method does not create an off-heap copy if the allocation / deallocation cost is
469      * too high, but just returns the original {@link ByteBuf}..
470      */
471     protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
472         final int readableBytes = buf.readableBytes();
473         if (readableBytes == 0) {
474             ReferenceCountUtil.safeRelease(holder);
475             return Unpooled.EMPTY_BUFFER;
476         }
477 
478         final ByteBufAllocator alloc = alloc();
479         if (alloc.isDirectBufferPooled()) {
480             ByteBuf directBuf = alloc.directBuffer(readableBytes);
481             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
482             ReferenceCountUtil.safeRelease(holder);
483             return directBuf;
484         }
485 
486         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
487         if (directBuf != null) {
488             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
489             ReferenceCountUtil.safeRelease(holder);
490             return directBuf;
491         }
492 
493         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
494         if (holder != buf) {
495             // Ensure to call holder.release() to give the holder a chance to release other resources than its content.
496             buf.retain();
497             ReferenceCountUtil.safeRelease(holder);
498         }
499 
500         return buf;
501     }
502 
503     @Override
504     protected void doClose() throws Exception {
505         ChannelPromise promise = connectPromise;
506         if (promise != null) {
507             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
508             promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
509             connectPromise = null;
510         }
511 
512         ScheduledFuture<?> future = connectTimeoutFuture;
513         if (future != null) {
514             future.cancel(false);
515             connectTimeoutFuture = null;
516         }
517     }
518 }