1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.DefaultAddressedEnvelope;
25 import io.netty.channel.socket.DatagramChannel;
26 import io.netty.channel.socket.DatagramChannelConfig;
27 import io.netty.channel.socket.DatagramPacket;
28 import io.netty.channel.socket.InternetProtocolFamily;
29 import io.netty.channel.socket.SocketProtocolFamily;
30 import io.netty.channel.unix.DatagramSocketAddress;
31 import io.netty.channel.unix.Errors;
32 import io.netty.channel.unix.IovArray;
33 import io.netty.channel.unix.UnixChannelUtil;
34 import io.netty.util.UncheckedBooleanSupplier;
35 import io.netty.util.internal.ObjectUtil;
36 import io.netty.util.internal.StringUtil;
37
38 import java.net.InetAddress;
39 import java.net.InetSocketAddress;
40 import java.net.NetworkInterface;
41 import java.net.PortUnreachableException;
42 import java.net.SocketAddress;
43 import java.net.SocketException;
44 import java.nio.ByteBuffer;
45 import java.nio.channels.UnresolvedAddressException;
46
47 import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
48
49 public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
50 private static final String EXPECTED_TYPES =
51 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
52 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
53 StringUtil.simpleClassName(ByteBuf.class) + ", " +
54 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
55 StringUtil.simpleClassName(ByteBuf.class) + ')';
56
57 private volatile boolean connected;
58 private final KQueueDatagramChannelConfig config;
59
60 public KQueueDatagramChannel() {
61 super(null, newSocketDgram(), false);
62 config = new KQueueDatagramChannelConfig(this);
63 }
64
65
66
67
68 @Deprecated
69 public KQueueDatagramChannel(InternetProtocolFamily protocol) {
70 super(null, newSocketDgram(protocol), false);
71 config = new KQueueDatagramChannelConfig(this);
72 }
73
74 public KQueueDatagramChannel(SocketProtocolFamily protocol) {
75 super(null, newSocketDgram(protocol), false);
76 config = new KQueueDatagramChannelConfig(this);
77 }
78
79 public KQueueDatagramChannel(int fd) {
80 this(new BsdSocket(fd), true);
81 }
82
83 KQueueDatagramChannel(BsdSocket socket, boolean active) {
84 super(null, socket, active);
85 config = new KQueueDatagramChannelConfig(this);
86 }
87
88 @Override
89 public InetSocketAddress remoteAddress() {
90 return (InetSocketAddress) super.remoteAddress();
91 }
92
93 @Override
94 public InetSocketAddress localAddress() {
95 return (InetSocketAddress) super.localAddress();
96 }
97
98 @Override
99 @SuppressWarnings("deprecation")
100 public boolean isActive() {
101 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
102 }
103
104 @Override
105 public boolean isConnected() {
106 return connected;
107 }
108
109 @Override
110 public ChannelFuture joinGroup(InetAddress multicastAddress) {
111 return joinGroup(multicastAddress, newPromise());
112 }
113
114 @Override
115 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
116 try {
117 NetworkInterface iface = config().getNetworkInterface();
118 if (iface == null) {
119 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
120 }
121 return joinGroup(multicastAddress, iface, null, promise);
122 } catch (SocketException e) {
123 promise.setFailure(e);
124 }
125 return promise;
126 }
127
128 @Override
129 public ChannelFuture joinGroup(
130 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
131 return joinGroup(multicastAddress, networkInterface, newPromise());
132 }
133
134 @Override
135 public ChannelFuture joinGroup(
136 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
137 ChannelPromise promise) {
138 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
139 }
140
141 @Override
142 public ChannelFuture joinGroup(
143 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
144 return joinGroup(multicastAddress, networkInterface, source, newPromise());
145 }
146
147 @Override
148 public ChannelFuture joinGroup(
149 final InetAddress multicastAddress, final NetworkInterface networkInterface,
150 final InetAddress source, final ChannelPromise promise) {
151
152 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
153 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
154
155 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
156 return promise;
157 }
158
159 @Override
160 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
161 return leaveGroup(multicastAddress, newPromise());
162 }
163
164 @Override
165 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
166 try {
167 return leaveGroup(
168 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
169 } catch (SocketException e) {
170 promise.setFailure(e);
171 }
172 return promise;
173 }
174
175 @Override
176 public ChannelFuture leaveGroup(
177 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
178 return leaveGroup(multicastAddress, networkInterface, newPromise());
179 }
180
181 @Override
182 public ChannelFuture leaveGroup(
183 InetSocketAddress multicastAddress,
184 NetworkInterface networkInterface, ChannelPromise promise) {
185 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
186 }
187
188 @Override
189 public ChannelFuture leaveGroup(
190 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
191 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
192 }
193
194 @Override
195 public ChannelFuture leaveGroup(
196 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
197 final ChannelPromise promise) {
198 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
199 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
200
201 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
202
203 return promise;
204 }
205
206 @Override
207 public ChannelFuture block(
208 InetAddress multicastAddress, NetworkInterface networkInterface,
209 InetAddress sourceToBlock) {
210 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
211 }
212
213 @Override
214 public ChannelFuture block(
215 final InetAddress multicastAddress, final NetworkInterface networkInterface,
216 final InetAddress sourceToBlock, final ChannelPromise promise) {
217 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
218 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
219 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
220 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
221 return promise;
222 }
223
224 @Override
225 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
226 return block(multicastAddress, sourceToBlock, newPromise());
227 }
228
229 @Override
230 public ChannelFuture block(
231 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
232 try {
233 return block(
234 multicastAddress,
235 NetworkInterface.getByInetAddress(localAddress().getAddress()),
236 sourceToBlock, promise);
237 } catch (Throwable e) {
238 promise.setFailure(e);
239 }
240 return promise;
241 }
242
243 @Override
244 protected AbstractKQueueUnsafe newUnsafe() {
245 return new KQueueDatagramChannelUnsafe();
246 }
247
248 @Override
249 protected void doBind(SocketAddress localAddress) throws Exception {
250 super.doBind(localAddress);
251 active = true;
252 }
253
254 @Override
255 protected boolean doWriteMessage(Object msg) throws Exception {
256 final ByteBuf data;
257 InetSocketAddress remoteAddress;
258 if (msg instanceof AddressedEnvelope) {
259 @SuppressWarnings("unchecked")
260 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
261 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
262 data = envelope.content();
263 remoteAddress = envelope.recipient();
264 } else {
265 data = (ByteBuf) msg;
266 remoteAddress = null;
267 }
268
269 final int dataLen = data.readableBytes();
270 if (dataLen == 0) {
271 return true;
272 }
273
274 final long writtenBytes;
275 if (data.hasMemoryAddress()) {
276 long memoryAddress = data.memoryAddress();
277 if (remoteAddress == null) {
278 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
279 } else {
280 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
281 remoteAddress.getAddress(), remoteAddress.getPort());
282 }
283 } else if (data.nioBufferCount() > 1) {
284 IovArray array = registration().ioHandler().cleanArray();
285 array.add(data, data.readerIndex(), data.readableBytes());
286 int cnt = array.count();
287 assert cnt != 0;
288
289 if (remoteAddress == null) {
290 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
291 } else {
292 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
293 remoteAddress.getAddress(), remoteAddress.getPort());
294 }
295 } else {
296 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
297 if (remoteAddress == null) {
298 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
299 } else {
300 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
301 remoteAddress.getAddress(), remoteAddress.getPort());
302 }
303 }
304
305 return writtenBytes > 0;
306 }
307
308 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
309 if (envelope.recipient() instanceof InetSocketAddress
310 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
311 throw new UnresolvedAddressException();
312 }
313 }
314
315 @Override
316 protected Object filterOutboundMessage(Object msg) {
317 if (msg instanceof DatagramPacket) {
318 DatagramPacket packet = (DatagramPacket) msg;
319 checkUnresolved(packet);
320 ByteBuf content = packet.content();
321 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
322 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
323 }
324
325 if (msg instanceof ByteBuf) {
326 ByteBuf buf = (ByteBuf) msg;
327 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
328 }
329
330 if (msg instanceof AddressedEnvelope) {
331 @SuppressWarnings("unchecked")
332 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
333 checkUnresolved(e);
334
335 if (e.content() instanceof ByteBuf &&
336 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
337
338 ByteBuf content = (ByteBuf) e.content();
339 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
340 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
341 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
342 }
343 }
344
345 throw new UnsupportedOperationException(
346 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
347 }
348
349 @Override
350 public KQueueDatagramChannelConfig config() {
351 return config;
352 }
353
354 @Override
355 protected void doDisconnect() throws Exception {
356 socket.disconnect();
357 connected = active = false;
358 resetCachedAddresses();
359 }
360
361 @Override
362 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
363 if (super.doConnect(remoteAddress, localAddress)) {
364 connected = true;
365 return true;
366 }
367 return false;
368 }
369
370 @Override
371 protected void doClose() throws Exception {
372 super.doClose();
373 connected = false;
374 }
375
376 final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
377
378 @Override
379 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
380 assert eventLoop().inEventLoop();
381 final DatagramChannelConfig config = config();
382 if (shouldBreakReadReady(config)) {
383 clearReadFilter0();
384 return;
385 }
386 final ChannelPipeline pipeline = pipeline();
387 final ByteBufAllocator allocator = config.getAllocator();
388 allocHandle.reset(config);
389 readReadyBefore();
390
391 Throwable exception = null;
392 try {
393 ByteBuf byteBuf = null;
394 try {
395 boolean connected = isConnected();
396 do {
397 byteBuf = allocHandle.allocate(allocator);
398 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
399
400 final DatagramPacket packet;
401 if (connected) {
402 try {
403 allocHandle.lastBytesRead(doReadBytes(byteBuf));
404 } catch (Errors.NativeIoException e) {
405
406 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
407 PortUnreachableException error = new PortUnreachableException(e.getMessage());
408 error.initCause(e);
409 throw error;
410 }
411 throw e;
412 }
413 if (allocHandle.lastBytesRead() <= 0) {
414
415 byteBuf.release();
416 byteBuf = null;
417 break;
418 }
419 packet = new DatagramPacket(byteBuf,
420 (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
421 } else {
422 final DatagramSocketAddress remoteAddress;
423 if (byteBuf.hasMemoryAddress()) {
424
425 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
426 byteBuf.capacity());
427 } else {
428 ByteBuffer nioData = byteBuf.internalNioBuffer(
429 byteBuf.writerIndex(), byteBuf.writableBytes());
430 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
431 }
432
433 if (remoteAddress == null) {
434 allocHandle.lastBytesRead(-1);
435 byteBuf.release();
436 byteBuf = null;
437 break;
438 }
439 InetSocketAddress localAddress = remoteAddress.localAddress();
440 if (localAddress == null) {
441 localAddress = (InetSocketAddress) localAddress();
442 }
443 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
444 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
445
446 packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
447 }
448
449 allocHandle.incMessagesRead(1);
450
451 readPending = false;
452 pipeline.fireChannelRead(packet);
453
454 byteBuf = null;
455
456
457
458 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
459 } catch (Throwable t) {
460 if (byteBuf != null) {
461 byteBuf.release();
462 }
463 exception = t;
464 }
465
466 allocHandle.readComplete();
467 pipeline.fireChannelReadComplete();
468
469 if (exception != null) {
470 pipeline.fireExceptionCaught(exception);
471 }
472 } finally {
473 readReadyFinally(config);
474 }
475 }
476 }
477 }