1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.Unpooled;
21 import io.netty.channel.AddressedEnvelope;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelMetadata;
24 import io.netty.channel.ChannelOutboundBuffer;
25 import io.netty.channel.ChannelPipeline;
26 import io.netty.channel.ChannelPromise;
27 import io.netty.channel.DefaultAddressedEnvelope;
28 import io.netty.channel.socket.DatagramChannel;
29 import io.netty.channel.socket.DatagramPacket;
30 import io.netty.channel.socket.InternetProtocolFamily;
31 import io.netty.channel.unix.Errors;
32 import io.netty.channel.unix.Errors.NativeIoException;
33 import io.netty.channel.unix.Socket;
34 import io.netty.channel.unix.UnixChannelUtil;
35 import io.netty.util.ReferenceCountUtil;
36 import io.netty.util.UncheckedBooleanSupplier;
37 import io.netty.util.internal.ObjectUtil;
38 import io.netty.util.internal.RecyclableArrayList;
39 import io.netty.util.internal.StringUtil;
40
41 import java.io.IOException;
42 import java.net.Inet4Address;
43 import java.net.InetAddress;
44 import java.net.InetSocketAddress;
45 import java.net.NetworkInterface;
46 import java.net.PortUnreachableException;
47 import java.net.SocketAddress;
48 import java.nio.ByteBuffer;
49
50 import static io.netty.channel.epoll.LinuxSocket.newSocketDgram;
51
52
53
54
55
56 public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
57 private static final ChannelMetadata METADATA = new ChannelMetadata(true);
58 private static final String EXPECTED_TYPES =
59 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
60 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
61 StringUtil.simpleClassName(ByteBuf.class) + ", " +
62 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
63 StringUtil.simpleClassName(ByteBuf.class) + ')';
64
65 private final EpollDatagramChannelConfig config;
66 private volatile boolean connected;
67
68
69
70
71
72
73 public static boolean isSegmentedDatagramPacketSupported() {
74 return Epoll.isAvailable() &&
75
76 Native.IS_SUPPORTING_SENDMMSG && Native.IS_SUPPORTING_UDP_SEGMENT;
77 }
78
79
80
81
82
83 public EpollDatagramChannel() {
84 this(null);
85 }
86
87
88
89
90
91 public EpollDatagramChannel(InternetProtocolFamily family) {
92 this(newSocketDgram(family), false);
93 }
94
95
96
97
98
99 public EpollDatagramChannel(int fd) {
100 this(new LinuxSocket(fd), true);
101 }
102
103 private EpollDatagramChannel(LinuxSocket fd, boolean active) {
104 super(null, fd, active);
105 config = new EpollDatagramChannelConfig(this);
106 }
107
108 @Override
109 public InetSocketAddress remoteAddress() {
110 return (InetSocketAddress) super.remoteAddress();
111 }
112
113 @Override
114 public InetSocketAddress localAddress() {
115 return (InetSocketAddress) super.localAddress();
116 }
117
118 @Override
119 public ChannelMetadata metadata() {
120 return METADATA;
121 }
122
123 @Override
124 @SuppressWarnings("deprecation")
125 public boolean isActive() {
126 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
127 }
128
129 @Override
130 public boolean isConnected() {
131 return connected;
132 }
133
134 @Override
135 public ChannelFuture joinGroup(InetAddress multicastAddress) {
136 return joinGroup(multicastAddress, newPromise());
137 }
138
139 @Override
140 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
141 try {
142 NetworkInterface iface = config().getNetworkInterface();
143 if (iface == null) {
144 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
145 }
146 return joinGroup(multicastAddress, iface, null, promise);
147 } catch (IOException e) {
148 promise.setFailure(e);
149 }
150 return promise;
151 }
152
153 @Override
154 public ChannelFuture joinGroup(
155 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
156 return joinGroup(multicastAddress, networkInterface, newPromise());
157 }
158
159 @Override
160 public ChannelFuture joinGroup(
161 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
162 ChannelPromise promise) {
163 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
164 }
165
166 @Override
167 public ChannelFuture joinGroup(
168 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
169 return joinGroup(multicastAddress, networkInterface, source, newPromise());
170 }
171
172 @Override
173 public ChannelFuture joinGroup(
174 final InetAddress multicastAddress, final NetworkInterface networkInterface,
175 final InetAddress source, final ChannelPromise promise) {
176
177 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
178 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
179
180 try {
181 socket.joinGroup(multicastAddress, networkInterface, source);
182 promise.setSuccess();
183 } catch (IOException e) {
184 promise.setFailure(e);
185 }
186 return promise;
187 }
188
189 @Override
190 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
191 return leaveGroup(multicastAddress, newPromise());
192 }
193
194 @Override
195 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
196 try {
197 return leaveGroup(
198 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
199 } catch (IOException e) {
200 promise.setFailure(e);
201 }
202 return promise;
203 }
204
205 @Override
206 public ChannelFuture leaveGroup(
207 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
208 return leaveGroup(multicastAddress, networkInterface, newPromise());
209 }
210
211 @Override
212 public ChannelFuture leaveGroup(
213 InetSocketAddress multicastAddress,
214 NetworkInterface networkInterface, ChannelPromise promise) {
215 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
216 }
217
218 @Override
219 public ChannelFuture leaveGroup(
220 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
221 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
222 }
223
224 @Override
225 public ChannelFuture leaveGroup(
226 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
227 final ChannelPromise promise) {
228 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
229 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
230
231 try {
232 socket.leaveGroup(multicastAddress, networkInterface, source);
233 promise.setSuccess();
234 } catch (IOException e) {
235 promise.setFailure(e);
236 }
237 return promise;
238 }
239
240 @Override
241 public ChannelFuture block(
242 InetAddress multicastAddress, NetworkInterface networkInterface,
243 InetAddress sourceToBlock) {
244 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
245 }
246
247 @Override
248 public ChannelFuture block(
249 final InetAddress multicastAddress, final NetworkInterface networkInterface,
250 final InetAddress sourceToBlock, final ChannelPromise promise) {
251 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
252 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
253 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
254
255 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
256 return promise;
257 }
258
259 @Override
260 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
261 return block(multicastAddress, sourceToBlock, newPromise());
262 }
263
264 @Override
265 public ChannelFuture block(
266 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
267 try {
268 return block(
269 multicastAddress,
270 NetworkInterface.getByInetAddress(localAddress().getAddress()),
271 sourceToBlock, promise);
272 } catch (Throwable e) {
273 promise.setFailure(e);
274 }
275 return promise;
276 }
277
278 @Override
279 protected AbstractEpollUnsafe newUnsafe() {
280 return new EpollDatagramChannelUnsafe();
281 }
282
283 @Override
284 protected void doBind(SocketAddress localAddress) throws Exception {
285 if (localAddress instanceof InetSocketAddress) {
286 InetSocketAddress socketAddress = (InetSocketAddress) localAddress;
287 if (socketAddress.getAddress().isAnyLocalAddress() &&
288 socketAddress.getAddress() instanceof Inet4Address) {
289 if (socket.family() == InternetProtocolFamily.IPv6) {
290 localAddress = new InetSocketAddress(LinuxSocket.INET6_ANY, socketAddress.getPort());
291 }
292 }
293 }
294 super.doBind(localAddress);
295 active = true;
296 }
297
298 @Override
299 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
300 int maxMessagesPerWrite = maxMessagesPerWrite();
301 while (maxMessagesPerWrite > 0) {
302 Object msg = in.current();
303 if (msg == null) {
304
305 break;
306 }
307
308 try {
309
310 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1 ||
311
312 in.current() instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
313 NativeDatagramPacketArray array = cleanDatagramPacketArray();
314 array.add(in, isConnected(), maxMessagesPerWrite);
315 int cnt = array.count();
316
317 if (cnt >= 1) {
318
319 int offset = 0;
320 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
321
322 int send = socket.sendmmsg(packets, offset, cnt);
323 if (send == 0) {
324
325 break;
326 }
327 for (int i = 0; i < send; i++) {
328 in.remove();
329 }
330 maxMessagesPerWrite -= send;
331 continue;
332 }
333 }
334 boolean done = false;
335 for (int i = config().getWriteSpinCount(); i > 0; --i) {
336 if (doWriteMessage(msg)) {
337 done = true;
338 break;
339 }
340 }
341
342 if (done) {
343 in.remove();
344 maxMessagesPerWrite --;
345 } else {
346 break;
347 }
348 } catch (IOException e) {
349 maxMessagesPerWrite --;
350
351
352
353 in.remove(e);
354 }
355 }
356
357 if (in.isEmpty()) {
358
359 clearFlag(Native.EPOLLOUT);
360 } else {
361
362 setFlag(Native.EPOLLOUT);
363 }
364 }
365
366 private boolean doWriteMessage(Object msg) throws Exception {
367 final ByteBuf data;
368 final InetSocketAddress remoteAddress;
369 if (msg instanceof AddressedEnvelope) {
370 @SuppressWarnings("unchecked")
371 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
372 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
373 data = envelope.content();
374 remoteAddress = envelope.recipient();
375 } else {
376 data = (ByteBuf) msg;
377 remoteAddress = null;
378 }
379
380 final int dataLen = data.readableBytes();
381 if (dataLen == 0) {
382 return true;
383 }
384
385 return doWriteOrSendBytes(data, remoteAddress, false) > 0;
386 }
387
388 @Override
389 protected Object filterOutboundMessage(Object msg) {
390 if (msg instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
391 if (!Native.IS_SUPPORTING_UDP_SEGMENT) {
392 throw new UnsupportedOperationException(
393 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
394 }
395 io.netty.channel.unix.SegmentedDatagramPacket packet = (io.netty.channel.unix.SegmentedDatagramPacket) msg;
396 ByteBuf content = packet.content();
397 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
398 packet.replace(newDirectBuffer(packet, content)) : msg;
399 }
400 if (msg instanceof DatagramPacket) {
401 DatagramPacket packet = (DatagramPacket) msg;
402 ByteBuf content = packet.content();
403 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
404 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
405 }
406
407 if (msg instanceof ByteBuf) {
408 ByteBuf buf = (ByteBuf) msg;
409 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
410 }
411
412 if (msg instanceof AddressedEnvelope) {
413 @SuppressWarnings("unchecked")
414 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
415 if (e.content() instanceof ByteBuf &&
416 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
417
418 ByteBuf content = (ByteBuf) e.content();
419 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
420 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
421 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
422 }
423 }
424
425 throw new UnsupportedOperationException(
426 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
427 }
428
429 @Override
430 public EpollDatagramChannelConfig config() {
431 return config;
432 }
433
434 @Override
435 protected void doDisconnect() throws Exception {
436 socket.disconnect();
437 connected = active = false;
438 resetCachedAddresses();
439 }
440
441 @Override
442 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
443 if (super.doConnect(remoteAddress, localAddress)) {
444 connected = true;
445 return true;
446 }
447 return false;
448 }
449
450 @Override
451 protected void doClose() throws Exception {
452 super.doClose();
453 connected = false;
454 }
455
456 final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
457
458 @Override
459 void epollInReady() {
460 assert eventLoop().inEventLoop();
461 EpollDatagramChannelConfig config = config();
462 if (shouldBreakEpollInReady(config)) {
463 clearEpollIn0();
464 return;
465 }
466 final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
467 allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
468
469 final ChannelPipeline pipeline = pipeline();
470 final ByteBufAllocator allocator = config.getAllocator();
471 allocHandle.reset(config);
472 epollInBefore();
473
474 Throwable exception = null;
475 try {
476 try {
477 boolean connected = isConnected();
478 do {
479 final boolean read;
480 int datagramSize = config().getMaxDatagramPayloadSize();
481
482 ByteBuf byteBuf = allocHandle.allocate(allocator);
483
484 int numDatagram = Native.IS_SUPPORTING_RECVMMSG ?
485 datagramSize == 0 ? 1 : byteBuf.writableBytes() / datagramSize :
486 0;
487 try {
488 if (numDatagram <= 1) {
489 if (!connected || config.isUdpGro()) {
490 read = recvmsg(allocHandle, cleanDatagramPacketArray(), byteBuf);
491 } else {
492 read = connectedRead(allocHandle, byteBuf, datagramSize);
493 }
494 } else {
495
496 read = scatteringRead(allocHandle, cleanDatagramPacketArray(),
497 byteBuf, datagramSize, numDatagram);
498 }
499 } catch (NativeIoException e) {
500 if (connected) {
501 throw translateForConnected(e);
502 }
503 throw e;
504 }
505
506 if (read) {
507 readPending = false;
508 } else {
509 break;
510 }
511
512
513 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
514 } catch (Throwable t) {
515 exception = t;
516 }
517
518 allocHandle.readComplete();
519 pipeline.fireChannelReadComplete();
520
521 if (exception != null) {
522 pipeline.fireExceptionCaught(exception);
523 }
524 } finally {
525 epollInFinally(config);
526 }
527 }
528 }
529
530 private boolean connectedRead(EpollRecvByteAllocatorHandle allocHandle, ByteBuf byteBuf, int maxDatagramPacketSize)
531 throws Exception {
532 try {
533 int writable = maxDatagramPacketSize != 0 ? Math.min(byteBuf.writableBytes(), maxDatagramPacketSize)
534 : byteBuf.writableBytes();
535 allocHandle.attemptedBytesRead(writable);
536
537 int writerIndex = byteBuf.writerIndex();
538 int localReadAmount;
539 if (byteBuf.hasMemoryAddress()) {
540 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, writerIndex + writable);
541 } else {
542 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, writable);
543 localReadAmount = socket.read(buf, buf.position(), buf.limit());
544 }
545
546 if (localReadAmount <= 0) {
547 allocHandle.lastBytesRead(localReadAmount);
548
549
550 return false;
551 }
552 byteBuf.writerIndex(writerIndex + localReadAmount);
553
554 allocHandle.lastBytesRead(maxDatagramPacketSize <= 0 ?
555 localReadAmount : writable);
556
557 DatagramPacket packet = new DatagramPacket(byteBuf, localAddress(), remoteAddress());
558 allocHandle.incMessagesRead(1);
559
560 pipeline().fireChannelRead(packet);
561 byteBuf = null;
562 return true;
563 } finally {
564 if (byteBuf != null) {
565 byteBuf.release();
566 }
567 }
568 }
569
570 private IOException translateForConnected(NativeIoException e) {
571
572 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
573 PortUnreachableException error = new PortUnreachableException(e.getMessage());
574 error.initCause(e);
575 return error;
576 }
577 return e;
578 }
579
580 private static void addDatagramPacketToOut(DatagramPacket packet,
581 RecyclableArrayList out) {
582 if (packet instanceof io.netty.channel.unix.SegmentedDatagramPacket) {
583 io.netty.channel.unix.SegmentedDatagramPacket segmentedDatagramPacket =
584 (io.netty.channel.unix.SegmentedDatagramPacket) packet;
585 ByteBuf content = segmentedDatagramPacket.content();
586 InetSocketAddress recipient = segmentedDatagramPacket.recipient();
587 InetSocketAddress sender = segmentedDatagramPacket.sender();
588 int segmentSize = segmentedDatagramPacket.segmentSize();
589 do {
590 out.add(new DatagramPacket(content.readRetainedSlice(Math.min(content.readableBytes(),
591 segmentSize)), recipient, sender));
592 } while (content.isReadable());
593
594 segmentedDatagramPacket.release();
595 } else {
596 out.add(packet);
597 }
598 }
599
600 private static void releaseAndRecycle(ByteBuf byteBuf, RecyclableArrayList packetList) {
601 if (byteBuf != null) {
602 byteBuf.release();
603 }
604 if (packetList != null) {
605 for (int i = 0; i < packetList.size(); i++) {
606 ReferenceCountUtil.release(packetList.get(i));
607 }
608 packetList.recycle();
609 }
610 }
611
612 private static void processPacket(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
613 int bytesRead, DatagramPacket packet) {
614 handle.lastBytesRead(bytesRead);
615 handle.incMessagesRead(1);
616 pipeline.fireChannelRead(packet);
617 }
618
619 private static void processPacketList(ChannelPipeline pipeline, EpollRecvByteAllocatorHandle handle,
620 int bytesRead, RecyclableArrayList packetList) {
621 int messagesRead = packetList.size();
622 handle.lastBytesRead(bytesRead);
623 handle.incMessagesRead(messagesRead);
624 for (int i = 0; i < messagesRead; i++) {
625 pipeline.fireChannelRead(packetList.set(i, Unpooled.EMPTY_BUFFER));
626 }
627 }
628
629 private boolean recvmsg(EpollRecvByteAllocatorHandle allocHandle,
630 NativeDatagramPacketArray array, ByteBuf byteBuf) throws IOException {
631 RecyclableArrayList datagramPackets = null;
632 try {
633 int writable = byteBuf.writableBytes();
634
635 boolean added = array.addWritable(byteBuf, byteBuf.writerIndex(), writable);
636 assert added;
637
638 allocHandle.attemptedBytesRead(writable);
639
640 NativeDatagramPacketArray.NativeDatagramPacket msg = array.packets()[0];
641
642 int bytesReceived = socket.recvmsg(msg);
643 if (bytesReceived == 0) {
644 allocHandle.lastBytesRead(-1);
645 return false;
646 }
647 byteBuf.writerIndex(bytesReceived);
648 InetSocketAddress local = localAddress();
649 DatagramPacket packet = msg.newDatagramPacket(byteBuf, local);
650 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
651 processPacket(pipeline(), allocHandle, bytesReceived, packet);
652 byteBuf = null;
653 } else {
654
655
656
657 datagramPackets = RecyclableArrayList.newInstance();
658 addDatagramPacketToOut(packet, datagramPackets);
659
660
661 byteBuf = null;
662
663 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
664 datagramPackets.recycle();
665 datagramPackets = null;
666 }
667
668 return true;
669 } finally {
670 releaseAndRecycle(byteBuf, datagramPackets);
671 }
672 }
673
674 private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeDatagramPacketArray array,
675 ByteBuf byteBuf, int datagramSize, int numDatagram) throws IOException {
676 RecyclableArrayList datagramPackets = null;
677 try {
678 int offset = byteBuf.writerIndex();
679 for (int i = 0; i < numDatagram; i++, offset += datagramSize) {
680 if (!array.addWritable(byteBuf, offset, datagramSize)) {
681 break;
682 }
683 }
684
685 allocHandle.attemptedBytesRead(offset - byteBuf.writerIndex());
686
687 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
688
689 int received = socket.recvmmsg(packets, 0, array.count());
690 if (received == 0) {
691 allocHandle.lastBytesRead(-1);
692 return false;
693 }
694 int bytesReceived = received * datagramSize;
695 byteBuf.writerIndex(bytesReceived);
696 InetSocketAddress local = localAddress();
697 if (received == 1) {
698
699 DatagramPacket packet = packets[0].newDatagramPacket(byteBuf, local);
700 if (!(packet instanceof io.netty.channel.unix.SegmentedDatagramPacket)) {
701 processPacket(pipeline(), allocHandle, datagramSize, packet);
702 byteBuf = null;
703 return true;
704 }
705 }
706
707
708
709 datagramPackets = RecyclableArrayList.newInstance();
710 for (int i = 0; i < received; i++) {
711 DatagramPacket packet = packets[i].newDatagramPacket(byteBuf.readRetainedSlice(datagramSize), local);
712 addDatagramPacketToOut(packet, datagramPackets);
713 }
714
715 byteBuf.release();
716 byteBuf = null;
717
718 processPacketList(pipeline(), allocHandle, bytesReceived, datagramPackets);
719 datagramPackets.recycle();
720 datagramPackets = null;
721 return true;
722 } finally {
723 releaseAndRecycle(byteBuf, datagramPackets);
724 }
725 }
726
727 private NativeDatagramPacketArray cleanDatagramPacketArray() {
728 return ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray();
729 }
730 }