1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.AddressedEnvelope;
22 import io.netty.channel.ChannelException;
23 import io.netty.channel.ChannelFuture;
24 import io.netty.channel.ChannelMetadata;
25 import io.netty.channel.ChannelOutboundBuffer;
26 import io.netty.channel.ChannelPipeline;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.DefaultAddressedEnvelope;
29 import io.netty.channel.socket.DatagramChannel;
30 import io.netty.channel.socket.DatagramPacket;
31 import io.netty.channel.socket.InternetProtocolFamily;
32 import io.netty.channel.socket.SocketProtocolFamily;
33 import io.netty.channel.unix.Errors;
34 import io.netty.channel.unix.Errors.NativeIoException;
35 import io.netty.channel.unix.UnixChannelUtil;
36 import io.netty.util.ReferenceCountUtil;
37 import io.netty.util.UncheckedBooleanSupplier;
38 import io.netty.util.internal.ObjectUtil;
39 import io.netty.util.internal.RecyclableArrayList;
40 import io.netty.util.internal.StringUtil;
41 import io.netty.util.internal.SystemPropertyUtil;
42 import io.netty.util.internal.logging.InternalLogger;
43 import io.netty.util.internal.logging.InternalLoggerFactory;
44
45 import java.io.IOException;
46 import java.net.Inet4Address;
47 import java.net.InetAddress;
48 import java.net.InetSocketAddress;
49 import java.net.NetworkInterface;
50 import java.net.PortUnreachableException;
51 import java.net.SocketAddress;
52 import java.nio.ByteBuffer;
53 import java.nio.channels.UnresolvedAddressException;
54
55 import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
56
57
58
59
60
61 public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
62 private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollDatagramChannel.class);
63 private static final boolean IP_MULTICAST_ALL =
64 SystemPropertyUtil.getBoolean("io.netty.channel.epoll.ipMulticastAll", false);
65 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
66 private static final String EXPECTED_TYPES =
67 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
68 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
69 StringUtil.simpleClassName(ByteBuf.class) + ", " +
70 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
71 StringUtil.simpleClassName(ByteBuf.class) + ')';
72
73 private final EpollDatagramChannelConfig config;
74 private volatile boolean connected;
75
76 static {
77 if (logger.isDebugEnabled()) {
78 logger.debug("-Dio.netty.channel.epoll.ipMulticastAll: {}", IP_MULTICAST_ALL);
79 }
80 }
81
82
83
84
85
86
87 public static boolean isSegmentedDatagramPacketSupported() {
88 return Epoll.isAvailable() &&
89
90 Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
91 }
92
93
94
95
96
97 public EpollDatagramChannel() {
98 this((SocketProtocolFamily) null);
99 }
100
101
102
103
104
105
106
107 @Deprecated
108 public EpollDatagramChannel(InternetProtocolFamily family) {
109 this(newSocketDgram(family), false);
110 }
111
112
113
114
115
116 public EpollDatagramChannel(SocketProtocolFamily family) {
117 this(newSocketDgram(family), false);
118 }
119
120
121
122
123
124 public EpollDatagramChannel(int fd) {
125 this(new LinuxSocket(fd), true);
126 }
127
128 private EpollDatagramChannel(LinuxSocket fd, boolean active) {
129 super(null, fd, active, EpollIoOps.valueOf(0));
130
131
132 try {
133 fd.setIpMulticastAll(IP_MULTICAST_ALL);
134 } catch (IOException | ChannelException e) {
135 logger.debug("Failed to set IP_MULTICAST_ALL to {}", IP_MULTICAST_ALL, e);
136 }
137
138 config = new EpollDatagramChannelConfig(this);
139 }
140
141 @Override
142 public InetSocketAddress remoteAddress() {
143 return (InetSocketAddress) super.remoteAddress();
144 }
145
146 @Override
147 public InetSocketAddress localAddress() {
148 return (InetSocketAddress) super.localAddress();
149 }
150
151 @Override
152 public ChannelMetadata metadata() {
153 return METADATA;
154 }
155
156 @Override
157 public boolean isActive() {
158 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
159 }
160
161 @Override
162 public boolean isConnected() {
163 return connected;
164 }
165
166 @Override
167 public ChannelFuture joinGroup(InetAddress multicastAddress) {
168 return joinGroup(multicastAddress, newPromise());
169 }
170
171 @Override
172 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
173 try {
174 NetworkInterface iface = config().getNetworkInterface();
175 if (iface == null) {
176 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
177 }
178 return joinGroup(multicastAddress, iface, null, promise);
179 } catch (IOException e) {
180 promise.setFailure(e);
181 }
182 return promise;
183 }
184
185 @Override
186 public ChannelFuture joinGroup(
187 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
188 return joinGroup(multicastAddress, networkInterface, newPromise());
189 }
190
191 @Override
192 public ChannelFuture joinGroup(
193 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
194 ChannelPromise promise) {
195 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
196 }
197
198 @Override
199 public ChannelFuture joinGroup(
200 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
201 return joinGroup(multicastAddress, networkInterface, source, newPromise());
202 }
203
204 @Override
205 public ChannelFuture joinGroup(
206 final InetAddress multicastAddress, final NetworkInterface networkInterface,
207 final InetAddress source, final ChannelPromise promise) {
208
209 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
210 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
211
212 if (eventLoop().inEventLoop()) {
213 joinGroup0(multicastAddress, networkInterface, source, promise);
214 } else {
215 eventLoop().execute(new Runnable() {
216 @Override
217 public void run() {
218 joinGroup0(multicastAddress, networkInterface, source, promise);
219 }
220 });
221 }
222 return promise;
223 }
224
225 private void joinGroup0(
226 final InetAddress multicastAddress, final NetworkInterface networkInterface,
227 final InetAddress source, final ChannelPromise promise) {
228 assert eventLoop().inEventLoop();
229
230 try {
231 socket.joinGroup(multicastAddress, networkInterface, source);
232 promise.setSuccess();
233 } catch (IOException e) {
234 promise.setFailure(e);
235 }
236 }
237
238 @Override
239 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
240 return leaveGroup(multicastAddress, newPromise());
241 }
242
243 @Override
244 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
245 try {
246 return leaveGroup(
247 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
248 } catch (IOException e) {
249 promise.setFailure(e);
250 }
251 return promise;
252 }
253
254 @Override
255 public ChannelFuture leaveGroup(
256 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
257 return leaveGroup(multicastAddress, networkInterface, newPromise());
258 }
259
260 @Override
261 public ChannelFuture leaveGroup(
262 InetSocketAddress multicastAddress,
263 NetworkInterface networkInterface, ChannelPromise promise) {
264 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
265 }
266
267 @Override
268 public ChannelFuture leaveGroup(
269 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
270 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
271 }
272
273 @Override
274 public ChannelFuture leaveGroup(
275 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
276 final ChannelPromise promise) {
277 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
278 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
279
280 if (eventLoop().inEventLoop()) {
281 leaveGroup0(multicastAddress, networkInterface, source, promise);
282 } else {
283 eventLoop().execute(new Runnable() {
284 @Override
285 public void run() {
286 leaveGroup0(multicastAddress, networkInterface, source, promise);
287 }
288 });
289 }
290 return promise;
291 }
292
293 private void leaveGroup0(
294 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
295 final ChannelPromise promise) {
296 assert eventLoop().inEventLoop();
297
298 try {
299 socket.leaveGroup(multicastAddress, networkInterface, source);
300 promise.setSuccess();
301 } catch (IOException e) {
302 promise.setFailure(e);
303 }
304 }
305
306 @Override
307 public ChannelFuture block(
308 InetAddress multicastAddress, NetworkInterface networkInterface,
309 InetAddress sourceToBlock) {
310 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
311 }
312
313 @Override
314 public ChannelFuture block(
315 final InetAddress multicastAddress, final NetworkInterface networkInterface,
316 final InetAddress sourceToBlock, final ChannelPromise promise) {
317 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
318 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
319 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
320
321 promise.setFailure(new UnsupportedOperationException("Multicast block not supported"));
322 return promise;
323 }
324
325 @Override
326 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
327 return block(multicastAddress, sourceToBlock, newPromise());
328 }
329
330 @Override
331 public ChannelFuture block(
332 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
333 try {
334 return block(
335 multicastAddress,
336 NetworkInterface.getByInetAddress(localAddress().getAddress()),
337 sourceToBlock, promise);
338 } catch (Throwable e) {
339 promise.setFailure(e);
340 }
341 return promise;
342 }
343
344 @Override
345 protected AbstractEpollUnsafe newUnsafe() {
346 return new EpollDatagramChannelUnsafe();
347 }
348
349 @Override
350 protected void doBind(SocketAddress localAddress) throws Exception {
351 if (localAddress instanceof InetSocketAddress) {
352 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
353 if (socketAddress.getAddress().isAnyLocalAddress() &&
354 socketAddress.getAddress() instanceof Inet4Address) {
355 if (socket.family() == SocketProtocolFamily.INET6) {
356 localAddress = new InetSocketAddress(Native.INET6_ANY, socketAddress.getPort());
357 }
358 }
359 }
360 super.doBind(localAddress);
361 active = true;
362 }
363
364 @Override
365 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
366 int maxMessagesPerWrite = maxMessagesPerWrite();
367 while (maxMessagesPerWrite > 0) {
368 Object msg = in.current();
369 if (msg == null) {
370
371 break;
372 }
373
374 try {
375
376 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
377
378 in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
379 NativeDatagramPacketArray array = cleanDatagramPacketArray();
380 array.add(in, isConnected(), maxMessagesPerWrite);
381 int cnt = array.count();
382
383 if (cnt >= 1) {
384
385 int offset = 0;
386 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
387
388 int send = socket.sendmmsg(packets, offset, cnt);
389 if (send == 0) {
390
391 break;
392 }
393 for (int i = 0; i < send; i++) {
394 in.remove();
395 }
396 maxMessagesPerWrite -= send;
397 continue;
398 }
399 }
400 boolean done = false;
401 for (int i = config().getWriteSpinCount(); i > 0; --i) {
402 if (doWriteMessage(msg)) {
403 done = true;
404 break;
405 }
406 }
407
408 if (done) {
409 in.remove();
410 maxMessagesPerWrite --;
411 } else {
412 break;
413 }
414 } catch (IOException e) {
415 maxMessagesPerWrite --;
416
417
418
419 in.remove(e);
420 }
421 }
422
423 if (in.isEmpty()) {
424
425 clearFlag(Native.EPOLLOUT);
426 } else {
427
428 setFlag(Native.EPOLLOUT);
429 }
430 }
431
432 private boolean doWriteMessage(Object msg) throws Exception {
433 final ByteBuf data;
434 final InetSocketAddress remoteAddress;
435 if (msg instanceof AddressedEnvelope) {
436 @SuppressWarnings("unchecked")
437 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
438 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
439 data = envelope.content();
440 remoteAddress = envelope.recipient();
441 } else {
442 data = (ByteBuf) msg;
443 remoteAddress = null;
444 }
445
446 final int dataLen = data.readableBytes();
447 if (dataLen == 0) {
448 return true;
449 }
450
451 try {
452 return doWriteOrSendBytes(data, remoteAddress, false) > 0;
453 } catch (NativeIoException e) {
454 if (remoteAddress == null) {
455 throw translateForConnected(e);
456 }
457 throw e;
458 }
459 }
460
461 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
462 if (envelope.recipient() instanceof InetSocketAddress
463 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
464 throw new UnresolvedAddressException();
465 }
466 }
467
468 @Override
469 protected Object filterOutboundMessage(Object msg) {
470 if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
471 if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
472 throw new UnsupportedOperationException(
473 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
474 }
475 io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
476 checkUnresolved(packet);
477
478 ByteBuf content = packet.content();
479 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
480 packet.replace(newDirectBuffer(packet, content)) : msg;
481 }
482 if (msg instanceof DatagramPacket) {
483 DatagramPacket packet = (DatagramPacket) msg;
484 checkUnresolved(packet);
485
486 ByteBuf content = packet.content();
487 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
488 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
489 }
490
491 if (msg instanceof ByteBuf) {
492 ByteBuf buf = (ByteBuf) msg;
493 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
494 }
495
496 if (msg instanceof AddressedEnvelope) {
497 @SuppressWarnings("unchecked")
498 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
499 checkUnresolved(e);
500
501 if (e.content() instanceof ByteBuf &&
502 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
503
504 ByteBuf content = (ByteBuf) e.content();
505 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
506 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
507 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
508 }
509 }
510
511 throw new UnsupportedOperationException(
512 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
513 }
514
515 @Override
516 public EpollDatagramChannelConfig config() {
517 return config;
518 }
519
520 @Override
521 protected void doDisconnect() throws Exception {
522 socket.disconnect();
523 connected = active = false;
524 resetCachedAddresses();
525 }
526
527 @Override
528 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
529 if (super.doConnect(remoteAddress, localAddress)) {
530 connected = true;
531 return true;
532 }
533 return false;
534 }
535
536 @Override
537 protected void doClose() throws Exception {
538 super.doClose();
539 connected = false;
540 }
541
542 final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
543
544 @Override
545 void epollInReady() {
546 assert eventLoop().inEventLoop();
547 EpollDatagramChannelConfig config = config();
548 if (shouldBreakEpollInReady(config)) {
549 clearEpollIn0();
550 return;
551 }
552 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
553 final ChannelPipeline pipeline = pipeline();
554 final ByteBufAllocator allocator = config.getAllocator();
555 allocHandle.reset(config);
556
557 Throwable exception = null;
558 try {
559 try {
560 boolean connected = isConnected();
561 do {
562 final boolean read;
563 int datagramSize = config().getMaxDatagramPayloadSize();
564
565 ByteBuf byteBuf = allocHandle.allocate(allocator);
566
567 int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
568 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
569 0;
570 try {
571 if (numDatagram <= 1) {
572 if (!connected || config.isUdpGro()) {
573 read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
574 } else {
575 read = connectedRead(allocHandle, byteBuf, datagramSize);
576 }
577 } else {
578
579 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
580 byteBuf, datagramSize, numDatagram);
581 }
582 } catch (NativeIoException e) {
583 if (connected) {
584 throw translateForConnected(e);
585 }
586 throw e;
587 }
588
589 if (read) {
590 readPending = false;
591 } else {
592 break;
593 }
594
595
596 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
597 } catch (Throwable t) {
598 exception = t;
599 }
600
601 allocHandle.readComplete();
602 pipeline.fireChannelReadComplete();
603
604 if (exception != null) {
605 pipeline.fireExceptionCaught(exception);
606 }
607 } finally {
608 if (shouldStopReading(config)) {
609 clearEpollIn();
610 }
611 }
612 }
613 }
614
615 private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
616 throws Exception {
617 try {
618 int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
619 : byteBuf.writableBytes();
620 allocHandle.attemptedBytesRead(writable);
621
622 int writerIndex = byteBuf.writerIndex();
623 int localReadAmount;
624 if (byteBuf.hasMemoryAddress()) {
625 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
626 } else {
627 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
628 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
629 }
630
631 if (localReadAmount <= 0) {
632 allocHandle.lastBytesRead(localReadAmount);
633
634
635 return false;
636 }
637 byteBuf.writerIndex(writerIndex + localReadAmount);
638
639 allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
640 localReadAmount : writable);
641
642 DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
643 allocHandle.incMessagesRead(1);
644
645 pipeline().fireChannelRead(packet);
646 byteBuf = null;
647 return true;
648 } finally {
649 if (byteBuf != null) {
650 byteBuf.release();
651 }
652 }
653 }
654
655 private IOException translateForConnected(NativeIoException e) {
656
657 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
658 PortUnreachableException error = new PortUnreachableException(e.getMessage());
659 error.initCause(e);
660 return error;
661 }
662 return e;
663 }
664
665 private static void addDatagramPacketToOut(DatagramPacket packet,
666 RecyclableArrayList out) {
667 if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
668 io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
669 (io.netty.channel.unix.SegmentedDatagramPacket) packet;
670 ByteBuf content = segmentedDatagramPacket.content();
671 InetSocketAddress recipient = segmentedDatagramPacket.recipient();
672 InetSocketAddress sender = segmentedDatagramPacket.sender();
673 int segmentSize = segmentedDatagramPacket.segmentSize();
674 do {
675 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
676 segmentSize)), recipient, sender));
677 } while (content.isReadable());
678
679 segmentedDatagramPacket.release();
680 } else {
681 out.add(packet);
682 }
683 }
684
685 private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
686 if (byteBuf != null) {
687 byteBuf.release();
688 }
689 if (packetList != null) {
690 for (int i = 0; i < packetList.size(); i++) {
691 ReferenceCountUtil.release(packetList.get(i));
692 }
693 packetList.recycle();
694 }
695 }
696
697 private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
698 int bytesRead, DatagramPacket packet) {
699 handle.lastBytesRead(Math.max(1, bytesRead));
700 handle.incMessagesRead(1);
701 pipeline.fireChannelRead(packet);
702 }
703
704 private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
705 int bytesRead, RecyclableArrayList packetList) {
706 int messagesRead = packetList.size();
707 handle.lastBytesRead(Math.max(1, bytesRead));
708 handle.incMessagesRead(messagesRead);
709 for (int i = 0; i < messagesRead; i++) {
710 pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
711 }
712 }
713
714 private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
715 NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
716 RecyclableArrayList datagramPackets = null;
717 try {
718 int writable = byteBuf.writableBytes();
719
720 boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
721 assert added;
722
723 allocHandle.attemptedBytesRead(writable);
724
725 NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
726
727 int bytesReceived = socket.recvmsg(msg);
728 if (!msg.hasSender()) {
729 allocHandle.lastBytesRead(-1);
730 return false;
731 }
732 byteBuf.writerIndex(bytesReceived);
733 InetSocketAddress local = localAddress();
734 DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
735 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
736 processPacket(pipeline(), allocHandle, bytesReceived, packet);
737 } else {
738
739
740
741 datagramPackets = RecyclableArrayList.newInstance();
742 addDatagramPacketToOut(packet, datagramPackets);
743
744 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
745 datagramPackets.recycle();
746 datagramPackets = null;
747 }
748
749 return true;
750 } finally {
751 releaseAndRecycle(byteBuf, datagramPackets);
752 }
753 }
754
755 private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
756 ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
757 RecyclableArrayList datagramPackets = null;
758 try {
759 int offset = byteBuf.writerIndex();
760 for (int i = 0; i < numDatagram; i++, offset += datagramSize) {
761 if (!array.addWritable(byteBuf, offset, datagramSize)) {
762 break;
763 }
764 }
765
766 allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
767
768 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
769
770 int received = socket.recvmmsg(packets, 0, array.count());
771 if (received == 0) {
772 allocHandle.lastBytesRead(-1);
773 return false;
774 }
775
776 InetSocketAddress local = localAddress();
777
778
779 int bytesReceived = received * datagramSize;
780 byteBuf.writerIndex(byteBuf.writerIndex() + bytesReceived);
781
782 if (received == 1) {
783
784 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
785 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
786 processPacket(pipeline(), allocHandle, datagramSize, packet);
787 return true;
788 }
789 }
790
791
792
793 datagramPackets = RecyclableArrayList.newInstance();
794 for (int i = 0; i < received; i++) {
795 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf, local);
796
797
798
799 byteBuf.skipBytes(datagramSize);
800 addDatagramPacketToOut(packet, datagramPackets);
801 }
802
803 byteBuf.release();
804 byteBuf = null;
805
806 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
807 datagramPackets.recycle();
808 datagramPackets = null;
809 return true;
810 } finally {
811 releaseAndRecycle(byteBuf, datagramPackets);
812 }
813 }
814
815 private NativeDatagramPacketArray cleanDatagramPacketArray() {
816 return ((NativeArrays) registration().attachment()).cleanDatagramPacketArray();
817 }
818 }