1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.DefaultAddressedEnvelope;
25 import io.netty.channel.socket.DatagramChannel;
26 import io.netty.channel.socket.DatagramChannelConfig;
27 import io.netty.channel.socket.DatagramPacket;
28 import io.netty.channel.socket.InternetProtocolFamily;
29 import io.netty.channel.unix.DatagramSocketAddress;
30 import io.netty.channel.unix.Errors;
31 import io.netty.channel.unix.IovArray;
32 import io.netty.channel.unix.UnixChannelUtil;
33 import io.netty.util.UncheckedBooleanSupplier;
34 import io.netty.util.internal.ObjectUtil;
35 import io.netty.util.internal.StringUtil;
36
37 import java.io.IOException;
38 import java.net.InetAddress;
39 import java.net.InetSocketAddress;
40 import java.net.NetworkInterface;
41 import java.net.PortUnreachableException;
42 import java.net.SocketAddress;
43 import java.net.SocketException;
44 import java.nio.ByteBuffer;
45 import java.nio.channels.UnresolvedAddressException;
46
47 import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
48
49 public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
50 private static final String EXPECTED_TYPES =
51 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
52 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
53 StringUtil.simpleClassName(ByteBuf.class) + ", " +
54 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
55 StringUtil.simpleClassName(ByteBuf.class) + ')';
56
57 private volatile boolean connected;
58 private final KQueueDatagramChannelConfig config;
59
60 public KQueueDatagramChannel() {
61 super(null, newSocketDgram(), false);
62 config = new KQueueDatagramChannelConfig(this);
63 }
64
65 public KQueueDatagramChannel(InternetProtocolFamily protocol) {
66 super(null, newSocketDgram(protocol), false);
67 config = new KQueueDatagramChannelConfig(this);
68 }
69
70 public KQueueDatagramChannel(int fd) {
71 this(new BsdSocket(fd), true);
72 }
73
74 KQueueDatagramChannel(BsdSocket socket, boolean active) {
75 super(null, socket, active);
76 config = new KQueueDatagramChannelConfig(this);
77 }
78
79 @Override
80 public InetSocketAddress remoteAddress() {
81 return (InetSocketAddress) super.remoteAddress();
82 }
83
84 @Override
85 public InetSocketAddress localAddress() {
86 return (InetSocketAddress) super.localAddress();
87 }
88
89 @Override
90 @SuppressWarnings("deprecation")
91 public boolean isActive() {
92 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
93 }
94
95 @Override
96 public boolean isConnected() {
97 return connected;
98 }
99
100 @Override
101 public ChannelFuture joinGroup(InetAddress multicastAddress) {
102 return joinGroup(multicastAddress, newPromise());
103 }
104
105 @Override
106 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
107 try {
108 NetworkInterface iface = config().getNetworkInterface();
109 if (iface == null) {
110 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
111 }
112 return joinGroup(multicastAddress, iface, null, promise);
113 } catch (SocketException e) {
114 promise.setFailure(e);
115 }
116 return promise;
117 }
118
119 @Override
120 public ChannelFuture joinGroup(
121 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
122 return joinGroup(multicastAddress, networkInterface, newPromise());
123 }
124
125 @Override
126 public ChannelFuture joinGroup(
127 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
128 ChannelPromise promise) {
129 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
130 }
131
132 @Override
133 public ChannelFuture joinGroup(
134 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
135 return joinGroup(multicastAddress, networkInterface, source, newPromise());
136 }
137
138 @Override
139 public ChannelFuture joinGroup(
140 final InetAddress multicastAddress, final NetworkInterface networkInterface,
141 final InetAddress source, final ChannelPromise promise) {
142
143 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
144 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
145
146 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
147 return promise;
148 }
149
150 @Override
151 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
152 return leaveGroup(multicastAddress, newPromise());
153 }
154
155 @Override
156 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
157 try {
158 return leaveGroup(
159 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
160 } catch (SocketException e) {
161 promise.setFailure(e);
162 }
163 return promise;
164 }
165
166 @Override
167 public ChannelFuture leaveGroup(
168 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
169 return leaveGroup(multicastAddress, networkInterface, newPromise());
170 }
171
172 @Override
173 public ChannelFuture leaveGroup(
174 InetSocketAddress multicastAddress,
175 NetworkInterface networkInterface, ChannelPromise promise) {
176 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
177 }
178
179 @Override
180 public ChannelFuture leaveGroup(
181 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
182 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
183 }
184
185 @Override
186 public ChannelFuture leaveGroup(
187 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
188 final ChannelPromise promise) {
189 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
190 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
191
192 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
193
194 return promise;
195 }
196
197 @Override
198 public ChannelFuture block(
199 InetAddress multicastAddress, NetworkInterface networkInterface,
200 InetAddress sourceToBlock) {
201 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
202 }
203
204 @Override
205 public ChannelFuture block(
206 final InetAddress multicastAddress, final NetworkInterface networkInterface,
207 final InetAddress sourceToBlock, final ChannelPromise promise) {
208 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
209 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
210 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
211 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
212 return promise;
213 }
214
215 @Override
216 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
217 return block(multicastAddress, sourceToBlock, newPromise());
218 }
219
220 @Override
221 public ChannelFuture block(
222 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
223 try {
224 return block(
225 multicastAddress,
226 NetworkInterface.getByInetAddress(localAddress().getAddress()),
227 sourceToBlock, promise);
228 } catch (Throwable e) {
229 promise.setFailure(e);
230 }
231 return promise;
232 }
233
234 @Override
235 protected AbstractKQueueUnsafe newUnsafe() {
236 return new KQueueDatagramChannelUnsafe();
237 }
238
239 @Override
240 protected void doBind(SocketAddress localAddress) throws Exception {
241 super.doBind(localAddress);
242 active = true;
243 }
244
245 @Override
246 protected boolean doWriteMessage(Object msg) throws Exception {
247 final ByteBuf data;
248 InetSocketAddress remoteAddress;
249 if (msg instanceof AddressedEnvelope) {
250 @SuppressWarnings("unchecked")
251 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
252 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
253 data = envelope.content();
254 remoteAddress = envelope.recipient();
255 } else {
256 data = (ByteBuf) msg;
257 remoteAddress = null;
258 }
259
260 final int dataLen = data.readableBytes();
261 if (dataLen == 0) {
262 return true;
263 }
264
265 final long writtenBytes;
266 if (data.hasMemoryAddress()) {
267 long memoryAddress = data.memoryAddress();
268 if (remoteAddress == null) {
269 try {
270 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
271 } catch (Errors.NativeIoException e) {
272 throw translateForConnected(e);
273 }
274 } else {
275 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
276 remoteAddress.getAddress(), remoteAddress.getPort());
277 }
278 } else if (data.nioBufferCount() > 1) {
279 IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
280 array.add(data, data.readerIndex(), data.readableBytes());
281 int cnt = array.count();
282 assert cnt != 0;
283
284 if (remoteAddress == null) {
285 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
286 } else {
287 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
288 remoteAddress.getAddress(), remoteAddress.getPort());
289 }
290 } else {
291 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
292 if (remoteAddress == null) {
293 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
294 } else {
295 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
296 remoteAddress.getAddress(), remoteAddress.getPort());
297 }
298 }
299
300 return writtenBytes > 0;
301 }
302
303 private static IOException translateForConnected(Errors.NativeIoException e) {
304
305 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
306 PortUnreachableException error = new PortUnreachableException(e.getMessage());
307 error.initCause(e);
308 return error;
309 }
310 return e;
311 }
312
313 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
314 if (envelope.recipient() instanceof InetSocketAddress
315 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
316 throw new UnresolvedAddressException();
317 }
318 }
319
320 @Override
321 protected Object filterOutboundMessage(Object msg) {
322 if (msg instanceof DatagramPacket) {
323 DatagramPacket packet = (DatagramPacket) msg;
324 checkUnresolved(packet);
325 ByteBuf content = packet.content();
326 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
327 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
328 }
329
330 if (msg instanceof ByteBuf) {
331 ByteBuf buf = (ByteBuf) msg;
332 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
333 }
334
335 if (msg instanceof AddressedEnvelope) {
336 @SuppressWarnings("unchecked")
337 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
338 checkUnresolved(e);
339
340 if (e.content() instanceof ByteBuf &&
341 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
342
343 ByteBuf content = (ByteBuf) e.content();
344 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
345 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
346 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
347 }
348 }
349
350 throw new UnsupportedOperationException(
351 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
352 }
353
354 @Override
355 public KQueueDatagramChannelConfig config() {
356 return config;
357 }
358
359 @Override
360 protected void doDisconnect() throws Exception {
361 socket.disconnect();
362 connected = active = false;
363 resetCachedAddresses();
364 }
365
366 @Override
367 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
368 if (super.doConnect(remoteAddress, localAddress)) {
369 connected = true;
370 return true;
371 }
372 return false;
373 }
374
375 @Override
376 protected void doClose() throws Exception {
377 super.doClose();
378 connected = false;
379 }
380
381 final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
382
383 @Override
384 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
385 assert eventLoop().inEventLoop();
386 final DatagramChannelConfig config = config();
387 if (shouldBreakReadReady(config)) {
388 clearReadFilter0();
389 return;
390 }
391 final ChannelPipeline pipeline = pipeline();
392 final ByteBufAllocator allocator = config.getAllocator();
393 allocHandle.reset(config);
394 readReadyBefore();
395
396 Throwable exception = null;
397 try {
398 ByteBuf byteBuf = null;
399 try {
400 boolean connected = isConnected();
401 do {
402 byteBuf = allocHandle.allocate(allocator);
403 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
404
405 final DatagramPacket packet;
406 if (connected) {
407 try {
408 allocHandle.lastBytesRead(doReadBytes(byteBuf));
409 } catch (Errors.NativeIoException e) {
410
411 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
412 PortUnreachableException error = new PortUnreachableException(e.getMessage());
413 error.initCause(e);
414 throw error;
415 }
416 throw e;
417 }
418 if (allocHandle.lastBytesRead() <= 0) {
419
420 byteBuf.release();
421 byteBuf = null;
422 break;
423 }
424 packet = new DatagramPacket(byteBuf,
425 (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
426 } else {
427 final DatagramSocketAddress remoteAddress;
428 if (byteBuf.hasMemoryAddress()) {
429
430 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
431 byteBuf.capacity());
432 } else {
433 ByteBuffer nioData = byteBuf.internalNioBuffer(
434 byteBuf.writerIndex(), byteBuf.writableBytes());
435 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
436 }
437
438 if (remoteAddress == null) {
439 allocHandle.lastBytesRead(-1);
440 byteBuf.release();
441 byteBuf = null;
442 break;
443 }
444 InetSocketAddress localAddress = remoteAddress.localAddress();
445 if (localAddress == null) {
446 localAddress = (InetSocketAddress) localAddress();
447 }
448 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
449 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
450
451 packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
452 }
453
454 allocHandle.incMessagesRead(1);
455
456 readPending = false;
457 pipeline.fireChannelRead(packet);
458
459 byteBuf = null;
460
461
462
463 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
464 } catch (Throwable t) {
465 if (byteBuf != null) {
466 byteBuf.release();
467 }
468 exception = t;
469 }
470
471 allocHandle.readComplete();
472 pipeline.fireChannelReadComplete();
473
474 if (exception != null) {
475 pipeline.fireExceptionCaught(exception);
476 }
477 } finally {
478 readReadyFinally(config);
479 }
480 }
481 }
482 }