1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.AddressedEnvelope;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelMetadata;
22 import io.netty.channel.ChannelOutboundBuffer;
23 import io.netty.channel.ChannelPipeline;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.DefaultAddressedEnvelope;
26 import io.netty.channel.RecvByteBufAllocator;
27 import io.netty.channel.socket.DatagramChannel;
28 import io.netty.channel.socket.DatagramChannelConfig;
29 import io.netty.channel.socket.DatagramPacket;
30 import io.netty.channel.unix.DatagramSocketAddress;
31 import io.netty.channel.unix.FileDescriptor;
32 import io.netty.channel.unix.Socket;
33 import io.netty.util.internal.StringUtil;
34
35 import java.io.IOException;
36 import java.net.InetAddress;
37 import java.net.InetSocketAddress;
38 import java.net.NetworkInterface;
39 import java.net.SocketAddress;
40 import java.net.SocketException;
41 import java.nio.ByteBuffer;
42
43 import static io.netty.channel.unix.Socket.newSocketDgram;
44
45
46
47
48
49 public final class EpollDatagramChannel extends AbstractEpollChannel implements DatagramChannel {
50 private static final ChannelMetadata METADATA = new ChannelMetadata(true);
51 private static final String EXPECTED_TYPES =
52 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54 StringUtil.simpleClassName(ByteBuf.class) + ", " +
55 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56 StringUtil.simpleClassName(ByteBuf.class) + ')';
57
58 private final EpollDatagramChannelConfig config;
59 private volatile boolean connected;
60
61 public EpollDatagramChannel() {
62 super(newSocketDgram(), Native.EPOLLIN);
63 config = new EpollDatagramChannelConfig(this);
64 }
65
66
67
68
69 @Deprecated
70 public EpollDatagramChannel(FileDescriptor fd) {
71 this(new Socket(fd.intValue()));
72 }
73
74 public EpollDatagramChannel(Socket fd) {
75 super(null, fd, Native.EPOLLIN, true);
76 config = new EpollDatagramChannelConfig(this);
77 }
78
79 @Override
80 public InetSocketAddress remoteAddress() {
81 return (InetSocketAddress) super.remoteAddress();
82 }
83
84 @Override
85 public InetSocketAddress localAddress() {
86 return (InetSocketAddress) super.localAddress();
87 }
88
89 @Override
90 public ChannelMetadata metadata() {
91 return METADATA;
92 }
93
94 @Override
95 @SuppressWarnings("deprecation")
96 public boolean isActive() {
97 return fd().isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
98 }
99
100 @Override
101 public boolean isConnected() {
102 return connected;
103 }
104
105 @Override
106 public ChannelFuture joinGroup(InetAddress multicastAddress) {
107 return joinGroup(multicastAddress, newPromise());
108 }
109
110 @Override
111 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
112 try {
113 return joinGroup(
114 multicastAddress,
115 NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
116 } catch (SocketException e) {
117 promise.setFailure(e);
118 }
119 return promise;
120 }
121
122 @Override
123 public ChannelFuture joinGroup(
124 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
125 return joinGroup(multicastAddress, networkInterface, newPromise());
126 }
127
128 @Override
129 public ChannelFuture joinGroup(
130 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
131 ChannelPromise promise) {
132 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
133 }
134
135 @Override
136 public ChannelFuture joinGroup(
137 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
138 return joinGroup(multicastAddress, networkInterface, source, newPromise());
139 }
140
141 @Override
142 public ChannelFuture joinGroup(
143 final InetAddress multicastAddress, final NetworkInterface networkInterface,
144 final InetAddress source, final ChannelPromise promise) {
145
146 if (multicastAddress == null) {
147 throw new NullPointerException("multicastAddress");
148 }
149
150 if (networkInterface == null) {
151 throw new NullPointerException("networkInterface");
152 }
153
154 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
155 return promise;
156 }
157
158 @Override
159 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
160 return leaveGroup(multicastAddress, newPromise());
161 }
162
163 @Override
164 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
165 try {
166 return leaveGroup(
167 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
168 } catch (SocketException e) {
169 promise.setFailure(e);
170 }
171 return promise;
172 }
173
174 @Override
175 public ChannelFuture leaveGroup(
176 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
177 return leaveGroup(multicastAddress, networkInterface, newPromise());
178 }
179
180 @Override
181 public ChannelFuture leaveGroup(
182 InetSocketAddress multicastAddress,
183 NetworkInterface networkInterface, ChannelPromise promise) {
184 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
185 }
186
187 @Override
188 public ChannelFuture leaveGroup(
189 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
190 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
191 }
192
193 @Override
194 public ChannelFuture leaveGroup(
195 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
196 final ChannelPromise promise) {
197 if (multicastAddress == null) {
198 throw new NullPointerException("multicastAddress");
199 }
200 if (networkInterface == null) {
201 throw new NullPointerException("networkInterface");
202 }
203
204 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
205
206 return promise;
207 }
208
209 @Override
210 public ChannelFuture block(
211 InetAddress multicastAddress, NetworkInterface networkInterface,
212 InetAddress sourceToBlock) {
213 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
214 }
215
216 @Override
217 public ChannelFuture block(
218 final InetAddress multicastAddress, final NetworkInterface networkInterface,
219 final InetAddress sourceToBlock, final ChannelPromise promise) {
220 if (multicastAddress == null) {
221 throw new NullPointerException("multicastAddress");
222 }
223 if (sourceToBlock == null) {
224 throw new NullPointerException("sourceToBlock");
225 }
226
227 if (networkInterface == null) {
228 throw new NullPointerException("networkInterface");
229 }
230 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
231 return promise;
232 }
233
234 @Override
235 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
236 return block(multicastAddress, sourceToBlock, newPromise());
237 }
238
239 @Override
240 public ChannelFuture block(
241 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
242 try {
243 return block(
244 multicastAddress,
245 NetworkInterface.getByInetAddress(localAddress().getAddress()),
246 sourceToBlock, promise);
247 } catch (Throwable e) {
248 promise.setFailure(e);
249 }
250 return promise;
251 }
252
253 @Override
254 protected AbstractEpollUnsafe newUnsafe() {
255 return new EpollDatagramChannelUnsafe();
256 }
257
258 @Override
259 protected void doBind(SocketAddress localAddress) throws Exception {
260 super.doBind(localAddress);
261 active = true;
262 }
263
264 @Override
265 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
266 for (;;) {
267 Object msg = in.current();
268 if (msg == null) {
269
270 clearFlag(Native.EPOLLOUT);
271 break;
272 }
273
274 try {
275
276 if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
277 NativeDatagramPacketArray array = NativeDatagramPacketArray.getInstance(in);
278 int cnt = array.count();
279
280 if (cnt >= 1) {
281
282 int offset = 0;
283 NativeDatagramPacketArray.NativeDatagramPacket[] packets = array.packets();
284
285 while (cnt > 0) {
286 int send = Native.sendmmsg(fd().intValue(), packets, offset, cnt);
287 if (send == 0) {
288
289 setFlag(Native.EPOLLOUT);
290 return;
291 }
292 for (int i = 0; i < send; i++) {
293 in.remove();
294 }
295 cnt -= send;
296 offset += send;
297 }
298 continue;
299 }
300 }
301 boolean done = false;
302 for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
303 if (doWriteMessage(msg)) {
304 done = true;
305 break;
306 }
307 }
308
309 if (done) {
310 in.remove();
311 } else {
312
313 setFlag(Native.EPOLLOUT);
314 break;
315 }
316 } catch (IOException e) {
317
318
319
320 in.remove(e);
321 }
322 }
323 }
324
325 private boolean doWriteMessage(Object msg) throws Exception {
326 final ByteBuf data;
327 InetSocketAddress remoteAddress;
328 if (msg instanceof AddressedEnvelope) {
329 @SuppressWarnings("unchecked")
330 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
331 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
332 data = envelope.content();
333 remoteAddress = envelope.recipient();
334 } else {
335 data = (ByteBuf) msg;
336 remoteAddress = null;
337 }
338
339 final int dataLen = data.readableBytes();
340 if (dataLen == 0) {
341 return true;
342 }
343
344 final long writtenBytes;
345 if (data.hasMemoryAddress()) {
346 long memoryAddress = data.memoryAddress();
347 if (remoteAddress == null) {
348 writtenBytes = fd().writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
349 } else {
350 writtenBytes = fd().sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
351 remoteAddress.getAddress(), remoteAddress.getPort());
352 }
353 } else if (data.nioBufferCount() > 1) {
354 IovArray array = ((EpollEventLoop) eventLoop()).cleanArray();
355 array.add(data);
356 int cnt = array.count();
357 assert cnt != 0;
358
359 if (remoteAddress == null) {
360 writtenBytes = fd().writevAddresses(array.memoryAddress(0), cnt);
361 } else {
362 writtenBytes = fd().sendToAddresses(array.memoryAddress(0), cnt,
363 remoteAddress.getAddress(), remoteAddress.getPort());
364 }
365 } else {
366 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
367 if (remoteAddress == null) {
368 writtenBytes = fd().write(nioData, nioData.position(), nioData.limit());
369 } else {
370 writtenBytes = fd().sendTo(nioData, nioData.position(), nioData.limit(),
371 remoteAddress.getAddress(), remoteAddress.getPort());
372 }
373 }
374
375 return writtenBytes > 0;
376 }
377
378 @Override
379 protected Object filterOutboundMessage(Object msg) {
380 if (msg instanceof DatagramPacket) {
381 DatagramPacket packet = (DatagramPacket) msg;
382 ByteBuf content = packet.content();
383 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
384 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
385 }
386
387 if (msg instanceof ByteBuf) {
388 ByteBuf buf = (ByteBuf) msg;
389 return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
390 }
391
392 if (msg instanceof AddressedEnvelope) {
393 @SuppressWarnings("unchecked")
394 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
395 if (e.content() instanceof ByteBuf &&
396 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
397
398 ByteBuf content = (ByteBuf) e.content();
399 return UnixChannelUtil.isBufferCopyNeededForWrite(content)?
400 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
401 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
402 }
403 }
404
405 throw new UnsupportedOperationException(
406 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
407 }
408
409 @Override
410 public EpollDatagramChannelConfig config() {
411 return config;
412 }
413
414 @Override
415 protected void doDisconnect() throws Exception {
416 fd().disconnect();
417 connected = active = false;
418 }
419
420 @Override
421 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
422 if (super.doConnect(remoteAddress, localAddress)) {
423 connected = true;
424 return true;
425 }
426 return false;
427 }
428
429 @Override
430 protected void doClose() throws Exception {
431 super.doClose();
432 connected = false;
433 }
434
435 final class EpollDatagramChannelUnsafe extends AbstractEpollUnsafe {
436 private RecvByteBufAllocator.Handle allocHandle;
437
438 @Override
439 void epollInReady() {
440 assert eventLoop().inEventLoop();
441 if (fd().isInputShutdown()) {
442 return;
443 }
444 DatagramChannelConfig config = config();
445 boolean edgeTriggered = isFlagSet(Native.EPOLLET);
446
447 if (!readPending && !edgeTriggered && !config.isAutoRead()) {
448
449 clearEpollIn0();
450 return;
451 }
452
453 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
454 if (allocHandle == null) {
455 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
456 }
457
458 final ChannelPipeline pipeline = pipeline();
459 Throwable exception = null;
460 try {
461
462 final int maxMessagesPerRead = edgeTriggered
463 ? Integer.MAX_VALUE : config.getMaxMessagesPerRead();
464 int messages = 0;
465 do {
466 ByteBuf data = null;
467 try {
468 data = allocHandle.allocate(config.getAllocator());
469 int writerIndex = data.writerIndex();
470 DatagramSocketAddress remoteAddress;
471 if (data.hasMemoryAddress()) {
472
473 remoteAddress = fd().recvFromAddress(data.memoryAddress(), data.writerIndex(),
474 data.capacity());
475 } else {
476 ByteBuffer nioData = data.internalNioBuffer(data.writerIndex(), data.writableBytes());
477 remoteAddress = fd().recvFrom(nioData, nioData.position(), nioData.limit());
478 }
479
480 if (remoteAddress == null) {
481 break;
482 }
483
484 int readBytes = remoteAddress.receivedAmount();
485 data.writerIndex(data.writerIndex() + readBytes);
486 allocHandle.record(readBytes);
487 readPending = false;
488
489 readPending = false;
490 pipeline.fireChannelRead(
491 new DatagramPacket(data, (InetSocketAddress) localAddress(), remoteAddress));
492
493 data = null;
494 } catch (Throwable t) {
495
496
497 exception = t;
498 } finally {
499 if (data != null) {
500 data.release();
501 }
502 if (!edgeTriggered && !config.isAutoRead()) {
503
504
505
506 break;
507 }
508 }
509 } while (++ messages < maxMessagesPerRead || isRdHup());
510
511 pipeline.fireChannelReadComplete();
512
513 if (exception != null) {
514 pipeline.fireExceptionCaught(exception);
515 }
516 } finally {
517
518
519
520
521
522
523 if (!readPending && !config.isAutoRead()) {
524 clearEpollIn();
525 }
526 }
527 }
528 }
529 }