1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.kqueue;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.AddressedEnvelope;
21 import io.netty.channel.ChannelFuture;
22 import io.netty.channel.ChannelPipeline;
23 import io.netty.channel.ChannelPromise;
24 import io.netty.channel.DefaultAddressedEnvelope;
25 import io.netty.channel.socket.DatagramChannel;
26 import io.netty.channel.socket.DatagramChannelConfig;
27 import io.netty.channel.socket.DatagramPacket;
28 import io.netty.channel.socket.InternetProtocolFamily;
29 import io.netty.channel.unix.DatagramSocketAddress;
30 import io.netty.channel.unix.Errors;
31 import io.netty.channel.unix.IovArray;
32 import io.netty.channel.unix.UnixChannelUtil;
33 import io.netty.util.UncheckedBooleanSupplier;
34 import io.netty.util.internal.ObjectUtil;
35 import io.netty.util.internal.StringUtil;
36 import io.netty.util.internal.UnstableApi;
37
38 import java.net.InetAddress;
39 import java.net.InetSocketAddress;
40 import java.net.NetworkInterface;
41 import java.net.PortUnreachableException;
42 import java.net.SocketAddress;
43 import java.net.SocketException;
44 import java.nio.ByteBuffer;
45 import java.nio.channels.UnresolvedAddressException;
46
47 import static io.netty.channel.kqueue.BsdSocket.newSocketDgram;
48
49 @UnstableApi
50 public final class KQueueDatagramChannel extends AbstractKQueueDatagramChannel implements DatagramChannel {
51 private static final String EXPECTED_TYPES =
52 " (expected: " + StringUtil.simpleClassName(DatagramPacket.class) + ", " +
53 StringUtil.simpleClassName(AddressedEnvelope.class) + '<' +
54 StringUtil.simpleClassName(ByteBuf.class) + ", " +
55 StringUtil.simpleClassName(InetSocketAddress.class) + ">, " +
56 StringUtil.simpleClassName(ByteBuf.class) + ')';
57
58 private volatile boolean connected;
59 private final KQueueDatagramChannelConfig config;
60
61 public KQueueDatagramChannel() {
62 super(null, newSocketDgram(), false);
63 config = new KQueueDatagramChannelConfig(this);
64 }
65
66 public KQueueDatagramChannel(InternetProtocolFamily protocol) {
67 super(null, newSocketDgram(protocol), false);
68 config = new KQueueDatagramChannelConfig(this);
69 }
70
71 public KQueueDatagramChannel(int fd) {
72 this(new BsdSocket(fd), true);
73 }
74
75 KQueueDatagramChannel(BsdSocket socket, boolean active) {
76 super(null, socket, active);
77 config = new KQueueDatagramChannelConfig(this);
78 }
79
80 @Override
81 public InetSocketAddress remoteAddress() {
82 return (InetSocketAddress) super.remoteAddress();
83 }
84
85 @Override
86 public InetSocketAddress localAddress() {
87 return (InetSocketAddress) super.localAddress();
88 }
89
90 @Override
91 @SuppressWarnings("deprecation")
92 public boolean isActive() {
93 return socket.isOpen() && (config.getActiveOnOpen() && isRegistered() || active);
94 }
95
96 @Override
97 public boolean isConnected() {
98 return connected;
99 }
100
101 @Override
102 public ChannelFuture joinGroup(InetAddress multicastAddress) {
103 return joinGroup(multicastAddress, newPromise());
104 }
105
106 @Override
107 public ChannelFuture joinGroup(InetAddress multicastAddress, ChannelPromise promise) {
108 try {
109 NetworkInterface iface = config().getNetworkInterface();
110 if (iface == null) {
111 iface = NetworkInterface.getByInetAddress(localAddress().getAddress());
112 }
113 return joinGroup(multicastAddress, iface, null, promise);
114 } catch (SocketException e) {
115 promise.setFailure(e);
116 }
117 return promise;
118 }
119
120 @Override
121 public ChannelFuture joinGroup(
122 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
123 return joinGroup(multicastAddress, networkInterface, newPromise());
124 }
125
126 @Override
127 public ChannelFuture joinGroup(
128 InetSocketAddress multicastAddress, NetworkInterface networkInterface,
129 ChannelPromise promise) {
130 return joinGroup(multicastAddress.getAddress(), networkInterface, null, promise);
131 }
132
133 @Override
134 public ChannelFuture joinGroup(
135 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
136 return joinGroup(multicastAddress, networkInterface, source, newPromise());
137 }
138
139 @Override
140 public ChannelFuture joinGroup(
141 final InetAddress multicastAddress, final NetworkInterface networkInterface,
142 final InetAddress source, final ChannelPromise promise) {
143
144 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
145 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
146
147 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
148 return promise;
149 }
150
151 @Override
152 public ChannelFuture leaveGroup(InetAddress multicastAddress) {
153 return leaveGroup(multicastAddress, newPromise());
154 }
155
156 @Override
157 public ChannelFuture leaveGroup(InetAddress multicastAddress, ChannelPromise promise) {
158 try {
159 return leaveGroup(
160 multicastAddress, NetworkInterface.getByInetAddress(localAddress().getAddress()), null, promise);
161 } catch (SocketException e) {
162 promise.setFailure(e);
163 }
164 return promise;
165 }
166
167 @Override
168 public ChannelFuture leaveGroup(
169 InetSocketAddress multicastAddress, NetworkInterface networkInterface) {
170 return leaveGroup(multicastAddress, networkInterface, newPromise());
171 }
172
173 @Override
174 public ChannelFuture leaveGroup(
175 InetSocketAddress multicastAddress,
176 NetworkInterface networkInterface, ChannelPromise promise) {
177 return leaveGroup(multicastAddress.getAddress(), networkInterface, null, promise);
178 }
179
180 @Override
181 public ChannelFuture leaveGroup(
182 InetAddress multicastAddress, NetworkInterface networkInterface, InetAddress source) {
183 return leaveGroup(multicastAddress, networkInterface, source, newPromise());
184 }
185
186 @Override
187 public ChannelFuture leaveGroup(
188 final InetAddress multicastAddress, final NetworkInterface networkInterface, final InetAddress source,
189 final ChannelPromise promise) {
190 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
191 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
192
193 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
194
195 return promise;
196 }
197
198 @Override
199 public ChannelFuture block(
200 InetAddress multicastAddress, NetworkInterface networkInterface,
201 InetAddress sourceToBlock) {
202 return block(multicastAddress, networkInterface, sourceToBlock, newPromise());
203 }
204
205 @Override
206 public ChannelFuture block(
207 final InetAddress multicastAddress, final NetworkInterface networkInterface,
208 final InetAddress sourceToBlock, final ChannelPromise promise) {
209 ObjectUtil.checkNotNull(multicastAddress, "multicastAddress");
210 ObjectUtil.checkNotNull(sourceToBlock, "sourceToBlock");
211 ObjectUtil.checkNotNull(networkInterface, "networkInterface");
212 promise.setFailure(new UnsupportedOperationException("Multicast not supported"));
213 return promise;
214 }
215
216 @Override
217 public ChannelFuture block(InetAddress multicastAddress, InetAddress sourceToBlock) {
218 return block(multicastAddress, sourceToBlock, newPromise());
219 }
220
221 @Override
222 public ChannelFuture block(
223 InetAddress multicastAddress, InetAddress sourceToBlock, ChannelPromise promise) {
224 try {
225 return block(
226 multicastAddress,
227 NetworkInterface.getByInetAddress(localAddress().getAddress()),
228 sourceToBlock, promise);
229 } catch (Throwable e) {
230 promise.setFailure(e);
231 }
232 return promise;
233 }
234
235 @Override
236 protected AbstractKQueueUnsafe newUnsafe() {
237 return new KQueueDatagramChannelUnsafe();
238 }
239
240 @Override
241 protected void doBind(SocketAddress localAddress) throws Exception {
242 super.doBind(localAddress);
243 active = true;
244 }
245
246 @Override
247 protected boolean doWriteMessage(Object msg) throws Exception {
248 final ByteBuf data;
249 InetSocketAddress remoteAddress;
250 if (msg instanceof AddressedEnvelope) {
251 @SuppressWarnings("unchecked")
252 AddressedEnvelope<ByteBuf, InetSocketAddress> envelope =
253 (AddressedEnvelope<ByteBuf, InetSocketAddress>) msg;
254 data = envelope.content();
255 remoteAddress = envelope.recipient();
256 } else {
257 data = (ByteBuf) msg;
258 remoteAddress = null;
259 }
260
261 final int dataLen = data.readableBytes();
262 if (dataLen == 0) {
263 return true;
264 }
265
266 final long writtenBytes;
267 if (data.hasMemoryAddress()) {
268 long memoryAddress = data.memoryAddress();
269 if (remoteAddress == null) {
270 writtenBytes = socket.writeAddress(memoryAddress, data.readerIndex(), data.writerIndex());
271 } else {
272 writtenBytes = socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
273 remoteAddress.getAddress(), remoteAddress.getPort());
274 }
275 } else if (data.nioBufferCount() > 1) {
276 IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
277 array.add(data, data.readerIndex(), data.readableBytes());
278 int cnt = array.count();
279 assert cnt != 0;
280
281 if (remoteAddress == null) {
282 writtenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
283 } else {
284 writtenBytes = socket.sendToAddresses(array.memoryAddress(0), cnt,
285 remoteAddress.getAddress(), remoteAddress.getPort());
286 }
287 } else {
288 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
289 if (remoteAddress == null) {
290 writtenBytes = socket.write(nioData, nioData.position(), nioData.limit());
291 } else {
292 writtenBytes = socket.sendTo(nioData, nioData.position(), nioData.limit(),
293 remoteAddress.getAddress(), remoteAddress.getPort());
294 }
295 }
296
297 return writtenBytes > 0;
298 }
299
300 private static void checkUnresolved(AddressedEnvelope<?, ?> envelope) {
301 if (envelope.recipient() instanceof InetSocketAddress
302 && (((InetSocketAddress) envelope.recipient()).isUnresolved())) {
303 throw new UnresolvedAddressException();
304 }
305 }
306
307 @Override
308 protected Object filterOutboundMessage(Object msg) {
309 if (msg instanceof DatagramPacket) {
310 DatagramPacket packet = (DatagramPacket) msg;
311 checkUnresolved(packet);
312 ByteBuf content = packet.content();
313 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
314 new DatagramPacket(newDirectBuffer(packet, content), packet.recipient()) : msg;
315 }
316
317 if (msg instanceof ByteBuf) {
318 ByteBuf buf = (ByteBuf) msg;
319 return UnixChannelUtil.isBufferCopyNeededForWrite(buf) ? newDirectBuffer(buf) : buf;
320 }
321
322 if (msg instanceof AddressedEnvelope) {
323 @SuppressWarnings("unchecked")
324 AddressedEnvelope<Object, SocketAddress> e = (AddressedEnvelope<Object, SocketAddress>) msg;
325 checkUnresolved(e);
326
327 if (e.content() instanceof ByteBuf &&
328 (e.recipient() == null || e.recipient() instanceof InetSocketAddress)) {
329
330 ByteBuf content = (ByteBuf) e.content();
331 return UnixChannelUtil.isBufferCopyNeededForWrite(content) ?
332 new DefaultAddressedEnvelope<ByteBuf, InetSocketAddress>(
333 newDirectBuffer(e, content), (InetSocketAddress) e.recipient()) : e;
334 }
335 }
336
337 throw new UnsupportedOperationException(
338 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
339 }
340
341 @Override
342 public KQueueDatagramChannelConfig config() {
343 return config;
344 }
345
346 @Override
347 protected void doDisconnect() throws Exception {
348 socket.disconnect();
349 connected = active = false;
350 resetCachedAddresses();
351 }
352
353 @Override
354 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
355 if (super.doConnect(remoteAddress, localAddress)) {
356 connected = true;
357 return true;
358 }
359 return false;
360 }
361
362 @Override
363 protected void doClose() throws Exception {
364 super.doClose();
365 connected = false;
366 }
367
368 final class KQueueDatagramChannelUnsafe extends AbstractKQueueUnsafe {
369
370 @Override
371 void readReady(KQueueRecvByteAllocatorHandle allocHandle) {
372 assert eventLoop().inEventLoop();
373 final DatagramChannelConfig config = config();
374 if (shouldBreakReadReady(config)) {
375 clearReadFilter0();
376 return;
377 }
378 final ChannelPipeline pipeline = pipeline();
379 final ByteBufAllocator allocator = config.getAllocator();
380 allocHandle.reset(config);
381 readReadyBefore();
382
383 Throwable exception = null;
384 try {
385 ByteBuf byteBuf = null;
386 try {
387 boolean connected = isConnected();
388 do {
389 byteBuf = allocHandle.allocate(allocator);
390 allocHandle.attemptedBytesRead(byteBuf.writableBytes());
391
392 final DatagramPacket packet;
393 if (connected) {
394 try {
395 allocHandle.lastBytesRead(doReadBytes(byteBuf));
396 } catch (Errors.NativeIoException e) {
397
398 if (e.expectedErr() == Errors.ERROR_ECONNREFUSED_NEGATIVE) {
399 PortUnreachableException error = new PortUnreachableException(e.getMessage());
400 error.initCause(e);
401 throw error;
402 }
403 throw e;
404 }
405 if (allocHandle.lastBytesRead() <= 0) {
406
407 byteBuf.release();
408 byteBuf = null;
409 break;
410 }
411 packet = new DatagramPacket(byteBuf,
412 (InetSocketAddress) localAddress(), (InetSocketAddress) remoteAddress());
413 } else {
414 final DatagramSocketAddress remoteAddress;
415 if (byteBuf.hasMemoryAddress()) {
416
417 remoteAddress = socket.recvFromAddress(byteBuf.memoryAddress(), byteBuf.writerIndex(),
418 byteBuf.capacity());
419 } else {
420 ByteBuffer nioData = byteBuf.internalNioBuffer(
421 byteBuf.writerIndex(), byteBuf.writableBytes());
422 remoteAddress = socket.recvFrom(nioData, nioData.position(), nioData.limit());
423 }
424
425 if (remoteAddress == null) {
426 allocHandle.lastBytesRead(-1);
427 byteBuf.release();
428 byteBuf = null;
429 break;
430 }
431 InetSocketAddress localAddress = remoteAddress.localAddress();
432 if (localAddress == null) {
433 localAddress = (InetSocketAddress) localAddress();
434 }
435 allocHandle.lastBytesRead(remoteAddress.receivedAmount());
436 byteBuf.writerIndex(byteBuf.writerIndex() + allocHandle.lastBytesRead());
437
438 packet = new DatagramPacket(byteBuf, localAddress, remoteAddress);
439 }
440
441 allocHandle.incMessagesRead(1);
442
443 readPending = false;
444 pipeline.fireChannelRead(packet);
445
446 byteBuf = null;
447
448
449
450 } while (allocHandle.continueReading(UncheckedBooleanSupplier.TRUE_SUPPLIER));
451 } catch (Throwable t) {
452 if (byteBuf != null) {
453 byteBuf.release();
454 }
455 exception = t;
456 }
457
458 allocHandle.readComplete();
459 pipeline.fireChannelReadComplete();
460
461 if (exception != null) {
462 pipeline.fireExceptionCaught(exception);
463 }
464 } finally {
465 readReadyFinally(config);
466 }
467 }
468 }
469 }