View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.nio;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.buffer.api.DefaultBufferAllocators;
21  import io.netty5.channel.ChannelMetadata;
22  import io.netty5.channel.RecvBufferAllocator;
23  import io.netty5.util.Resource;
24  import io.netty5.channel.AbstractChannel;
25  import io.netty5.channel.Channel;
26  import io.netty5.channel.ChannelException;
27  import io.netty5.channel.EventLoop;
28  import io.netty5.util.internal.logging.InternalLogger;
29  import io.netty5.util.internal.logging.InternalLoggerFactory;
30  
31  import java.io.IOException;
32  import java.net.SocketAddress;
33  import java.nio.channels.CancelledKeyException;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.SelectableChannel;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.Selector;
38  
39  /**
40   * Abstract base class for {@link Channel} implementations which use a Selector based approach.
41   */
42  public abstract class AbstractNioChannel<P extends Channel, L extends SocketAddress, R extends SocketAddress>
43          extends AbstractChannel<P, L, R> {
44  
45      private static final InternalLogger logger =
46              InternalLoggerFactory.getInstance(AbstractNioChannel.class);
47  
48      private final SelectableChannel ch;
49      protected final int readInterestOp;
50      volatile SelectionKey selectionKey;
51      boolean readPending;
52      private final Runnable clearReadPendingRunnable = this::clearReadPending0;
53  
54      private final NioProcessor nioProcessor = new NioProcessor() {
55          @Override
56          public void register(Selector selector) throws ClosedChannelException {
57              int interestOps;
58              SelectionKey key = selectionKey;
59              if (key != null) {
60                  interestOps = key.interestOps();
61                  key.cancel();
62              } else {
63                  interestOps = 0;
64              }
65              selectionKey = javaChannel().register(selector, interestOps, this);
66          }
67  
68          @Override
69          public void deregister() {
70              SelectionKey key = selectionKey;
71              if (key != null) {
72                  key.cancel();
73                  selectionKey = null;
74              }
75          }
76  
77          @Override
78          public void handle(SelectionKey k) {
79              if (!k.isValid()) {
80  
81                  // close the channel if the key is not valid anymore
82                  closeTransportNow();
83                  return;
84              }
85  
86              try {
87                  int readyOps = k.readyOps();
88                  // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
89                  // the NIO JDK channel implementation may throw a NotYetConnectedException.
90                  if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
91                      // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
92                      // See https://github.com/netty/netty/issues/924
93                      int ops = k.interestOps();
94                      ops &= ~SelectionKey.OP_CONNECT;
95                      k.interestOps(ops);
96  
97                      finishConnectNow();
98                  }
99  
100                 // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
101                 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
102                     // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to
103                     // write
104                     forceFlush();
105                 }
106 
107                 // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
108                 // to a spin loop
109                 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
110                     readNow();
111                 }
112             } catch (CancelledKeyException ignored) {
113                 closeTransportNow();
114             }
115         }
116 
117         @Override
118         public void close() {
119             closeTransportNow();
120         }
121     };
122 
123     /**
124      * Create a new instance
125      *
126      * @param parent                the parent {@link Channel} by which this instance was created. May be {@code null}
127      * @param eventLoop             the {@link EventLoop} to use for all I/O.
128      * @param metadata              the {@link ChannelMetadata} to use.
129      * @param defaultRecvAllocator  the default {@link RecvBufferAllocator} to use.
130      * @param ch                    the underlying {@link SelectableChannel} on which it operates
131      * @param readInterestOp        the ops to set to receive data from the {@link SelectableChannel}
132      */
133     protected AbstractNioChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
134                                  RecvBufferAllocator defaultRecvAllocator,
135                                  SelectableChannel ch, int readInterestOp) {
136         super(parent, eventLoop, metadata, defaultRecvAllocator);
137         this.ch = ch;
138         this.readInterestOp = readInterestOp;
139         try {
140             ch.configureBlocking(false);
141         } catch (IOException e) {
142             try {
143                 ch.close();
144             } catch (IOException e2) {
145                 logger.warn(
146                         "Failed to close a partially initialized socket.", e2);
147             }
148 
149             throw new ChannelException("Failed to enter non-blocking mode.", e);
150         }
151     }
152 
153     @Override
154     public boolean isOpen() {
155         return ch.isOpen();
156     }
157 
158     protected SelectableChannel javaChannel() {
159         return ch;
160     }
161 
162     /**
163      * Return the current {@link SelectionKey} or {@code null} if the underlying channel was not registered with the
164      * {@link Selector} yet.
165      */
166     protected SelectionKey selectionKey() {
167         return selectionKey;
168     }
169 
170     /**
171      * @deprecated No longer supported.
172      * No longer supported.
173      */
174     @Deprecated
175     protected boolean isReadPending() {
176         return readPending;
177     }
178 
179     /**
180      * @deprecated Use {@link #clearReadPending()} if appropriate instead.
181      * No longer supported.
182      */
183     @Deprecated
184     protected void setReadPending(final boolean readPending) {
185         if (isRegistered()) {
186             EventLoop eventLoop = executor();
187             if (eventLoop.inEventLoop()) {
188                 setReadPending0(readPending);
189             } else {
190                 eventLoop.execute(() -> setReadPending0(readPending));
191             }
192         } else {
193             // Best effort if we are not registered yet clear readPending.
194             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
195             // not set yet so it would produce an assertion failure.
196             this.readPending = readPending;
197         }
198     }
199 
200     /**
201      * Set read pending to {@code false}.
202      */
203     protected final void clearReadPending() {
204         if (isRegistered()) {
205             EventLoop eventLoop = executor();
206             if (eventLoop.inEventLoop()) {
207                 clearReadPending0();
208             } else {
209                 eventLoop.execute(clearReadPendingRunnable);
210             }
211         } else {
212             // Best effort if we are not registered yet clear readPending. This happens during channel initialization.
213             // NB: We only set the boolean field instead of calling clearReadPending0(), because the SelectionKey is
214             // not set yet so it would produce an assertion failure.
215             readPending = false;
216         }
217     }
218 
219     private void setReadPending0(boolean readPending) {
220         this.readPending = readPending;
221         if (!readPending) {
222             removeReadOp();
223         }
224     }
225 
226     private void clearReadPending0() {
227         readPending = false;
228         removeReadOp();
229     }
230 
231     protected final void removeReadOp() {
232         SelectionKey key = selectionKey();
233         // Check first if the key is still valid as it may be canceled as part of the deregistration
234         // from the EventLoop
235         // See https://github.com/netty/netty/issues/2104
236         if (key == null || !key.isValid()) {
237             return;
238         }
239         int interestOps = key.interestOps();
240         if ((interestOps & readInterestOp) != 0) {
241             // only remove readInterestOp if needed
242             key.interestOps(interestOps & ~readInterestOp);
243         }
244     }
245 
246     @Override
247     protected final void writeFlushed() {
248         // Flush immediately only when there's no pending flush.
249         // If there's a pending flush operation, event loop will call forceFlush() later,
250         // and thus there's no need to call it now.
251         if (!isFlushPending()) {
252             super.writeFlushed();
253         }
254     }
255 
256     final void forceFlush() {
257         // directly call super.flush0() to force a flush now
258         super.writeFlushed();
259     }
260 
261     private boolean isFlushPending() {
262         SelectionKey selectionKey = selectionKey();
263         return selectionKey != null && selectionKey.isValid()
264                 && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
265     }
266 
267     @Override
268     protected void doBeginRead() throws Exception {
269         // Channel.read() or ChannelHandlerContext.read() was called
270         final SelectionKey selectionKey = this.selectionKey;
271         if (!selectionKey.isValid()) {
272             return;
273         }
274 
275         readPending = true;
276 
277         final int interestOps = selectionKey.interestOps();
278         if ((interestOps & readInterestOp) == 0) {
279             selectionKey.interestOps(interestOps | readInterestOp);
280         }
281     }
282 
283     @Override
284     protected void doClose() throws Exception {
285         javaChannel().close();
286     }
287 
288     /**
289      * Allocates a new off-heap copy of the given buffer, unless the cost of doing so is too high.
290      * The given buffer is closed if a copy is created, or returned directly.
291      *
292      * @param buf The buffer to copy.
293      * @return Probably an off-heap copy of the given buffer.
294      */
295     protected final Buffer newDirectBuffer(Buffer buf) {
296         if (buf.readableBytes() == 0) {
297             // Don't bother allocating a zero-sized buffer. They will not cause IO anyway.
298             return buf;
299         }
300 
301         BufferAllocator bufferAllocator = bufferAllocator();
302         if (!bufferAllocator.getAllocationType().isDirect()) {
303             bufferAllocator = DefaultBufferAllocators.offHeapAllocator();
304         }
305         if (bufferAllocator.isPooling()) {
306             try (buf) {
307                 return bufferAllocator.allocate(buf.readableBytes()).writeBytes(buf);
308             }
309         }
310         return buf; // Un-pooled off-heap allocation is too expensive. Give up.
311     }
312 
313     /**
314      * Allocates a new off-heap copy of the given buffer, unless the cost of doing so is too high.
315      * The given holder is closed regardless.
316      *
317      * @param buf The buffer to copy.
318      * @return Probably an off-heap copy of the given buffer.
319      */
320     protected final Buffer newDirectBuffer(Resource<?> holder, Buffer buf) {
321         try (holder) {
322             BufferAllocator bufferAllocator = bufferAllocator();
323             if (!bufferAllocator.getAllocationType().isDirect()) {
324                 bufferAllocator = DefaultBufferAllocators.offHeapAllocator();
325             }
326             if (bufferAllocator.isPooling()) {
327                 return bufferAllocator.allocate(buf.readableBytes()).writeBytes(buf);
328             }
329             // Un-pooled off-heap allocation is too expensive. Give up.
330             // Use split() to grab the readable part of the buffer; the remainder will be closed along with its holder.
331             return buf.split();
332         }
333     }
334 
335     protected abstract void readNow();
336 
337     private void closeTransportNow() {
338         closeTransport(newPromise());
339     }
340 
341     private void finishConnectNow() {
342         finishConnect();
343     }
344 
345     final NioProcessor nioProcessor() {
346         return nioProcessor;
347     }
348 }