1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.nio;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.buffer.api.DefaultBufferAllocators;
21 import io.netty5.channel.ChannelMetadata;
22 import io.netty5.channel.RecvBufferAllocator;
23 import io.netty5.util.Resource;
24 import io.netty5.channel.AbstractChannel;
25 import io.netty5.channel.Channel;
26 import io.netty5.channel.ChannelException;
27 import io.netty5.channel.EventLoop;
28 import io.netty5.util.internal.logging.InternalLogger;
29 import io.netty5.util.internal.logging.InternalLoggerFactory;
30
31 import java.io.IOException;
32 import java.net.SocketAddress;
33 import java.nio.channels.CancelledKeyException;
34 import java.nio.channels.ClosedChannelException;
35 import java.nio.channels.SelectableChannel;
36 import java.nio.channels.SelectionKey;
37 import java.nio.channels.Selector;
38
39
40
41
42 public abstract class AbstractNioChannel<P extends Channel, L extends SocketAddress, R extends SocketAddress>
43 extends AbstractChannel<P, L, R> {
44
45 private static final InternalLogger logger =
46 InternalLoggerFactory.getInstance(AbstractNioChannel.class);
47
48 private final SelectableChannel ch;
49 protected final int readInterestOp;
50 volatile SelectionKey selectionKey;
51 boolean readPending;
52 private final Runnable clearReadPendingRunnable = this::clearReadPending0;
53
54 private final NioProcessor nioProcessor = new NioProcessor() {
55 @Override
56 public void register(Selector selector) throws ClosedChannelException {
57 int interestOps;
58 SelectionKey key = selectionKey;
59 if (key != null) {
60 interestOps = key.interestOps();
61 key.cancel();
62 } else {
63 interestOps = 0;
64 }
65 selectionKey = javaChannel().register(selector, interestOps, this);
66 }
67
68 @Override
69 public void deregister() {
70 SelectionKey key = selectionKey;
71 if (key != null) {
72 key.cancel();
73 selectionKey = null;
74 }
75 }
76
77 @Override
78 public void handle(SelectionKey k) {
79 if (!k.isValid()) {
80
81
82 closeTransportNow();
83 return;
84 }
85
86 try {
87 int readyOps = k.readyOps();
88
89
90 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
91
92
93 int ops = k.interestOps();
94 ops &= ~SelectionKey.OP_CONNECT;
95 k.interestOps(ops);
96
97 finishConnectNow();
98 }
99
100
101 if ((readyOps & SelectionKey.OP_WRITE) != 0) {
102
103
104 forceFlush();
105 }
106
107
108
109 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
110 readNow();
111 }
112 } catch (CancelledKeyException ignored) {
113 closeTransportNow();
114 }
115 }
116
117 @Override
118 public void close() {
119 closeTransportNow();
120 }
121 };
122
123
124
125
126
127
128
129
130
131
132
133 protected AbstractNioChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
134 RecvBufferAllocator defaultRecvAllocator,
135 SelectableChannel ch, int readInterestOp) {
136 super(parent, eventLoop, metadata, defaultRecvAllocator);
137 this.ch = ch;
138 this.readInterestOp = readInterestOp;
139 try {
140 ch.configureBlocking(false);
141 } catch (IOException e) {
142 try {
143 ch.close();
144 } catch (IOException e2) {
145 logger.warn(
146 "Failed to close a partially initialized socket.", e2);
147 }
148
149 throw new ChannelException("Failed to enter non-blocking mode.", e);
150 }
151 }
152
153 @Override
154 public boolean isOpen() {
155 return ch.isOpen();
156 }
157
158 protected SelectableChannel javaChannel() {
159 return ch;
160 }
161
162
163
164
165
166 protected SelectionKey selectionKey() {
167 return selectionKey;
168 }
169
170
171
172
173
174 @Deprecated
175 protected boolean isReadPending() {
176 return readPending;
177 }
178
179
180
181
182
183 @Deprecated
184 protected void setReadPending(final boolean readPending) {
185 if (isRegistered()) {
186 EventLoop eventLoop = executor();
187 if (eventLoop.inEventLoop()) {
188 setReadPending0(readPending);
189 } else {
190 eventLoop.execute(() -> setReadPending0(readPending));
191 }
192 } else {
193
194
195
196 this.readPending = readPending;
197 }
198 }
199
200
201
202
203 protected final void clearReadPending() {
204 if (isRegistered()) {
205 EventLoop eventLoop = executor();
206 if (eventLoop.inEventLoop()) {
207 clearReadPending0();
208 } else {
209 eventLoop.execute(clearReadPendingRunnable);
210 }
211 } else {
212
213
214
215 readPending = false;
216 }
217 }
218
219 private void setReadPending0(boolean readPending) {
220 this.readPending = readPending;
221 if (!readPending) {
222 removeReadOp();
223 }
224 }
225
226 private void clearReadPending0() {
227 readPending = false;
228 removeReadOp();
229 }
230
231 protected final void removeReadOp() {
232 SelectionKey key = selectionKey();
233
234
235
236 if (key == null || !key.isValid()) {
237 return;
238 }
239 int interestOps = key.interestOps();
240 if ((interestOps & readInterestOp) != 0) {
241
242 key.interestOps(interestOps & ~readInterestOp);
243 }
244 }
245
246 @Override
247 protected final void writeFlushed() {
248
249
250
251 if (!isFlushPending()) {
252 super.writeFlushed();
253 }
254 }
255
256 final void forceFlush() {
257
258 super.writeFlushed();
259 }
260
261 private boolean isFlushPending() {
262 SelectionKey selectionKey = selectionKey();
263 return selectionKey != null && selectionKey.isValid()
264 && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
265 }
266
267 @Override
268 protected void doBeginRead() throws Exception {
269
270 final SelectionKey selectionKey = this.selectionKey;
271 if (!selectionKey.isValid()) {
272 return;
273 }
274
275 readPending = true;
276
277 final int interestOps = selectionKey.interestOps();
278 if ((interestOps & readInterestOp) == 0) {
279 selectionKey.interestOps(interestOps | readInterestOp);
280 }
281 }
282
283 @Override
284 protected void doClose() throws Exception {
285 javaChannel().close();
286 }
287
288
289
290
291
292
293
294
295 protected final Buffer newDirectBuffer(Buffer buf) {
296 if (buf.readableBytes() == 0) {
297
298 return buf;
299 }
300
301 BufferAllocator bufferAllocator = bufferAllocator();
302 if (!bufferAllocator.getAllocationType().isDirect()) {
303 bufferAllocator = DefaultBufferAllocators.offHeapAllocator();
304 }
305 if (bufferAllocator.isPooling()) {
306 try (buf) {
307 return bufferAllocator.allocate(buf.readableBytes()).writeBytes(buf);
308 }
309 }
310 return buf;
311 }
312
313
314
315
316
317
318
319
320 protected final Buffer newDirectBuffer(Resource<?> holder, Buffer buf) {
321 try (holder) {
322 BufferAllocator bufferAllocator = bufferAllocator();
323 if (!bufferAllocator.getAllocationType().isDirect()) {
324 bufferAllocator = DefaultBufferAllocators.offHeapAllocator();
325 }
326 if (bufferAllocator.isPooling()) {
327 return bufferAllocator.allocate(buf.readableBytes()).writeBytes(buf);
328 }
329
330
331 return buf.split();
332 }
333 }
334
335 protected abstract void readNow();
336
337 private void closeTransportNow() {
338 closeTransport(newPromise());
339 }
340
341 private void finishConnectNow() {
342 finishConnect();
343 }
344
345 final NioProcessor nioProcessor() {
346 return nioProcessor;
347 }
348 }