1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.channel.epoll;
17
18 import io.netty5.buffer.api.Buffer;
19 import io.netty5.buffer.api.BufferAllocator;
20 import io.netty5.channel.ChannelException;
21 import io.netty5.channel.ChannelOption;
22 import io.netty5.channel.ChannelShutdownDirection;
23 import io.netty5.channel.FixedRecvBufferAllocator;
24 import io.netty5.channel.RecvBufferAllocator;
25 import io.netty5.channel.socket.DomainSocketAddress;
26 import io.netty5.channel.socket.SocketProtocolFamily;
27 import io.netty5.channel.unix.DomainDatagramSocketAddress;
28 import io.netty5.channel.unix.IntegerUnixChannelOption;
29 import io.netty5.channel.unix.RawUnixChannelOption;
30 import io.netty5.channel.unix.RecvFromAddressDomainSocket;
31 import io.netty5.channel.unix.UnixChannel;
32 import io.netty5.channel.unix.UnixChannelOption;
33 import io.netty5.util.Resource;
34 import io.netty5.channel.AddressedEnvelope;
35 import io.netty5.channel.ChannelMetadata;
36 import io.netty5.channel.ChannelOutboundBuffer;
37 import io.netty5.channel.ChannelPipeline;
38 import io.netty5.channel.DefaultBufferAddressedEnvelope;
39 import io.netty5.channel.EventLoop;
40 import io.netty5.channel.socket.DatagramChannel;
41 import io.netty5.channel.socket.DatagramPacket;
42 import io.netty5.channel.unix.Errors;
43 import io.netty5.channel.unix.Errors.NativeIoException;
44 import io.netty5.channel.unix.SegmentedDatagramPacket;
45 import io.netty5.channel.unix.UnixChannelUtil;
46 import io.netty5.util.concurrent.Future;
47 import io.netty5.util.concurrent.Promise;
48 import io.netty5.util.internal.ObjectUtil;
49 import io.netty5.util.internal.RecyclableArrayList;
50 import io.netty5.util.internal.SilentDispose;
51 import io.netty5.util.internal.StringUtil;
52 import io.netty5.util.internal.logging.InternalLogger;
53 import io.netty5.util.internal.logging.InternalLoggerFactory;
54
55 import java.io.IOException;
56 import java.net.Inet4Address;
57 import java.net.InetAddress;
58 import java.net.InetSocketAddress;
59 import java.net.NetworkInterface;
60 import java.net.PortUnreachableException;
61 import java.net.ProtocolFamily;
62 import java.net.SocketAddress;
63 import java.net.SocketException;
64 import java.util.Set;
65 import java.util.function.Predicate;
66
67 import static java.util.Objects.requireNonNull;
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 public final class EpollDatagramChannel extends AbstractEpollChannel<UnixChannel> implements DatagramChannel {
102 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollDatagramChannel.class);
103 private static final ChannelMetadata METADATA = new ChannelMetadata(true);
104 private static final String EXPECTED_TYPES =
105 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
106 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
107 StringUtil.simpleClassName(Buffer.class) + ", " +
108 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
109 StringUtil.simpleClassName(Buffer.class) + ')';
110
111 private static final String EXPECTED_TYPES_DOMAIN_SOCKET =
112 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
113 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
114 StringUtil.simpleClassName(Buffer.class) + ", " +
115 StringUtil.simpleClassName(DomainSocketAddress.class) + ">, " +
116 StringUtil.simpleClassName(Buffer.class) + ')';
117 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
118
119 private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN_SOCKET = supportedOptionsDomainSocket();
120 private static final Predicate<RecvBufferAllocator.Handle> TRUE_SUPPLIER = h -> true;
121
122 private volatile boolean activeOnOpen;
123 private volatile int maxDatagramSize;
124 private volatile boolean gro;
125
126 private volatile boolean connected;
127 private volatile boolean inputShutdown;
128 private volatile boolean outputShutdown;
129
130
131
132
133
134
135 public static boolean isSegmentedDatagramPacketSupported() {
136 return Epoll.isAvailable() &&
137
138 Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
139 }
140
141
142
143
144
145 public EpollDatagramChannel(EventLoop eventLoop) {
146 this(eventLoop, null);
147 }
148
149
150
151
152
153 public EpollDatagramChannel(EventLoop eventLoop, ProtocolFamily family) {
154 this(eventLoop, LinuxSocket.newDatagramSocket(family), false);
155 }
156
157
158
159
160
161 public EpollDatagramChannel(EventLoop eventLoop, int fd, ProtocolFamily family) {
162 this(eventLoop, new LinuxSocket(fd, SocketProtocolFamily.of(family)), true);
163 }
164
165 private EpollDatagramChannel(EventLoop eventLoop, LinuxSocket fd, boolean active) {
166 super(null, eventLoop, METADATA, 0, new FixedRecvBufferAllocator(2048), fd, active);
167 }
168
169 @Override
170 public boolean isActive() {
171 return socket.isOpen() && (getActiveOnOpen() && isRegistered() || active);
172 }
173
174 @Override
175 public boolean isConnected() {
176 return connected;
177 }
178
179 private NetworkInterface networkInterface() throws SocketException {
180 NetworkInterface iface = getNetworkInterface();
181 if (iface == null) {
182 SocketAddress localAddress = localAddress();
183 if (localAddress instanceof InetSocketAddress) {
184 return NetworkInterface.getByInetAddress(((InetSocketAddress) localAddress()).getAddress());
185 }
186 }
187 return null;
188 }
189
190 @Override
191 public Future<Void> joinGroup(InetAddress multicastAddress) {
192 try {
193 return joinGroup(multicastAddress, networkInterface(), null);
194 } catch (IOException | UnsupportedOperationException e) {
195 return newFailedFuture(e);
196 }
197 }
198
199 @Override
200 public Future<Void> joinGroup(
201 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
202 requireNonNull(multicastAddress, "multicastAddress");
203 requireNonNull(networkInterface, "networkInterface");
204
205 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
206 return newFailedFuture(new UnsupportedOperationException("Multicast not supported"));
207 }
208
209 Promise<Void> promise = newPromise();
210 if (executor().inEventLoop()) {
211 joinGroup0(multicastAddress, networkInterface, source, promise);
212 } else {
213 executor().execute(() -> joinGroup0(multicastAddress, networkInterface, source, promise));
214 }
215 return promise.asFuture();
216 }
217
218 private void joinGroup0(InetAddress multicastAddress, NetworkInterface networkInterface,
219 InetAddress source, Promise<Void> promise) {
220 assertEventLoop();
221
222 try {
223 socket.joinGroup(multicastAddress, networkInterface, source);
224 } catch (IOException e) {
225 promise.setFailure(e);
226 return;
227 }
228 promise.setSuccess(null);
229 }
230
231 @Override
232 public Future<Void> leaveGroup(InetAddress multicastAddress) {
233 try {
234 return leaveGroup(multicastAddress, networkInterface(), null);
235 } catch (IOException | UnsupportedOperationException e) {
236 return newFailedFuture(e);
237 }
238 }
239
240 @Override
241 public Future<Void> leaveGroup(
242 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
243 requireNonNull(multicastAddress, "multicastAddress");
244 requireNonNull(networkInterface, "networkInterface");
245
246 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
247 return newFailedFuture(new UnsupportedOperationException("Multicast not supported"));
248 }
249
250 Promise<Void> promise = newPromise();
251 if (executor().inEventLoop()) {
252 leaveGroup0(multicastAddress, networkInterface, source, promise);
253 } else {
254 executor().execute(() -> leaveGroup0(multicastAddress, networkInterface, source, promise));
255 }
256 return promise.asFuture();
257 }
258
259 private void leaveGroup0(
260 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
261 final Promise<Void> promise) {
262 assert executor().inEventLoop();
263
264 try {
265 socket.leaveGroup(multicastAddress, networkInterface, source);
266 } catch (IOException e) {
267 promise.setFailure(e);
268 return;
269 }
270 promise.setSuccess(null);
271 }
272
273 @Override
274 public Future<Void> block(
275 InetAddress multicastAddress, NetworkInterface networkInterface,
276 InetAddress sourceToBlock) {
277 requireNonNull(multicastAddress, "multicastAddress");
278 requireNonNull(sourceToBlock, "sourceToBlock");
279 requireNonNull(networkInterface, "networkInterface");
280 return newFailedFuture(new UnsupportedOperationException("Multicast block not supported"));
281 }
282
283 @Override
284 public Future<Void> block(
285 InetAddress multicastAddress, InetAddress sourceToBlock) {
286 try {
287 return block(
288 multicastAddress,
289 networkInterface(),
290 sourceToBlock);
291 } catch (IOException | UnsupportedOperationException e) {
292 return newFailedFuture(e);
293 }
294 }
295
296 @Override
297 protected void doShutdown(ChannelShutdownDirection direction) {
298 switch (direction) {
299 case Inbound:
300 inputShutdown = true;
301 break;
302 case Outbound:
303 outputShutdown = true;
304 break;
305 default:
306 throw new IllegalStateException();
307 }
308 }
309
310 @Override
311 public boolean isShutdown(ChannelShutdownDirection direction) {
312 if (!isActive()) {
313 return true;
314 }
315 switch (direction) {
316 case Inbound:
317 return inputShutdown;
318 case Outbound:
319 return outputShutdown;
320 default:
321 throw new AssertionError();
322 }
323 }
324
325 @Override
326 protected void doBind(SocketAddress localAddress) throws Exception {
327 if (localAddress instanceof InetSocketAddress) {
328 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
329 if (socketAddress.getAddress().isAnyLocalAddress() &&
330 socketAddress.getAddress() instanceof Inet4Address) {
331 if (socket.protocolFamily() == SocketProtocolFamily.INET6) {
332 localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
333 }
334 }
335 }
336 super.doBind(localAddress);
337 active = true;
338 }
339
340 @Override
341 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
342 int maxMessagesPerWrite = getMaxMessagesPerWrite();
343 while (maxMessagesPerWrite > 0) {
344 Object msg = in.current();
345 if (msg == null) {
346
347 break;
348 }
349
350 try {
351
352 if (Native.IS_SUPPORTING_SENDMMSG && socket.protocolFamily() != SocketProtocolFamily.UNIX &&
353 in.size() > 1 ||
354
355 in.current() instanceof SegmentedDatagramPacket) {
356 NativeDatagramPacketArray array = cleanDatagramPacketArray();
357 array.add(in, isConnected(), maxMessagesPerWrite);
358 int cnt = array.count();
359
360 if (cnt >= 1) {
361
362 int offset = 0;
363 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
364
365 int send = socket.sendmmsg(packets, offset, cnt);
366 if (send == 0) {
367
368 break;
369 }
370 for (int i = 0; i < send; i++) {
371 in.remove();
372 }
373 maxMessagesPerWrite -= send;
374 continue;
375 }
376 }
377 boolean done = false;
378 for (int i = getWriteSpinCount(); i > 0; --i) {
379 if (doWriteMessage(msg)) {
380 done = true;
381 break;
382 }
383 }
384
385 if (done) {
386 in.remove();
387 maxMessagesPerWrite --;
388 } else {
389 break;
390 }
391 } catch (IOException e) {
392 maxMessagesPerWrite --;
393
394
395
396 in.remove(e);
397 }
398 }
399
400 if (in.isEmpty()) {
401
402 clearFlag(Native.EPOLLOUT);
403 } else {
404
405 setFlag(Native.EPOLLOUT);
406 }
407 }
408
409 private boolean doWriteMessage(Object msg) throws Exception {
410 final Buffer data;
411 final SocketAddress remoteAddress;
412 if (msg instanceof AddressedEnvelope) {
413 @SuppressWarnings("unchecked")
414 AddressedEnvelope<?, SocketAddress> envelope = (AddressedEnvelope<?, SocketAddress>) msg;
415 data = (Buffer) envelope.content();
416 remoteAddress = envelope.recipient();
417 } else {
418 data = (Buffer) msg;
419 remoteAddress = null;
420 }
421
422 if (data.readableBytes() == 0) {
423 return true;
424 }
425 return doWriteOrSendBytes(data, remoteAddress, false) > 0;
426 }
427
428 @Override
429 protected Object filterOutboundMessage(Object msg) {
430 if (socket.protocolFamily() == SocketProtocolFamily.UNIX) {
431 return filterOutboundMessage0(msg, DomainSocketAddress.class, EXPECTED_TYPES_DOMAIN_SOCKET);
432 }
433 return filterOutboundMessage0(msg, InetSocketAddress.class, EXPECTED_TYPES);
434 }
435
436 private Object filterOutboundMessage0(Object msg, Class<? extends SocketAddress> recipientClass,
437 String expectedTypes) {
438 if (msg instanceof SegmentedDatagramPacket) {
439 if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
440 throw new UnsupportedOperationException(
441 "Unsupported message type: " + StringUtil.simpleClassName(msg) + expectedTypes);
442 }
443 SegmentedDatagramPacket packet = (SegmentedDatagramPacket) msg;
444 if (recipientClass.isInstance(packet.recipient())) {
445 Buffer content = packet.content();
446 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
447 packet.replace(newDirectBuffer(packet, content)) : msg;
448 }
449 } else if (msg instanceof DatagramPacket) {
450 DatagramPacket packet = (DatagramPacket) msg;
451 if (recipientClass.isInstance(packet.recipient())) {
452 Buffer content = packet.content();
453 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
454 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
455 }
456 } else if (msg instanceof Buffer) {
457 Buffer buf = (Buffer) msg;
458 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
459 } else if (msg instanceof AddressedEnvelope) {
460 @SuppressWarnings("unchecked")
461 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
462 if (recipientClass.isInstance(e.recipient())) {
463 InetSocketAddress recipient = (InetSocketAddress) e.recipient();
464 Object content = e.content();
465 if (content instanceof Buffer) {
466 Buffer buf = (Buffer) content;
467 if (UnixChannelUtil.isBufferCopyNeededForWrite(buf)) {
468 try {
469 return new DefaultBufferAddressedEnvelope<>(newDirectBuffer(buf), recipient);
470 } finally {
471 SilentDispose.dispose(e, logger);
472 }
473 }
474 return e;
475 }
476 }
477 }
478
479 throw new UnsupportedOperationException(
480 "unsupported message type: " + StringUtil.simpleClassName(msg) + expectedTypes);
481 }
482
483 @Override
484 protected void doDisconnect() throws Exception {
485 socket.disconnect();
486 connected = active = false;
487 resetCachedAddresses();
488 }
489
490 @Override
491 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
492 if (super.doConnect(remoteAddress, localAddress)) {
493 connected = true;
494 return true;
495 }
496 return false;
497 }
498
499 @Override
500 protected void doClose() throws Exception {
501 super.doClose();
502 connected = false;
503 }
504
505 @Override
506 protected void epollInReady(RecvBufferAllocator.Handle handle, BufferAllocator recvBufferAllocator,
507 boolean receivedRdHup) {
508 final ChannelPipeline pipeline = pipeline();
509 Throwable exception = socket.protocolFamily() == SocketProtocolFamily.UNIX ?
510 doReadBufferDomainSocket(handle, recvBufferAllocator) : doReadBuffer(handle, recvBufferAllocator);
511 handle.readComplete();
512 pipeline.fireChannelReadComplete();
513
514 if (exception != null) {
515 pipeline.fireChannelExceptionCaught(exception);
516 }
517 readIfIsAutoRead();
518 }
519
520 @Override
521 protected boolean maybeMoreDataToRead(RecvBufferAllocator.Handle handle) {
522 return handle.lastBytesRead() > 0;
523 }
524
525 private Throwable doReadBufferDomainSocket(RecvBufferAllocator.Handle allocHandle,
526 BufferAllocator allocator) {
527 Buffer buf = null;
528 try {
529 boolean connected = isConnected();
530 do {
531 buf = allocHandle.allocate(allocator);
532 allocHandle.attemptedBytesRead(buf.writableBytes());
533
534 final DatagramPacket packet;
535 if (connected) {
536 doReadBytes(buf);
537 if (allocHandle.lastBytesRead() <= 0) {
538
539 buf.close();
540 break;
541 }
542 packet = new DatagramPacket(buf, localAddress(), remoteAddress());
543 } else {
544 final RecvFromAddressDomainSocket recvFrom = new RecvFromAddressDomainSocket(socket);
545 buf.forEachWritable(0, recvFrom);
546 final DomainDatagramSocketAddress remoteAddress = recvFrom.remoteAddress();
547
548 if (remoteAddress == null) {
549 allocHandle.lastBytesRead(-1);
550 buf.close();
551 break;
552 }
553 DomainSocketAddress localAddress = remoteAddress.localAddress();
554 if (localAddress == null) {
555 localAddress = (DomainSocketAddress) localAddress();
556 }
557 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
558 buf.skipWritableBytes(allocHandle.lastBytesRead());
559
560 packet = new DatagramPacket(buf, localAddress, remoteAddress);
561 }
562
563 allocHandle.incMessagesRead(1);
564
565 readPending = false;
566 pipeline().fireChannelRead(packet);
567
568 buf = null;
569
570
571
572 } while (allocHandle.continueReading(isAutoRead(), TRUE_SUPPLIER));
573 } catch (Throwable t) {
574 if (buf != null) {
575 buf.close();
576 }
577 return t;
578 }
579 return null;
580 }
581
582 private Throwable doReadBuffer(RecvBufferAllocator.Handle allocHandle, BufferAllocator allocator) {
583 try {
584 boolean connected = isConnected();
585 do {
586 final boolean read;
587 int datagramSize = getMaxDatagramPayloadSize();
588
589 Buffer buf = allocHandle.allocate(allocator);
590
591 int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
592 datagramSize == 0 ? 1 : buf.writableBytes() / datagramSize :
593 0;
594 try {
595 if (numDatagram <= 1) {
596 if (!connected || isUdpGro()) {
597 read = recvmsg(allocHandle, allocator, cleanDatagramPacketArray(), buf);
598 } else {
599 read = connectedRead(allocHandle, buf, datagramSize);
600 }
601 } else {
602
603 read = scatteringRead(allocHandle, allocator, cleanDatagramPacketArray(),
604 buf, datagramSize, numDatagram);
605 }
606 } catch (NativeIoException e) {
607 if (connected) {
608 throw translateForConnected(e);
609 }
610 throw e;
611 }
612
613 if (read) {
614 readPending = false;
615 } else {
616 break;
617 }
618
619
620 } while (allocHandle.continueReading(isAutoRead(), TRUE_SUPPLIER));
621 } catch (Throwable t) {
622 return t;
623 }
624 return null;
625 }
626
627 private boolean connectedRead(RecvBufferAllocator.Handle allocHandle, Buffer buf,
628 int maxDatagramPacketSize) throws Exception {
629 try {
630 int writable = maxDatagramPacketSize != 0 ? Math.min(buf.writableBytes(), maxDatagramPacketSize)
631 : buf.writableBytes();
632 allocHandle.attemptedBytesRead(writable);
633
634 int initialWritableBytes = buf.writableBytes();
635 buf.forEachWritable(0, (index, component) -> {
636 long address = component.writableNativeAddress();
637 assert address != 0;
638 int bytesRead = socket.readAddress(address, 0, component.writableBytes());
639 allocHandle.lastBytesRead(bytesRead);
640 if (bytesRead <= 0) {
641 return false;
642 }
643 component.skipWritableBytes(bytesRead);
644 return true;
645 });
646 final int totalBytesRead = initialWritableBytes - buf.writableBytes();
647 if (totalBytesRead == 0) {
648
649 return false;
650 }
651 if (maxDatagramPacketSize > 0) {
652 allocHandle.lastBytesRead(totalBytesRead);
653 }
654 DatagramPacket packet = new DatagramPacket(buf, localAddress(), remoteAddress());
655 allocHandle.incMessagesRead(1);
656
657 pipeline().fireChannelRead(packet);
658 buf = null;
659 return true;
660 } finally {
661 if (buf != null) {
662 buf.close();
663 }
664 }
665 }
666
667 private IOException translateForConnected(NativeIoException e) {
668
669 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
670 PortUnreachableException error = new PortUnreachableException(e.getMessage());
671 error.initCause(e);
672 return error;
673 }
674 return e;
675 }
676
677 private static void addDatagramPacketToOut(AddressedEnvelope<?, ?> packet, RecyclableArrayList out) {
678 if (packet instanceof SegmentedDatagramPacket) {
679 try (SegmentedDatagramPacket segmentedDatagramPacket = (SegmentedDatagramPacket) packet) {
680 Buffer content = segmentedDatagramPacket.content();
681 SocketAddress recipient = segmentedDatagramPacket.recipient();
682 SocketAddress sender = segmentedDatagramPacket.sender();
683 int segmentSize = segmentedDatagramPacket.segmentSize();
684 do {
685 out.add(new DatagramPacket(content.readSplit(segmentSize), recipient, sender));
686 } while (content.readableBytes() > 0);
687 }
688 } else {
689 out.add(packet);
690 }
691 }
692
693 private static void releaseAndRecycle(Object buffer, RecyclableArrayList packetList) {
694 Resource.dispose(buffer);
695 if (packetList != null) {
696 for (int i = 0; i < packetList.size(); i++) {
697 Resource.dispose(packetList.get(i));
698 }
699 packetList.recycle();
700 }
701 }
702
703 private static void processPacket(ChannelPipeline pipeline, RecvBufferAllocator.Handle handle,
704 int bytesRead, AddressedEnvelope<?, ?> packet) {
705 handle.lastBytesRead(bytesRead);
706 handle.incMessagesRead(1);
707 pipeline.fireChannelRead(packet);
708 }
709
710 private static void processPacketList(ChannelPipeline pipeline, RecvBufferAllocator.Handle handle,
711 BufferAllocator allocator, int bytesRead, RecyclableArrayList packetList) {
712 int messagesRead = packetList.size();
713 handle.lastBytesRead(bytesRead);
714 handle.incMessagesRead(messagesRead);
715 for (int i = 0; i < messagesRead; i++) {
716 pipeline.fireChannelRead(packetList.set(i, allocator.allocate(0)));
717 }
718 }
719
720 private boolean recvmsg(RecvBufferAllocator.Handle allocHandle, BufferAllocator allocator,
721 NativeDatagramPacketArray array, Buffer buf) throws IOException {
722 RecyclableArrayList datagramPackets = null;
723 try {
724 int initialWriterOffset = buf.writerOffset();
725
726 boolean added = array.addWritable(buf, 0, null);
727 assert added;
728
729 allocHandle.attemptedBytesRead(buf.writerOffset() - initialWriterOffset);
730
731 NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
732
733 int bytesReceived = socket.recvmsg(msg);
734 if (bytesReceived == 0) {
735 allocHandle.lastBytesRead(-1);
736 return false;
737 }
738 buf.writerOffset(initialWriterOffset + bytesReceived);
739 InetSocketAddress local = (InetSocketAddress) localAddress();
740 DatagramPacket packet = msg.newDatagramPacket(buf, local);
741 if (!(packet instanceof SegmentedDatagramPacket)) {
742 processPacket(pipeline(), allocHandle, bytesReceived, packet);
743 buf = null;
744 } else {
745
746
747
748 datagramPackets = RecyclableArrayList.newInstance();
749 addDatagramPacketToOut(packet, datagramPackets);
750
751
752 buf = null;
753
754 processPacketList(pipeline(), allocHandle, allocator, bytesReceived, datagramPackets);
755 datagramPackets.recycle();
756 datagramPackets = null;
757 }
758
759 return true;
760 } finally {
761 releaseAndRecycle(buf, datagramPackets);
762 }
763 }
764
765 private boolean scatteringRead(RecvBufferAllocator.Handle allocHandle, BufferAllocator allocator,
766 NativeDatagramPacketArray array, Buffer buf, int datagramSize, int numDatagram)
767 throws IOException {
768 RecyclableArrayList datagramPackets = null;
769 try {
770 int initialWriterOffset = buf.writerOffset();
771 for (int i = 0; i < numDatagram; i++) {
772 if (!array.addWritable(buf, datagramSize, null)) {
773 break;
774 }
775 }
776
777 allocHandle.attemptedBytesRead(buf.writerOffset() - initialWriterOffset);
778
779 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
780
781 int received = socket.recvmmsg(packets, 0, array.count());
782 if (received == 0) {
783 allocHandle.lastBytesRead(-1);
784 return false;
785 }
786 int bytesReceived = received * datagramSize;
787 buf.writerOffset(initialWriterOffset + bytesReceived);
788 InetSocketAddress local = (InetSocketAddress) localAddress();
789 if (received == 1) {
790
791 DatagramPacket packet = packets[0].newDatagramPacket(buf, local);
792 if (!(packet instanceof SegmentedDatagramPacket)) {
793 processPacket(pipeline(), allocHandle, datagramSize, packet);
794 buf = null;
795 return true;
796 }
797 }
798
799
800
801 datagramPackets = RecyclableArrayList.newInstance();
802 for (int i = 0; i < received; i++) {
803 DatagramPacket packet = packets[i].newDatagramPacket(buf.readSplit(datagramSize), local);
804 addDatagramPacketToOut(packet, datagramPackets);
805 }
806
807 buf.close();
808 buf = null;
809
810 processPacketList(pipeline(), allocHandle, allocator, bytesReceived, datagramPackets);
811 datagramPackets.recycle();
812 datagramPackets = null;
813 return true;
814 } finally {
815 releaseAndRecycle(buf, datagramPackets);
816 }
817 }
818
819 private NativeDatagramPacketArray cleanDatagramPacketArray() {
820 return registration().cleanDatagramPacketArray();
821 }
822
823 @SuppressWarnings("unchecked")
824 @Override
825 protected <T> T getExtendedOption(ChannelOption<T> option) {
826 if (isOptionSupported(socket.protocolFamily(), option)) {
827 if (option == ChannelOption.SO_BROADCAST) {
828 return (T) Boolean.valueOf(isBroadcast());
829 }
830 if (option == ChannelOption.SO_RCVBUF) {
831 return (T) Integer.valueOf(getReceiveBufferSize());
832 }
833 if (option == ChannelOption.SO_SNDBUF) {
834 return (T) Integer.valueOf(getSendBufferSize());
835 }
836 if (option == ChannelOption.SO_REUSEADDR) {
837 return (T) Boolean.valueOf(isReuseAddress());
838 }
839 if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) {
840 return (T) Boolean.valueOf(isLoopbackModeDisabled());
841 }
842 if (option == ChannelOption.IP_MULTICAST_IF) {
843 return (T) getNetworkInterface();
844 }
845 if (option == ChannelOption.IP_MULTICAST_TTL) {
846 return (T) Integer.valueOf(getTimeToLive());
847 }
848 if (option == ChannelOption.IP_TOS) {
849 return (T) Integer.valueOf(getTrafficClass());
850 }
851 if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
852 return (T) Boolean.valueOf(activeOnOpen);
853 }
854 if (option == UnixChannelOption.SO_REUSEPORT) {
855 return (T) Boolean.valueOf(isReusePort());
856 }
857 if (option == EpollChannelOption.IP_TRANSPARENT) {
858 return (T) Boolean.valueOf(isIpTransparent());
859 }
860 if (option == EpollChannelOption.IP_FREEBIND) {
861 return (T) Boolean.valueOf(isFreeBind());
862 }
863 if (option == EpollChannelOption.IP_RECVORIGDSTADDR) {
864 return (T) Boolean.valueOf(isIpRecvOrigDestAddr());
865 }
866 if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) {
867 return (T) Integer.valueOf(getMaxDatagramPayloadSize());
868 }
869 if (option == EpollChannelOption.UDP_GRO) {
870 return (T) Boolean.valueOf(isUdpGro());
871 }
872 }
873 return super.getExtendedOption(option);
874 }
875
876 @Override
877 protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
878 if (isOptionSupported(socket.protocolFamily(), option)) {
879 if (option == ChannelOption.SO_BROADCAST) {
880 setBroadcast((Boolean) value);
881 } else if (option == ChannelOption.SO_RCVBUF) {
882 setReceiveBufferSize((Integer) value);
883 } else if (option == ChannelOption.SO_SNDBUF) {
884 setSendBufferSize((Integer) value);
885 } else if (option == ChannelOption.SO_REUSEADDR) {
886 setReuseAddress((Boolean) value);
887 } else if (option == ChannelOption.IP_MULTICAST_LOOP_DISABLED) {
888 setLoopbackModeDisabled((Boolean) value);
889 } else if (option == ChannelOption.IP_MULTICAST_IF) {
890 setNetworkInterface((NetworkInterface) value);
891 } else if (option == ChannelOption.IP_MULTICAST_TTL) {
892 setTimeToLive((Integer) value);
893 } else if (option == ChannelOption.IP_TOS) {
894 setTrafficClass((Integer) value);
895 } else if (option == ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION) {
896 setActiveOnOpen((Boolean) value);
897 } else if (option == UnixChannelOption.SO_REUSEPORT) {
898 setReusePort((Boolean) value);
899 } else if (option == EpollChannelOption.IP_FREEBIND) {
900 setFreeBind((Boolean) value);
901 } else if (option == EpollChannelOption.IP_TRANSPARENT) {
902 setIpTransparent((Boolean) value);
903 } else if (option == EpollChannelOption.IP_RECVORIGDSTADDR) {
904 setIpRecvOrigDestAddr((Boolean) value);
905 } else if (option == EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE) {
906 setMaxDatagramPayloadSize((Integer) value);
907 } else if (option == EpollChannelOption.UDP_GRO) {
908 setUdpGro((Boolean) value);
909 }
910 } else {
911 super.setExtendedOption(option, value);
912 }
913 }
914
915 private static Set<ChannelOption<?>> supportedOptions() {
916 return newSupportedIdentityOptionsSet(
917 ChannelOption.SO_BROADCAST, ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF,
918 ChannelOption.SO_REUSEADDR, ChannelOption.IP_MULTICAST_LOOP_DISABLED,
919 ChannelOption.IP_MULTICAST_IF, ChannelOption.IP_MULTICAST_TTL, ChannelOption.IP_TOS,
920 ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION, UnixChannelOption.SO_REUSEPORT,
921 EpollChannelOption.IP_FREEBIND, EpollChannelOption.IP_TRANSPARENT,
922 EpollChannelOption.IP_RECVORIGDSTADDR, EpollChannelOption.MAX_DATAGRAM_PAYLOAD_SIZE,
923 EpollChannelOption.UDP_GRO);
924 }
925
926 private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
927 return newSupportedIdentityOptionsSet(ChannelOption.SO_RCVBUF, ChannelOption.SO_SNDBUF,
928 ChannelOption.DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATION);
929 }
930
931 private static boolean isOptionSupported(SocketProtocolFamily family, ChannelOption<?> option) {
932 if (family == SocketProtocolFamily.UNIX) {
933 return SUPPORTED_OPTIONS_DOMAIN_SOCKET.contains(option);
934 }
935 return SUPPORTED_OPTIONS.contains(option);
936 }
937
938 @Override
939 protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
940 return isOptionSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
941 }
942
943 private void setActiveOnOpen(boolean activeOnOpen) {
944 if (isRegistered()) {
945 throw new IllegalStateException("Can only changed before channel was registered");
946 }
947 this.activeOnOpen = activeOnOpen;
948 }
949
950 boolean getActiveOnOpen() {
951 return activeOnOpen;
952 }
953
954 private int getSendBufferSize() {
955 try {
956 return socket.getSendBufferSize();
957 } catch (IOException e) {
958 throw new ChannelException(e);
959 }
960 }
961
962 private void setSendBufferSize(int sendBufferSize) {
963 try {
964 socket.setSendBufferSize(sendBufferSize);
965 } catch (IOException e) {
966 throw new ChannelException(e);
967 }
968 }
969
970 private int getReceiveBufferSize() {
971 try {
972 return socket.getReceiveBufferSize();
973 } catch (IOException e) {
974 throw new ChannelException(e);
975 }
976 }
977
978 private void setReceiveBufferSize(int receiveBufferSize) {
979 try {
980 socket.setReceiveBufferSize(receiveBufferSize);
981 } catch (IOException e) {
982 throw new ChannelException(e);
983 }
984 }
985
986 private int getTrafficClass() {
987 try {
988 return socket.getTrafficClass();
989 } catch (IOException e) {
990 throw new ChannelException(e);
991 }
992 }
993
994 private void setTrafficClass(int trafficClass) {
995 try {
996 socket.setTrafficClass(trafficClass);
997 } catch (IOException e) {
998 throw new ChannelException(e);
999 }
1000 }
1001
1002 private boolean isReuseAddress() {
1003 try {
1004 return socket.isReuseAddress();
1005 } catch (IOException e) {
1006 throw new ChannelException(e);
1007 }
1008 }
1009
1010 private void setReuseAddress(boolean reuseAddress) {
1011 try {
1012 socket.setReuseAddress(reuseAddress);
1013 } catch (IOException e) {
1014 throw new ChannelException(e);
1015 }
1016 }
1017
1018 private boolean isBroadcast() {
1019 try {
1020 return socket.isBroadcast();
1021 } catch (IOException e) {
1022 throw new ChannelException(e);
1023 }
1024 }
1025
1026 private void setBroadcast(boolean broadcast) {
1027 try {
1028 socket.setBroadcast(broadcast);
1029 } catch (IOException e) {
1030 throw new ChannelException(e);
1031 }
1032 }
1033
1034 private boolean isLoopbackModeDisabled() {
1035 try {
1036 return socket.isLoopbackModeDisabled();
1037 } catch (IOException e) {
1038 throw new ChannelException(e);
1039 }
1040 }
1041
1042 private void setLoopbackModeDisabled(boolean loopbackModeDisabled) {
1043 try {
1044 socket.setLoopbackModeDisabled(loopbackModeDisabled);
1045 } catch (IOException e) {
1046 throw new ChannelException(e);
1047 }
1048 }
1049
1050 private int getTimeToLive() {
1051 try {
1052 return socket.getTimeToLive();
1053 } catch (IOException e) {
1054 throw new ChannelException(e);
1055 }
1056 }
1057
1058 private void setTimeToLive(int ttl) {
1059 try {
1060 socket.setTimeToLive(ttl);
1061 } catch (IOException e) {
1062 throw new ChannelException(e);
1063 }
1064 }
1065
1066 private NetworkInterface getNetworkInterface() {
1067 try {
1068 return socket.getNetworkInterface();
1069 } catch (IOException e) {
1070 throw new ChannelException(e);
1071 }
1072 }
1073
1074 private void setNetworkInterface(NetworkInterface networkInterface) {
1075 try {
1076 socket.setNetworkInterface(networkInterface);
1077 } catch (IOException e) {
1078 throw new ChannelException(e);
1079 }
1080 }
1081
1082
1083
1084
1085 private boolean isReusePort() {
1086 try {
1087 return socket.isReusePort();
1088 } catch (IOException e) {
1089 throw new ChannelException(e);
1090 }
1091 }
1092
1093
1094
1095
1096
1097
1098
1099
1100 private void setReusePort(boolean reusePort) {
1101 try {
1102 socket.setReusePort(reusePort);
1103 } catch (IOException e) {
1104 throw new ChannelException(e);
1105 }
1106 }
1107
1108
1109
1110
1111
1112 private boolean isIpTransparent() {
1113 try {
1114 return socket.isIpTransparent();
1115 } catch (IOException e) {
1116 throw new ChannelException(e);
1117 }
1118 }
1119
1120
1121
1122
1123
1124 private void setIpTransparent(boolean ipTransparent) {
1125 try {
1126 socket.setIpTransparent(ipTransparent);
1127 } catch (IOException e) {
1128 throw new ChannelException(e);
1129 }
1130 }
1131
1132
1133
1134
1135
1136 private boolean isFreeBind() {
1137 try {
1138 return socket.isIpFreeBind();
1139 } catch (IOException e) {
1140 throw new ChannelException(e);
1141 }
1142 }
1143
1144
1145
1146
1147
1148 private void setFreeBind(boolean freeBind) {
1149 try {
1150 socket.setIpFreeBind(freeBind);
1151 } catch (IOException e) {
1152 throw new ChannelException(e);
1153 }
1154 }
1155
1156
1157
1158
1159
1160 private boolean isIpRecvOrigDestAddr() {
1161 try {
1162 return socket.isIpRecvOrigDestAddr();
1163 } catch (IOException e) {
1164 throw new ChannelException(e);
1165 }
1166 }
1167
1168
1169
1170
1171
1172 private void setIpRecvOrigDestAddr(boolean ipTransparent) {
1173 try {
1174 socket.setIpRecvOrigDestAddr(ipTransparent);
1175 } catch (IOException e) {
1176 throw new ChannelException(e);
1177 }
1178 }
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188 private void setMaxDatagramPayloadSize(int maxDatagramSize) {
1189 this.maxDatagramSize = ObjectUtil.checkPositiveOrZero(maxDatagramSize, "maxDatagramSize");
1190 }
1191
1192
1193
1194
1195 private int getMaxDatagramPayloadSize() {
1196 return maxDatagramSize;
1197 }
1198
1199
1200
1201
1202
1203 private void setUdpGro(boolean gro) {
1204 try {
1205 socket.setUdpGro(gro);
1206 } catch (IOException e) {
1207 throw new ChannelException(e);
1208 }
1209 this.gro = gro;
1210 }
1211
1212
1213
1214
1215
1216 private boolean isUdpGro() {
1217
1218
1219 return gro;
1220 }
1221 }