1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.epoll;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.channel.AdaptiveRecvBufferAllocator;
21 import io.netty5.channel.ChannelException;
22 import io.netty5.channel.ChannelMetadata;
23 import io.netty5.channel.ChannelOption;
24 import io.netty5.channel.ChannelOutboundBuffer;
25 import io.netty5.channel.ChannelPipeline;
26 import io.netty5.channel.ChannelShutdownDirection;
27 import io.netty5.channel.DefaultFileRegion;
28 import io.netty5.channel.EventLoop;
29 import io.netty5.channel.FileRegion;
30 import io.netty5.channel.RecvBufferAllocator;
31 import io.netty5.channel.internal.ChannelUtils;
32 import io.netty5.channel.socket.SocketChannel;
33 import io.netty5.channel.socket.SocketProtocolFamily;
34 import io.netty5.channel.unix.DomainSocketReadMode;
35 import io.netty5.channel.unix.FileDescriptor;
36 import io.netty5.channel.unix.IntegerUnixChannelOption;
37 import io.netty5.channel.unix.IovArray;
38 import io.netty5.channel.unix.PeerCredentials;
39 import io.netty5.channel.unix.RawUnixChannelOption;
40 import io.netty5.channel.unix.SocketWritableByteChannel;
41 import io.netty5.channel.unix.UnixChannel;
42 import io.netty5.channel.unix.UnixChannelOption;
43 import io.netty5.channel.unix.UnixChannelUtil;
44 import io.netty5.util.Resource;
45 import io.netty5.util.concurrent.Future;
46 import io.netty5.util.concurrent.GlobalEventExecutor;
47 import io.netty5.util.internal.StringUtil;
48
49 import java.io.IOException;
50 import java.net.InetAddress;
51 import java.net.ProtocolFamily;
52 import java.net.SocketAddress;
53 import java.nio.ByteBuffer;
54 import java.nio.channels.NotYetConnectedException;
55 import java.nio.channels.WritableByteChannel;
56 import java.util.Collection;
57 import java.util.Collections;
58 import java.util.Map;
59 import java.util.Set;
60 import java.util.concurrent.Executor;
61 import java.util.function.Predicate;
62
63 import static io.netty5.channel.ChannelOption.IP_TOS;
64 import static io.netty5.channel.ChannelOption.SO_KEEPALIVE;
65 import static io.netty5.channel.ChannelOption.SO_LINGER;
66 import static io.netty5.channel.ChannelOption.SO_RCVBUF;
67 import static io.netty5.channel.ChannelOption.SO_REUSEADDR;
68 import static io.netty5.channel.ChannelOption.SO_SNDBUF;
69 import static io.netty5.channel.ChannelOption.TCP_NODELAY;
70 import static io.netty5.channel.epoll.Native.IS_SUPPORTING_TCP_FASTOPEN_CLIENT;
71 import static io.netty5.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
72 import static io.netty5.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
73 import static io.netty5.channel.unix.Limits.SSIZE_MAX;
74 import static io.netty5.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE;
75 import static java.util.Objects.requireNonNull;
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 public final class EpollSocketChannel
125 extends AbstractEpollChannel<EpollServerSocketChannel>
126 implements SocketChannel {
127
128 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
129 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN_SOCKET = supportedOptionsDomainSocket();
130
131 private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
132 private static final String EXPECTED_TYPES =
133 " (expected: " + StringUtil.simpleClassName(Buffer.class) + ", " +
134 StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
135
136
137 private final Runnable flushTask = this::writeFlushed;
138
139 private WritableByteChannel byteChannel;
140 private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
141
142 private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
143
144 private volatile DomainSocketReadMode mode = DomainSocketReadMode.BYTES;
145
146 private volatile boolean tcpFastopen;
147
148 private static final Predicate<RecvBufferAllocator.Handle> MAYBE_MORE_DATA = h ->
149 h.lastBytesRead() == h.attemptedBytesRead();
150
151 private static final Predicate<RecvBufferAllocator.Handle> MAYBE_MORE_DATA_RDHUP = h -> true;
152
153 public EpollSocketChannel(EventLoop eventLoop) {
154 this(eventLoop, (ProtocolFamily) null);
155 }
156
157 public EpollSocketChannel(EventLoop eventLoop, ProtocolFamily protocolFamily) {
158
159 super(null, eventLoop, METADATA, Native.EPOLLRDHUP, new AdaptiveRecvBufferAllocator(),
160 LinuxSocket.newSocket(protocolFamily), false);
161 }
162
163 public EpollSocketChannel(EventLoop eventLoop, int fd, ProtocolFamily family) {
164 this(eventLoop, new LinuxSocket(fd, SocketProtocolFamily.of(family)));
165 }
166
167 private EpollSocketChannel(EventLoop eventLoop, LinuxSocket socket) {
168
169 super(null, eventLoop, METADATA, Native.EPOLLRDHUP, new AdaptiveRecvBufferAllocator(),
170 socket, isSoErrorZero(socket));
171 }
172
173 EpollSocketChannel(EpollServerSocketChannel parent, EventLoop eventLoop,
174 LinuxSocket fd, SocketAddress remoteAddress) {
175
176 super(parent, eventLoop, METADATA, Native.EPOLLRDHUP, new AdaptiveRecvBufferAllocator(), fd, remoteAddress);
177
178 if (fd.protocolFamily() != SocketProtocolFamily.UNIX && parent != null) {
179 tcpMd5SigAddresses = parent.tcpMd5SigAddresses();
180 }
181 }
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197 private int writeBytes(ChannelOutboundBuffer in, Buffer buf) throws Exception {
198 int readableBytes = buf.readableBytes();
199 if (readableBytes == 0) {
200 in.remove();
201 return 0;
202 }
203
204 int readableComponents = buf.countReadableComponents();
205 if (readableComponents == 1) {
206 return doWriteBytes(in, buf);
207 } else {
208 ByteBuffer[] nioBuffers = new ByteBuffer[readableComponents];
209 buf.forEachReadable(0, (index, component) -> {
210 nioBuffers[index] = component.readableBuffer();
211 return true;
212 });
213 return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
214 getMaxBytesPerGatheringWrite());
215 }
216 }
217
218 void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
219 this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
220 }
221
222 long getMaxBytesPerGatheringWrite() {
223 return maxBytesPerGatheringWrite;
224 }
225
226 private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
227
228
229
230 if (attempted == written) {
231 if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
232 setMaxBytesPerGatheringWrite(attempted << 1);
233 }
234 } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
235 setMaxBytesPerGatheringWrite(attempted >>> 1);
236 }
237 }
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254 private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
255 final long expectedWrittenBytes = array.size();
256 assert expectedWrittenBytes != 0;
257 final int cnt = array.count();
258 assert cnt != 0;
259
260 final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
261 if (localWrittenBytes > 0) {
262 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
263 in.removeBytes(localWrittenBytes);
264 return 1;
265 }
266 return WRITE_STATUS_SNDBUF_FULL;
267 }
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 private int writeBytesMultiple(
288 ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
289 long maxBytesPerGatheringWrite) throws IOException {
290 assert expectedWrittenBytes != 0;
291 if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
292 expectedWrittenBytes = maxBytesPerGatheringWrite;
293 }
294
295 final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
296 if (localWrittenBytes > 0) {
297 adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
298 in.removeBytes(localWrittenBytes);
299 return 1;
300 }
301 return WRITE_STATUS_SNDBUF_FULL;
302 }
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318 private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
319 final long offset = region.transferred();
320 final long regionCount = region.count();
321 if (offset >= regionCount) {
322 in.remove();
323 return 0;
324 }
325
326 final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
327 if (flushedAmount > 0) {
328 in.progress(flushedAmount);
329 if (region.transferred() >= regionCount) {
330 in.remove();
331 }
332 return 1;
333 }
334 if (flushedAmount == 0) {
335 validateFileRegion(region, offset);
336 }
337 return WRITE_STATUS_SNDBUF_FULL;
338 }
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354 private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
355 if (region.transferred() >= region.count()) {
356 in.remove();
357 return 0;
358 }
359
360 if (byteChannel == null) {
361 byteChannel = new EpollSocketWritableByteChannel();
362 }
363 final long flushedAmount = region.transferTo(byteChannel, region.transferred());
364 if (flushedAmount > 0) {
365 in.progress(flushedAmount);
366 if (region.transferred() >= region.count()) {
367 in.remove();
368 }
369 return 1;
370 }
371 return WRITE_STATUS_SNDBUF_FULL;
372 }
373
374 @Override
375 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
376 int writeSpinCount = getWriteSpinCount();
377 do {
378 final int msgCount = in.size();
379
380 if (msgCount > 1 && in.current() instanceof Buffer) {
381 writeSpinCount -= doWriteMultiple(in);
382 } else if (msgCount == 0) {
383
384 clearFlag(Native.EPOLLOUT);
385
386 return;
387 } else {
388 writeSpinCount -= doWriteSingle(in);
389 }
390
391
392
393
394 } while (writeSpinCount > 0);
395
396 if (writeSpinCount == 0) {
397
398
399
400
401 clearFlag(Native.EPOLLOUT);
402
403
404 executor().execute(flushTask);
405 } else {
406
407
408 setFlag(Native.EPOLLOUT);
409 }
410 }
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426 private int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
427
428 Object msg = in.current();
429 if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
430
431 in.remove();
432 return 1;
433 }
434 if (msg instanceof Buffer) {
435 return writeBytes(in, (Buffer) msg);
436 } else if (msg instanceof DefaultFileRegion) {
437 return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
438 } else if (msg instanceof FileRegion) {
439 return writeFileRegion(in, (FileRegion) msg);
440 } else {
441
442 throw new Error();
443 }
444 }
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460 private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
461 final long maxBytesPerGatheringWrite = getMaxBytesPerGatheringWrite();
462 IovArray array = registration().cleanIovArray();
463 array.maxBytes(maxBytesPerGatheringWrite);
464 in.forEachFlushedMessage(array);
465
466 if (array.count() >= 1) {
467 return writeBytesMultiple(in, array);
468 }
469
470 in.removeBytes(0);
471 return 0;
472 }
473
474 @Override
475 protected Object filterOutboundMessage(Object msg) {
476 if (socket.protocolFamily() == SocketProtocolFamily.UNIX && msg instanceof FileDescriptor) {
477 return msg;
478 }
479 if (msg instanceof Buffer) {
480 Buffer buf = (Buffer) msg;
481 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
482 }
483
484 if (msg instanceof FileRegion) {
485 return msg;
486 }
487
488 throw new UnsupportedOperationException(
489 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
490 }
491
492 @Override
493 protected void doShutdown(ChannelShutdownDirection direction) throws Exception {
494 switch (direction) {
495 case Outbound:
496 socket.shutdown(false, true);
497 break;
498 case Inbound:
499 try {
500 socket.shutdown(true, false);
501 } catch (NotYetConnectedException ignore) {
502
503
504 }
505 break;
506 default:
507 throw new AssertionError();
508 }
509 }
510
511 @Override
512 public boolean isShutdown(ChannelShutdownDirection direction) {
513 if (!isActive()) {
514 return true;
515 }
516 switch (direction) {
517 case Outbound:
518 return socket.isOutputShutdown();
519 case Inbound:
520 return socket.isInputShutdown();
521 default:
522 throw new AssertionError();
523 }
524 }
525
526 private void handleReadException(ChannelPipeline pipeline, Buffer buffer, Throwable cause, boolean close,
527 RecvBufferAllocator.Handle allocHandle) {
528 if (buffer.readableBytes() > 0) {
529 readPending = false;
530 pipeline.fireChannelRead(buffer);
531 } else {
532 buffer.close();
533 }
534 allocHandle.readComplete();
535 pipeline.fireChannelReadComplete();
536 pipeline.fireChannelExceptionCaught(cause);
537
538
539
540 if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
541 shutdownInput(false);
542 } else {
543 readIfIsAutoRead();
544 }
545 }
546
547 @Override
548 protected void epollInReady(RecvBufferAllocator.Handle handle, BufferAllocator recvBufferAllocator,
549 boolean receivedRdHup) {
550 if (socket.protocolFamily() == SocketProtocolFamily.UNIX
551 && getReadMode() == DomainSocketReadMode.FILE_DESCRIPTORS) {
552 epollInReadFd(handle, receivedRdHup);
553 } else {
554 epollInReadyBytes(handle, recvBufferAllocator, receivedRdHup);
555 }
556 }
557
558 private static Predicate<RecvBufferAllocator.Handle> maybeMoreData(boolean receivedRdHup) {
559 return receivedRdHup ? MAYBE_MORE_DATA_RDHUP : MAYBE_MORE_DATA;
560 }
561
562 private void epollInReadyBytes(RecvBufferAllocator.Handle recvAlloc, BufferAllocator bufferAllocator,
563 boolean receivedRdHup) {
564 final ChannelPipeline pipeline = pipeline();
565 Predicate<RecvBufferAllocator.Handle> maybeMoreData = maybeMoreData(receivedRdHup);
566
567 Buffer buffer = null;
568 boolean close = false;
569 try {
570 do {
571
572
573 buffer = recvAlloc.allocate(bufferAllocator);
574 doReadBytes(buffer);
575 if (recvAlloc.lastBytesRead() <= 0) {
576
577 Resource.dispose(buffer);
578 buffer = null;
579 close = recvAlloc.lastBytesRead() < 0;
580 if (close) {
581
582 readPending = false;
583 }
584 break;
585 }
586 recvAlloc.incMessagesRead(1);
587 readPending = false;
588 pipeline.fireChannelRead(buffer);
589 buffer = null;
590
591 if (shouldBreakEpollInReady()) {
592
593
594
595
596
597
598
599
600
601
602
603 break;
604 }
605 } while (recvAlloc.continueReading(isAutoRead(), maybeMoreData)
606 && !isShutdown(ChannelShutdownDirection.Inbound));
607
608 recvAlloc.readComplete();
609 pipeline.fireChannelReadComplete();
610
611 if (close) {
612 shutdownInput(false);
613 } else {
614 readIfIsAutoRead();
615 }
616 } catch (Throwable t) {
617 handleReadException(pipeline, buffer, t, close, recvAlloc);
618 }
619 }
620
621 private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
622 EpollSocketWritableByteChannel() {
623 super(socket);
624 }
625
626 @Override
627 protected BufferAllocator alloc() {
628 return bufferAllocator();
629 }
630 }
631
632 @SuppressWarnings("unchecked")
633 @Override
634 protected <T> T getExtendedOption(ChannelOption<T> option) {
635 if (isOptionSupported(socket.protocolFamily(), option)) {
636 if (option == SO_RCVBUF) {
637 return (T) Integer.valueOf(getReceiveBufferSize());
638 }
639 if (option == SO_SNDBUF) {
640 return (T) Integer.valueOf(getSendBufferSize());
641 }
642 if (option == TCP_NODELAY) {
643 return (T) Boolean.valueOf(isTcpNoDelay());
644 }
645 if (option == SO_KEEPALIVE) {
646 return (T) Boolean.valueOf(isKeepAlive());
647 }
648 if (option == SO_REUSEADDR) {
649 return (T) Boolean.valueOf(isReuseAddress());
650 }
651 if (option == SO_LINGER) {
652 return (T) Integer.valueOf(getSoLinger());
653 }
654 if (option == IP_TOS) {
655 return (T) Integer.valueOf(getTrafficClass());
656 }
657 if (option == EpollChannelOption.TCP_CORK) {
658 return (T) Boolean.valueOf(isTcpCork());
659 }
660 if (option == EpollChannelOption.TCP_NOTSENT_LOWAT) {
661 return (T) Long.valueOf(getTcpNotSentLowAt());
662 }
663 if (option == EpollChannelOption.TCP_KEEPIDLE) {
664 return (T) Integer.valueOf(getTcpKeepIdle());
665 }
666 if (option == EpollChannelOption.TCP_KEEPINTVL) {
667 return (T) Integer.valueOf(getTcpKeepIntvl());
668 }
669 if (option == EpollChannelOption.TCP_KEEPCNT) {
670 return (T) Integer.valueOf(getTcpKeepCnt());
671 }
672 if (option == EpollChannelOption.TCP_USER_TIMEOUT) {
673 return (T) Integer.valueOf(getTcpUserTimeout());
674 }
675 if (option == EpollChannelOption.TCP_QUICKACK) {
676 return (T) Boolean.valueOf(isTcpQuickAck());
677 }
678 if (option == EpollChannelOption.IP_TRANSPARENT) {
679 return (T) Boolean.valueOf(isIpTransparent());
680 }
681 if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
682 return (T) Boolean.valueOf(isTcpFastOpenConnect());
683 }
684 if (option == EpollChannelOption.SO_BUSY_POLL) {
685 return (T) Integer.valueOf(getSoBusyPoll());
686 }
687 if (option == DOMAIN_SOCKET_READ_MODE) {
688 return (T) getReadMode();
689 }
690 if (option == EpollChannelOption.TCP_INFO) {
691 return (T) getTcpInfo();
692 }
693 if (option == UnixChannelOption.SO_PEERCRED) {
694 return (T) getPeerCredentials();
695 }
696 }
697 return super.getExtendedOption(option);
698 }
699
700 @SuppressWarnings("unchecked")
701 @Override
702 protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
703 if (isOptionSupported(socket.protocolFamily(), option)) {
704 if (option == SO_RCVBUF) {
705 setReceiveBufferSize((Integer) value);
706 } else if (option == SO_SNDBUF) {
707 setSendBufferSize((Integer) value);
708 } else if (option == TCP_NODELAY) {
709 setTcpNoDelay((Boolean) value);
710 } else if (option == SO_KEEPALIVE) {
711 setKeepAlive((Boolean) value);
712 } else if (option == SO_REUSEADDR) {
713 setReuseAddress((Boolean) value);
714 } else if (option == SO_LINGER) {
715 setSoLinger((Integer) value);
716 } else if (option == IP_TOS) {
717 setTrafficClass((Integer) value);
718 } else if (option == EpollChannelOption.TCP_CORK) {
719 setTcpCork((Boolean) value);
720 } else if (option == EpollChannelOption.TCP_NOTSENT_LOWAT) {
721 setTcpNotSentLowAt((Long) value);
722 } else if (option == EpollChannelOption.TCP_KEEPIDLE) {
723 setTcpKeepIdle((Integer) value);
724 } else if (option == EpollChannelOption.TCP_KEEPCNT) {
725 setTcpKeepCnt((Integer) value);
726 } else if (option == EpollChannelOption.TCP_KEEPINTVL) {
727 setTcpKeepIntvl((Integer) value);
728 } else if (option == EpollChannelOption.TCP_USER_TIMEOUT) {
729 setTcpUserTimeout((Integer) value);
730 } else if (option == EpollChannelOption.IP_TRANSPARENT) {
731 setIpTransparent((Boolean) value);
732 } else if (option == EpollChannelOption.TCP_MD5SIG) {
733 @SuppressWarnings("unchecked")
734 final Map<InetAddress, byte[]> m = (Map<InetAddress, byte[]>) value;
735 setTcpMd5Sig(m);
736 } else if (option == EpollChannelOption.TCP_QUICKACK) {
737 setTcpQuickAck((Boolean) value);
738 } else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
739 setTcpFastOpenConnect((Boolean) value);
740 } else if (option == EpollChannelOption.SO_BUSY_POLL) {
741 setSoBusyPoll((Integer) value);
742 } else if (option == DOMAIN_SOCKET_READ_MODE) {
743 setReadMode((DomainSocketReadMode) value);
744 } else if (option == EpollChannelOption.TCP_INFO) {
745 throw new UnsupportedOperationException("read-only option: " + option);
746 } else if (option == UnixChannelOption.SO_PEERCRED) {
747 throw new UnsupportedOperationException("read-only option: " + option);
748 }
749 } else {
750 super.setExtendedOption(option, value);
751 }
752 }
753
754 private static boolean isOptionSupported(SocketProtocolFamily family, ChannelOption<?> option) {
755 if (family == SocketProtocolFamily.UNIX) {
756 return SUPPORTED_OPTIONS_DOMAIN_SOCKET.contains(option);
757 }
758 return SUPPORTED_OPTIONS.contains(option);
759 }
760
761 @Override
762 protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
763 return isOptionSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
764 }
765
766 private static Set<ChannelOption<?>> supportedOptions() {
767 return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER,
768 IP_TOS, EpollChannelOption.TCP_CORK, EpollChannelOption.TCP_KEEPIDLE, EpollChannelOption.TCP_KEEPCNT,
769 EpollChannelOption.TCP_KEEPINTVL, EpollChannelOption.TCP_USER_TIMEOUT,
770 EpollChannelOption.IP_TRANSPARENT, EpollChannelOption.TCP_MD5SIG, EpollChannelOption.TCP_QUICKACK,
771 ChannelOption.TCP_FASTOPEN_CONNECT, EpollChannelOption.SO_BUSY_POLL,
772 EpollChannelOption.TCP_NOTSENT_LOWAT, EpollChannelOption.TCP_INFO);
773 }
774
775 private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
776 return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_SNDBUF, DOMAIN_SOCKET_READ_MODE,
777 UnixChannelOption.SO_PEERCRED);
778 }
779
780 private int getReceiveBufferSize() {
781 try {
782 return socket.getReceiveBufferSize();
783 } catch (IOException e) {
784 throw new ChannelException(e);
785 }
786 }
787
788 private int getSendBufferSize() {
789 try {
790 return socket.getSendBufferSize();
791 } catch (IOException e) {
792 throw new ChannelException(e);
793 }
794 }
795
796 private int getSoLinger() {
797 try {
798 return socket.getSoLinger();
799 } catch (IOException e) {
800 throw new ChannelException(e);
801 }
802 }
803
804 private int getTrafficClass() {
805 try {
806 return socket.getTrafficClass();
807 } catch (IOException e) {
808 throw new ChannelException(e);
809 }
810 }
811
812 private boolean isKeepAlive() {
813 try {
814 return socket.isKeepAlive();
815 } catch (IOException e) {
816 throw new ChannelException(e);
817 }
818 }
819
820 private boolean isReuseAddress() {
821 try {
822 return socket.isReuseAddress();
823 } catch (IOException e) {
824 throw new ChannelException(e);
825 }
826 }
827
828 private boolean isTcpNoDelay() {
829 try {
830 return socket.isTcpNoDelay();
831 } catch (IOException e) {
832 throw new ChannelException(e);
833 }
834 }
835
836
837
838
839 private boolean isTcpCork() {
840 try {
841 return socket.isTcpCork();
842 } catch (IOException e) {
843 throw new ChannelException(e);
844 }
845 }
846
847
848
849
850 private int getSoBusyPoll() {
851 try {
852 return socket.getSoBusyPoll();
853 } catch (IOException e) {
854 throw new ChannelException(e);
855 }
856 }
857
858
859
860
861
862 private long getTcpNotSentLowAt() {
863 try {
864 return socket.getTcpNotSentLowAt();
865 } catch (IOException e) {
866 throw new ChannelException(e);
867 }
868 }
869
870
871
872
873 private int getTcpKeepIdle() {
874 try {
875 return socket.getTcpKeepIdle();
876 } catch (IOException e) {
877 throw new ChannelException(e);
878 }
879 }
880
881
882
883
884 private int getTcpKeepIntvl() {
885 try {
886 return socket.getTcpKeepIntvl();
887 } catch (IOException e) {
888 throw new ChannelException(e);
889 }
890 }
891
892
893
894
895 private int getTcpKeepCnt() {
896 try {
897 return socket.getTcpKeepCnt();
898 } catch (IOException e) {
899 throw new ChannelException(e);
900 }
901 }
902
903
904
905
906 private int getTcpUserTimeout() {
907 try {
908 return socket.getTcpUserTimeout();
909 } catch (IOException e) {
910 throw new ChannelException(e);
911 }
912 }
913
914 private void setKeepAlive(boolean keepAlive) {
915 try {
916 socket.setKeepAlive(keepAlive);
917 } catch (IOException e) {
918 throw new ChannelException(e);
919 }
920 }
921
922 private void setReceiveBufferSize(int receiveBufferSize) {
923 try {
924 socket.setReceiveBufferSize(receiveBufferSize);
925 } catch (IOException e) {
926 throw new ChannelException(e);
927 }
928 }
929
930 private void setReuseAddress(boolean reuseAddress) {
931 try {
932 socket.setReuseAddress(reuseAddress);
933 } catch (IOException e) {
934 throw new ChannelException(e);
935 }
936 }
937
938 private void setSendBufferSize(int sendBufferSize) {
939 try {
940 socket.setSendBufferSize(sendBufferSize);
941 calculateMaxBytesPerGatheringWrite();
942 } catch (IOException e) {
943 throw new ChannelException(e);
944 }
945 }
946
947 private void setSoLinger(int soLinger) {
948 try {
949 socket.setSoLinger(soLinger);
950 } catch (IOException e) {
951 throw new ChannelException(e);
952 }
953 }
954
955 private void setTcpNoDelay(boolean tcpNoDelay) {
956 try {
957 socket.setTcpNoDelay(tcpNoDelay);
958 } catch (IOException e) {
959 throw new ChannelException(e);
960 }
961 }
962
963
964
965
966 private void setTcpCork(boolean tcpCork) {
967 try {
968 socket.setTcpCork(tcpCork);
969 } catch (IOException e) {
970 throw new ChannelException(e);
971 }
972 }
973
974
975
976
977 private void setSoBusyPoll(int loopMicros) {
978 try {
979 socket.setSoBusyPoll(loopMicros);
980 } catch (IOException e) {
981 throw new ChannelException(e);
982 }
983 }
984
985
986
987
988
989 private void setTcpNotSentLowAt(long tcpNotSentLowAt) {
990 try {
991 socket.setTcpNotSentLowAt(tcpNotSentLowAt);
992 } catch (IOException e) {
993 throw new ChannelException(e);
994 }
995 }
996
997 private void setTrafficClass(int trafficClass) {
998 try {
999 socket.setTrafficClass(trafficClass);
1000 } catch (IOException e) {
1001 throw new ChannelException(e);
1002 }
1003 }
1004
1005
1006
1007
1008 private void setTcpKeepIdle(int seconds) {
1009 try {
1010 socket.setTcpKeepIdle(seconds);
1011 } catch (IOException e) {
1012 throw new ChannelException(e);
1013 }
1014 }
1015
1016
1017
1018
1019 private void setTcpKeepIntvl(int seconds) {
1020 try {
1021 socket.setTcpKeepIntvl(seconds);
1022 } catch (IOException e) {
1023 throw new ChannelException(e);
1024 }
1025 }
1026
1027
1028
1029
1030 private void setTcpKeepCnt(int probes) {
1031 try {
1032 socket.setTcpKeepCnt(probes);
1033 } catch (IOException e) {
1034 throw new ChannelException(e);
1035 }
1036 }
1037
1038
1039
1040
1041 private void setTcpUserTimeout(int milliseconds) {
1042 try {
1043 socket.setTcpUserTimeout(milliseconds);
1044 } catch (IOException e) {
1045 throw new ChannelException(e);
1046 }
1047 }
1048
1049
1050
1051
1052
1053 public boolean isIpTransparent() {
1054 try {
1055 return socket.isIpTransparent();
1056 } catch (IOException e) {
1057 throw new ChannelException(e);
1058 }
1059 }
1060
1061
1062
1063
1064
1065 private void setIpTransparent(boolean transparent) {
1066 try {
1067 socket.setIpTransparent(transparent);
1068 } catch (IOException e) {
1069 throw new ChannelException(e);
1070 }
1071 }
1072
1073
1074
1075
1076
1077
1078 private void setTcpQuickAck(boolean quickAck) {
1079 try {
1080 socket.setTcpQuickAck(quickAck);
1081 } catch (IOException e) {
1082 throw new ChannelException(e);
1083 }
1084 }
1085
1086
1087
1088
1089
1090 private boolean isTcpQuickAck() {
1091 try {
1092 return socket.isTcpQuickAck();
1093 } catch (IOException e) {
1094 throw new ChannelException(e);
1095 }
1096 }
1097
1098 private void setReadMode(DomainSocketReadMode mode) {
1099 requireNonNull(mode, "mode");
1100 this.mode = mode;
1101 }
1102
1103 private DomainSocketReadMode getReadMode() {
1104 return mode;
1105 }
1106
1107
1108
1109
1110
1111
1112
1113 private void setTcpFastOpenConnect(boolean fastOpenConnect) {
1114 this.tcpFastopen = fastOpenConnect;
1115 }
1116
1117
1118
1119
1120 private boolean isTcpFastOpenConnect() {
1121 return tcpFastopen;
1122 }
1123
1124 private void calculateMaxBytesPerGatheringWrite() {
1125
1126 int newSendBufferSize = getSendBufferSize() << 1;
1127 if (newSendBufferSize > 0) {
1128 setMaxBytesPerGatheringWrite(newSendBufferSize);
1129 }
1130 }
1131
1132
1133
1134
1135
1136 private EpollTcpInfo getTcpInfo() {
1137 try {
1138 EpollTcpInfo info = new EpollTcpInfo();
1139 socket.getTcpInfo(info);
1140 return info;
1141 } catch (IOException e) {
1142 throw new ChannelException(e);
1143 }
1144 }
1145
1146 @Override
1147 protected boolean doConnect0(SocketAddress remote) throws Exception {
1148 if (IS_SUPPORTING_TCP_FASTOPEN_CLIENT && socket.protocolFamily() != SocketProtocolFamily.UNIX &&
1149 isTcpFastOpenConnect()) {
1150 ChannelOutboundBuffer outbound = outboundBuffer();
1151 outbound.addFlush();
1152 Object curr = outbound.current();
1153 if (curr instanceof Buffer) {
1154
1155
1156 final long localFlushedAmount;
1157 Buffer initialData = (Buffer) curr;
1158 localFlushedAmount = doWriteOrSendBytes(initialData, remote, true);
1159 if (localFlushedAmount > 0) {
1160
1161
1162 outbound.removeBytes(localFlushedAmount);
1163 return true;
1164 }
1165 }
1166 }
1167 return super.doConnect0(remote);
1168 }
1169
1170 @Override
1171 protected Future<Executor> prepareToClose() {
1172 if (socket.protocolFamily() != SocketProtocolFamily.UNIX) {
1173 try {
1174
1175
1176 if (isOpen() && getSoLinger() > 0) {
1177
1178
1179
1180
1181 executor().deregisterForIo(this).map(v -> GlobalEventExecutor.INSTANCE);
1182 }
1183 } catch (Throwable ignore) {
1184
1185
1186
1187 }
1188 }
1189 return null;
1190 }
1191
1192
1193
1194
1195
1196
1197 private void setTcpMd5Sig(Map<InetAddress, byte[]> keys) {
1198
1199 synchronized (this) {
1200 try {
1201 tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys);
1202 } catch (IOException e) {
1203 throw new ChannelException(e);
1204 }
1205 }
1206 }
1207
1208 private PeerCredentials getPeerCredentials() {
1209 try {
1210 return socket.getPeerCredentials();
1211 } catch (IOException e) {
1212 throw new ChannelException(e);
1213 }
1214 }
1215
1216 private void epollInReadFd(RecvBufferAllocator.Handle allocHandle, boolean receivedRdHup) {
1217 final ChannelPipeline pipeline = pipeline();
1218 Predicate<RecvBufferAllocator.Handle> maybeMoreData = maybeMoreData(receivedRdHup);
1219 try {
1220 readLoop: do {
1221
1222
1223
1224 allocHandle.lastBytesRead(socket.recvFd());
1225 switch(allocHandle.lastBytesRead()) {
1226 case 0:
1227 break readLoop;
1228 case -1:
1229 closeTransport(newPromise());
1230 return;
1231 default:
1232 allocHandle.incMessagesRead(1);
1233 readPending = false;
1234 pipeline.fireChannelRead(new FileDescriptor(allocHandle.lastBytesRead()));
1235 break;
1236 }
1237 } while (allocHandle.continueReading(isAutoRead(), maybeMoreData)
1238 && !isShutdown(ChannelShutdownDirection.Inbound));
1239
1240 allocHandle.readComplete();
1241 pipeline.fireChannelReadComplete();
1242 } catch (Throwable t) {
1243 allocHandle.readComplete();
1244 pipeline.fireChannelReadComplete();
1245 pipeline.fireChannelExceptionCaught(t);
1246 }
1247 }
1248
1249 @Override
1250 protected boolean maybeMoreDataToRead(RecvBufferAllocator.Handle handle) {
1251 return handle.lastBytesRead() == handle.attemptedBytesRead();
1252 }
1253 }