1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.kqueue;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.buffer.api.DefaultBufferAllocators;
21 import io.netty5.channel.ChannelOption;
22 import io.netty5.channel.ChannelShutdownDirection;
23 import io.netty5.channel.RecvBufferAllocator;
24 import io.netty5.channel.socket.SocketProtocolFamily;
25 import io.netty5.channel.unix.IntegerUnixChannelOption;
26 import io.netty5.channel.unix.RawUnixChannelOption;
27 import io.netty5.util.Resource;
28 import io.netty5.channel.AbstractChannel;
29 import io.netty5.channel.ChannelException;
30 import io.netty5.channel.ChannelMetadata;
31 import io.netty5.channel.ChannelOutboundBuffer;
32 import io.netty5.channel.EventLoop;
33 import io.netty5.channel.unix.FileDescriptor;
34 import io.netty5.channel.unix.UnixChannel;
35
36 import java.io.IOException;
37 import java.net.InetSocketAddress;
38 import java.net.SocketAddress;
39 import java.nio.ByteBuffer;
40 import java.nio.channels.UnresolvedAddressException;
41 import java.util.function.Predicate;
42
43 import static io.netty5.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
44 import static io.netty5.channel.unix.Limits.SSIZE_MAX;
45 import static io.netty5.channel.unix.UnixChannelUtil.computeRemoteAddr;
46 import static java.lang.Math.min;
47 import static java.util.Objects.requireNonNull;
48
49 abstract class AbstractKQueueChannel<P extends UnixChannel>
50 extends AbstractChannel<P, SocketAddress, SocketAddress> implements UnixChannel {
51
52 final BsdSocket socket;
53
54 protected volatile boolean active;
55
56 protected boolean readPending;
57
58 private long numberBytesPending;
59
60
61
62
63
64
65
66
67
68
69 private final Predicate<RecvBufferAllocator.Handle> maybeMoreData = h -> {
70 if (h.lastBytesRead() > 0) {
71 numberBytesPending -= h.lastBytesRead();
72 }
73 return numberBytesPending != 0;
74 };
75
76 private final Runnable readReadyRunnable = new Runnable() {
77 @Override
78 public void run() {
79 readReadyRunnablePending = false;
80
81
82
83 readReady(numberBytesPending);
84 }
85 };
86
87 private KQueueRegistration registration;
88
89 private boolean readFilterEnabled;
90 private boolean writeFilterEnabled;
91 private boolean readReadyRunnablePending;
92 private boolean inputClosedSeenErrorOnRead;
93
94 private boolean maybeMoreDataToRead;
95
96 private boolean eof;
97
98 private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
99
100 private volatile SocketAddress localAddress;
101 private volatile SocketAddress remoteAddress;
102
103 AbstractKQueueChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
104 RecvBufferAllocator defaultRecvAllocator, BsdSocket fd, boolean active) {
105 super(parent, eventLoop, metadata, defaultRecvAllocator);
106 socket = requireNonNull(fd, "fd");
107 this.active = active;
108 if (active) {
109
110
111 this.localAddress = fd.localAddress();
112 this.remoteAddress = fd.remoteAddress();
113 }
114 }
115
116 AbstractKQueueChannel(P parent, EventLoop eventLoop, ChannelMetadata metadata,
117 RecvBufferAllocator defaultRecvAllocator, BsdSocket fd, SocketAddress remote) {
118 super(parent, eventLoop, metadata, defaultRecvAllocator);
119 socket = requireNonNull(fd, "fd");
120 active = true;
121
122
123 this.localAddress = fd.localAddress();
124 this.remoteAddress = remote;
125 }
126
127 @Override
128 @SuppressWarnings("unchecked")
129 protected <T> T getExtendedOption(ChannelOption<T> option) {
130 try {
131 if (option instanceof IntegerUnixChannelOption) {
132 IntegerUnixChannelOption opt = (IntegerUnixChannelOption) option;
133 return (T) Integer.valueOf(socket.getIntOpt(opt.level(), opt.optname()));
134 }
135 if (option instanceof RawUnixChannelOption) {
136 RawUnixChannelOption opt = (RawUnixChannelOption) option;
137 ByteBuffer out = ByteBuffer.allocate(opt.length());
138 socket.getRawOpt(opt.level(), opt.optname(), out);
139 return (T) out.flip();
140 }
141 } catch (IOException e) {
142 throw new ChannelException(e);
143 }
144 return super.getExtendedOption(option);
145 }
146
147 @Override
148 protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
149 try {
150 if (option instanceof IntegerUnixChannelOption) {
151 IntegerUnixChannelOption opt = (IntegerUnixChannelOption) option;
152 socket.setIntOpt(opt.level(), opt.optname(), (Integer) value);
153 return;
154 } else if (option instanceof RawUnixChannelOption) {
155 RawUnixChannelOption opt = (RawUnixChannelOption) option;
156 socket.setRawOpt(opt.level(), opt.optname(), (ByteBuffer) value);
157 return;
158 }
159 } catch (IOException e) {
160 throw new ChannelException(e);
161 }
162 super.setExtendedOption(option, value);
163 }
164
165 @Override
166 protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
167 if (option instanceof IntegerUnixChannelOption || option instanceof RawUnixChannelOption) {
168 return true;
169 }
170 return super.isExtendedOptionSupported(option);
171 }
172
173 protected void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
174 this.maxBytesPerGatheringWrite = min(SSIZE_MAX, maxBytesPerGatheringWrite);
175 }
176
177 protected long getMaxBytesPerGatheringWrite() {
178 return maxBytesPerGatheringWrite;
179 }
180
181 @Override
182 protected final void autoReadCleared() {
183 clearReadFilter();
184 }
185
186 static boolean isSoErrorZero(BsdSocket fd) {
187 try {
188 return fd.getSoError() == 0;
189 } catch (IOException e) {
190 throw new ChannelException(e);
191 }
192 }
193
194 protected KQueueRegistration registration() {
195 assert registration != null;
196 return registration;
197 }
198
199 @Override
200 public final FileDescriptor fd() {
201 return socket;
202 }
203
204 @Override
205 public boolean isActive() {
206 return active;
207 }
208
209 @Override
210 protected void doClose() throws Exception {
211 active = false;
212
213
214 inputClosedSeenErrorOnRead = true;
215 socket.close();
216 }
217
218 @Override
219 protected void doDisconnect() throws Exception {
220 doClose();
221 }
222
223 @SuppressWarnings("unchecked")
224 final void resetCachedAddresses() {
225 cacheAddresses(localAddress, null);
226 remoteAddress = null;
227 }
228
229 @Override
230 public final boolean isOpen() {
231 return socket.isOpen();
232 }
233
234 @Override
235 protected final void doBeginRead() {
236
237 readPending = true;
238
239
240
241
242 readFilter(true);
243
244
245
246 if (maybeMoreDataToRead) {
247 executeReadReadyRunnable();
248 }
249 }
250
251 final void register0(KQueueRegistration registration) {
252 this.registration = registration;
253
254
255
256 readReadyRunnablePending = false;
257
258
259 if (writeFilterEnabled) {
260 evSet0(registration, Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
261 }
262 if (readFilterEnabled) {
263 evSet0(registration, Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
264 }
265 evSet0(registration, Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
266 }
267
268 final void deregister0() {
269
270
271 readFilterEnabled = false;
272 writeFilterEnabled = false;
273 }
274
275 final void unregisterFilters() throws Exception {
276
277 readFilter(false);
278 writeFilter(false);
279
280 if (registration != null) {
281 evSet0(registration, Native.EVFILT_SOCK, Native.EV_DELETE, 0);
282 registration = null;
283 }
284 }
285
286
287
288
289 protected final Buffer newDirectBuffer(Buffer buf) {
290 return newDirectBuffer(buf, buf);
291 }
292
293
294
295
296
297 protected final Buffer newDirectBuffer(Resource<?> holder, Buffer buf) {
298 BufferAllocator allocator = ioBufferAllocator();
299 try (holder) {
300 int readableBytes = buf.readableBytes();
301 Buffer directCopy = allocator.allocate(readableBytes);
302 if (readableBytes > 0) {
303 directCopy.writeBytes(buf);
304 }
305 return directCopy;
306 }
307 }
308
309 protected static void checkResolvable(InetSocketAddress addr) {
310 if (addr.isUnresolved()) {
311 throw new UnresolvedAddressException();
312 }
313 }
314
315
316
317
318 protected final int doReadBytes(Buffer buffer) throws Exception {
319 recvBufAllocHandle().attemptedBytesRead(buffer.writableBytes());
320 try (var iterator = buffer.forEachWritable()) {
321 var component = iterator.first();
322 if (component == null) {
323 recvBufAllocHandle().lastBytesRead(0);
324 return 0;
325 }
326 long address = component.writableNativeAddress();
327 assert address != 0;
328 int localReadAmount = socket.readAddress(address, 0, component.writableBytes());
329 recvBufAllocHandle().lastBytesRead(localReadAmount);
330 if (localReadAmount > 0) {
331 component.skipWritableBytes(localReadAmount);
332 }
333 return localReadAmount;
334 }
335 }
336
337 protected final int doWriteBytes(ChannelOutboundBuffer in, Buffer buf) throws Exception {
338 int initialReaderOffset = buf.readerOffset();
339 buf.forEachReadable(0, (index, component) -> {
340 long address = component.readableNativeAddress();
341 assert address != 0;
342 int written = socket.writeAddress(address, 0, component.readableBytes());
343 if (written > 0) {
344 component.skipReadableBytes(written);
345 }
346 return false;
347 });
348 int readerOffset = buf.readerOffset();
349 if (initialReaderOffset < readerOffset) {
350 buf.readerOffset(initialReaderOffset);
351 int bytesWritten = readerOffset - initialReaderOffset;
352 in.removeBytes(bytesWritten);
353 return 1;
354 }
355 return WRITE_STATUS_SNDBUF_FULL;
356 }
357
358 final boolean shouldBreakReadReady() {
359 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure());
360 }
361
362 private void clearReadFilter() {
363
364 if (isRegistered()) {
365 final EventLoop loop = executor();
366 if (loop.inEventLoop()) {
367 clearReadFilter0();
368 } else {
369
370 loop.execute(() -> {
371 if (readPending && !isAutoRead()) {
372
373 clearReadFilter0();
374 }
375 });
376 }
377 } else {
378
379
380 readFilterEnabled = false;
381 }
382 }
383
384 final void readFilter(boolean readFilterEnabled) {
385 if (this.readFilterEnabled != readFilterEnabled) {
386 this.readFilterEnabled = readFilterEnabled;
387 evSet(Native.EVFILT_READ, readFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
388 }
389 }
390
391 final void writeFilter(boolean writeFilterEnabled) {
392 if (this.writeFilterEnabled != writeFilterEnabled) {
393 this.writeFilterEnabled = writeFilterEnabled;
394 evSet(Native.EVFILT_WRITE, writeFilterEnabled ? Native.EV_ADD_CLEAR_ENABLE : Native.EV_DELETE_DISABLE);
395 }
396 }
397
398 private void evSet(short filter, short flags) {
399 if (isRegistered()) {
400 evSet0(registration, filter, flags);
401 }
402 }
403
404 private void evSet0(KQueueRegistration registration, short filter, short flags) {
405 evSet0(registration, filter, flags, 0);
406 }
407
408 private void evSet0(KQueueRegistration registration, short filter, short flags, int fflags) {
409
410 if (isOpen()) {
411 registration.evSet(filter, flags, fflags);
412 }
413 }
414
415 final void readReady(long numberBytesPending) {
416 RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
417 allocHandle.reset();
418 this.numberBytesPending = numberBytesPending;
419 allocHandle.attemptedBytesRead((int) Math.min(numberBytesPending, Integer.MAX_VALUE));
420
421 assert executor().inEventLoop();
422 if (shouldBreakReadReady()) {
423 clearReadFilter0();
424 return;
425 }
426 maybeMoreDataToRead = false;
427
428 try {
429 readReady(allocHandle, ioBufferAllocator(), maybeMoreData);
430 } finally {
431 maybeMoreDataToRead = this.numberBytesPending != 0;
432
433 if (eof || readPending && maybeMoreDataToRead) {
434
435
436
437
438
439
440
441 executeReadReadyRunnable();
442 } else if (!readPending && !isAutoRead()) {
443
444
445
446
447
448
449 clearReadFilter0();
450 }
451 }
452 }
453
454 abstract void readReady(RecvBufferAllocator.Handle allocHandle, BufferAllocator recvBufferAllocator,
455 Predicate<RecvBufferAllocator.Handle> maybeMoreData);
456
457 final void writeReady() {
458 if (isConnectPending()) {
459
460 finishConnect();
461 } else if (!socket.isOutputShutdown()) {
462
463 super.writeFlushed();
464 }
465 }
466
467
468
469
470 final void shutdownInput(boolean readEOF) {
471
472
473
474
475
476 if (readEOF && isConnectPending()) {
477 finishConnect();
478 }
479 if (!socket.isInputShutdown()) {
480 if (isAllowHalfClosure()) {
481 shutdownTransport(ChannelShutdownDirection.Inbound, newPromise());
482 } else {
483 closeTransport(newPromise());
484 }
485 } else if (!readEOF) {
486 inputClosedSeenErrorOnRead = true;
487 }
488 }
489
490 final void readEOF() {
491
492 final RecvBufferAllocator.Handle allocHandle = recvBufAllocHandle();
493 eof = true;
494
495 if (isActive()) {
496
497
498
499 readReady(allocHandle, ioBufferAllocator(), maybeMoreData);
500 } else {
501
502 shutdownInput(true);
503 }
504 }
505
506 @Override
507 protected final void writeFlushed() {
508
509
510
511 if (!writeFilterEnabled) {
512 super.writeFlushed();
513 }
514 }
515
516 private void executeReadReadyRunnable() {
517 if (readReadyRunnablePending || !isActive() || shouldBreakReadReady()) {
518 return;
519 }
520 readReadyRunnablePending = true;
521 executor().execute(readReadyRunnable);
522 }
523
524 private void clearReadFilter0() {
525 assert executor().inEventLoop();
526 try {
527 readPending = false;
528 readFilter(false);
529 } catch (Throwable e) {
530
531
532 pipeline().fireChannelExceptionCaught(e);
533 closeTransport(newPromise());
534 }
535 }
536
537 @Override
538 protected final boolean doFinishConnect(SocketAddress requestedRemoteAddress) throws Exception {
539 if (socket.finishConnect()) {
540 active = true;
541 writeFilter(false);
542 if (requestedRemoteAddress instanceof InetSocketAddress) {
543 remoteAddress = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
544 } else {
545 remoteAddress = requestedRemoteAddress;
546 }
547 return true;
548 }
549 writeFilter(true);
550 return false;
551 }
552
553 @SuppressWarnings("unchecked")
554 @Override
555 protected void doBind(SocketAddress local) throws Exception {
556 if (local instanceof InetSocketAddress) {
557 checkResolvable((InetSocketAddress) local);
558 }
559 socket.bind(local);
560 if (fetchLocalAddress()) {
561 this.localAddress = socket.localAddress();
562 } else {
563 this.localAddress = local;
564 }
565 }
566
567 protected boolean fetchLocalAddress() {
568 return socket.protocolFamily() != SocketProtocolFamily.UNIX;
569 }
570
571
572
573
574 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
575 if (localAddress instanceof InetSocketAddress) {
576 checkResolvable((InetSocketAddress) localAddress);
577 }
578
579 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
580 ? (InetSocketAddress) remoteAddress : null;
581 if (remoteSocketAddr != null) {
582 checkResolvable(remoteSocketAddr);
583 }
584
585 if (localAddress != null) {
586 doBind(localAddress);
587 }
588
589 boolean connected = doConnect0(remoteAddress, localAddress);
590 if (connected) {
591 active = true;
592 this.remoteAddress = remoteSocketAddr == null?
593 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
594 }
595
596 if (fetchLocalAddress()) {
597
598
599
600 this.localAddress = socket.localAddress();
601 }
602 return connected;
603 }
604
605 protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
606 boolean success = false;
607 try {
608 boolean connected = socket.connect(remoteAddress);
609 if (!connected) {
610 writeFilter(true);
611 }
612 success = true;
613 return connected;
614 } finally {
615 if (!success) {
616 doClose();
617 }
618 }
619 }
620
621 @Override
622 protected SocketAddress localAddress0() {
623 return localAddress;
624 }
625
626 @Override
627 protected SocketAddress remoteAddress0() {
628 return remoteAddress;
629 }
630
631 final void closeTransportNow() {
632 closeTransport(newPromise());
633 }
634
635 private BufferAllocator ioBufferAllocator() {
636 BufferAllocator alloc = bufferAllocator();
637
638 if (!alloc.getAllocationType().isDirect()) {
639 return DefaultBufferAllocators.offHeapAllocator();
640 }
641 return alloc;
642 }
643 }