View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.nio;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.AbstractChannel;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelException;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelFutureListener;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.ConnectTimeoutException;
29  import io.netty.channel.EventLoop;
30  import io.netty.util.ReferenceCountUtil;
31  import io.netty.util.ReferenceCounted;
32  import io.netty.util.internal.ThrowableUtil;
33  import io.netty.util.internal.logging.InternalLogger;
34  import io.netty.util.internal.logging.InternalLoggerFactory;
35  
36  import java.io.IOException;
37  import java.net.SocketAddress;
38  import java.nio.channels.CancelledKeyException;
39  import java.nio.channels.ClosedChannelException;
40  import java.nio.channels.ConnectionPendingException;
41  import java.nio.channels.SelectableChannel;
42  import java.nio.channels.SelectionKey;
43  import java.util.concurrent.ScheduledFuture;
44  import java.util.concurrent.TimeUnit;
45  
46  /**
47   * Abstract base class for {@link Channel} implementations which use a Selector based approach.
48   */
49  public abstract class AbstractNioChannel extends AbstractChannel {
50  
51      private static final InternalLogger logger =
52              InternalLoggerFactory.getInstance(AbstractNioChannel.class);
53  
54      private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
55              new ClosedChannelException(), AbstractNioChannel.class, "doClose()");
56  
57      private final SelectableChannel ch;
58      protected final int readInterestOp;
59      volatile SelectionKey selectionKey;
60      private volatile boolean inputShutdown;
61      private volatile boolean readPending;
62  
63      /**
64       * The future of the current connection attempt.  If not null, subsequent
65       * connection attempts will fail.
66       */
67      private ChannelPromise connectPromise;
68      private ScheduledFuture<?> connectTimeoutFuture;
69      private SocketAddress requestedRemoteAddress;
70  
71      /**
72       * Create a new instance
73       *
74       * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
75       * @param ch                the underlying {@link SelectableChannel} on which it operates
76       * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
77       */
78      protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
79          super(parent);
80          this.ch = ch;
81          this.readInterestOp = readInterestOp;
82          try {
83              ch.configureBlocking(false);
84          } catch (IOException e) {
85              try {
86                  ch.close();
87              } catch (IOException e2) {
88                  if (logger.isWarnEnabled()) {
89                      logger.warn(
90                              "Failed to close a partially initialized socket.", e2);
91                  }
92              }
93  
94              throw new ChannelException("Failed to enter non-blocking mode.", e);
95          }
96      }
97  
98      @Override
99      public boolean isOpen() {
100         return ch.isOpen();
101     }
102 
103     @Override
104     public NioUnsafe unsafe() {
105         return (NioUnsafe) super.unsafe();
106     }
107 
108     protected SelectableChannel javaChannel() {
109         return ch;
110     }
111 
112     @Override
113     public NioEventLoop eventLoop() {
114         return (NioEventLoop) super.eventLoop();
115     }
116 
117     /**
118      * Return the current {@link SelectionKey}
119      */
120     protected SelectionKey selectionKey() {
121         assert selectionKey != null;
122         return selectionKey;
123     }
124 
125     protected boolean isReadPending() {
126         return readPending;
127     }
128 
129     protected void setReadPending(boolean readPending) {
130         this.readPending = readPending;
131     }
132 
133     /**
134      * Return {@code true} if the input of this {@link Channel} is shutdown
135      */
136     protected boolean isInputShutdown() {
137         return inputShutdown;
138     }
139 
140     /**
141      * Shutdown the input of this {@link Channel}.
142      */
143     void setInputShutdown() {
144         inputShutdown = true;
145     }
146 
147     /**
148      * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel}
149      */
150     public interface NioUnsafe extends Unsafe {
151         /**
152          * Return underlying {@link SelectableChannel}
153          */
154         SelectableChannel ch();
155 
156         /**
157          * Finish connect
158          */
159         void finishConnect();
160 
161         /**
162          * Read from underlying {@link SelectableChannel}
163          */
164         void read();
165 
166         void forceFlush();
167     }
168 
169     protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
170 
171         protected final void removeReadOp() {
172             SelectionKey key = selectionKey();
173             // Check first if the key is still valid as it may be canceled as part of the deregistration
174             // from the EventLoop
175             // See https://github.com/netty/netty/issues/2104
176             if (!key.isValid()) {
177                 return;
178             }
179             int interestOps = key.interestOps();
180             if ((interestOps & readInterestOp) != 0) {
181                 // only remove readInterestOp if needed
182                 key.interestOps(interestOps & ~readInterestOp);
183             }
184         }
185 
186         @Override
187         public final SelectableChannel ch() {
188             return javaChannel();
189         }
190 
191         @Override
192         public final void connect(
193                 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
194             if (!promise.setUncancellable() || !ensureOpen(promise)) {
195                 return;
196             }
197 
198             try {
199                 if (connectPromise != null) {
200                     // Already a connect in process.
201                     throw new ConnectionPendingException();
202                 }
203 
204                 boolean wasActive = isActive();
205                 if (doConnect(remoteAddress, localAddress)) {
206                     fulfillConnectPromise(promise, wasActive);
207                 } else {
208                     connectPromise = promise;
209                     requestedRemoteAddress = remoteAddress;
210 
211                     // Schedule connect timeout.
212                     int connectTimeoutMillis = config().getConnectTimeoutMillis();
213                     if (connectTimeoutMillis > 0) {
214                         connectTimeoutFuture = eventLoop().schedule(new Runnable() {
215                             @Override
216                             public void run() {
217                                 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
218                                 ConnectTimeoutException cause =
219                                         new ConnectTimeoutException("connection timed out: " + remoteAddress);
220                                 if (connectPromise != null && connectPromise.tryFailure(cause)) {
221                                     close(voidPromise());
222                                 }
223                             }
224                         }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
225                     }
226 
227                     promise.addListener(new ChannelFutureListener() {
228                         @Override
229                         public void operationComplete(ChannelFuture future) throws Exception {
230                             if (future.isCancelled()) {
231                                 if (connectTimeoutFuture != null) {
232                                     connectTimeoutFuture.cancel(false);
233                                 }
234                                 connectPromise = null;
235                                 close(voidPromise());
236                             }
237                         }
238                     });
239                 }
240             } catch (Throwable t) {
241                 promise.tryFailure(annotateConnectException(t, remoteAddress));
242                 closeIfClosed();
243             }
244         }
245 
246         private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
247             if (promise == null) {
248                 // Closed via cancellation and the promise has been notified already.
249                 return;
250             }
251 
252             // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
253             // We still need to ensure we call fireChannelActive() in this case.
254             boolean active = isActive();
255 
256             // trySuccess() will return false if a user cancelled the connection attempt.
257             boolean promiseSet = promise.trySuccess();
258 
259             // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
260             // because what happened is what happened.
261             if (!wasActive && active) {
262                 pipeline().fireChannelActive();
263             }
264 
265             // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
266             if (!promiseSet) {
267                 close(voidPromise());
268             }
269         }
270 
271         private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
272             if (promise == null) {
273                 // Closed via cancellation and the promise has been notified already.
274                 return;
275             }
276 
277             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
278             promise.tryFailure(cause);
279             closeIfClosed();
280         }
281 
282         @Override
283         public final void finishConnect() {
284             // Note this method is invoked by the event loop only if the connection attempt was
285             // neither cancelled nor timed out.
286 
287             assert eventLoop().inEventLoop();
288 
289             try {
290                 boolean wasActive = isActive();
291                 doFinishConnect();
292                 fulfillConnectPromise(connectPromise, wasActive);
293             } catch (Throwable t) {
294                 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
295             } finally {
296                 // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
297                 // See https://github.com/netty/netty/issues/1770
298                 if (connectTimeoutFuture != null) {
299                     connectTimeoutFuture.cancel(false);
300                 }
301                 connectPromise = null;
302             }
303         }
304 
305         @Override
306         protected final void flush0() {
307             // Flush immediately only when there's no pending flush.
308             // If there's a pending flush operation, event loop will call forceFlush() later,
309             // and thus there's no need to call it now.
310             if (isFlushPending()) {
311                 return;
312             }
313             super.flush0();
314         }
315 
316         @Override
317         public final void forceFlush() {
318             // directly call super.flush0() to force a flush now
319             super.flush0();
320         }
321 
322         private boolean isFlushPending() {
323             SelectionKey selectionKey = selectionKey();
324             return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
325         }
326     }
327 
328     @Override
329     protected boolean isCompatible(EventLoop loop) {
330         return loop instanceof NioEventLoop;
331     }
332 
333     @Override
334     protected void doRegister() throws Exception {
335         boolean selected = false;
336         for (;;) {
337             try {
338                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
339                 return;
340             } catch (CancelledKeyException e) {
341                 if (!selected) {
342                     // Force the Selector to select now as the "canceled" SelectionKey may still be
343                     // cached and not removed because no Select.select(..) operation was called yet.
344                     eventLoop().selectNow();
345                     selected = true;
346                 } else {
347                     // We forced a select operation on the selector before but the SelectionKey is still cached
348                     // for whatever reason. JDK bug ?
349                     throw e;
350                 }
351             }
352         }
353     }
354 
355     @Override
356     protected void doDeregister() throws Exception {
357         eventLoop().cancel(selectionKey());
358     }
359 
360     @Override
361     protected void doBeginRead() throws Exception {
362         // Channel.read() or ChannelHandlerContext.read() was called
363         if (inputShutdown) {
364             return;
365         }
366 
367         final SelectionKey selectionKey = this.selectionKey;
368         if (!selectionKey.isValid()) {
369             return;
370         }
371 
372         readPending = true;
373 
374         final int interestOps = selectionKey.interestOps();
375         if ((interestOps & readInterestOp) == 0) {
376             selectionKey.interestOps(interestOps | readInterestOp);
377         }
378     }
379 
380     /**
381      * Connect to the remote peer
382      */
383     protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
384 
385     /**
386      * Finish the connect
387      */
388     protected abstract void doFinishConnect() throws Exception;
389 
390     /**
391      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the original one.
392      * Note that this method does not create an off-heap copy if the allocation / deallocation cost is too high,
393      * but just returns the original {@link ByteBuf}..
394      */
395     protected final ByteBuf newDirectBuffer(ByteBuf buf) {
396         final int readableBytes = buf.readableBytes();
397         if (readableBytes == 0) {
398             ReferenceCountUtil.safeRelease(buf);
399             return Unpooled.EMPTY_BUFFER;
400         }
401 
402         final ByteBufAllocator alloc = alloc();
403         if (alloc.isDirectBufferPooled()) {
404             ByteBuf directBuf = alloc.directBuffer(readableBytes);
405             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
406             ReferenceCountUtil.safeRelease(buf);
407             return directBuf;
408         }
409 
410         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
411         if (directBuf != null) {
412             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
413             ReferenceCountUtil.safeRelease(buf);
414             return directBuf;
415         }
416 
417         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
418         return buf;
419     }
420 
421     /**
422      * Returns an off-heap copy of the specified {@link ByteBuf}, and releases the specified holder.
423      * The caller must ensure that the holder releases the original {@link ByteBuf} when the holder is released by
424      * this method.  Note that this method does not create an off-heap copy if the allocation / deallocation cost is
425      * too high, but just returns the original {@link ByteBuf}..
426      */
427     protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
428         final int readableBytes = buf.readableBytes();
429         if (readableBytes == 0) {
430             ReferenceCountUtil.safeRelease(holder);
431             return Unpooled.EMPTY_BUFFER;
432         }
433 
434         final ByteBufAllocator alloc = alloc();
435         if (alloc.isDirectBufferPooled()) {
436             ByteBuf directBuf = alloc.directBuffer(readableBytes);
437             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
438             ReferenceCountUtil.safeRelease(holder);
439             return directBuf;
440         }
441 
442         final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
443         if (directBuf != null) {
444             directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
445             ReferenceCountUtil.safeRelease(holder);
446             return directBuf;
447         }
448 
449         // Allocating and deallocating an unpooled direct buffer is very expensive; give up.
450         if (holder != buf) {
451             // Ensure to call holder.release() to give the holder a chance to release other resources than its content.
452             buf.retain();
453             ReferenceCountUtil.safeRelease(holder);
454         }
455 
456         return buf;
457     }
458 
459     @Override
460     protected void doClose() throws Exception {
461         ChannelPromise promise = connectPromise;
462         if (promise != null) {
463             // Use tryFailure() instead of setFailure() to avoid the race against cancel().
464             promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
465             connectPromise = null;
466         }
467 
468         ScheduledFuture<?> future = connectTimeoutFuture;
469         if (future != null) {
470             future.cancel(false);
471             connectTimeoutFuture = null;
472         }
473     }
474 }