1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.DefaultAddressedEnvelope;
25 import io.netty.channel.socket.DatagramChannel;
26 import io.netty.channel.socket.DatagramChannelConfig;
27 import io.netty.channel.socket.DatagramPacket;
28 import io.netty.channel.socket.InternetProtocolFamily;
29 import io.netty.channel.socket.SocketProtocolFamily;
30 import io.netty.channel.unix.DatagramSocketAddress;
31 import io.netty.channel.unix.Errors;
32 import io.netty.channel.unix.IovArray;
33 import io.netty.channel.unix.UnixChannelUtil;
34 import io.netty.util.UncheckedBooleanSupplier;
35 import io.netty.util.internal.ObjectUtil;
36 import io.netty.util.internal.StringUtil;
37
38 import java.io.IOException;
39 import java.net.InetAddress;
40 import java.net.InetSocketAddress;
41 import java.net.NetworkInterface;
42 import java.net.PortUnreachableException;
43 import java.net.SocketAddress;
44 import java.net.SocketException;
45 import java.nio.ByteBuffer;
46 import java.nio.channels.UnresolvedAddressException;
47
48 import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
49
50 public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
51 private static final String EXPECTED_TYPES =
52 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54 StringUtil.simpleClassName(ByteBuf.class) + ", " +
55 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56 StringUtil.simpleClassName(ByteBuf.class) + ')';
57
58 private volatile boolean connected;
59 private final KQueueDatagramChannelConfig config;
60
61 public KQueueDatagramChannel() {
62 super(null, newSocketDgram(), false);
63 config = new KQueueDatagramChannelConfig(this);
64 }
65
66
67
68
69 @Deprecated
70 public KQueueDatagramChannel(InternetProtocolFamily protocol) {
71 super(null, newSocketDgram(protocol), false);
72 config = new KQueueDatagramChannelConfig(this);
73 }
74
75 public KQueueDatagramChannel(SocketProtocolFamily protocol) {
76 super(null, newSocketDgram(protocol), false);
77 config = new KQueueDatagramChannelConfig(this);
78 }
79
80 public KQueueDatagramChannel(int fd) {
81 this(new BsdSocket(fd), true);
82 }
83
84 KQueueDatagramChannel(BsdSocket socket, boolean active) {
85 super(null, socket, active);
86 config = new KQueueDatagramChannelConfig(this);
87 }
88
89 @Override
90 public InetSocketAddress remoteAddress() {
91 return (InetSocketAddress) super.remoteAddress();
92 }
93
94 @Override
95 public InetSocketAddress localAddress() {
96 return (InetSocketAddress) super.localAddress();
97 }
98
99 @Override
100 @SuppressWarnings("deprecation")
101 public boolean isActive() {
102 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
103 }
104
105 @Override
106 public boolean isConnected() {
107 return connected;
108 }
109
110 @Override
111 public ChannelFuture joinGroup(InetAddress multicastAddress) {
112 return joinGroup(multicastAddress, newPromise());
113 }
114
115 @Override
116 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
117 try {
118 NetworkInterface iface = config().getNetworkInterface();
119 if (iface == null) {
120 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
121 }
122 return joinGroup(multicastAddress, iface, null, promise);
123 } catch (SocketException e) {
124 promise.setFailure(e);
125 }
126 return promise;
127 }
128
129 @Override
130 public ChannelFuture joinGroup(
131 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
132 return joinGroup(multicastAddress, networkInterface, newPromise());
133 }
134
135 @Override
136 public ChannelFuture joinGroup(
137 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
138 ChannelPromise promise) {
139 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
140 }
141
142 @Override
143 public ChannelFuture joinGroup(
144 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
145 return joinGroup(multicastAddress, networkInterface, source, newPromise());
146 }
147
148 @Override
149 public ChannelFuture joinGroup(
150 final InetAddress multicastAddress, final NetworkInterface networkInterface,
151 final InetAddress source, final ChannelPromise promise) {
152
153 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
154 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
155
156 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
157 return promise;
158 }
159
160 @Override
161 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
162 return leaveGroup(multicastAddress, newPromise());
163 }
164
165 @Override
166 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
167 try {
168 return leaveGroup(
169 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
170 } catch (SocketException e) {
171 promise.setFailure(e);
172 }
173 return promise;
174 }
175
176 @Override
177 public ChannelFuture leaveGroup(
178 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
179 return leaveGroup(multicastAddress, networkInterface, newPromise());
180 }
181
182 @Override
183 public ChannelFuture leaveGroup(
184 InetSocketAddress multicastAddress,
185 NetworkInterface networkInterface, ChannelPromise promise) {
186 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
187 }
188
189 @Override
190 public ChannelFuture leaveGroup(
191 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
192 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
193 }
194
195 @Override
196 public ChannelFuture leaveGroup(
197 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
198 final ChannelPromise promise) {
199 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
200 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
201
202 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
203
204 return promise;
205 }
206
207 @Override
208 public ChannelFuture block(
209 InetAddress multicastAddress, NetworkInterface networkInterface,
210 InetAddress sourceToBlock) {
211 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
212 }
213
214 @Override
215 public ChannelFuture block(
216 final InetAddress multicastAddress, final NetworkInterface networkInterface,
217 final InetAddress sourceToBlock, final ChannelPromise promise) {
218 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
219 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
220 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
221 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
222 return promise;
223 }
224
225 @Override
226 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
227 return block(multicastAddress, sourceToBlock, newPromise());
228 }
229
230 @Override
231 public ChannelFuture block(
232 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
233 try {
234 return block(
235 multicastAddress,
236 NetworkInterface.getByInetAddress(localAddress().getAddress()),
237 sourceToBlock, promise);
238 } catch (Throwable e) {
239 promise.setFailure(e);
240 }
241 return promise;
242 }
243
244 @Override
245 protected AbstractKQueueUnsafe newUnsafe() {
246 return new KQueueDatagramChannelUnsafe();
247 }
248
249 @Override
250 protected void doBind(SocketAddress localAddress) throws Exception {
251 super.doBind(localAddress);
252 active = true;
253 }
254
255 @Override
256 protected boolean doWriteMessage(Object msg) throws Exception {
257 final ByteBuf data;
258 InetSocketAddress remoteAddress;
259 if (msg instanceof AddressedEnvelope) {
260 @SuppressWarnings("unchecked")
261 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
262 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
263 data = envelope.content();
264 remoteAddress = envelope.recipient();
265 } else {
266 data = (ByteBuf) msg;
267 remoteAddress = null;
268 }
269
270 final int dataLen = data.readableBytes();
271 if (dataLen == 0) {
272 return true;
273 }
274
275 final long writtenBytes;
276 if (data.hasMemoryAddress()) {
277 long memoryAddress = data.memoryAddress();
278 if (remoteAddress == null) {
279 try {
280 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
281 } catch (Errors.NativeIoException e) {
282 throw translateForConnected(e);
283 }
284 } else {
285 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
286 remoteAddress.getAddress(), remoteAddress.getPort());
287 }
288 } else if (data.nioBufferCount() > 1) {
289 IovArray array = ((NativeArrays) registration().attachment()).cleanIovArray();
290 array.add(data, data.readerIndex(), data.readableBytes());
291 int cnt = array.count();
292 assert cnt != 0;
293
294 if (remoteAddress == null) {
295 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
296 } else {
297 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
298 remoteAddress.getAddress(), remoteAddress.getPort());
299 }
300 } else {
301 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
302 if (remoteAddress == null) {
303 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
304 } else {
305 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
306 remoteAddress.getAddress(), remoteAddress.getPort());
307 }
308 }
309
310 return writtenBytes > 0;
311 }
312
313 private static IOException translateForConnected(Errors.NativeIoException e) {
314
315 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
316 PortUnreachableException error = new PortUnreachableException(e.getMessage());
317 error.initCause(e);
318 return error;
319 }
320 return e;
321 }
322
323 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
324 if (envelope.recipient() instanceof InetSocketAddress
325 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
326 throw new UnresolvedAddressException();
327 }
328 }
329
330 @Override
331 protected Object filterOutboundMessage(Object msg) {
332 if (msg instanceof DatagramPacket) {
333 DatagramPacket packet = (DatagramPacket) msg;
334 checkUnresolved(packet);
335 ByteBuf content = packet.content();
336 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
337 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
338 }
339
340 if (msg instanceof ByteBuf) {
341 ByteBuf buf = (ByteBuf) msg;
342 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
343 }
344
345 if (msg instanceof AddressedEnvelope) {
346 @SuppressWarnings("unchecked")
347 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
348 checkUnresolved(e);
349
350 if (e.content() instanceof ByteBuf &&
351 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
352
353 ByteBuf content = (ByteBuf) e.content();
354 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
355 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
356 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
357 }
358 }
359
360 throw new UnsupportedOperationException(
361 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
362 }
363
364 @Override
365 public KQueueDatagramChannelConfig config() {
366 return config;
367 }
368
369 @Override
370 protected void doDisconnect() throws Exception {
371 socket.disconnect();
372 connected = active = false;
373 resetCachedAddresses();
374 }
375
376 @Override
377 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
378 if (super.doConnect(remoteAddress, localAddress)) {
379 connected = true;
380 return true;
381 }
382 return false;
383 }
384
385 @Override
386 protected void doClose() throws Exception {
387 super.doClose();
388 connected = false;
389 }
390
391 final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
392
393 @Override
394 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
395 assert eventLoop().inEventLoop();
396 final DatagramChannelConfig config = config();
397 if (shouldBreakReadReady(config)) {
398 clearReadFilter0();
399 return;
400 }
401 final ChannelPipeline pipeline = pipeline();
402 final ByteBufAllocator allocator = config.getAllocator();
403 allocHandle.reset(config);
404
405 Throwable exception = null;
406 try {
407 ByteBuf byteBuf = null;
408 try {
409 boolean connected = isConnected();
410 do {
411 byteBuf = allocHandle.allocate(allocator);
412 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
413
414 final DatagramPacket packet;
415 if (connected) {
416 try {
417 allocHandle.lastBytesRead(doReadBytes(byteBuf));
418 } catch (Errors.NativeIoException e) {
419
420 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
421 PortUnreachableException error = new PortUnreachableException(e.getMessage());
422 error.initCause(e);
423 throw error;
424 }
425 throw e;
426 }
427 if (allocHandle.lastBytesRead() <= 0) {
428
429 byteBuf.release();
430 byteBuf = null;
431 break;
432 }
433 packet = new DatagramPacket(byteBuf,
434 (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
435 } else {
436 final DatagramSocketAddress remoteAddress;
437 if (byteBuf.hasMemoryAddress()) {
438
439 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
440 byteBuf.capacity());
441 } else {
442 ByteBuffer nioData = byteBuf.internalNioBuffer(
443 byteBuf.writerIndex(), byteBuf.writableBytes());
444 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
445 }
446
447 if (remoteAddress == null) {
448 allocHandle.lastBytesRead(-1);
449 byteBuf.release();
450 byteBuf = null;
451 break;
452 }
453 InetSocketAddress localAddress = remoteAddress.localAddress();
454 if (localAddress == null) {
455 localAddress = (InetSocketAddress) localAddress();
456 }
457 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
458 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
459
460 packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
461 }
462
463 allocHandle.incMessagesRead(1);
464
465 readPending = false;
466 pipeline.fireChannelRead(packet);
467
468 byteBuf = null;
469
470
471
472 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
473 } catch (Throwable t) {
474 if (byteBuf != null) {
475 byteBuf.release();
476 }
477 exception = t;
478 }
479
480 allocHandle.readComplete();
481 pipeline.fireChannelReadComplete();
482
483 if (exception != null) {
484 pipeline.fireExceptionCaught(exception);
485 }
486 } finally {
487 if (shouldStopReading(config)) {
488 clearReadFilter0();
489 }
490 }
491 }
492 }
493 }