1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.AddressedEnvelope;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.DefaultAddressedEnvelope;
28 import io.netty.channel.socket.DatagramChannel;
29 import io.netty.channel.socket.DatagramPacket;
30 import io.netty.channel.socket.InternetProtocolFamily;
31 import io.netty.channel.unix.Errors;
32 import io.netty.channel.unix.Errors.NativeIoException;
33 import io.netty.channel.unix.UnixChannelUtil;
34 import io.netty.util.ReferenceCountUtil;
35 import io.netty.util.UncheckedBooleanSupplier;
36 import io.netty.util.internal.ObjectUtil;
37 import io.netty.util.internal.RecyclableArrayList;
38 import io.netty.util.internal.StringUtil;
39
40 import java.io.IOException;
41 import java.net.Inet4Address;
42 import java.net.InetAddress;
43 import java.net.InetSocketAddress;
44 import java.net.NetworkInterface;
45 import java.net.PortUnreachableException;
46 import java.net.SocketAddress;
47 import java.nio.ByteBuffer;
48 import java.nio.channels.UnresolvedAddressException;
49
50 import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
51
52
53
54
55
56 public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
57 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
58 private static final String EXPECTED_TYPES =
59 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
60 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
61 StringUtil.simpleClassName(ByteBuf.class) + ", " +
62 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
63 StringUtil.simpleClassName(ByteBuf.class) + ')';
64
65 private final EpollDatagramChannelConfig config;
66 private volatile boolean connected;
67
68
69
70
71
72
73 public static boolean isSegmentedDatagramPacketSupported() {
74 return Epoll.isAvailable() &&
75
76 Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
77 }
78
79
80
81
82
83 public EpollDatagramChannel() {
84 this(null);
85 }
86
87
88
89
90
91 public EpollDatagramChannel(InternetProtocolFamily family) {
92 this(newSocketDgram(family), false);
93 }
94
95
96
97
98
99 public EpollDatagramChannel(int fd) {
100 this(new LinuxSocket(fd), true);
101 }
102
103 private EpollDatagramChannel(LinuxSocket fd, boolean active) {
104 super(null, fd, active);
105 config = new EpollDatagramChannelConfig(this);
106 }
107
108 @Override
109 public InetSocketAddress remoteAddress() {
110 return (InetSocketAddress) super.remoteAddress();
111 }
112
113 @Override
114 public InetSocketAddress localAddress() {
115 return (InetSocketAddress) super.localAddress();
116 }
117
118 @Override
119 public ChannelMetadata metadata() {
120 return METADATA;
121 }
122
123 @Override
124 public boolean isActive() {
125 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
126 }
127
128 @Override
129 public boolean isConnected() {
130 return connected;
131 }
132
133 @Override
134 public ChannelFuture joinGroup(InetAddress multicastAddress) {
135 return joinGroup(multicastAddress, newPromise());
136 }
137
138 @Override
139 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
140 try {
141 NetworkInterface iface = config().getNetworkInterface();
142 if (iface == null) {
143 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
144 }
145 return joinGroup(multicastAddress, iface, null, promise);
146 } catch (IOException e) {
147 promise.setFailure(e);
148 }
149 return promise;
150 }
151
152 @Override
153 public ChannelFuture joinGroup(
154 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
155 return joinGroup(multicastAddress, networkInterface, newPromise());
156 }
157
158 @Override
159 public ChannelFuture joinGroup(
160 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
161 ChannelPromise promise) {
162 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
163 }
164
165 @Override
166 public ChannelFuture joinGroup(
167 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
168 return joinGroup(multicastAddress, networkInterface, source, newPromise());
169 }
170
171 @Override
172 public ChannelFuture joinGroup(
173 final InetAddress multicastAddress, final NetworkInterface networkInterface,
174 final InetAddress source, final ChannelPromise promise) {
175
176 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
177 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
178
179 if (eventLoop().inEventLoop()) {
180 joinGroup0(multicastAddress, networkInterface, source, promise);
181 } else {
182 eventLoop().execute(new Runnable() {
183 @Override
184 public void run() {
185 joinGroup0(multicastAddress, networkInterface, source, promise);
186 }
187 });
188 }
189 return promise;
190 }
191
192 private void joinGroup0(
193 final InetAddress multicastAddress, final NetworkInterface networkInterface,
194 final InetAddress source, final ChannelPromise promise) {
195 assert eventLoop().inEventLoop();
196
197 try {
198 socket.joinGroup(multicastAddress, networkInterface, source);
199 promise.setSuccess();
200 } catch (IOException e) {
201 promise.setFailure(e);
202 }
203 }
204
205 @Override
206 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
207 return leaveGroup(multicastAddress, newPromise());
208 }
209
210 @Override
211 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
212 try {
213 return leaveGroup(
214 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
215 } catch (IOException e) {
216 promise.setFailure(e);
217 }
218 return promise;
219 }
220
221 @Override
222 public ChannelFuture leaveGroup(
223 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
224 return leaveGroup(multicastAddress, networkInterface, newPromise());
225 }
226
227 @Override
228 public ChannelFuture leaveGroup(
229 InetSocketAddress multicastAddress,
230 NetworkInterface networkInterface, ChannelPromise promise) {
231 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
232 }
233
234 @Override
235 public ChannelFuture leaveGroup(
236 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
237 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
238 }
239
240 @Override
241 public ChannelFuture leaveGroup(
242 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
243 final ChannelPromise promise) {
244 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
245 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
246
247 if (eventLoop().inEventLoop()) {
248 leaveGroup0(multicastAddress, networkInterface, source, promise);
249 } else {
250 eventLoop().execute(new Runnable() {
251 @Override
252 public void run() {
253 leaveGroup0(multicastAddress, networkInterface, source, promise);
254 }
255 });
256 }
257 return promise;
258 }
259
260 private void leaveGroup0(
261 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
262 final ChannelPromise promise) {
263 assert eventLoop().inEventLoop();
264
265 try {
266 socket.leaveGroup(multicastAddress, networkInterface, source);
267 promise.setSuccess();
268 } catch (IOException e) {
269 promise.setFailure(e);
270 }
271 }
272
273 @Override
274 public ChannelFuture block(
275 InetAddress multicastAddress, NetworkInterface networkInterface,
276 InetAddress sourceToBlock) {
277 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
278 }
279
280 @Override
281 public ChannelFuture block(
282 final InetAddress multicastAddress, final NetworkInterface networkInterface,
283 final InetAddress sourceToBlock, final ChannelPromise promise) {
284 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
285 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
286 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
287
288 promise.setFailure(new UnsupportedOperationException("Multicast block not supported"));
289 return promise;
290 }
291
292 @Override
293 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
294 return block(multicastAddress, sourceToBlock, newPromise());
295 }
296
297 @Override
298 public ChannelFuture block(
299 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
300 try {
301 return block(
302 multicastAddress,
303 NetworkInterface.getByInetAddress(localAddress().getAddress()),
304 sourceToBlock, promise);
305 } catch (Throwable e) {
306 promise.setFailure(e);
307 }
308 return promise;
309 }
310
311 @Override
312 protected AbstractEpollUnsafe newUnsafe() {
313 return new EpollDatagramChannelUnsafe();
314 }
315
316 @Override
317 protected void doBind(SocketAddress localAddress) throws Exception {
318 if (localAddress instanceof InetSocketAddress) {
319 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
320 if (socketAddress.getAddress().isAnyLocalAddress() &&
321 socketAddress.getAddress() instanceof Inet4Address) {
322 if (socket.family() == InternetProtocolFamily.IPv6) {
323 localAddress = new InetSocketAddress(Native.INET6_ANY, socketAddress.getPort());
324 }
325 }
326 }
327 super.doBind(localAddress);
328 active = true;
329 }
330
331 @Override
332 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
333 int maxMessagesPerWrite = maxMessagesPerWrite();
334 while (maxMessagesPerWrite > 0) {
335 Object msg = in.current();
336 if (msg == null) {
337
338 break;
339 }
340
341 try {
342
343 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
344
345 in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
346 NativeDatagramPacketArray array = cleanDatagramPacketArray();
347 array.add(in, isConnected(), maxMessagesPerWrite);
348 int cnt = array.count();
349
350 if (cnt >= 1) {
351
352 int offset = 0;
353 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
354
355 int send = socket.sendmmsg(packets, offset, cnt);
356 if (send == 0) {
357
358 break;
359 }
360 for (int i = 0; i < send; i++) {
361 in.remove();
362 }
363 maxMessagesPerWrite -= send;
364 continue;
365 }
366 }
367 boolean done = false;
368 for (int i = config().getWriteSpinCount(); i > 0; --i) {
369 if (doWriteMessage(msg)) {
370 done = true;
371 break;
372 }
373 }
374
375 if (done) {
376 in.remove();
377 maxMessagesPerWrite --;
378 } else {
379 break;
380 }
381 } catch (IOException e) {
382 maxMessagesPerWrite --;
383
384
385
386 in.remove(e);
387 }
388 }
389
390 if (in.isEmpty()) {
391
392 clearFlag(Native.EPOLLOUT);
393 } else {
394
395 setFlag(Native.EPOLLOUT);
396 }
397 }
398
399 private boolean doWriteMessage(Object msg) throws Exception {
400 final ByteBuf data;
401 final InetSocketAddress remoteAddress;
402 if (msg instanceof AddressedEnvelope) {
403 @SuppressWarnings("unchecked")
404 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
405 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
406 data = envelope.content();
407 remoteAddress = envelope.recipient();
408 } else {
409 data = (ByteBuf) msg;
410 remoteAddress = null;
411 }
412
413 final int dataLen = data.readableBytes();
414 if (dataLen == 0) {
415 return true;
416 }
417
418 try {
419 return doWriteOrSendBytes(data, remoteAddress, false) > 0;
420 } catch (NativeIoException e) {
421 if (remoteAddress == null) {
422 throw translateForConnected(e);
423 }
424 throw e;
425 }
426 }
427
428 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
429 if (envelope.recipient() instanceof InetSocketAddress
430 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
431 throw new UnresolvedAddressException();
432 }
433 }
434
435 @Override
436 protected Object filterOutboundMessage(Object msg) {
437 if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
438 if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
439 throw new UnsupportedOperationException(
440 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
441 }
442 io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
443 checkUnresolved(packet);
444
445 ByteBuf content = packet.content();
446 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
447 packet.replace(newDirectBuffer(packet, content)) : msg;
448 }
449 if (msg instanceof DatagramPacket) {
450 DatagramPacket packet = (DatagramPacket) msg;
451 checkUnresolved(packet);
452
453 ByteBuf content = packet.content();
454 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
455 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
456 }
457
458 if (msg instanceof ByteBuf) {
459 ByteBuf buf = (ByteBuf) msg;
460 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
461 }
462
463 if (msg instanceof AddressedEnvelope) {
464 @SuppressWarnings("unchecked")
465 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
466 checkUnresolved(e);
467
468 if (e.content() instanceof ByteBuf &&
469 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
470
471 ByteBuf content = (ByteBuf) e.content();
472 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
473 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
474 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
475 }
476 }
477
478 throw new UnsupportedOperationException(
479 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
480 }
481
482 @Override
483 public EpollDatagramChannelConfig config() {
484 return config;
485 }
486
487 @Override
488 protected void doDisconnect() throws Exception {
489 socket.disconnect();
490 connected = active = false;
491 resetCachedAddresses();
492 }
493
494 @Override
495 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
496 if (super.doConnect(remoteAddress, localAddress)) {
497 connected = true;
498 return true;
499 }
500 return false;
501 }
502
503 @Override
504 protected void doClose() throws Exception {
505 super.doClose();
506 connected = false;
507 }
508
509 final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
510
511 @Override
512 void epollInReady() {
513 assert eventLoop().inEventLoop();
514 EpollDatagramChannelConfig config = config();
515 if (shouldBreakEpollInReady(config)) {
516 clearEpollIn0();
517 return;
518 }
519 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
520 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
521
522 final ChannelPipeline pipeline = pipeline();
523 final ByteBufAllocator allocator = config.getAllocator();
524 allocHandle.reset(config);
525 epollInBefore();
526
527 Throwable exception = null;
528 try {
529 try {
530 boolean connected = isConnected();
531 do {
532 final boolean read;
533 int datagramSize = config().getMaxDatagramPayloadSize();
534
535 ByteBuf byteBuf = allocHandle.allocate(allocator);
536
537 int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
538 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
539 0;
540 try {
541 if (numDatagram <= 1) {
542 if (!connected || config.isUdpGro()) {
543 read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
544 } else {
545 read = connectedRead(allocHandle, byteBuf, datagramSize);
546 }
547 } else {
548
549 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
550 byteBuf, datagramSize, numDatagram);
551 }
552 } catch (NativeIoException e) {
553 if (connected) {
554 throw translateForConnected(e);
555 }
556 throw e;
557 }
558
559 if (read) {
560 readPending = false;
561 } else {
562 break;
563 }
564
565
566 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
567 } catch (Throwable t) {
568 exception = t;
569 }
570
571 allocHandle.readComplete();
572 pipeline.fireChannelReadComplete();
573
574 if (exception != null) {
575 pipeline.fireExceptionCaught(exception);
576 }
577 } finally {
578 epollInFinally(config);
579 }
580 }
581 }
582
583 private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
584 throws Exception {
585 try {
586 int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
587 : byteBuf.writableBytes();
588 allocHandle.attemptedBytesRead(writable);
589
590 int writerIndex = byteBuf.writerIndex();
591 int localReadAmount;
592 if (byteBuf.hasMemoryAddress()) {
593 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
594 } else {
595 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
596 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
597 }
598
599 if (localReadAmount <= 0) {
600 allocHandle.lastBytesRead(localReadAmount);
601
602
603 return false;
604 }
605 byteBuf.writerIndex(writerIndex + localReadAmount);
606
607 allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
608 localReadAmount : writable);
609
610 DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
611 allocHandle.incMessagesRead(1);
612
613 pipeline().fireChannelRead(packet);
614 byteBuf = null;
615 return true;
616 } finally {
617 if (byteBuf != null) {
618 byteBuf.release();
619 }
620 }
621 }
622
623 private IOException translateForConnected(NativeIoException e) {
624
625 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
626 PortUnreachableException error = new PortUnreachableException(e.getMessage());
627 error.initCause(e);
628 return error;
629 }
630 return e;
631 }
632
633 private static void addDatagramPacketToOut(DatagramPacket packet,
634 RecyclableArrayList out) {
635 if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
636 io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
637 (io.netty.channel.unix.SegmentedDatagramPacket) packet;
638 ByteBuf content = segmentedDatagramPacket.content();
639 InetSocketAddress recipient = segmentedDatagramPacket.recipient();
640 InetSocketAddress sender = segmentedDatagramPacket.sender();
641 int segmentSize = segmentedDatagramPacket.segmentSize();
642 do {
643 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
644 segmentSize)), recipient, sender));
645 } while (content.isReadable());
646
647 segmentedDatagramPacket.release();
648 } else {
649 out.add(packet);
650 }
651 }
652
653 private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
654 if (byteBuf != null) {
655 byteBuf.release();
656 }
657 if (packetList != null) {
658 for (int i = 0; i < packetList.size(); i++) {
659 ReferenceCountUtil.release(packetList.get(i));
660 }
661 packetList.recycle();
662 }
663 }
664
665 private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
666 int bytesRead, DatagramPacket packet) {
667 handle.lastBytesRead(Math.max(1, bytesRead));
668 handle.incMessagesRead(1);
669 pipeline.fireChannelRead(packet);
670 }
671
672 private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
673 int bytesRead, RecyclableArrayList packetList) {
674 int messagesRead = packetList.size();
675 handle.lastBytesRead(Math.max(1, bytesRead));
676 handle.incMessagesRead(messagesRead);
677 for (int i = 0; i < messagesRead; i++) {
678 pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
679 }
680 }
681
682 private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
683 NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
684 RecyclableArrayList datagramPackets = null;
685 try {
686 int writable = byteBuf.writableBytes();
687
688 boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
689 assert added;
690
691 allocHandle.attemptedBytesRead(writable);
692
693 NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
694
695 int bytesReceived = socket.recvmsg(msg);
696 if (!msg.hasSender()) {
697 allocHandle.lastBytesRead(-1);
698 return false;
699 }
700 byteBuf.writerIndex(bytesReceived);
701 InetSocketAddress local = localAddress();
702 DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
703 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
704 processPacket(pipeline(), allocHandle, bytesReceived, packet);
705 } else {
706
707
708
709 datagramPackets = RecyclableArrayList.newInstance();
710 addDatagramPacketToOut(packet, datagramPackets);
711
712 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
713 datagramPackets.recycle();
714 datagramPackets = null;
715 }
716
717 return true;
718 } finally {
719 releaseAndRecycle(byteBuf, datagramPackets);
720 }
721 }
722
723 private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
724 ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
725 RecyclableArrayList datagramPackets = null;
726 try {
727 int offset = byteBuf.writerIndex();
728 for (int i = 0; i < numDatagram; i++, offset += datagramSize) {
729 if (!array.addWritable(byteBuf, offset, datagramSize)) {
730 break;
731 }
732 }
733
734 allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
735
736 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
737
738 int received = socket.recvmmsg(packets, 0, array.count());
739 if (received == 0) {
740 allocHandle.lastBytesRead(-1);
741 return false;
742 }
743
744 InetSocketAddress local = localAddress();
745
746
747 int bytesReceived = received * datagramSize;
748 byteBuf.writerIndex(byteBuf.writerIndex() + bytesReceived);
749
750 if (received == 1) {
751
752 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
753 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
754 processPacket(pipeline(), allocHandle, datagramSize, packet);
755 return true;
756 }
757 }
758
759
760
761 datagramPackets = RecyclableArrayList.newInstance();
762 for (int i = 0; i < received; i++) {
763 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf, local);
764
765
766
767 byteBuf.skipBytes(datagramSize);
768 addDatagramPacketToOut(packet, datagramPackets);
769 }
770
771 byteBuf.release();
772 byteBuf = null;
773
774 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
775 datagramPackets.recycle();
776 datagramPackets = null;
777 return true;
778 } finally {
779 releaseAndRecycle(byteBuf, datagramPackets);
780 }
781 }
782
783 private NativeDatagramPacketArray cleanDatagramPacketArray() {
784 return ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray();
785 }
786 }