1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package io.netty5.channel.nio;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.buffer.api.DefaultBufferAllocators;
21  import io.netty5.channel.ChannelMetadata;
22  import io.netty5.channel.RecvBufferAllocator;
23  import io.netty5.util.Resource;
24  import io.netty5.channel.AbstractChannel;
25  import io.netty5.channel.Channel;
26  import io.netty5.channel.ChannelException;
27  import io.netty5.channel.EventLoop;
28  import io.netty5.util.internal.logging.InternalLogger;
29  import io.netty5.util.internal.logging.InternalLoggerFactory;
30  
31  import java.io.IOException;
32  import java.net.SocketAddress;
33  import java.nio.channels.CancelledKeyException;
34  import java.nio.channels.ClosedChannelException;
35  import java.nio.channels.SelectableChannel;
36  import java.nio.channels.SelectionKey;
37  import java.nio.channels.Selector;
38  
39  
40  
41  
42  public abstract class AbstractNioChannel<P extends Channel, L extends SocketAddress, R extends SocketAddress>
43          extends AbstractChannel<P, L, R> {
44  
45      private static final InternalLogger logger =
46              InternalLoggerFactory.getInstance(AbstractNioChannel.class);
47  
48      private final SelectableChannel ch;
49      protected final int readInterestOp;
50      volatile SelectionKey selectionKey;
51      boolean readPending;
52      private final Runnable clearReadPendingRunnable = this::clearReadPending0;
53  
54      private final NioProcessor nioProcessor = new NioProcessor() {
55          @Override
56          public void register(Selector selector) throws ClosedChannelException {
57              int interestOps;
58              SelectionKey key = selectionKey;
59              if (key != null) {
60                  interestOps = key.interestOps();
61                  key.cancel();
62              } else {
63                  interestOps = 0;
64              }
65              selectionKey = javaChannel().register(selector, interestOps, this);
66          }
67  
68          @Override
69          public void deregister() {
70              SelectionKey key = selectionKey;
71              if (key != null) {
72                  key.cancel();
73                  selectionKey = null;
74              }
75          }
76  
77          @Override
78          public void handle(SelectionKey k) {
79              if (!k.isValid()) {
80  
81                  
82                  closeTransportNow();
83                  return;
84              }
85  
86              try {
87                  int readyOps = k.readyOps();
88                  
89                  
90                  if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
91                      
92                      
93                      int ops = k.interestOps();
94                      ops &= ~SelectionKey.OP_CONNECT;
95                      k.interestOps(ops);
96  
97                      finishConnectNow();
98                  }
99  
100                 
101                 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
102                     
103                     
104                     forceFlush();
105                 }
106 
107                 
108                 
109                 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
110                     readNow();
111                 }
112             } catch (CancelledKeyException ignored) {
113                 closeTransportNow();
114             }
115         }
116 
117         @Override
118         public void close() {
119             closeTransportNow();
120         }
121     };
122 
123     
124 
125 
126 
127 
128 
129 
130 
131 
132 
133     protected AbstractNioChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
134                                  RecvBufferAllocator defaultRecvAllocator,
135                                  SelectableChannel ch, int readInterestOp) {
136         super(parent, eventLoop, metadata, defaultRecvAllocator);
137         this.ch = ch;
138         this.readInterestOp = readInterestOp;
139         try {
140             ch.configureBlocking(false);
141         } catch (IOException e) {
142             try {
143                 ch.close();
144             } catch (IOException e2) {
145                 logger.warn(
146                         "Failed to close a partially initialized socket.", e2);
147             }
148 
149             throw new ChannelException("Failed to enter non-blocking mode.", e);
150         }
151     }
152 
153     @Override
154     public boolean isOpen() {
155         return ch.isOpen();
156     }
157 
158     protected SelectableChannel javaChannel() {
159         return ch;
160     }
161 
162     
163 
164 
165 
166     protected SelectionKey selectionKey() {
167         return selectionKey;
168     }
169 
170     
171 
172 
173 
174     @Deprecated
175     protected boolean isReadPending() {
176         return readPending;
177     }
178 
179     
180 
181 
182 
183     @Deprecated
184     protected void setReadPending(final boolean readPending) {
185         if (isRegistered()) {
186             EventLoop eventLoop = executor();
187             if (eventLoop.inEventLoop()) {
188                 setReadPending0(readPending);
189             } else {
190                 eventLoop.execute(() -> setReadPending0(readPending));
191             }
192         } else {
193             
194             
195             
196             this.readPending = readPending;
197         }
198     }
199 
200     
201 
202 
203     protected final void clearReadPending() {
204         if (isRegistered()) {
205             EventLoop eventLoop = executor();
206             if (eventLoop.inEventLoop()) {
207                 clearReadPending0();
208             } else {
209                 eventLoop.execute(clearReadPendingRunnable);
210             }
211         } else {
212             
213             
214             
215             readPending = false;
216         }
217     }
218 
219     private void setReadPending0(boolean readPending) {
220         this.readPending = readPending;
221         if (!readPending) {
222             removeReadOp();
223         }
224     }
225 
226     private void clearReadPending0() {
227         readPending = false;
228         removeReadOp();
229     }
230 
231     protected final void removeReadOp() {
232         SelectionKey key = selectionKey();
233         
234         
235         
236         if (key == null || !key.isValid()) {
237             return;
238         }
239         int interestOps = key.interestOps();
240         if ((interestOps & readInterestOp) != 0) {
241             
242             key.interestOps(interestOps & ~readInterestOp);
243         }
244     }
245 
246     @Override
247     protected final void writeFlushed() {
248         
249         
250         
251         if (!isFlushPending()) {
252             super.writeFlushed();
253         }
254     }
255 
256     final void forceFlush() {
257         
258         super.writeFlushed();
259     }
260 
261     private boolean isFlushPending() {
262         SelectionKey selectionKey = selectionKey();
263         return selectionKey != null && selectionKey.isValid()
264                 && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
265     }
266 
267     @Override
268     protected void doBeginRead() throws Exception {
269         
270         final SelectionKey selectionKey = this.selectionKey;
271         if (!selectionKey.isValid()) {
272             return;
273         }
274 
275         readPending = true;
276 
277         final int interestOps = selectionKey.interestOps();
278         if ((interestOps & readInterestOp) == 0) {
279             selectionKey.interestOps(interestOps | readInterestOp);
280         }
281     }
282 
283     @Override
284     protected void doClose() throws Exception {
285         javaChannel().close();
286     }
287 
288     
289 
290 
291 
292 
293 
294 
295     protected final Buffer newDirectBuffer(Buffer buf) {
296         if (buf.readableBytes() == 0) {
297             
298             return buf;
299         }
300 
301         BufferAllocator bufferAllocator = bufferAllocator();
302         if (!bufferAllocator.getAllocationType().isDirect()) {
303             bufferAllocator = DefaultBufferAllocators.offHeapAllocator();
304         }
305         if (bufferAllocator.isPooling()) {
306             try (buf) {
307                 return bufferAllocator.allocate(buf.readableBytes()).writeBytes(buf);
308             }
309         }
310         return buf; 
311     }
312 
313     
314 
315 
316 
317 
318 
319 
320     protected final Buffer newDirectBuffer(Resource<?> holder, Buffer buf) {
321         try (holder) {
322             BufferAllocator bufferAllocator = bufferAllocator();
323             if (!bufferAllocator.getAllocationType().isDirect()) {
324                 bufferAllocator = DefaultBufferAllocators.offHeapAllocator();
325             }
326             if (bufferAllocator.isPooling()) {
327                 return bufferAllocator.allocate(buf.readableBytes()).writeBytes(buf);
328             }
329             
330             
331             return buf.split();
332         }
333     }
334 
335     protected abstract void readNow();
336 
337     private void closeTransportNow() {
338         closeTransport(newPromise());
339     }
340 
341     private void finishConnectNow() {
342         finishConnect();
343     }
344 
345     final NioProcessor nioProcessor() {
346         return nioProcessor;
347     }
348 }