1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.DefaultAddressedEnvelope;
25 import io.netty.channel.socket.DatagramChannel;
26 import io.netty.channel.socket.DatagramChannelConfig;
27 import io.netty.channel.socket.DatagramPacket;
28 import io.netty.channel.socket.InternetProtocolFamily;
29 import io.netty.channel.unix.DatagramSocketAddress;
30 import io.netty.channel.unix.Errors;
31 import io.netty.channel.unix.IovArray;
32 import io.netty.channel.unix.UnixChannelUtil;
33 import io.netty.util.UncheckedBooleanSupplier;
34 import io.netty.util.internal.ObjectUtil;
35 import io.netty.util.internal.StringUtil;
36
37 import java.net.InetAddress;
38 import java.net.InetSocketAddress;
39 import java.net.NetworkInterface;
40 import java.net.PortUnreachableException;
41 import java.net.SocketAddress;
42 import java.net.SocketException;
43 import java.nio.ByteBuffer;
44 import java.nio.channels.UnresolvedAddressException;
45
46 import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
47
48 public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
49 private static final String EXPECTED_TYPES =
50 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
51 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
52 StringUtil.simpleClassName(ByteBuf.class) + ", " +
53 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
54 StringUtil.simpleClassName(ByteBuf.class) + ')';
55
56 private volatile boolean connected;
57 private final KQueueDatagramChannelConfig config;
58
59 public KQueueDatagramChannel() {
60 super(null, newSocketDgram(), false);
61 config = new KQueueDatagramChannelConfig(this);
62 }
63
64 public KQueueDatagramChannel(InternetProtocolFamily protocol) {
65 super(null, newSocketDgram(protocol), false);
66 config = new KQueueDatagramChannelConfig(this);
67 }
68
69 public KQueueDatagramChannel(int fd) {
70 this(new BsdSocket(fd), true);
71 }
72
73 KQueueDatagramChannel(BsdSocket socket, boolean active) {
74 super(null, socket, active);
75 config = new KQueueDatagramChannelConfig(this);
76 }
77
78 @Override
79 public InetSocketAddress remoteAddress() {
80 return (InetSocketAddress) super.remoteAddress();
81 }
82
83 @Override
84 public InetSocketAddress localAddress() {
85 return (InetSocketAddress) super.localAddress();
86 }
87
88 @Override
89 @SuppressWarnings("deprecation")
90 public boolean isActive() {
91 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
92 }
93
94 @Override
95 public boolean isConnected() {
96 return connected;
97 }
98
99 @Override
100 public ChannelFuture joinGroup(InetAddress multicastAddress) {
101 return joinGroup(multicastAddress, newPromise());
102 }
103
104 @Override
105 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
106 try {
107 NetworkInterface iface = config().getNetworkInterface();
108 if (iface == null) {
109 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
110 }
111 return joinGroup(multicastAddress, iface, null, promise);
112 } catch (SocketException e) {
113 promise.setFailure(e);
114 }
115 return promise;
116 }
117
118 @Override
119 public ChannelFuture joinGroup(
120 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
121 return joinGroup(multicastAddress, networkInterface, newPromise());
122 }
123
124 @Override
125 public ChannelFuture joinGroup(
126 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
127 ChannelPromise promise) {
128 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
129 }
130
131 @Override
132 public ChannelFuture joinGroup(
133 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
134 return joinGroup(multicastAddress, networkInterface, source, newPromise());
135 }
136
137 @Override
138 public ChannelFuture joinGroup(
139 final InetAddress multicastAddress, final NetworkInterface networkInterface,
140 final InetAddress source, final ChannelPromise promise) {
141
142 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
143 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
144
145 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
146 return promise;
147 }
148
149 @Override
150 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
151 return leaveGroup(multicastAddress, newPromise());
152 }
153
154 @Override
155 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
156 try {
157 return leaveGroup(
158 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
159 } catch (SocketException e) {
160 promise.setFailure(e);
161 }
162 return promise;
163 }
164
165 @Override
166 public ChannelFuture leaveGroup(
167 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
168 return leaveGroup(multicastAddress, networkInterface, newPromise());
169 }
170
171 @Override
172 public ChannelFuture leaveGroup(
173 InetSocketAddress multicastAddress,
174 NetworkInterface networkInterface, ChannelPromise promise) {
175 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
176 }
177
178 @Override
179 public ChannelFuture leaveGroup(
180 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
181 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
182 }
183
184 @Override
185 public ChannelFuture leaveGroup(
186 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
187 final ChannelPromise promise) {
188 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
189 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
190
191 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
192
193 return promise;
194 }
195
196 @Override
197 public ChannelFuture block(
198 InetAddress multicastAddress, NetworkInterface networkInterface,
199 InetAddress sourceToBlock) {
200 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
201 }
202
203 @Override
204 public ChannelFuture block(
205 final InetAddress multicastAddress, final NetworkInterface networkInterface,
206 final InetAddress sourceToBlock, final ChannelPromise promise) {
207 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
208 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
209 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
210 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
211 return promise;
212 }
213
214 @Override
215 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
216 return block(multicastAddress, sourceToBlock, newPromise());
217 }
218
219 @Override
220 public ChannelFuture block(
221 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
222 try {
223 return block(
224 multicastAddress,
225 NetworkInterface.getByInetAddress(localAddress().getAddress()),
226 sourceToBlock, promise);
227 } catch (Throwable e) {
228 promise.setFailure(e);
229 }
230 return promise;
231 }
232
233 @Override
234 protected AbstractKQueueUnsafe newUnsafe() {
235 return new KQueueDatagramChannelUnsafe();
236 }
237
238 @Override
239 protected void doBind(SocketAddress localAddress) throws Exception {
240 super.doBind(localAddress);
241 active = true;
242 }
243
244 @Override
245 protected boolean doWriteMessage(Object msg) throws Exception {
246 final ByteBuf data;
247 InetSocketAddress remoteAddress;
248 if (msg instanceof AddressedEnvelope) {
249 @SuppressWarnings("unchecked")
250 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
251 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
252 data = envelope.content();
253 remoteAddress = envelope.recipient();
254 } else {
255 data = (ByteBuf) msg;
256 remoteAddress = null;
257 }
258
259 final int dataLen = data.readableBytes();
260 if (dataLen == 0) {
261 return true;
262 }
263
264 final long writtenBytes;
265 if (data.hasMemoryAddress()) {
266 long memoryAddress = data.memoryAddress();
267 if (remoteAddress == null) {
268 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
269 } else {
270 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
271 remoteAddress.getAddress(), remoteAddress.getPort());
272 }
273 } else if (data.nioBufferCount() > 1) {
274 IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
275 array.add(data, data.readerIndex(), data.readableBytes());
276 int cnt = array.count();
277 assert cnt != 0;
278
279 if (remoteAddress == null) {
280 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
281 } else {
282 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
283 remoteAddress.getAddress(), remoteAddress.getPort());
284 }
285 } else {
286 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
287 if (remoteAddress == null) {
288 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
289 } else {
290 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
291 remoteAddress.getAddress(), remoteAddress.getPort());
292 }
293 }
294
295 return writtenBytes > 0;
296 }
297
298 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
299 if (envelope.recipient() instanceof InetSocketAddress
300 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
301 throw new UnresolvedAddressException();
302 }
303 }
304
305 @Override
306 protected Object filterOutboundMessage(Object msg) {
307 if (msg instanceof DatagramPacket) {
308 DatagramPacket packet = (DatagramPacket) msg;
309 checkUnresolved(packet);
310 ByteBuf content = packet.content();
311 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
312 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
313 }
314
315 if (msg instanceof ByteBuf) {
316 ByteBuf buf = (ByteBuf) msg;
317 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
318 }
319
320 if (msg instanceof AddressedEnvelope) {
321 @SuppressWarnings("unchecked")
322 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
323 checkUnresolved(e);
324
325 if (e.content() instanceof ByteBuf &&
326 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
327
328 ByteBuf content = (ByteBuf) e.content();
329 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
330 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
331 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
332 }
333 }
334
335 throw new UnsupportedOperationException(
336 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
337 }
338
339 @Override
340 public KQueueDatagramChannelConfig config() {
341 return config;
342 }
343
344 @Override
345 protected void doDisconnect() throws Exception {
346 socket.disconnect();
347 connected = active = false;
348 resetCachedAddresses();
349 }
350
351 @Override
352 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
353 if (super.doConnect(remoteAddress, localAddress)) {
354 connected = true;
355 return true;
356 }
357 return false;
358 }
359
360 @Override
361 protected void doClose() throws Exception {
362 super.doClose();
363 connected = false;
364 }
365
366 final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
367
368 @Override
369 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
370 assert eventLoop().inEventLoop();
371 final DatagramChannelConfig config = config();
372 if (shouldBreakReadReady(config)) {
373 clearReadFilter0();
374 return;
375 }
376 final ChannelPipeline pipeline = pipeline();
377 final ByteBufAllocator allocator = config.getAllocator();
378 allocHandle.reset(config);
379 readReadyBefore();
380
381 Throwable exception = null;
382 try {
383 ByteBuf byteBuf = null;
384 try {
385 boolean connected = isConnected();
386 do {
387 byteBuf = allocHandle.allocate(allocator);
388 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
389
390 final DatagramPacket packet;
391 if (connected) {
392 try {
393 allocHandle.lastBytesRead(doReadBytes(byteBuf));
394 } catch (Errors.NativeIoException e) {
395
396 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
397 PortUnreachableException error = new PortUnreachableException(e.getMessage());
398 error.initCause(e);
399 throw error;
400 }
401 throw e;
402 }
403 if (allocHandle.lastBytesRead() <= 0) {
404
405 byteBuf.release();
406 byteBuf = null;
407 break;
408 }
409 packet = new DatagramPacket(byteBuf,
410 (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
411 } else {
412 final DatagramSocketAddress remoteAddress;
413 if (byteBuf.hasMemoryAddress()) {
414
415 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
416 byteBuf.capacity());
417 } else {
418 ByteBuffer nioData = byteBuf.internalNioBuffer(
419 byteBuf.writerIndex(), byteBuf.writableBytes());
420 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
421 }
422
423 if (remoteAddress == null) {
424 allocHandle.lastBytesRead(-1);
425 byteBuf.release();
426 byteBuf = null;
427 break;
428 }
429 InetSocketAddress localAddress = remoteAddress.localAddress();
430 if (localAddress == null) {
431 localAddress = (InetSocketAddress) localAddress();
432 }
433 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
434 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
435
436 packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
437 }
438
439 allocHandle.incMessagesRead(1);
440
441 readPending = false;
442 pipeline.fireChannelRead(packet);
443
444 byteBuf = null;
445
446
447
448 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
449 } catch (Throwable t) {
450 if (byteBuf != null) {
451 byteBuf.release();
452 }
453 exception = t;
454 }
455
456 allocHandle.readComplete();
457 pipeline.fireChannelReadComplete();
458
459 if (exception != null) {
460 pipeline.fireExceptionCaught(exception);
461 }
462 } finally {
463 readReadyFinally(config);
464 }
465 }
466 }
467 }