1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.AddressedEnvelope;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.DefaultAddressedEnvelope;
28 import io.netty.channel.socket.DatagramChannel;
29 import io.netty.channel.socket.DatagramPacket;
30 import io.netty.channel.socket.InternetProtocolFamily;
31 import io.netty.channel.unix.Errors;
32 import io.netty.channel.unix.Errors.NativeIoException;
33 import io.netty.channel.unix.UnixChannelUtil;
34 import io.netty.util.ReferenceCountUtil;
35 import io.netty.util.UncheckedBooleanSupplier;
36 import io.netty.util.internal.ObjectUtil;
37 import io.netty.util.internal.RecyclableArrayList;
38 import io.netty.util.internal.StringUtil;
39
40 import java.io.IOException;
41 import java.net.Inet4Address;
42 import java.net.InetAddress;
43 import java.net.InetSocketAddress;
44 import java.net.NetworkInterface;
45 import java.net.PortUnreachableException;
46 import java.net.SocketAddress;
47 import java.nio.ByteBuffer;
48 import java.nio.channels.UnresolvedAddressException;
49
50 import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
51
52
53
54
55
56 public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
57 private static final ChannelMetadata METADATA = new ChannelMetadata(true, 16);
58 private static final String EXPECTED_TYPES =
59 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
60 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
61 StringUtil.simpleClassName(ByteBuf.class) + ", " +
62 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
63 StringUtil.simpleClassName(ByteBuf.class) + ')';
64
65 private final EpollDatagramChannelConfig config;
66 private volatile boolean connected;
67
68
69
70
71
72
73 public static boolean isSegmentedDatagramPacketSupported() {
74 return Epoll.isAvailable() &&
75
76 Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
77 }
78
79
80
81
82
83 public EpollDatagramChannel() {
84 this(null);
85 }
86
87
88
89
90
91 public EpollDatagramChannel(InternetProtocolFamily family) {
92 this(newSocketDgram(family), false);
93 }
94
95
96
97
98
99 public EpollDatagramChannel(int fd) {
100 this(new LinuxSocket(fd), true);
101 }
102
103 private EpollDatagramChannel(LinuxSocket fd, boolean active) {
104 super(null, fd, active);
105 config = new EpollDatagramChannelConfig(this);
106 }
107
108 @Override
109 public InetSocketAddress remoteAddress() {
110 return (InetSocketAddress) super.remoteAddress();
111 }
112
113 @Override
114 public InetSocketAddress localAddress() {
115 return (InetSocketAddress) super.localAddress();
116 }
117
118 @Override
119 public ChannelMetadata metadata() {
120 return METADATA;
121 }
122
123 @Override
124 public boolean isActive() {
125 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
126 }
127
128 @Override
129 public boolean isConnected() {
130 return connected;
131 }
132
133 @Override
134 public ChannelFuture joinGroup(InetAddress multicastAddress) {
135 return joinGroup(multicastAddress, newPromise());
136 }
137
138 @Override
139 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
140 try {
141 NetworkInterface iface = config().getNetworkInterface();
142 if (iface == null) {
143 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
144 }
145 return joinGroup(multicastAddress, iface, null, promise);
146 } catch (IOException e) {
147 promise.setFailure(e);
148 }
149 return promise;
150 }
151
152 @Override
153 public ChannelFuture joinGroup(
154 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
155 return joinGroup(multicastAddress, networkInterface, newPromise());
156 }
157
158 @Override
159 public ChannelFuture joinGroup(
160 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
161 ChannelPromise promise) {
162 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
163 }
164
165 @Override
166 public ChannelFuture joinGroup(
167 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
168 return joinGroup(multicastAddress, networkInterface, source, newPromise());
169 }
170
171 @Override
172 public ChannelFuture joinGroup(
173 final InetAddress multicastAddress, final NetworkInterface networkInterface,
174 final InetAddress source, final ChannelPromise promise) {
175
176 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
177 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
178
179 if (eventLoop().inEventLoop()) {
180 joinGroup0(multicastAddress, networkInterface, source, promise);
181 } else {
182 eventLoop().execute(new Runnable() {
183 @Override
184 public void run() {
185 joinGroup0(multicastAddress, networkInterface, source, promise);
186 }
187 });
188 }
189 return promise;
190 }
191
192 private void joinGroup0(
193 final InetAddress multicastAddress, final NetworkInterface networkInterface,
194 final InetAddress source, final ChannelPromise promise) {
195 assert eventLoop().inEventLoop();
196
197 try {
198 socket.joinGroup(multicastAddress, networkInterface, source);
199 promise.setSuccess();
200 } catch (IOException e) {
201 promise.setFailure(e);
202 }
203 }
204
205 @Override
206 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
207 return leaveGroup(multicastAddress, newPromise());
208 }
209
210 @Override
211 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
212 try {
213 return leaveGroup(
214 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
215 } catch (IOException e) {
216 promise.setFailure(e);
217 }
218 return promise;
219 }
220
221 @Override
222 public ChannelFuture leaveGroup(
223 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
224 return leaveGroup(multicastAddress, networkInterface, newPromise());
225 }
226
227 @Override
228 public ChannelFuture leaveGroup(
229 InetSocketAddress multicastAddress,
230 NetworkInterface networkInterface, ChannelPromise promise) {
231 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
232 }
233
234 @Override
235 public ChannelFuture leaveGroup(
236 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
237 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
238 }
239
240 @Override
241 public ChannelFuture leaveGroup(
242 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
243 final ChannelPromise promise) {
244 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
245 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
246
247 if (eventLoop().inEventLoop()) {
248 leaveGroup0(multicastAddress, networkInterface, source, promise);
249 } else {
250 eventLoop().execute(new Runnable() {
251 @Override
252 public void run() {
253 leaveGroup0(multicastAddress, networkInterface, source, promise);
254 }
255 });
256 }
257 return promise;
258 }
259
260 private void leaveGroup0(
261 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
262 final ChannelPromise promise) {
263 assert eventLoop().inEventLoop();
264
265 try {
266 socket.leaveGroup(multicastAddress, networkInterface, source);
267 promise.setSuccess();
268 } catch (IOException e) {
269 promise.setFailure(e);
270 }
271 }
272
273 @Override
274 public ChannelFuture block(
275 InetAddress multicastAddress, NetworkInterface networkInterface,
276 InetAddress sourceToBlock) {
277 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
278 }
279
280 @Override
281 public ChannelFuture block(
282 final InetAddress multicastAddress, final NetworkInterface networkInterface,
283 final InetAddress sourceToBlock, final ChannelPromise promise) {
284 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
285 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
286 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
287
288 promise.setFailure(new UnsupportedOperationException("Multicast block not supported"));
289 return promise;
290 }
291
292 @Override
293 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
294 return block(multicastAddress, sourceToBlock, newPromise());
295 }
296
297 @Override
298 public ChannelFuture block(
299 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
300 try {
301 return block(
302 multicastAddress,
303 NetworkInterface.getByInetAddress(localAddress().getAddress()),
304 sourceToBlock, promise);
305 } catch (Throwable e) {
306 promise.setFailure(e);
307 }
308 return promise;
309 }
310
311 @Override
312 protected AbstractEpollUnsafe newUnsafe() {
313 return new EpollDatagramChannelUnsafe();
314 }
315
316 @Override
317 protected void doBind(SocketAddress localAddress) throws Exception {
318 if (localAddress instanceof InetSocketAddress) {
319 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
320 if (socketAddress.getAddress().isAnyLocalAddress() &&
321 socketAddress.getAddress() instanceof Inet4Address) {
322 if (socket.family() == InternetProtocolFamily.IPv6) {
323 localAddress = new InetSocketAddress(Native.INET6_ANY, socketAddress.getPort());
324 }
325 }
326 }
327 super.doBind(localAddress);
328 active = true;
329 }
330
331 @Override
332 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
333 int maxMessagesPerWrite = maxMessagesPerWrite();
334 while (maxMessagesPerWrite > 0) {
335 Object msg = in.current();
336 if (msg == null) {
337
338 break;
339 }
340
341 try {
342
343 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
344
345 in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
346 NativeDatagramPacketArray array = cleanDatagramPacketArray();
347 array.add(in, isConnected(), maxMessagesPerWrite);
348 int cnt = array.count();
349
350 if (cnt >= 1) {
351
352 int offset = 0;
353 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
354
355 int send = socket.sendmmsg(packets, offset, cnt);
356 if (send == 0) {
357
358 break;
359 }
360 for (int i = 0; i < send; i++) {
361 in.remove();
362 }
363 maxMessagesPerWrite -= send;
364 continue;
365 }
366 }
367 boolean done = false;
368 for (int i = config().getWriteSpinCount(); i > 0; --i) {
369 if (doWriteMessage(msg)) {
370 done = true;
371 break;
372 }
373 }
374
375 if (done) {
376 in.remove();
377 maxMessagesPerWrite --;
378 } else {
379 break;
380 }
381 } catch (IOException e) {
382 maxMessagesPerWrite --;
383
384
385
386 in.remove(e);
387 }
388 }
389
390 if (in.isEmpty()) {
391
392 clearFlag(Native.EPOLLOUT);
393 } else {
394
395 setFlag(Native.EPOLLOUT);
396 }
397 }
398
399 private boolean doWriteMessage(Object msg) throws Exception {
400 final ByteBuf data;
401 final InetSocketAddress remoteAddress;
402 if (msg instanceof AddressedEnvelope) {
403 @SuppressWarnings("unchecked")
404 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
405 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
406 data = envelope.content();
407 remoteAddress = envelope.recipient();
408 } else {
409 data = (ByteBuf) msg;
410 remoteAddress = null;
411 }
412
413 final int dataLen = data.readableBytes();
414 if (dataLen == 0) {
415 return true;
416 }
417
418 return doWriteOrSendBytes(data, remoteAddress, false) > 0;
419 }
420
421 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
422 if (envelope.recipient() instanceof InetSocketAddress
423 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
424 throw new UnresolvedAddressException();
425 }
426 }
427
428 @Override
429 protected Object filterOutboundMessage(Object msg) {
430 if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
431 if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
432 throw new UnsupportedOperationException(
433 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
434 }
435 io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
436 checkUnresolved(packet);
437
438 ByteBuf content = packet.content();
439 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
440 packet.replace(newDirectBuffer(packet, content)) : msg;
441 }
442 if (msg instanceof DatagramPacket) {
443 DatagramPacket packet = (DatagramPacket) msg;
444 checkUnresolved(packet);
445
446 ByteBuf content = packet.content();
447 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
448 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
449 }
450
451 if (msg instanceof ByteBuf) {
452 ByteBuf buf = (ByteBuf) msg;
453 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
454 }
455
456 if (msg instanceof AddressedEnvelope) {
457 @SuppressWarnings("unchecked")
458 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
459 checkUnresolved(e);
460
461 if (e.content() instanceof ByteBuf &&
462 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
463
464 ByteBuf content = (ByteBuf) e.content();
465 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
466 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
467 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
468 }
469 }
470
471 throw new UnsupportedOperationException(
472 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
473 }
474
475 @Override
476 public EpollDatagramChannelConfig config() {
477 return config;
478 }
479
480 @Override
481 protected void doDisconnect() throws Exception {
482 socket.disconnect();
483 connected = active = false;
484 resetCachedAddresses();
485 }
486
487 @Override
488 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
489 if (super.doConnect(remoteAddress, localAddress)) {
490 connected = true;
491 return true;
492 }
493 return false;
494 }
495
496 @Override
497 protected void doClose() throws Exception {
498 super.doClose();
499 connected = false;
500 }
501
502 final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
503
504 @Override
505 void epollInReady() {
506 assert eventLoop().inEventLoop();
507 EpollDatagramChannelConfig config = config();
508 if (shouldBreakEpollInReady(config)) {
509 clearEpollIn0();
510 return;
511 }
512 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
513 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
514
515 final ChannelPipeline pipeline = pipeline();
516 final ByteBufAllocator allocator = config.getAllocator();
517 allocHandle.reset(config);
518 epollInBefore();
519
520 Throwable exception = null;
521 try {
522 try {
523 boolean connected = isConnected();
524 do {
525 final boolean read;
526 int datagramSize = config().getMaxDatagramPayloadSize();
527
528 ByteBuf byteBuf = allocHandle.allocate(allocator);
529
530 int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
531 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
532 0;
533 try {
534 if (numDatagram <= 1) {
535 if (!connected || config.isUdpGro()) {
536 read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
537 } else {
538 read = connectedRead(allocHandle, byteBuf, datagramSize);
539 }
540 } else {
541
542 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
543 byteBuf, datagramSize, numDatagram);
544 }
545 } catch (NativeIoException e) {
546 if (connected) {
547 throw translateForConnected(e);
548 }
549 throw e;
550 }
551
552 if (read) {
553 readPending = false;
554 } else {
555 break;
556 }
557
558
559 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
560 } catch (Throwable t) {
561 exception = t;
562 }
563
564 allocHandle.readComplete();
565 pipeline.fireChannelReadComplete();
566
567 if (exception != null) {
568 pipeline.fireExceptionCaught(exception);
569 }
570 } finally {
571 epollInFinally(config);
572 }
573 }
574 }
575
576 private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
577 throws Exception {
578 try {
579 int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
580 : byteBuf.writableBytes();
581 allocHandle.attemptedBytesRead(writable);
582
583 int writerIndex = byteBuf.writerIndex();
584 int localReadAmount;
585 if (byteBuf.hasMemoryAddress()) {
586 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
587 } else {
588 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
589 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
590 }
591
592 if (localReadAmount <= 0) {
593 allocHandle.lastBytesRead(localReadAmount);
594
595
596 return false;
597 }
598 byteBuf.writerIndex(writerIndex + localReadAmount);
599
600 allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
601 localReadAmount : writable);
602
603 DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
604 allocHandle.incMessagesRead(1);
605
606 pipeline().fireChannelRead(packet);
607 byteBuf = null;
608 return true;
609 } finally {
610 if (byteBuf != null) {
611 byteBuf.release();
612 }
613 }
614 }
615
616 private IOException translateForConnected(NativeIoException e) {
617
618 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
619 PortUnreachableException error = new PortUnreachableException(e.getMessage());
620 error.initCause(e);
621 return error;
622 }
623 return e;
624 }
625
626 private static void addDatagramPacketToOut(DatagramPacket packet,
627 RecyclableArrayList out) {
628 if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
629 io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
630 (io.netty.channel.unix.SegmentedDatagramPacket) packet;
631 ByteBuf content = segmentedDatagramPacket.content();
632 InetSocketAddress recipient = segmentedDatagramPacket.recipient();
633 InetSocketAddress sender = segmentedDatagramPacket.sender();
634 int segmentSize = segmentedDatagramPacket.segmentSize();
635 do {
636 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
637 segmentSize)), recipient, sender));
638 } while (content.isReadable());
639
640 segmentedDatagramPacket.release();
641 } else {
642 out.add(packet);
643 }
644 }
645
646 private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
647 if (byteBuf != null) {
648 byteBuf.release();
649 }
650 if (packetList != null) {
651 for (int i = 0; i < packetList.size(); i++) {
652 ReferenceCountUtil.release(packetList.get(i));
653 }
654 packetList.recycle();
655 }
656 }
657
658 private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
659 int bytesRead, DatagramPacket packet) {
660 handle.lastBytesRead(Math.max(1, bytesRead));
661 handle.incMessagesRead(1);
662 pipeline.fireChannelRead(packet);
663 }
664
665 private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
666 int bytesRead, RecyclableArrayList packetList) {
667 int messagesRead = packetList.size();
668 handle.lastBytesRead(Math.max(1, bytesRead));
669 handle.incMessagesRead(messagesRead);
670 for (int i = 0; i < messagesRead; i++) {
671 pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
672 }
673 }
674
675 private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
676 NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
677 RecyclableArrayList datagramPackets = null;
678 try {
679 int writable = byteBuf.writableBytes();
680
681 boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
682 assert added;
683
684 allocHandle.attemptedBytesRead(writable);
685
686 NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
687
688 int bytesReceived = socket.recvmsg(msg);
689 if (!msg.hasSender()) {
690 allocHandle.lastBytesRead(-1);
691 return false;
692 }
693 byteBuf.writerIndex(bytesReceived);
694 InetSocketAddress local = localAddress();
695 DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
696 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
697 processPacket(pipeline(), allocHandle, bytesReceived, packet);
698 } else {
699
700
701
702 datagramPackets = RecyclableArrayList.newInstance();
703 addDatagramPacketToOut(packet, datagramPackets);
704
705 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
706 datagramPackets.recycle();
707 datagramPackets = null;
708 }
709
710 return true;
711 } finally {
712 releaseAndRecycle(byteBuf, datagramPackets);
713 }
714 }
715
716 private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
717 ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
718 RecyclableArrayList datagramPackets = null;
719 try {
720 int offset = byteBuf.writerIndex();
721 for (int i = 0; i < numDatagram; i++, offset += datagramSize) {
722 if (!array.addWritable(byteBuf, offset, datagramSize)) {
723 break;
724 }
725 }
726
727 allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
728
729 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
730
731 int received = socket.recvmmsg(packets, 0, array.count());
732 if (received == 0) {
733 allocHandle.lastBytesRead(-1);
734 return false;
735 }
736
737 InetSocketAddress local = localAddress();
738
739
740 int bytesReceived = received * datagramSize;
741 byteBuf.writerIndex(byteBuf.writerIndex() + bytesReceived);
742
743 if (received == 1) {
744
745 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
746 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
747 processPacket(pipeline(), allocHandle, datagramSize, packet);
748 return true;
749 }
750 }
751
752
753
754 datagramPackets = RecyclableArrayList.newInstance();
755 for (int i = 0; i < received; i++) {
756 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf, local);
757
758
759
760 byteBuf.skipBytes(datagramSize);
761 addDatagramPacketToOut(packet, datagramPackets);
762 }
763
764 byteBuf.release();
765 byteBuf = null;
766
767 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
768 datagramPackets.recycle();
769 datagramPackets = null;
770 return true;
771 } finally {
772 releaseAndRecycle(byteBuf, datagramPackets);
773 }
774 }
775
776 private NativeDatagramPacketArray cleanDatagramPacketArray() {
777 return ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray();
778 }
779 }