1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.AddressedEnvelope;
22 import io.netty.channel.ChannelException;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultAddressedEnvelope;
29 import io.netty.channel.socket.DatagramChannel;
30 import io.netty.channel.socket.DatagramPacket;
31 import io.netty.channel.socket.InternetProtocolFamily;
32 import io.netty.channel.socket.SocketProtocolFamily;
33 import io.netty.channel.unix.Errors;
34 import io.netty.channel.unix.Errors.NativeIoException;
35 import io.netty.channel.unix.UnixChannelUtil;
36 import io.netty.util.ReferenceCountUtil;
37 import io.netty.util.UncheckedBooleanSupplier;
38 import io.netty.util.internal.ObjectUtil;
39 import io.netty.util.internal.RecyclableArrayList;
40 import io.netty.util.internal.StringUtil;
41 import io.netty.util.internal.SystemPropertyUtil;
42 import io.netty.util.internal.logging.InternalLogger;
43 import io.netty.util.internal.logging.InternalLoggerFactory;
44
45 import java.io.IOException;
46 import java.net.Inet4Address;
47 import java.net.InetAddress;
48 import java.net.InetSocketAddress;
49 import java.net.NetworkInterface;
50 import java.net.PortUnreachableException;
51 import java.net.SocketAddress;
52 import java.nio.ByteBuffer;
53 import java.nio.channels.UnresolvedAddressException;
54
55 import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
56
57
58
59
60 public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
61 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollDatagramChannel.class);
62 private static final boolean IP_MULTICAST_ALL =
63 SystemPropertyUtil.getBoolean("io.netty.channel.epoll.ipMulticastAll", false);
64 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
65 private static final String EXPECTED_TYPES =
66 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
67 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
68 StringUtil.simpleClassName(ByteBuf.class) + ", " +
69 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
70 StringUtil.simpleClassName(ByteBuf.class) + ')';
71
72 private final EpollDatagramChannelConfig config;
73 private volatile boolean connected;
74
75 static {
76 if (logger.isDebugEnabled()) {
77 logger.debug("-Dio.netty.channel.epoll.ipMulticastAll: {}", IP_MULTICAST_ALL);
78 }
79 }
80
81
82
83
84
85
86 public static boolean isSegmentedDatagramPacketSupported() {
87 return Epoll.isAvailable() &&
88
89 Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
90 }
91
92
93
94
95
96 public EpollDatagramChannel() {
97 this((SocketProtocolFamily) null);
98 }
99
100
101
102
103
104
105
106 @Deprecated
107 public EpollDatagramChannel(InternetProtocolFamily family) {
108 this(newSocketDgram(family), false);
109 }
110
111
112
113
114
115 public EpollDatagramChannel(SocketProtocolFamily family) {
116 this(newSocketDgram(family), false);
117 }
118
119
120
121
122
123 public EpollDatagramChannel(int fd) {
124 this(new LinuxSocket(fd), true);
125 }
126
127 private EpollDatagramChannel(LinuxSocket fd, boolean active) {
128 super(null, fd, active, EpollIoOps.valueOf(0));
129
130
131 try {
132 fd.setIpMulticastAll(IP_MULTICAST_ALL);
133 } catch (IOException | ChannelException e) {
134 logger.debug("Failed to set IP_MULTICAST_ALL to {}", IP_MULTICAST_ALL, e);
135 }
136
137 config = new EpollDatagramChannelConfig(this);
138 }
139
140 @Override
141 public InetSocketAddress remoteAddress() {
142 return (InetSocketAddress) super.remoteAddress();
143 }
144
145 @Override
146 public InetSocketAddress localAddress() {
147 return (InetSocketAddress) super.localAddress();
148 }
149
150 @Override
151 public ChannelMetadata metadata() {
152 return METADATA;
153 }
154
155 @Override
156 public boolean isActive() {
157 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
158 }
159
160 @Override
161 public boolean isConnected() {
162 return connected;
163 }
164
165 @Override
166 public ChannelFuture joinGroup(InetAddress multicastAddress) {
167 return joinGroup(multicastAddress, newPromise());
168 }
169
170 @Override
171 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
172 try {
173 NetworkInterface iface = config().getNetworkInterface();
174 if (iface == null) {
175 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
176 }
177 return joinGroup(multicastAddress, iface, null, promise);
178 } catch (IOException e) {
179 promise.setFailure(e);
180 }
181 return promise;
182 }
183
184 @Override
185 public ChannelFuture joinGroup(
186 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
187 return joinGroup(multicastAddress, networkInterface, newPromise());
188 }
189
190 @Override
191 public ChannelFuture joinGroup(
192 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
193 ChannelPromise promise) {
194 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
195 }
196
197 @Override
198 public ChannelFuture joinGroup(
199 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
200 return joinGroup(multicastAddress, networkInterface, source, newPromise());
201 }
202
203 @Override
204 public ChannelFuture joinGroup(
205 final InetAddress multicastAddress, final NetworkInterface networkInterface,
206 final InetAddress source, final ChannelPromise promise) {
207
208 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
209 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
210
211 if (eventLoop().inEventLoop()) {
212 joinGroup0(multicastAddress, networkInterface, source, promise);
213 } else {
214 eventLoop().execute(new Runnable() {
215 @Override
216 public void run() {
217 joinGroup0(multicastAddress, networkInterface, source, promise);
218 }
219 });
220 }
221 return promise;
222 }
223
224 private void joinGroup0(
225 final InetAddress multicastAddress, final NetworkInterface networkInterface,
226 final InetAddress source, final ChannelPromise promise) {
227 assert eventLoop().inEventLoop();
228
229 try {
230 socket.joinGroup(multicastAddress, networkInterface, source);
231 promise.setSuccess();
232 } catch (IOException e) {
233 promise.setFailure(e);
234 }
235 }
236
237 @Override
238 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
239 return leaveGroup(multicastAddress, newPromise());
240 }
241
242 @Override
243 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
244 try {
245 return leaveGroup(
246 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
247 } catch (IOException e) {
248 promise.setFailure(e);
249 }
250 return promise;
251 }
252
253 @Override
254 public ChannelFuture leaveGroup(
255 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
256 return leaveGroup(multicastAddress, networkInterface, newPromise());
257 }
258
259 @Override
260 public ChannelFuture leaveGroup(
261 InetSocketAddress multicastAddress,
262 NetworkInterface networkInterface, ChannelPromise promise) {
263 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
264 }
265
266 @Override
267 public ChannelFuture leaveGroup(
268 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
269 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
270 }
271
272 @Override
273 public ChannelFuture leaveGroup(
274 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
275 final ChannelPromise promise) {
276 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
277 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
278
279 if (eventLoop().inEventLoop()) {
280 leaveGroup0(multicastAddress, networkInterface, source, promise);
281 } else {
282 eventLoop().execute(new Runnable() {
283 @Override
284 public void run() {
285 leaveGroup0(multicastAddress, networkInterface, source, promise);
286 }
287 });
288 }
289 return promise;
290 }
291
292 private void leaveGroup0(
293 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
294 final ChannelPromise promise) {
295 assert eventLoop().inEventLoop();
296
297 try {
298 socket.leaveGroup(multicastAddress, networkInterface, source);
299 promise.setSuccess();
300 } catch (IOException e) {
301 promise.setFailure(e);
302 }
303 }
304
305 @Override
306 public ChannelFuture block(
307 InetAddress multicastAddress, NetworkInterface networkInterface,
308 InetAddress sourceToBlock) {
309 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
310 }
311
312 @Override
313 public ChannelFuture block(
314 final InetAddress multicastAddress, final NetworkInterface networkInterface,
315 final InetAddress sourceToBlock, final ChannelPromise promise) {
316 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
317 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
318 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
319
320 promise.setFailure(new UnsupportedOperationException("Multicast block not supported"));
321 return promise;
322 }
323
324 @Override
325 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
326 return block(multicastAddress, sourceToBlock, newPromise());
327 }
328
329 @Override
330 public ChannelFuture block(
331 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
332 try {
333 return block(
334 multicastAddress,
335 NetworkInterface.getByInetAddress(localAddress().getAddress()),
336 sourceToBlock, promise);
337 } catch (Throwable e) {
338 promise.setFailure(e);
339 }
340 return promise;
341 }
342
343 @Override
344 protected AbstractEpollUnsafe newUnsafe() {
345 return new EpollDatagramChannelUnsafe();
346 }
347
348 @Override
349 protected void doRegister(ChannelPromise promise) {
350 super.doRegister(promise);
351 promise.addListener(f -> {
352 if (f.isSuccess() && isRegistered()) {
353
354
355 submitCurrentOps();
356 }
357 });
358 }
359
360 @Override
361 protected void doBind(SocketAddress localAddress) throws Exception {
362 if (localAddress instanceof InetSocketAddress) {
363 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
364 if (socketAddress.getAddress().isAnyLocalAddress() &&
365 socketAddress.getAddress() instanceof Inet4Address) {
366 if (socket.family() == SocketProtocolFamily.INET6) {
367 localAddress = new InetSocketAddress(Native.INET6_ANY, socketAddress.getPort());
368 }
369 }
370 }
371 super.doBind(localAddress);
372 active = true;
373 }
374
375 @Override
376 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
377 int maxMessagesPerWrite = maxMessagesPerWrite();
378 while (maxMessagesPerWrite > 0) {
379 Object msg = in.current();
380 if (msg == null) {
381
382 break;
383 }
384
385 try {
386
387 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
388
389 in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
390 NativeDatagramPacketArray array = cleanDatagramPacketArray();
391 array.add(in, isConnected(), maxMessagesPerWrite);
392 int cnt = array.count();
393
394 if (cnt >= 1) {
395
396 int offset = 0;
397 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
398
399 int send = socket.sendmmsg(packets, offset, cnt);
400 if (send == 0) {
401
402 break;
403 }
404 for (int i = 0; i < send; i++) {
405 in.remove();
406 }
407 maxMessagesPerWrite -= send;
408 continue;
409 }
410 }
411 boolean done = false;
412 for (int i = config().getWriteSpinCount(); i > 0; --i) {
413 if (doWriteMessage(msg)) {
414 done = true;
415 break;
416 }
417 }
418
419 if (done) {
420 in.remove();
421 maxMessagesPerWrite --;
422 } else {
423 break;
424 }
425 } catch (IOException e) {
426 maxMessagesPerWrite --;
427
428
429
430 in.remove(e);
431 }
432 }
433
434 if (in.isEmpty()) {
435
436 clearFlag(Native.EPOLLOUT);
437 } else {
438
439 setFlag(Native.EPOLLOUT);
440 }
441 }
442
443 private boolean doWriteMessage(Object msg) throws Exception {
444 final ByteBuf data;
445 final InetSocketAddress remoteAddress;
446 if (msg instanceof AddressedEnvelope) {
447 @SuppressWarnings("unchecked")
448 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
449 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
450 data = envelope.content();
451 remoteAddress = envelope.recipient();
452 } else {
453 data = (ByteBuf) msg;
454 remoteAddress = null;
455 }
456
457 final int dataLen = data.readableBytes();
458 if (dataLen == 0) {
459 return true;
460 }
461
462 try {
463 return doWriteOrSendBytes(data, remoteAddress, false) > 0;
464 } catch (NativeIoException e) {
465 if (remoteAddress == null) {
466 throw translateForConnected(e);
467 }
468 throw e;
469 }
470 }
471
472 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
473 if (envelope.recipient() instanceof InetSocketAddress
474 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
475 throw new UnresolvedAddressException();
476 }
477 }
478
479 @Override
480 protected Object filterOutboundMessage(Object msg) {
481 if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
482 if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
483 throw new UnsupportedOperationException(
484 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
485 }
486 io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
487 checkUnresolved(packet);
488
489 ByteBuf content = packet.content();
490 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
491 packet.replace(newDirectBuffer(packet, content)) : msg;
492 }
493 if (msg instanceof DatagramPacket) {
494 DatagramPacket packet = (DatagramPacket) msg;
495 checkUnresolved(packet);
496
497 ByteBuf content = packet.content();
498 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
499 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
500 }
501
502 if (msg instanceof ByteBuf) {
503 ByteBuf buf = (ByteBuf) msg;
504 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
505 }
506
507 if (msg instanceof AddressedEnvelope) {
508 @SuppressWarnings("unchecked")
509 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
510 checkUnresolved(e);
511
512 if (e.content() instanceof ByteBuf &&
513 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
514
515 ByteBuf content = (ByteBuf) e.content();
516 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
517 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
518 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
519 }
520 }
521
522 throw new UnsupportedOperationException(
523 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
524 }
525
526 @Override
527 public EpollDatagramChannelConfig config() {
528 return config;
529 }
530
531 @Override
532 protected void doDisconnect() throws Exception {
533 socket.disconnect();
534 connected = active = false;
535 resetCachedAddresses();
536 }
537
538 @Override
539 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
540 if (super.doConnect(remoteAddress, localAddress)) {
541 connected = true;
542 return true;
543 }
544 return false;
545 }
546
547 @Override
548 protected void doClose() throws Exception {
549 super.doClose();
550 connected = false;
551 }
552
553 final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
554
555 @Override
556 void epollInReady() {
557 assert eventLoop().inEventLoop();
558 EpollDatagramChannelConfig config = config();
559 if (shouldBreakEpollInReady(config)) {
560 clearEpollIn0();
561 return;
562 }
563 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
564 final ChannelPipeline pipeline = pipeline();
565 final ByteBufAllocator allocator = config.getAllocator();
566 allocHandle.reset(config);
567
568 Throwable exception = null;
569 try {
570 try {
571 boolean connected = isConnected();
572 do {
573 final boolean read;
574 int datagramSize = config().getMaxDatagramPayloadSize();
575
576 ByteBuf byteBuf = allocHandle.allocate(allocator);
577
578 int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
579 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
580 0;
581 try {
582 if (numDatagram <= 1) {
583 if (!connected || config.isUdpGro()) {
584 read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
585 } else {
586 read = connectedRead(allocHandle, byteBuf, datagramSize);
587 }
588 } else {
589
590 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
591 byteBuf, datagramSize, numDatagram);
592 }
593 } catch (NativeIoException e) {
594 if (connected) {
595 throw translateForConnected(e);
596 }
597 throw e;
598 }
599
600 if (read) {
601 readPending = false;
602 } else {
603 break;
604 }
605
606
607 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
608 } catch (Throwable t) {
609 exception = t;
610 }
611
612 allocHandle.readComplete();
613 pipeline.fireChannelReadComplete();
614
615 if (exception != null) {
616 pipeline.fireExceptionCaught(exception);
617 }
618 } finally {
619 if (shouldStopReading(config)) {
620 clearEpollIn();
621 }
622 }
623 }
624 }
625
626 private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
627 throws Exception {
628 try {
629 int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
630 : byteBuf.writableBytes();
631 allocHandle.attemptedBytesRead(writable);
632
633 int writerIndex = byteBuf.writerIndex();
634 int localReadAmount;
635 if (byteBuf.hasMemoryAddress()) {
636 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
637 } else {
638 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
639 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
640 }
641
642 if (localReadAmount <= 0) {
643 allocHandle.lastBytesRead(localReadAmount);
644
645
646 return false;
647 }
648 byteBuf.writerIndex(writerIndex + localReadAmount);
649
650 allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
651 localReadAmount : writable);
652
653 DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
654 allocHandle.incMessagesRead(1);
655
656 pipeline().fireChannelRead(packet);
657 byteBuf = null;
658 return true;
659 } finally {
660 if (byteBuf != null) {
661 byteBuf.release();
662 }
663 }
664 }
665
666 private IOException translateForConnected(NativeIoException e) {
667
668 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
669 PortUnreachableException error = new PortUnreachableException(e.getMessage());
670 error.initCause(e);
671 return error;
672 }
673 return e;
674 }
675
676 private static void addDatagramPacketToOut(DatagramPacket packet,
677 RecyclableArrayList out) {
678 if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
679 io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
680 (io.netty.channel.unix.SegmentedDatagramPacket) packet;
681 ByteBuf content = segmentedDatagramPacket.content();
682 InetSocketAddress recipient = segmentedDatagramPacket.recipient();
683 InetSocketAddress sender = segmentedDatagramPacket.sender();
684 int segmentSize = segmentedDatagramPacket.segmentSize();
685 do {
686 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
687 segmentSize)), recipient, sender));
688 } while (content.isReadable());
689
690 segmentedDatagramPacket.release();
691 } else {
692 out.add(packet);
693 }
694 }
695
696 private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
697 if (byteBuf != null) {
698 byteBuf.release();
699 }
700 if (packetList != null) {
701 for (int i = 0; i < packetList.size(); i++) {
702 ReferenceCountUtil.release(packetList.get(i));
703 }
704 packetList.recycle();
705 }
706 }
707
708 private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
709 int bytesRead, DatagramPacket packet) {
710 handle.lastBytesRead(Math.max(1, bytesRead));
711 handle.incMessagesRead(1);
712 pipeline.fireChannelRead(packet);
713 }
714
715 private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
716 int bytesRead, RecyclableArrayList packetList) {
717 int messagesRead = packetList.size();
718 handle.lastBytesRead(Math.max(1, bytesRead));
719 handle.incMessagesRead(messagesRead);
720 for (int i = 0; i < messagesRead; i++) {
721 pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
722 }
723 }
724
725 private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
726 NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
727 RecyclableArrayList datagramPackets = null;
728 try {
729 int writable = byteBuf.writableBytes();
730
731 boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
732 assert added;
733
734 allocHandle.attemptedBytesRead(writable);
735
736 NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
737
738 int bytesReceived = socket.recvmsg(msg);
739 if (!msg.hasSender()) {
740 allocHandle.lastBytesRead(-1);
741 return false;
742 }
743 byteBuf.writerIndex(bytesReceived);
744 InetSocketAddress local = localAddress();
745 DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
746 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
747 processPacket(pipeline(), allocHandle, bytesReceived, packet);
748 } else {
749
750
751
752 datagramPackets = RecyclableArrayList.newInstance();
753 addDatagramPacketToOut(packet, datagramPackets);
754
755 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
756 datagramPackets.recycle();
757 datagramPackets = null;
758 }
759
760 return true;
761 } finally {
762 releaseAndRecycle(byteBuf, datagramPackets);
763 }
764 }
765
766 private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
767 ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
768 RecyclableArrayList datagramPackets = null;
769 try {
770 int offset = byteBuf.writerIndex();
771 for (int i = 0; i < numDatagram; i++, offset += datagramSize) {
772 if (!array.addWritable(byteBuf, offset, datagramSize)) {
773 break;
774 }
775 }
776
777 allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
778
779 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
780
781 int received = socket.recvmmsg(packets, 0, array.count());
782 if (received == 0) {
783 allocHandle.lastBytesRead(-1);
784 return false;
785 }
786
787 InetSocketAddress local = localAddress();
788
789
790 int bytesReceived = received * datagramSize;
791 byteBuf.writerIndex(byteBuf.writerIndex() + bytesReceived);
792
793 if (received == 1) {
794
795 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
796 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
797 processPacket(pipeline(), allocHandle, datagramSize, packet);
798 return true;
799 }
800 }
801
802
803
804 datagramPackets = RecyclableArrayList.newInstance();
805 for (int i = 0; i < received; i++) {
806 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf, local);
807
808
809
810 byteBuf.skipBytes(datagramSize);
811 addDatagramPacketToOut(packet, datagramPackets);
812 }
813
814 byteBuf.release();
815 byteBuf = null;
816
817 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
818 datagramPackets.recycle();
819 datagramPackets = null;
820 return true;
821 } finally {
822 releaseAndRecycle(byteBuf, datagramPackets);
823 }
824 }
825
826 private NativeDatagramPacketArray cleanDatagramPacketArray() {
827 return ((NativeArrays) registration().attachment()).cleanDatagramPacketArray();
828 }
829 }