1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.AddressedEnvelope;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.DefaultAddressedEnvelope;
28 import io.netty.channel.socket.DatagramChannel;
29 import io.netty.channel.socket.DatagramPacket;
30 import io.netty.channel.socket.InternetProtocolFamily;
31 import io.netty.channel.socket.SocketProtocolFamily;
32 import io.netty.channel.unix.Errors;
33 import io.netty.channel.unix.Errors.NativeIoException;
34 import io.netty.channel.unix.UnixChannelUtil;
35 import io.netty.util.ReferenceCountUtil;
36 import io.netty.util.UncheckedBooleanSupplier;
37 import io.netty.util.internal.ObjectUtil;
38 import io.netty.util.internal.RecyclableArrayList;
39 import io.netty.util.internal.StringUtil;
40
41 import java.io.IOException;
42 import java.net.Inet4Address;
43 import java.net.InetAddress;
44 import java.net.InetSocketAddress;
45 import java.net.NetworkInterface;
46 import java.net.PortUnreachableException;
47 import java.net.SocketAddress;
48 import java.nio.ByteBuffer;
49 import java.nio.channels.UnresolvedAddressException;
50
51 import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
52
53
54
55
56
57 public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
58 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
59 private static final String EXPECTED_TYPES =
60 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
61 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
62 StringUtil.simpleClassName(ByteBuf.class) + ", " +
63 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
64 StringUtil.simpleClassName(ByteBuf.class) + ')';
65
66 private final EpollDatagramChannelConfig config;
67 private volatile boolean connected;
68
69
70
71
72
73
74 public static boolean isSegmentedDatagramPacketSupported() {
75 return Epoll.isAvailable() &&
76
77 Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
78 }
79
80
81
82
83
84 public EpollDatagramChannel() {
85 this((SocketProtocolFamily) null);
86 }
87
88
89
90
91
92
93
94 @Deprecated
95 public EpollDatagramChannel(InternetProtocolFamily family) {
96 this(newSocketDgram(family), false);
97 }
98
99
100
101
102
103 public EpollDatagramChannel(SocketProtocolFamily family) {
104 this(newSocketDgram(family), false);
105 }
106
107
108
109
110
111 public EpollDatagramChannel(int fd) {
112 this(new LinuxSocket(fd), true);
113 }
114
115 private EpollDatagramChannel(LinuxSocket fd, boolean active) {
116 super(null, fd, active, EpollIoOps.valueOf(0));
117 config = new EpollDatagramChannelConfig(this);
118 }
119
120 @Override
121 public InetSocketAddress remoteAddress() {
122 return (InetSocketAddress) super.remoteAddress();
123 }
124
125 @Override
126 public InetSocketAddress localAddress() {
127 return (InetSocketAddress) super.localAddress();
128 }
129
130 @Override
131 public ChannelMetadata metadata() {
132 return METADATA;
133 }
134
135 @Override
136 public boolean isActive() {
137 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
138 }
139
140 @Override
141 public boolean isConnected() {
142 return connected;
143 }
144
145 @Override
146 public ChannelFuture joinGroup(InetAddress multicastAddress) {
147 return joinGroup(multicastAddress, newPromise());
148 }
149
150 @Override
151 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
152 try {
153 NetworkInterface iface = config().getNetworkInterface();
154 if (iface == null) {
155 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
156 }
157 return joinGroup(multicastAddress, iface, null, promise);
158 } catch (IOException e) {
159 promise.setFailure(e);
160 }
161 return promise;
162 }
163
164 @Override
165 public ChannelFuture joinGroup(
166 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
167 return joinGroup(multicastAddress, networkInterface, newPromise());
168 }
169
170 @Override
171 public ChannelFuture joinGroup(
172 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
173 ChannelPromise promise) {
174 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
175 }
176
177 @Override
178 public ChannelFuture joinGroup(
179 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
180 return joinGroup(multicastAddress, networkInterface, source, newPromise());
181 }
182
183 @Override
184 public ChannelFuture joinGroup(
185 final InetAddress multicastAddress, final NetworkInterface networkInterface,
186 final InetAddress source, final ChannelPromise promise) {
187
188 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
189 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
190
191 if (eventLoop().inEventLoop()) {
192 joinGroup0(multicastAddress, networkInterface, source, promise);
193 } else {
194 eventLoop().execute(new Runnable() {
195 @Override
196 public void run() {
197 joinGroup0(multicastAddress, networkInterface, source, promise);
198 }
199 });
200 }
201 return promise;
202 }
203
204 private void joinGroup0(
205 final InetAddress multicastAddress, final NetworkInterface networkInterface,
206 final InetAddress source, final ChannelPromise promise) {
207 assert eventLoop().inEventLoop();
208
209 try {
210 socket.joinGroup(multicastAddress, networkInterface, source);
211 promise.setSuccess();
212 } catch (IOException e) {
213 promise.setFailure(e);
214 }
215 }
216
217 @Override
218 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
219 return leaveGroup(multicastAddress, newPromise());
220 }
221
222 @Override
223 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
224 try {
225 return leaveGroup(
226 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
227 } catch (IOException e) {
228 promise.setFailure(e);
229 }
230 return promise;
231 }
232
233 @Override
234 public ChannelFuture leaveGroup(
235 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
236 return leaveGroup(multicastAddress, networkInterface, newPromise());
237 }
238
239 @Override
240 public ChannelFuture leaveGroup(
241 InetSocketAddress multicastAddress,
242 NetworkInterface networkInterface, ChannelPromise promise) {
243 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
244 }
245
246 @Override
247 public ChannelFuture leaveGroup(
248 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
249 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
250 }
251
252 @Override
253 public ChannelFuture leaveGroup(
254 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
255 final ChannelPromise promise) {
256 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
257 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
258
259 if (eventLoop().inEventLoop()) {
260 leaveGroup0(multicastAddress, networkInterface, source, promise);
261 } else {
262 eventLoop().execute(new Runnable() {
263 @Override
264 public void run() {
265 leaveGroup0(multicastAddress, networkInterface, source, promise);
266 }
267 });
268 }
269 return promise;
270 }
271
272 private void leaveGroup0(
273 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
274 final ChannelPromise promise) {
275 assert eventLoop().inEventLoop();
276
277 try {
278 socket.leaveGroup(multicastAddress, networkInterface, source);
279 promise.setSuccess();
280 } catch (IOException e) {
281 promise.setFailure(e);
282 }
283 }
284
285 @Override
286 public ChannelFuture block(
287 InetAddress multicastAddress, NetworkInterface networkInterface,
288 InetAddress sourceToBlock) {
289 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
290 }
291
292 @Override
293 public ChannelFuture block(
294 final InetAddress multicastAddress, final NetworkInterface networkInterface,
295 final InetAddress sourceToBlock, final ChannelPromise promise) {
296 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
297 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
298 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
299
300 promise.setFailure(new UnsupportedOperationException("Multicast block not supported"));
301 return promise;
302 }
303
304 @Override
305 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
306 return block(multicastAddress, sourceToBlock, newPromise());
307 }
308
309 @Override
310 public ChannelFuture block(
311 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
312 try {
313 return block(
314 multicastAddress,
315 NetworkInterface.getByInetAddress(localAddress().getAddress()),
316 sourceToBlock, promise);
317 } catch (Throwable e) {
318 promise.setFailure(e);
319 }
320 return promise;
321 }
322
323 @Override
324 protected AbstractEpollUnsafe newUnsafe() {
325 return new EpollDatagramChannelUnsafe();
326 }
327
328 @Override
329 protected void doBind(SocketAddress localAddress) throws Exception {
330 if (localAddress instanceof InetSocketAddress) {
331 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
332 if (socketAddress.getAddress().isAnyLocalAddress() &&
333 socketAddress.getAddress() instanceof Inet4Address) {
334 if (socket.family() == SocketProtocolFamily.INET6) {
335 localAddress = new InetSocketAddress(Native.INET6_ANY, socketAddress.getPort());
336 }
337 }
338 }
339 super.doBind(localAddress);
340 active = true;
341 }
342
343 @Override
344 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
345 int maxMessagesPerWrite = maxMessagesPerWrite();
346 while (maxMessagesPerWrite > 0) {
347 Object msg = in.current();
348 if (msg == null) {
349
350 break;
351 }
352
353 try {
354
355 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
356
357 in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
358 NativeDatagramPacketArray array = cleanDatagramPacketArray();
359 array.add(in, isConnected(), maxMessagesPerWrite);
360 int cnt = array.count();
361
362 if (cnt >= 1) {
363
364 int offset = 0;
365 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
366
367 int send = socket.sendmmsg(packets, offset, cnt);
368 if (send == 0) {
369
370 break;
371 }
372 for (int i = 0; i < send; i++) {
373 in.remove();
374 }
375 maxMessagesPerWrite -= send;
376 continue;
377 }
378 }
379 boolean done = false;
380 for (int i = config().getWriteSpinCount(); i > 0; --i) {
381 if (doWriteMessage(msg)) {
382 done = true;
383 break;
384 }
385 }
386
387 if (done) {
388 in.remove();
389 maxMessagesPerWrite --;
390 } else {
391 break;
392 }
393 } catch (IOException e) {
394 maxMessagesPerWrite --;
395
396
397
398 in.remove(e);
399 }
400 }
401
402 if (in.isEmpty()) {
403
404 clearFlag(Native.EPOLLOUT);
405 } else {
406
407 setFlag(Native.EPOLLOUT);
408 }
409 }
410
411 private boolean doWriteMessage(Object msg) throws Exception {
412 final ByteBuf data;
413 final InetSocketAddress remoteAddress;
414 if (msg instanceof AddressedEnvelope) {
415 @SuppressWarnings("unchecked")
416 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
417 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
418 data = envelope.content();
419 remoteAddress = envelope.recipient();
420 } else {
421 data = (ByteBuf) msg;
422 remoteAddress = null;
423 }
424
425 final int dataLen = data.readableBytes();
426 if (dataLen == 0) {
427 return true;
428 }
429
430 return doWriteOrSendBytes(data, remoteAddress, false) > 0;
431 }
432
433 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
434 if (envelope.recipient() instanceof InetSocketAddress
435 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
436 throw new UnresolvedAddressException();
437 }
438 }
439
440 @Override
441 protected Object filterOutboundMessage(Object msg) {
442 if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
443 if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
444 throw new UnsupportedOperationException(
445 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
446 }
447 io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
448 checkUnresolved(packet);
449
450 ByteBuf content = packet.content();
451 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
452 packet.replace(newDirectBuffer(packet, content)) : msg;
453 }
454 if (msg instanceof DatagramPacket) {
455 DatagramPacket packet = (DatagramPacket) msg;
456 checkUnresolved(packet);
457
458 ByteBuf content = packet.content();
459 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
460 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
461 }
462
463 if (msg instanceof ByteBuf) {
464 ByteBuf buf = (ByteBuf) msg;
465 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
466 }
467
468 if (msg instanceof AddressedEnvelope) {
469 @SuppressWarnings("unchecked")
470 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
471 checkUnresolved(e);
472
473 if (e.content() instanceof ByteBuf &&
474 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
475
476 ByteBuf content = (ByteBuf) e.content();
477 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
478 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
479 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
480 }
481 }
482
483 throw new UnsupportedOperationException(
484 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
485 }
486
487 @Override
488 public EpollDatagramChannelConfig config() {
489 return config;
490 }
491
492 @Override
493 protected void doDisconnect() throws Exception {
494 socket.disconnect();
495 connected = active = false;
496 resetCachedAddresses();
497 }
498
499 @Override
500 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
501 if (super.doConnect(remoteAddress, localAddress)) {
502 connected = true;
503 return true;
504 }
505 return false;
506 }
507
508 @Override
509 protected void doClose() throws Exception {
510 super.doClose();
511 connected = false;
512 }
513
514 final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
515
516 @Override
517 void epollInReady() {
518 assert eventLoop().inEventLoop();
519 EpollDatagramChannelConfig config = config();
520 if (shouldBreakEpollInReady(config)) {
521 clearEpollIn0();
522 return;
523 }
524 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
525 final ChannelPipeline pipeline = pipeline();
526 final ByteBufAllocator allocator = config.getAllocator();
527 allocHandle.reset(config);
528
529 Throwable exception = null;
530 try {
531 try {
532 boolean connected = isConnected();
533 do {
534 final boolean read;
535 int datagramSize = config().getMaxDatagramPayloadSize();
536
537 ByteBuf byteBuf = allocHandle.allocate(allocator);
538
539 int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
540 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
541 0;
542 try {
543 if (numDatagram <= 1) {
544 if (!connected || config.isUdpGro()) {
545 read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
546 } else {
547 read = connectedRead(allocHandle, byteBuf, datagramSize);
548 }
549 } else {
550
551 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
552 byteBuf, datagramSize, numDatagram);
553 }
554 } catch (NativeIoException e) {
555 if (connected) {
556 throw translateForConnected(e);
557 }
558 throw e;
559 }
560
561 if (read) {
562 readPending = false;
563 } else {
564 break;
565 }
566
567
568 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
569 } catch (Throwable t) {
570 exception = t;
571 }
572
573 allocHandle.readComplete();
574 pipeline.fireChannelReadComplete();
575
576 if (exception != null) {
577 pipeline.fireExceptionCaught(exception);
578 }
579 } finally {
580 if (shouldStopReading(config)) {
581 clearEpollIn();
582 }
583 }
584 }
585 }
586
587 private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
588 throws Exception {
589 try {
590 int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
591 : byteBuf.writableBytes();
592 allocHandle.attemptedBytesRead(writable);
593
594 int writerIndex = byteBuf.writerIndex();
595 int localReadAmount;
596 if (byteBuf.hasMemoryAddress()) {
597 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
598 } else {
599 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
600 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
601 }
602
603 if (localReadAmount <= 0) {
604 allocHandle.lastBytesRead(localReadAmount);
605
606
607 return false;
608 }
609 byteBuf.writerIndex(writerIndex + localReadAmount);
610
611 allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
612 localReadAmount : writable);
613
614 DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
615 allocHandle.incMessagesRead(1);
616
617 pipeline().fireChannelRead(packet);
618 byteBuf = null;
619 return true;
620 } finally {
621 if (byteBuf != null) {
622 byteBuf.release();
623 }
624 }
625 }
626
627 private IOException translateForConnected(NativeIoException e) {
628
629 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
630 PortUnreachableException error = new PortUnreachableException(e.getMessage());
631 error.initCause(e);
632 return error;
633 }
634 return e;
635 }
636
637 private static void addDatagramPacketToOut(DatagramPacket packet,
638 RecyclableArrayList out) {
639 if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
640 io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
641 (io.netty.channel.unix.SegmentedDatagramPacket) packet;
642 ByteBuf content = segmentedDatagramPacket.content();
643 InetSocketAddress recipient = segmentedDatagramPacket.recipient();
644 InetSocketAddress sender = segmentedDatagramPacket.sender();
645 int segmentSize = segmentedDatagramPacket.segmentSize();
646 do {
647 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
648 segmentSize)), recipient, sender));
649 } while (content.isReadable());
650
651 segmentedDatagramPacket.release();
652 } else {
653 out.add(packet);
654 }
655 }
656
657 private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
658 if (byteBuf != null) {
659 byteBuf.release();
660 }
661 if (packetList != null) {
662 for (int i = 0; i < packetList.size(); i++) {
663 ReferenceCountUtil.release(packetList.get(i));
664 }
665 packetList.recycle();
666 }
667 }
668
669 private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
670 int bytesRead, DatagramPacket packet) {
671 handle.lastBytesRead(Math.max(1, bytesRead));
672 handle.incMessagesRead(1);
673 pipeline.fireChannelRead(packet);
674 }
675
676 private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
677 int bytesRead, RecyclableArrayList packetList) {
678 int messagesRead = packetList.size();
679 handle.lastBytesRead(Math.max(1, bytesRead));
680 handle.incMessagesRead(messagesRead);
681 for (int i = 0; i < messagesRead; i++) {
682 pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
683 }
684 }
685
686 private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
687 NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
688 RecyclableArrayList datagramPackets = null;
689 try {
690 int writable = byteBuf.writableBytes();
691
692 boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
693 assert added;
694
695 allocHandle.attemptedBytesRead(writable);
696
697 NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
698
699 int bytesReceived = socket.recvmsg(msg);
700 if (!msg.hasSender()) {
701 allocHandle.lastBytesRead(-1);
702 return false;
703 }
704 byteBuf.writerIndex(bytesReceived);
705 InetSocketAddress local = localAddress();
706 DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
707 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
708 processPacket(pipeline(), allocHandle, bytesReceived, packet);
709 } else {
710
711
712
713 datagramPackets = RecyclableArrayList.newInstance();
714 addDatagramPacketToOut(packet, datagramPackets);
715
716 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
717 datagramPackets.recycle();
718 datagramPackets = null;
719 }
720
721 return true;
722 } finally {
723 releaseAndRecycle(byteBuf, datagramPackets);
724 }
725 }
726
727 private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
728 ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
729 RecyclableArrayList datagramPackets = null;
730 try {
731 int offset = byteBuf.writerIndex();
732 for (int i = 0; i < numDatagram; i++, offset += datagramSize) {
733 if (!array.addWritable(byteBuf, offset, datagramSize)) {
734 break;
735 }
736 }
737
738 allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
739
740 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
741
742 int received = socket.recvmmsg(packets, 0, array.count());
743 if (received == 0) {
744 allocHandle.lastBytesRead(-1);
745 return false;
746 }
747
748 InetSocketAddress local = localAddress();
749
750
751 int bytesReceived = received * datagramSize;
752 byteBuf.writerIndex(byteBuf.writerIndex() + bytesReceived);
753
754 if (received == 1) {
755
756 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
757 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
758 processPacket(pipeline(), allocHandle, datagramSize, packet);
759 return true;
760 }
761 }
762
763
764
765 datagramPackets = RecyclableArrayList.newInstance();
766 for (int i = 0; i < received; i++) {
767 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf, local);
768
769
770
771 byteBuf.skipBytes(datagramSize);
772 addDatagramPacketToOut(packet, datagramPackets);
773 }
774
775 byteBuf.release();
776 byteBuf = null;
777
778 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
779 datagramPackets.recycle();
780 datagramPackets = null;
781 return true;
782 } finally {
783 releaseAndRecycle(byteBuf, datagramPackets);
784 }
785 }
786
787 private NativeDatagramPacketArray cleanDatagramPacketArray() {
788 return registration().ioHandler().cleanDatagramPacketArray();
789 }
790 }